1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "MDSContext.h"
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
41 #include "messages/MMDSSlaveRequest.h"
45 #include "common/config.h"
48 #define dout_subsys ceph_subsys_mds
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
53 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
57 class LockerContext
: public MDSInternalContextBase
{
60 MDSRank
*get_mds() override
66 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
67 assert(locker
!= NULL
);
71 class LockerLogContext
: public MDSLogContextBase
{
74 MDSRank
*get_mds() override
80 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
81 assert(locker
!= NULL
);
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message
*m
)
89 switch (m
->get_type()) {
93 handle_lock(static_cast<MLock
*>(m
));
96 case MSG_MDS_INODEFILECAPS
:
97 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
101 case CEPH_MSG_CLIENT_CAPS
:
102 handle_client_caps(static_cast<MClientCaps
*>(m
));
105 case CEPH_MSG_CLIENT_CAPRELEASE
:
106 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
108 case CEPH_MSG_CLIENT_LEASE
:
109 handle_client_lease(static_cast<MClientLease
*>(m
));
113 derr
<< "locker unknown message " << m
->get_type() << dendl
;
114 assert(0 == "locker unknown message");
131 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
133 for (const auto &it
: lock
->get_parent()->get_replicas()) {
134 if (mds
->is_cluster_degraded() &&
135 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
137 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
138 mds
->send_message_mds(m
, it
.first
);
142 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
144 for (const auto &it
: lock
->get_parent()->get_replicas()) {
145 if (mds
->is_cluster_degraded() &&
146 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
148 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
150 mds
->send_message_mds(m
, it
.first
);
157 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
159 // rdlock ancestor snaps
161 rdlocks
.insert(&in
->snaplock
);
162 while (t
->get_projected_parent_dn()) {
163 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
164 rdlocks
.insert(&t
->snaplock
);
168 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
169 file_layout_t
**layout
)
171 //rdlock ancestor snaps
173 rdlocks
.insert(&in
->snaplock
);
174 rdlocks
.insert(&in
->policylock
);
175 bool found_layout
= false;
177 rdlocks
.insert(&t
->snaplock
);
179 rdlocks
.insert(&t
->policylock
);
180 if (t
->get_projected_inode()->has_layout()) {
181 *layout
= &t
->get_projected_inode()->layout
;
185 if (t
->get_projected_parent_dn() &&
186 t
->get_projected_parent_dn()->get_dir())
187 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
192 struct MarkEventOnDestruct
{
196 MarkEventOnDestruct(MDRequestRef
& _mdr
,
197 const char *_message
) : mdr(_mdr
),
200 ~MarkEventOnDestruct() {
202 mdr
->mark_event(message
);
206 /* If this function returns false, the mdr has been placed
207 * on the appropriate wait list */
208 bool Locker::acquire_locks(MDRequestRef
& mdr
,
209 set
<SimpleLock
*> &rdlocks
,
210 set
<SimpleLock
*> &wrlocks
,
211 set
<SimpleLock
*> &xlocks
,
212 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
213 CInode
*auth_pin_freeze
,
214 bool auth_pin_nonblock
)
216 if (mdr
->done_locking
&&
217 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
218 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
219 return true; // at least we had better be!
221 dout(10) << "acquire_locks " << *mdr
<< dendl
;
223 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
225 client_t client
= mdr
->get_client();
227 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
228 set
<MDSCacheObject
*> mustpin
; // items to authpin
231 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
232 SimpleLock
*lock
= *p
;
234 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
235 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
236 mds
->is_cluster_degraded() &&
238 !mdr
->is_queued_for_replay()) {
239 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
240 // get processed in proper order.
242 if (lock
->get_parent()->is_auth()) {
243 if (!mdr
->locks
.count(lock
)) {
245 lock
->get_parent()->list_replicas(ls
);
247 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
254 // if the lock is the latest locked one, it's possible that slave mds got the lock
255 // while there are recovering mds.
256 if (!mdr
->locks
.count(lock
) || lock
== *mdr
->locks
.rbegin())
260 dout(10) << " must xlock " << *lock
<< " " << *lock
->get_parent()
261 << ", waiting for cluster recovered" << dendl
;
262 mds
->locker
->drop_locks(mdr
.get(), NULL
);
263 mdr
->drop_local_auth_pins();
264 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
269 dout(20) << " must xlock " << *lock
<< " " << *lock
->get_parent() << dendl
;
272 mustpin
.insert(lock
->get_parent());
274 // augment xlock with a versionlock?
275 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
276 CDentry
*dn
= (CDentry
*)lock
->get_parent();
280 if (xlocks
.count(&dn
->versionlock
))
281 continue; // we're xlocking the versionlock too; don't wrlock it!
283 if (mdr
->is_master()) {
284 // master. wrlock versionlock so we can pipeline dentry updates to journal.
285 wrlocks
.insert(&dn
->versionlock
);
287 // slave. exclusively lock the dentry version (i.e. block other journal updates).
288 // this makes rollback safe.
289 xlocks
.insert(&dn
->versionlock
);
290 sorted
.insert(&dn
->versionlock
);
293 if (lock
->get_type() > CEPH_LOCK_IVERSION
) {
294 // inode version lock?
295 CInode
*in
= (CInode
*)lock
->get_parent();
298 if (mdr
->is_master()) {
299 // master. wrlock versionlock so we can pipeline inode updates to journal.
300 wrlocks
.insert(&in
->versionlock
);
302 // slave. exclusively lock the inode version (i.e. block other journal updates).
303 // this makes rollback safe.
304 xlocks
.insert(&in
->versionlock
);
305 sorted
.insert(&in
->versionlock
);
311 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
312 MDSCacheObject
*object
= (*p
)->get_parent();
313 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
315 if (object
->is_auth())
316 mustpin
.insert(object
);
317 else if (!object
->is_auth() &&
318 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
319 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
320 dout(15) << " will also auth_pin " << *object
321 << " in case we need to request a scatter" << dendl
;
322 mustpin
.insert(object
);
327 if (remote_wrlocks
) {
328 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
329 MDSCacheObject
*object
= p
->first
->get_parent();
330 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
331 << *p
->first
<< " " << *object
<< dendl
;
332 sorted
.insert(p
->first
);
333 mustpin
.insert(object
);
338 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
341 MDSCacheObject
*object
= (*p
)->get_parent();
342 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
344 if (object
->is_auth())
345 mustpin
.insert(object
);
346 else if (!object
->is_auth() &&
347 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
348 dout(15) << " will also auth_pin " << *object
349 << " in case we need to request a rdlock" << dendl
;
350 mustpin
.insert(object
);
356 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
358 // can i auth pin them all now?
359 marker
.message
= "failed to authpin local pins";
360 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
363 MDSCacheObject
*object
= *p
;
365 dout(10) << " must authpin " << *object
<< dendl
;
367 if (mdr
->is_auth_pinned(object
)) {
368 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
370 if (mdr
->more()->is_remote_frozen_authpin
) {
371 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
373 // unfreeze auth pin for the wrong inode
374 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
378 if (!object
->is_auth()) {
379 if (!mdr
->locks
.empty())
380 drop_locks(mdr
.get());
381 if (object
->is_ambiguous_auth()) {
383 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
384 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
385 mdr
->drop_local_auth_pins();
388 mustpin_remote
[object
->authority().first
].insert(object
);
391 if (!object
->can_auth_pin()) {
393 drop_locks(mdr
.get());
394 mdr
->drop_local_auth_pins();
395 if (auth_pin_nonblock
) {
396 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
400 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
401 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
403 if (!mdr
->remote_auth_pins
.empty())
404 notify_freeze_waiter(object
);
410 // ok, grab local auth pins
411 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
414 MDSCacheObject
*object
= *p
;
415 if (mdr
->is_auth_pinned(object
)) {
416 dout(10) << " already auth_pinned " << *object
<< dendl
;
417 } else if (object
->is_auth()) {
418 dout(10) << " auth_pinning " << *object
<< dendl
;
419 mdr
->auth_pin(object
);
423 // request remote auth_pins
424 if (!mustpin_remote
.empty()) {
425 marker
.message
= "requesting remote authpins";
426 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
427 p
!= mdr
->remote_auth_pins
.end();
429 if (mustpin
.count(p
->first
)) {
430 assert(p
->second
== p
->first
->authority().first
);
431 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
432 if (q
!= mustpin_remote
.end())
433 q
->second
.insert(p
->first
);
436 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
437 p
!= mustpin_remote
.end();
439 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
441 // wait for active auth
442 if (mds
->is_cluster_degraded() &&
443 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
444 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
445 if (mdr
->more()->waiting_on_slave
.empty())
446 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
450 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
451 MMDSSlaveRequest::OP_AUTHPIN
);
452 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
453 q
!= p
->second
.end();
455 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
456 MDSCacheObjectInfo info
;
457 (*q
)->set_object_info(info
);
458 req
->get_authpins().push_back(info
);
459 if (*q
== auth_pin_freeze
)
460 (*q
)->set_object_info(req
->get_authpin_freeze());
463 if (auth_pin_nonblock
)
464 req
->mark_nonblock();
465 mds
->send_message_mds(req
, p
->first
);
467 // put in waiting list
468 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
469 mdr
->more()->waiting_on_slave
.insert(p
->first
);
474 // caps i'll need to issue
475 set
<CInode
*> issue_set
;
479 // make sure they match currently acquired locks.
480 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
481 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
484 bool need_wrlock
= !!wrlocks
.count(*p
);
485 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
488 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
490 SimpleLock
*have
= *existing
;
492 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
493 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
496 if (mdr
->remote_wrlocks
.count(have
)) {
497 if (!need_remote_wrlock
||
498 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
499 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
500 << " " << *have
<< " " << *have
->get_parent() << dendl
;
501 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
504 if (need_wrlock
|| need_remote_wrlock
) {
505 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
506 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
508 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
509 if (need_remote_wrlock
)
510 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
514 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
515 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
520 // hose any stray locks
521 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
522 assert(need_wrlock
|| need_remote_wrlock
);
523 SimpleLock
*lock
= *existing
;
524 if (mdr
->wrlocks
.count(lock
)) {
526 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
527 else if (need_remote_wrlock
) // acquire remote_wrlock first
528 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
529 bool need_issue
= false;
530 wrlock_finish(lock
, mdr
.get(), &need_issue
);
532 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
536 while (existing
!= mdr
->locks
.end()) {
537 SimpleLock
*stray
= *existing
;
539 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
540 bool need_issue
= false;
541 if (mdr
->xlocks
.count(stray
)) {
542 xlock_finish(stray
, mdr
.get(), &need_issue
);
543 } else if (mdr
->rdlocks
.count(stray
)) {
544 rdlock_finish(stray
, mdr
.get(), &need_issue
);
546 // may have acquired both wrlock and remore wrlock
547 if (mdr
->wrlocks
.count(stray
))
548 wrlock_finish(stray
, mdr
.get(), &need_issue
);
549 if (mdr
->remote_wrlocks
.count(stray
))
550 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
553 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
557 if (mdr
->locking
&& *p
!= mdr
->locking
) {
558 cancel_locking(mdr
.get(), &issue_set
);
560 if (xlocks
.count(*p
)) {
561 marker
.message
= "failed to xlock, waiting";
562 if (!xlock_start(*p
, mdr
))
564 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
565 } else if (need_wrlock
|| need_remote_wrlock
) {
566 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
567 marker
.message
= "waiting for remote wrlocks";
568 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
571 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
572 marker
.message
= "failed to wrlock, waiting";
573 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
574 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
575 // can't take the wrlock because the scatter lock is gathering. need to
576 // release the remote wrlock, so that the gathering process can finish.
577 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
578 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
581 // nowait if we have already gotten remote wrlock
582 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
584 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
587 assert(mdr
->is_master());
588 if ((*p
)->needs_recover()) {
589 if (mds
->is_cluster_degraded()) {
590 if (!mdr
->is_queued_for_replay()) {
591 // see comments in SimpleLock::set_state_rejoin() and
592 // ScatterLock::encode_state_for_rejoin()
593 drop_locks(mdr
.get());
594 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
595 dout(10) << " rejoin recovering " << **p
<< " " << *(*p
)->get_parent()
596 << ", waiting for cluster recovered" << dendl
;
597 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
601 (*p
)->clear_need_recover();
605 marker
.message
= "failed to rdlock, waiting";
606 if (!rdlock_start(*p
, mdr
))
608 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
612 // any extra unneeded locks?
613 while (existing
!= mdr
->locks
.end()) {
614 SimpleLock
*stray
= *existing
;
616 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
617 bool need_issue
= false;
618 if (mdr
->xlocks
.count(stray
)) {
619 xlock_finish(stray
, mdr
.get(), &need_issue
);
620 } else if (mdr
->rdlocks
.count(stray
)) {
621 rdlock_finish(stray
, mdr
.get(), &need_issue
);
623 // may have acquired both wrlock and remore wrlock
624 if (mdr
->wrlocks
.count(stray
))
625 wrlock_finish(stray
, mdr
.get(), &need_issue
);
626 if (mdr
->remote_wrlocks
.count(stray
))
627 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
630 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
633 mdr
->done_locking
= true;
634 mdr
->set_mds_stamp(ceph_clock_now());
636 marker
.message
= "acquired locks";
639 issue_caps_set(issue_set
);
643 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
646 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
648 dir
= in
->get_parent_dir();
649 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
652 dir
= dynamic_cast<CDir
*>(o
);
656 if (dir
->is_freezing_dir())
657 mdcache
->fragment_freeze_inc_num_waiters(dir
);
658 if (dir
->is_freezing_tree()) {
659 while (!dir
->is_freezing_tree_root())
660 dir
= dir
->get_parent_dir();
661 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
666 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
668 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
669 p
!= mut
->xlocks
.end();
671 MDSCacheObject
*object
= (*p
)->get_parent();
672 assert(object
->is_auth());
674 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
676 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
677 (*p
)->set_xlock_done();
681 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
683 while (!mut
->rdlocks
.empty()) {
685 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
686 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
688 pneed_issue
->insert(static_cast<CInode
*>(p
));
692 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
694 set
<mds_rank_t
> slaves
;
696 while (!mut
->xlocks
.empty()) {
697 SimpleLock
*lock
= *mut
->xlocks
.begin();
698 MDSCacheObject
*p
= lock
->get_parent();
700 assert(lock
->get_sm()->can_remote_xlock
);
701 slaves
.insert(p
->authority().first
);
703 mut
->locks
.erase(lock
);
704 mut
->xlocks
.erase(lock
);
708 xlock_finish(lock
, mut
, &ni
);
710 pneed_issue
->insert(static_cast<CInode
*>(p
));
713 while (!mut
->remote_wrlocks
.empty()) {
714 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
715 slaves
.insert(p
->second
);
716 if (mut
->wrlocks
.count(p
->first
) == 0)
717 mut
->locks
.erase(p
->first
);
718 mut
->remote_wrlocks
.erase(p
);
721 while (!mut
->wrlocks
.empty()) {
723 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
724 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
726 pneed_issue
->insert(static_cast<CInode
*>(p
));
729 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
730 if (!mds
->is_cluster_degraded() ||
731 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
732 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
733 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
734 MMDSSlaveRequest::OP_DROPLOCKS
);
735 mds
->send_message_mds(slavereq
, *p
);
740 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
742 SimpleLock
*lock
= mut
->locking
;
744 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
746 if (lock
->get_parent()->is_auth()) {
747 bool need_issue
= false;
748 if (lock
->get_state() == LOCK_PREXLOCK
) {
749 _finish_xlock(lock
, -1, &need_issue
);
750 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
751 lock
->get_num_xlocks() == 0) {
752 lock
->set_state(LOCK_XLOCKDONE
);
753 eval_gather(lock
, true, &need_issue
);
756 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
758 mut
->finish_locking(lock
);
761 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
764 set
<CInode
*> my_need_issue
;
766 pneed_issue
= &my_need_issue
;
769 cancel_locking(mut
, pneed_issue
);
770 _drop_non_rdlocks(mut
, pneed_issue
);
771 _drop_rdlocks(mut
, pneed_issue
);
773 if (pneed_issue
== &my_need_issue
)
774 issue_caps_set(*pneed_issue
);
775 mut
->done_locking
= false;
778 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
780 set
<CInode
*> my_need_issue
;
782 pneed_issue
= &my_need_issue
;
784 _drop_non_rdlocks(mut
, pneed_issue
);
786 if (pneed_issue
== &my_need_issue
)
787 issue_caps_set(*pneed_issue
);
790 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
792 set
<CInode
*> need_issue
;
794 for (auto p
= mut
->rdlocks
.begin(); p
!= mut
->rdlocks
.end(); ) {
795 SimpleLock
*lock
= *p
;
797 // make later mksnap/setlayout (at other mds) wait for this unsafe request
798 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
799 lock
->get_type() == CEPH_LOCK_IPOLICY
)
802 rdlock_finish(lock
, mut
, &ni
);
804 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
807 issue_caps_set(need_issue
);
812 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
814 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
815 assert(!lock
->is_stable());
817 int next
= lock
->get_next_state();
820 bool caps
= lock
->get_cap_shift();
821 if (lock
->get_type() != CEPH_LOCK_DN
)
822 in
= static_cast<CInode
*>(lock
->get_parent());
824 bool need_issue
= false;
826 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
827 assert(!caps
|| in
!= NULL
);
828 if (caps
&& in
->is_head()) {
829 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
830 lock
->get_cap_shift(), lock
->get_cap_mask());
831 dout(10) << " next state is " << lock
->get_state_name(next
)
832 << " issued/allows loner " << gcap_string(loner_issued
)
833 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
834 << " xlocker " << gcap_string(xlocker_issued
)
835 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
836 << " other " << gcap_string(other_issued
)
837 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
840 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
841 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
842 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
846 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
847 bool auth
= lock
->get_parent()->is_auth();
848 if (!lock
->is_gathering() &&
849 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
850 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
851 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
853 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
854 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
855 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
856 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
857 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
858 lock
->get_state() != LOCK_MIX_SYNC2
860 dout(7) << "eval_gather finished gather on " << *lock
861 << " on " << *lock
->get_parent() << dendl
;
863 if (lock
->get_sm() == &sm_filelock
) {
865 if (in
->state_test(CInode::STATE_RECOVERING
)) {
866 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
868 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
869 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
870 mds
->mdcache
->queue_file_recover(in
);
871 mds
->mdcache
->do_file_recover();
876 if (!lock
->get_parent()->is_auth()) {
877 // replica: tell auth
878 mds_rank_t auth
= lock
->get_parent()->authority().first
;
880 if (lock
->get_parent()->is_rejoining() &&
881 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
882 dout(7) << "eval_gather finished gather, but still rejoining "
883 << *lock
->get_parent() << dendl
;
887 if (!mds
->is_cluster_degraded() ||
888 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
889 switch (lock
->get_state()) {
891 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
897 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
898 lock
->encode_locked_state(reply
->get_data());
899 mds
->send_message_mds(reply
, auth
);
900 next
= LOCK_MIX_SYNC2
;
901 (static_cast<ScatterLock
*>(lock
))->start_flush();
906 (static_cast<ScatterLock
*>(lock
))->finish_flush();
907 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
910 // do nothing, we already acked
915 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
916 mds
->send_message_mds(reply
, auth
);
917 next
= LOCK_SYNC_MIX2
;
924 lock
->encode_locked_state(data
);
925 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
926 (static_cast<ScatterLock
*>(lock
))->start_flush();
927 // we'll get an AC_LOCKFLUSHED to complete
938 // once the first (local) stage of mix->lock gather complete we can
939 // gather from replicas
940 if (lock
->get_state() == LOCK_MIX_LOCK
&&
941 lock
->get_parent()->is_replicated()) {
942 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
943 send_lock_message(lock
, LOCK_AC_LOCK
);
945 lock
->set_state(LOCK_MIX_LOCK2
);
949 if (lock
->is_dirty() && !lock
->is_flushed()) {
950 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
954 lock
->clear_flushed();
956 switch (lock
->get_state()) {
962 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
963 if (lock
->get_parent()->is_replicated()) {
965 lock
->encode_locked_state(softdata
);
966 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
968 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
973 if (next
!= LOCK_SYNC
)
982 if (lock
->get_parent()->is_replicated()) {
984 lock
->encode_locked_state(softdata
);
985 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
992 lock
->set_state(next
);
994 if (lock
->get_parent()->is_auth() &&
996 lock
->get_parent()->auth_unpin(lock
);
998 // drop loner before doing waiters
1002 in
->get_wanted_loner() != in
->get_loner()) {
1003 dout(10) << " trying to drop loner" << dendl
;
1004 if (in
->try_drop_loner()) {
1005 dout(10) << " dropped loner" << dendl
;
1011 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1014 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1016 if (caps
&& in
->is_head())
1019 if (lock
->get_parent()->is_auth() &&
1021 try_eval(lock
, &need_issue
);
1026 *pneed_issue
= true;
1027 else if (in
->is_head())
1033 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1035 bool need_issue
= caps_imported
;
1036 list
<MDSInternalContextBase
*> finishers
;
1038 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1041 if (in
->is_auth() && in
->is_head()) {
1042 client_t orig_loner
= in
->get_loner();
1043 if (in
->choose_ideal_loner()) {
1044 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1047 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1048 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1054 if (mask
& CEPH_LOCK_IFILE
)
1055 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1056 if (mask
& CEPH_LOCK_IAUTH
)
1057 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1058 if (mask
& CEPH_LOCK_ILINK
)
1059 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1060 if (mask
& CEPH_LOCK_IXATTR
)
1061 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1062 if (mask
& CEPH_LOCK_INEST
)
1063 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1064 if (mask
& CEPH_LOCK_IFLOCK
)
1065 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1066 if (mask
& CEPH_LOCK_IPOLICY
)
1067 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1070 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1071 if (in
->try_drop_loner()) {
1073 if (in
->get_wanted_loner() >= 0) {
1074 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1075 bool ok
= in
->try_set_loner();
1083 finish_contexts(g_ceph_context
, finishers
);
1085 if (need_issue
&& in
->is_head())
1088 dout(10) << "eval done" << dendl
;
1092 class C_Locker_Eval
: public LockerContext
{
1096 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1097 // We are used as an MDSCacheObject waiter, so should
1098 // only be invoked by someone already holding the big lock.
1099 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1100 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1102 void finish(int r
) override
{
1103 locker
->try_eval(p
, mask
);
1104 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1108 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1110 // unstable and ambiguous auth?
1111 if (p
->is_ambiguous_auth()) {
1112 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1113 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1117 if (p
->is_auth() && p
->is_frozen()) {
1118 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1119 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1123 if (mask
& CEPH_LOCK_DN
) {
1124 assert(mask
== CEPH_LOCK_DN
);
1125 bool need_issue
= false; // ignore this, no caps on dentries
1126 CDentry
*dn
= static_cast<CDentry
*>(p
);
1127 eval_any(&dn
->lock
, &need_issue
);
1129 CInode
*in
= static_cast<CInode
*>(p
);
1134 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1136 MDSCacheObject
*p
= lock
->get_parent();
1138 // unstable and ambiguous auth?
1139 if (p
->is_ambiguous_auth()) {
1140 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1141 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1145 if (!p
->is_auth()) {
1146 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1150 if (p
->is_frozen()) {
1151 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1152 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1157 * We could have a situation like:
1159 * - mds A authpins item on mds B
1160 * - mds B starts to freeze tree containing item
1161 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1162 * - mds B lock is unstable, sets scatter_wanted
1163 * - mds B lock stabilizes, calls try_eval.
1165 * We can defer while freezing without causing a deadlock. Honor
1166 * scatter_wanted flag here. This will never get deferred by the
1167 * checks above due to the auth_pin held by the master.
1169 if (lock
->is_scatterlock()) {
1170 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1171 if (slock
->get_scatter_wanted() &&
1172 slock
->get_state() != LOCK_MIX
) {
1173 scatter_mix(slock
, pneed_issue
);
1174 if (!lock
->is_stable())
1176 } else if (slock
->get_unscatter_wanted() &&
1177 slock
->get_state() != LOCK_LOCK
) {
1178 simple_lock(slock
, pneed_issue
);
1179 if (!lock
->is_stable()) {
1185 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1186 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1187 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1191 eval(lock
, pneed_issue
);
1194 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1196 bool need_issue
= false;
1197 list
<MDSInternalContextBase
*> finishers
;
1200 if (!in
->filelock
.is_stable())
1201 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1202 if (!in
->authlock
.is_stable())
1203 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1204 if (!in
->linklock
.is_stable())
1205 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1206 if (!in
->xattrlock
.is_stable())
1207 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1209 if (need_issue
&& in
->is_head()) {
1211 issue_set
->insert(in
);
1216 finish_contexts(g_ceph_context
, finishers
);
1219 void Locker::eval_scatter_gathers(CInode
*in
)
1221 bool need_issue
= false;
1222 list
<MDSInternalContextBase
*> finishers
;
1224 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1227 if (!in
->filelock
.is_stable())
1228 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1229 if (!in
->nestlock
.is_stable())
1230 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1231 if (!in
->dirfragtreelock
.is_stable())
1232 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1234 if (need_issue
&& in
->is_head())
1237 finish_contexts(g_ceph_context
, finishers
);
1240 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1242 switch (lock
->get_type()) {
1243 case CEPH_LOCK_IFILE
:
1244 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1245 case CEPH_LOCK_IDFT
:
1246 case CEPH_LOCK_INEST
:
1247 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1249 return simple_eval(lock
, need_issue
);
1254 // ------------------
1257 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1260 if (lock
->is_stable()) {
1261 if (lock
->get_parent()->is_auth()) {
1262 if (lock
->get_sm() == &sm_scatterlock
) {
1263 // not until tempsync is fully implemented
1264 //if (lock->get_parent()->is_replicated())
1265 //scatter_tempsync((ScatterLock*)lock);
1268 } else if (lock
->get_sm() == &sm_filelock
) {
1269 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1270 if (lock
->get_state() == LOCK_EXCL
&&
1271 in
->get_target_loner() >= 0 &&
1272 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1280 // request rdlock state change from auth
1281 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1282 if (!mds
->is_cluster_degraded() ||
1283 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1284 dout(10) << "requesting rdlock from auth on "
1285 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1286 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1291 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1292 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1293 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1294 mds
->mdcache
->recovery_queue
.prioritize(in
);
1301 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1303 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1305 // can read? grab ref.
1306 if (lock
->can_rdlock(client
))
1309 _rdlock_kick(lock
, false);
1311 if (lock
->can_rdlock(client
))
1316 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1317 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1322 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1324 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1326 // client may be allowed to rdlock the same item it has xlocked.
1327 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1328 if (mut
->snapid
!= CEPH_NOSNAP
)
1330 client_t client
= as_anon
? -1 : mut
->get_client();
1333 if (lock
->get_type() != CEPH_LOCK_DN
)
1334 in
= static_cast<CInode
*>(lock
->get_parent());
1337 if (!lock->get_parent()->is_auth() &&
1338 lock->fw_rdlock_to_auth()) {
1339 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1345 // can read? grab ref.
1346 if (lock
->can_rdlock(client
)) {
1348 mut
->rdlocks
.insert(lock
);
1349 mut
->locks
.insert(lock
);
1353 // hmm, wait a second.
1354 if (in
&& !in
->is_head() && in
->is_auth() &&
1355 lock
->get_state() == LOCK_SNAP_SYNC
) {
1356 // okay, we actually need to kick the head's lock to get ourselves synced up.
1357 CInode
*head
= mdcache
->get_inode(in
->ino());
1359 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1360 if (hlock
->get_state() == LOCK_SYNC
)
1361 hlock
= head
->get_lock(lock
->get_type());
1363 if (hlock
->get_state() != LOCK_SYNC
) {
1364 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1365 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1367 // oh, check our lock again then
1371 if (!_rdlock_kick(lock
, as_anon
))
1377 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1378 wait_on
= SimpleLock::WAIT_RD
;
1380 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1381 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1382 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1387 void Locker::nudge_log(SimpleLock
*lock
)
1389 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1390 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1391 mds
->mdlog
->flush();
1394 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1399 mut
->rdlocks
.erase(lock
);
1400 mut
->locks
.erase(lock
);
1403 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1406 if (!lock
->is_rdlocked()) {
1407 if (!lock
->is_stable())
1408 eval_gather(lock
, false, pneed_issue
);
1409 else if (lock
->get_parent()->is_auth())
1410 try_eval(lock
, pneed_issue
);
1415 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1417 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1418 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1419 if (!(*p
)->can_rdlock(-1)) {
1420 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1426 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1428 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1429 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1430 if (!rdlock_try(*p
, -1, NULL
)) {
1431 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1437 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1439 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1440 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1442 mut
->rdlocks
.insert(*p
);
1443 mut
->locks
.insert(*p
);
1447 // ------------------
1450 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1452 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1453 lock
->get_type() == CEPH_LOCK_DVERSION
)
1454 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1456 dout(7) << "wrlock_force on " << *lock
1457 << " on " << *lock
->get_parent() << dendl
;
1458 lock
->get_wrlock(true);
1459 mut
->wrlocks
.insert(lock
);
1460 mut
->locks
.insert(lock
);
1463 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1465 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1466 lock
->get_type() == CEPH_LOCK_DVERSION
)
1467 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1469 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1471 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1472 client_t client
= mut
->get_client();
1473 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1474 (in
->has_subtree_or_exporting_dirfrag() ||
1475 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1479 if (lock
->can_wrlock(client
) &&
1480 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1482 mut
->wrlocks
.insert(lock
);
1483 mut
->locks
.insert(lock
);
1487 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1488 in
->state_test(CInode::STATE_RECOVERING
)) {
1489 mds
->mdcache
->recovery_queue
.prioritize(in
);
1492 if (!lock
->is_stable())
1495 if (in
->is_auth()) {
1496 // don't do nested lock state change if we have dirty scatterdata and
1497 // may scatter_writebehind or start_scatter, because nowait==true implies
1498 // that the caller already has a log entry open!
1499 if (nowait
&& lock
->is_dirty())
1503 scatter_mix(static_cast<ScatterLock
*>(lock
));
1507 if (nowait
&& !lock
->can_wrlock(client
))
1512 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1513 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1514 if (!mds
->is_cluster_degraded() ||
1515 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1516 dout(10) << "requesting scatter from auth on "
1517 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1518 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1525 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1526 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1533 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1535 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1536 lock
->get_type() == CEPH_LOCK_DVERSION
)
1537 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1539 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1542 mut
->wrlocks
.erase(lock
);
1543 if (mut
->remote_wrlocks
.count(lock
) == 0)
1544 mut
->locks
.erase(lock
);
1547 if (!lock
->is_wrlocked()) {
1548 if (!lock
->is_stable())
1549 eval_gather(lock
, false, pneed_issue
);
1550 else if (lock
->get_parent()->is_auth())
1551 try_eval(lock
, pneed_issue
);
1558 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1560 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1562 // wait for active target
1563 if (mds
->is_cluster_degraded() &&
1564 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1565 dout(7) << " mds." << target
<< " is not active" << dendl
;
1566 if (mut
->more()->waiting_on_slave
.empty())
1567 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1571 // send lock request
1572 mut
->start_locking(lock
, target
);
1573 mut
->more()->slaves
.insert(target
);
1574 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1575 MMDSSlaveRequest::OP_WRLOCK
);
1576 r
->set_lock_type(lock
->get_type());
1577 lock
->get_parent()->set_object_info(r
->get_object_info());
1578 mds
->send_message_mds(r
, target
);
1580 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1581 mut
->more()->waiting_on_slave
.insert(target
);
1584 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1588 mut
->remote_wrlocks
.erase(lock
);
1589 if (mut
->wrlocks
.count(lock
) == 0)
1590 mut
->locks
.erase(lock
);
1592 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1593 << " " << *lock
->get_parent() << dendl
;
1594 if (!mds
->is_cluster_degraded() ||
1595 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1596 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1597 MMDSSlaveRequest::OP_UNWRLOCK
);
1598 slavereq
->set_lock_type(lock
->get_type());
1599 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1600 mds
->send_message_mds(slavereq
, target
);
1605 // ------------------
1608 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1610 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1611 lock
->get_type() == CEPH_LOCK_DVERSION
)
1612 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1614 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1615 client_t client
= mut
->get_client();
1618 if (lock
->get_parent()->is_auth()) {
1621 if (lock
->can_xlock(client
)) {
1622 lock
->set_state(LOCK_XLOCK
);
1623 lock
->get_xlock(mut
, client
);
1624 mut
->xlocks
.insert(lock
);
1625 mut
->locks
.insert(lock
);
1626 mut
->finish_locking(lock
);
1630 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1631 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1632 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1633 mds
->mdcache
->recovery_queue
.prioritize(in
);
1637 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1638 lock
->get_xlock_by_client() != client
||
1639 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1642 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1643 mut
->start_locking(lock
);
1650 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1655 assert(lock
->get_sm()->can_remote_xlock
);
1656 assert(!mut
->slave_request
);
1658 // wait for single auth
1659 if (lock
->get_parent()->is_ambiguous_auth()) {
1660 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1661 new C_MDS_RetryRequest(mdcache
, mut
));
1665 // wait for active auth
1666 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1667 if (mds
->is_cluster_degraded() &&
1668 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1669 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1670 if (mut
->more()->waiting_on_slave
.empty())
1671 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1675 // send lock request
1676 mut
->more()->slaves
.insert(auth
);
1677 mut
->start_locking(lock
, auth
);
1678 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1679 MMDSSlaveRequest::OP_XLOCK
);
1680 r
->set_lock_type(lock
->get_type());
1681 lock
->get_parent()->set_object_info(r
->get_object_info());
1682 mds
->send_message_mds(r
, auth
);
1684 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1685 mut
->more()->waiting_on_slave
.insert(auth
);
1691 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1693 assert(!lock
->is_stable());
1694 if (lock
->get_num_rdlocks() == 0 &&
1695 lock
->get_num_wrlocks() == 0 &&
1696 !lock
->is_leased() &&
1697 lock
->get_state() != LOCK_XLOCKSNAP
&&
1698 lock
->get_type() != CEPH_LOCK_DN
) {
1699 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1700 client_t loner
= in
->get_target_loner();
1701 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1702 lock
->set_state(LOCK_EXCL
);
1703 lock
->get_parent()->auth_unpin(lock
);
1704 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1705 if (lock
->get_cap_shift())
1706 *pneed_issue
= true;
1707 if (lock
->get_parent()->is_auth() &&
1709 try_eval(lock
, pneed_issue
);
1713 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1714 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1717 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1719 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1720 lock
->get_type() == CEPH_LOCK_DVERSION
)
1721 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1723 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1725 client_t xlocker
= lock
->get_xlock_by_client();
1730 mut
->xlocks
.erase(lock
);
1731 mut
->locks
.erase(lock
);
1733 bool do_issue
= false;
1736 if (!lock
->get_parent()->is_auth()) {
1737 assert(lock
->get_sm()->can_remote_xlock
);
1740 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1741 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1742 if (!mds
->is_cluster_degraded() ||
1743 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1744 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1745 MMDSSlaveRequest::OP_UNXLOCK
);
1746 slavereq
->set_lock_type(lock
->get_type());
1747 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1748 mds
->send_message_mds(slavereq
, auth
);
1751 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1752 SimpleLock::WAIT_WR
|
1753 SimpleLock::WAIT_RD
, 0);
1755 if (lock
->get_num_xlocks() == 0) {
1756 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1757 lock
->set_state(LOCK_XLOCKDONE
);
1758 _finish_xlock(lock
, xlocker
, &do_issue
);
1763 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1764 if (in
->is_head()) {
1766 *pneed_issue
= true;
1773 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1775 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1778 mut
->xlocks
.erase(lock
);
1779 mut
->locks
.erase(lock
);
1781 MDSCacheObject
*p
= lock
->get_parent();
1782 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1784 if (!lock
->is_stable())
1785 lock
->get_parent()->auth_unpin(lock
);
1787 lock
->set_state(LOCK_LOCK
);
1790 void Locker::xlock_import(SimpleLock
*lock
)
1792 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1793 lock
->get_parent()->auth_pin(lock
);
1798 // file i/o -----------------------------------------
1800 version_t
Locker::issue_file_data_version(CInode
*in
)
1802 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1803 return in
->inode
.file_data_version
;
1806 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1814 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1815 bool sm
=false, bool ni
=false, client_t c
=-1,
1816 MClientCaps
*ac
= 0)
1817 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1818 client(c
), ack(ac
) {
1819 in
->get(CInode::PIN_PTRWAITER
);
1821 void finish(int r
) override
{
1822 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1823 in
->put(CInode::PIN_PTRWAITER
);
1827 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1828 client_t client
, MClientCaps
*ack
)
1830 dout(10) << "file_update_finish on " << *in
<< dendl
;
1831 in
->pop_and_dirty_projected_inode(mut
->ls
);
1836 Session
*session
= mds
->get_session(client
);
1838 // "oldest flush tid" > 0 means client uses unique TID for each flush
1839 if (ack
->get_oldest_flush_tid() > 0)
1840 session
->add_completed_flush(ack
->get_client_tid());
1841 mds
->send_message_client_counted(ack
, session
);
1843 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1848 set
<CInode
*> need_issue
;
1849 drop_locks(mut
.get(), &need_issue
);
1851 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1852 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1853 // check for snap writeback completion
1854 bool gather
= false;
1855 compact_map
<int,set
<client_t
> >::iterator p
= in
->client_snap_caps
.begin();
1856 while (p
!= in
->client_snap_caps
.end()) {
1857 SimpleLock
*lock
= in
->get_lock(p
->first
);
1859 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1860 << " lock " << *lock
<< " on " << *in
<< dendl
;
1863 p
->second
.erase(client
);
1864 if (p
->second
.empty()) {
1866 in
->client_snap_caps
.erase(p
++);
1871 if (in
->client_snap_caps
.empty())
1872 in
->item_open_file
.remove_myself();
1873 eval_cap_gather(in
, &need_issue
);
1876 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1877 Capability
*cap
= in
->get_client_cap(client
);
1878 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1879 issue_caps(in
, cap
);
1882 if (share_max
&& in
->is_auth() &&
1883 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1884 share_inode_max_size(in
);
1886 issue_caps_set(need_issue
);
1888 // auth unpin after issuing caps
1892 Capability
* Locker::issue_new_caps(CInode
*in
,
1898 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1901 // if replay, try to reconnect cap, and otherwise do nothing.
1903 mds
->mdcache
->try_reconnect_cap(in
, session
);
1908 assert(session
->info
.inst
.name
.is_client());
1909 client_t my_client
= session
->info
.inst
.name
.num();
1910 int my_want
= ceph_caps_for_mode(mode
);
1912 // register a capability
1913 Capability
*cap
= in
->get_client_cap(my_client
);
1916 cap
= in
->add_client_cap(my_client
, session
, realm
);
1917 cap
->set_wanted(my_want
);
1919 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1923 // make sure it wants sufficient caps
1924 if (my_want
& ~cap
->wanted()) {
1925 // augment wanted caps for this client
1926 cap
->set_wanted(cap
->wanted() | my_want
);
1930 if (in
->is_auth()) {
1931 // [auth] twiddle mode?
1932 eval(in
, CEPH_CAP_LOCKS
);
1934 if (_need_flush_mdlog(in
, my_want
))
1935 mds
->mdlog
->flush();
1938 // [replica] tell auth about any new caps wanted
1939 request_inode_file_caps(in
);
1942 // issue caps (pot. incl new one)
1943 //issue_caps(in); // note: _eval above may have done this already...
1945 // re-issue whatever we can
1946 //cap->issue(cap->pending());
1949 cap
->dec_suppress();
1955 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1957 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1961 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1963 // allowed caps are determined by the lock mode.
1964 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1965 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1966 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1968 client_t loner
= in
->get_loner();
1970 dout(7) << "issue_caps loner client." << loner
1971 << " allowed=" << ccap_string(loner_allowed
)
1972 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1973 << ", others allowed=" << ccap_string(all_allowed
)
1974 << " on " << *in
<< dendl
;
1976 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1977 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1978 << " on " << *in
<< dendl
;
1981 assert(in
->is_head());
1983 // count conflicts with
1987 map
<client_t
, Capability
*>::iterator it
;
1989 it
= in
->client_caps
.find(only_cap
->get_client());
1991 it
= in
->client_caps
.begin();
1992 for (; it
!= in
->client_caps
.end(); ++it
) {
1993 Capability
*cap
= it
->second
;
1994 if (cap
->is_stale())
1997 // do not issue _new_ bits when size|mtime is projected
1999 if (loner
== it
->first
)
2000 allowed
= loner_allowed
;
2002 allowed
= all_allowed
;
2004 // add in any xlocker-only caps (for locks this client is the xlocker for)
2005 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2007 Session
*session
= mds
->get_session(it
->first
);
2008 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
2009 !(session
&& session
->connection
&&
2010 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
2011 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2013 int pending
= cap
->pending();
2014 int wanted
= cap
->wanted();
2016 dout(20) << " client." << it
->first
2017 << " pending " << ccap_string(pending
)
2018 << " allowed " << ccap_string(allowed
)
2019 << " wanted " << ccap_string(wanted
)
2022 if (!(pending
& ~allowed
)) {
2023 // skip if suppress or new, and not revocation
2024 if (cap
->is_new() || cap
->is_suppress()) {
2025 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2030 // notify clients about deleted inode, to make sure they release caps ASAP.
2031 if (in
->inode
.nlink
== 0)
2032 wanted
|= CEPH_CAP_LINK_SHARED
;
2034 // are there caps that the client _wants_ and can have, but aren't pending?
2035 // or do we need to revoke?
2036 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2037 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2041 // include caps that clients generally like, while we're at it.
2042 int likes
= in
->get_caps_liked();
2043 int before
= pending
;
2045 if (pending
& ~allowed
)
2046 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2048 seq
= cap
->issue((wanted
|likes
) & allowed
);
2049 int after
= cap
->pending();
2051 if (cap
->is_new()) {
2052 // haven't send caps to client yet
2053 if (before
& ~after
)
2054 cap
->confirm_receipt(seq
, after
);
2056 dout(7) << " sending MClientCaps to client." << it
->first
2057 << " seq " << cap
->get_last_seq()
2058 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2061 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2062 if (op
== CEPH_CAP_OP_REVOKE
) {
2063 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2064 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2065 cap
->set_last_revoke_stamp(ceph_clock_now());
2066 cap
->reset_num_revoke_warnings();
2069 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2070 in
->find_snaprealm()->inode
->ino(),
2071 cap
->get_cap_id(), cap
->get_last_seq(),
2074 mds
->get_osd_epoch_barrier());
2075 in
->encode_cap_message(m
, cap
);
2077 mds
->send_message_client_counted(m
, it
->first
);
2085 return (nissued
== 0); // true if no re-issued, no callbacks
2088 void Locker::issue_truncate(CInode
*in
)
2090 dout(7) << "issue_truncate on " << *in
<< dendl
;
2092 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2093 it
!= in
->client_caps
.end();
2095 Capability
*cap
= it
->second
;
2096 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2098 in
->find_snaprealm()->inode
->ino(),
2099 cap
->get_cap_id(), cap
->get_last_seq(),
2100 cap
->pending(), cap
->wanted(), 0,
2102 mds
->get_osd_epoch_barrier());
2103 in
->encode_cap_message(m
, cap
);
2104 mds
->send_message_client_counted(m
, it
->first
);
2107 // should we increase max_size?
2108 if (in
->is_auth() && in
->is_file())
2109 check_inode_max_size(in
);
2113 void Locker::revoke_stale_caps(Capability
*cap
)
2115 CInode
*in
= cap
->get_inode();
2116 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2117 // if export succeeds, the cap will be removed. if export fails, we need to
2118 // revoke the cap if it's still stale.
2119 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2123 int issued
= cap
->issued();
2124 if (issued
& ~CEPH_CAP_PIN
) {
2125 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2128 if (in
->is_auth() &&
2129 in
->inode
.client_ranges
.count(cap
->get_client()))
2130 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2132 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2133 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2134 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2135 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2137 if (in
->is_auth()) {
2138 try_eval(in
, CEPH_CAP_LOCKS
);
2140 request_inode_file_caps(in
);
2145 void Locker::revoke_stale_caps(Session
*session
)
2147 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2149 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2150 Capability
*cap
= *p
;
2152 revoke_stale_caps(cap
);
2156 void Locker::resume_stale_caps(Session
*session
)
2158 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2160 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2161 Capability
*cap
= *p
;
2162 CInode
*in
= cap
->get_inode();
2163 assert(in
->is_head());
2164 if (cap
->is_stale()) {
2165 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2168 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2169 // if export succeeds, the cap will be removed. if export fails,
2170 // we need to re-issue the cap if it's not stale.
2171 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2175 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2176 issue_caps(in
, cap
);
2181 void Locker::remove_stale_leases(Session
*session
)
2183 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2184 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2186 ClientLease
*l
= *p
;
2188 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2189 dout(15) << " removing lease on " << *parent
<< dendl
;
2190 parent
->remove_client_lease(l
, this);
2195 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2198 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2199 in
->get(CInode::PIN_PTRWAITER
);
2201 void finish(int r
) override
{
2203 locker
->request_inode_file_caps(in
);
2204 in
->put(CInode::PIN_PTRWAITER
);
2208 void Locker::request_inode_file_caps(CInode
*in
)
2210 assert(!in
->is_auth());
2212 int wanted
= in
->get_caps_wanted() & ~CEPH_CAP_PIN
;
2213 if (wanted
!= in
->replica_caps_wanted
) {
2214 // wait for single auth
2215 if (in
->is_ambiguous_auth()) {
2216 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2217 new C_MDL_RequestInodeFileCaps(this, in
));
2221 mds_rank_t auth
= in
->authority().first
;
2222 if (mds
->is_cluster_degraded() &&
2223 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2224 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2228 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2229 << " was " << ccap_string(in
->replica_caps_wanted
)
2230 << " on " << *in
<< " to mds." << auth
<< dendl
;
2232 in
->replica_caps_wanted
= wanted
;
2234 if (!mds
->is_cluster_degraded() ||
2235 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2236 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2241 /* This function DOES put the passed message before returning */
2242 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2244 // nobody should be talking to us during recovery.
2245 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2248 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2249 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2252 assert(in
->is_auth());
2254 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2257 in
->mds_caps_wanted
[from
] = m
->get_caps();
2259 in
->mds_caps_wanted
.erase(from
);
2261 try_eval(in
, CEPH_CAP_LOCKS
);
2266 class C_MDL_CheckMaxSize
: public LockerContext
{
2268 uint64_t new_max_size
;
2273 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2274 uint64_t _newsize
, utime_t _mtime
) :
2275 LockerContext(l
), in(i
),
2276 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2278 in
->get(CInode::PIN_PTRWAITER
);
2280 void finish(int r
) override
{
2282 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2283 in
->put(CInode::PIN_PTRWAITER
);
2287 uint64_t Locker::calc_new_max_size(inode_t
*pi
, uint64_t size
)
2289 uint64_t new_max
= (size
+ 1) << 1;
2290 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2292 max_inc
*= pi
->layout
.object_size
;
2293 new_max
= MIN(new_max
, size
+ max_inc
);
2295 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2298 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2299 map
<client_t
,client_writeable_range_t
> *new_ranges
,
2300 bool *max_increased
)
2302 inode_t
*latest
= in
->get_projected_inode();
2304 if(latest
->has_layout()) {
2305 ms
= calc_new_max_size(latest
, size
);
2307 // Layout-less directories like ~mds0/, have zero size
2311 // increase ranges as appropriate.
2312 // shrink to 0 if no WR|BUFFER caps issued.
2313 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2314 p
!= in
->client_caps
.end();
2316 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2317 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2319 if (latest
->client_ranges
.count(p
->first
)) {
2320 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2321 if (ms
> oldr
.range
.last
)
2322 *max_increased
= true;
2323 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2324 nr
.follows
= oldr
.follows
;
2326 *max_increased
= true;
2328 nr
.follows
= in
->first
- 1;
2334 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2335 uint64_t new_max_size
, uint64_t new_size
,
2338 assert(in
->is_auth());
2339 assert(in
->is_file());
2341 inode_t
*latest
= in
->get_projected_inode();
2342 map
<client_t
, client_writeable_range_t
> new_ranges
;
2343 uint64_t size
= latest
->size
;
2344 bool update_size
= new_size
> 0;
2345 bool update_max
= false;
2346 bool max_increased
= false;
2349 new_size
= size
= MAX(size
, new_size
);
2350 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2351 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2352 update_size
= false;
2355 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2357 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2360 if (!update_size
&& !update_max
) {
2361 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2365 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2366 << " update_size " << update_size
2367 << " on " << *in
<< dendl
;
2369 if (in
->is_frozen()) {
2370 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2371 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2375 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2378 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2380 if (in
->filelock
.is_stable()) {
2381 if (in
->get_target_loner() >= 0)
2382 file_excl(&in
->filelock
);
2384 simple_lock(&in
->filelock
);
2386 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2388 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2393 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2394 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2399 MutationRef
mut(new MutationImpl());
2400 mut
->ls
= mds
->mdlog
->get_current_segment();
2402 inode_t
*pi
= in
->project_inode();
2403 pi
->version
= in
->pre_dirty();
2406 dout(10) << "check_inode_max_size client_ranges " << pi
->client_ranges
<< " -> " << new_ranges
<< dendl
;
2407 pi
->client_ranges
= new_ranges
;
2411 dout(10) << "check_inode_max_size size " << pi
->size
<< " -> " << new_size
<< dendl
;
2412 pi
->size
= new_size
;
2413 pi
->rstat
.rbytes
= new_size
;
2414 dout(10) << "check_inode_max_size mtime " << pi
->mtime
<< " -> " << new_mtime
<< dendl
;
2415 pi
->mtime
= new_mtime
;
2418 // use EOpen if the file is still open; otherwise, use EUpdate.
2419 // this is just an optimization to push open files forward into
2420 // newer log segments.
2422 EMetaBlob
*metablob
;
2423 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2424 EOpen
*eo
= new EOpen(mds
->mdlog
);
2425 eo
->add_ino(in
->ino());
2426 metablob
= &eo
->metablob
;
2428 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2430 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2431 metablob
= &eu
->metablob
;
2434 mds
->mdlog
->start_entry(le
);
2435 if (update_size
) { // FIXME if/when we do max_size nested accounting
2436 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2438 CDentry
*parent
= in
->get_projected_parent_dn();
2439 metablob
->add_primary_dentry(parent
, in
, true);
2441 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2442 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2444 mds
->mdlog
->submit_entry(le
,
2445 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2446 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2449 // make max_size _increase_ timely
2451 mds
->mdlog
->flush();
2457 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2460 * only share if currently issued a WR cap. if client doesn't have it,
2461 * file_max doesn't matter, and the client will get it if/when they get
2464 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2465 map
<client_t
, Capability
*>::iterator it
;
2467 it
= in
->client_caps
.find(only_cap
->get_client());
2469 it
= in
->client_caps
.begin();
2470 for (; it
!= in
->client_caps
.end(); ++it
) {
2471 const client_t client
= it
->first
;
2472 Capability
*cap
= it
->second
;
2473 if (cap
->is_suppress())
2475 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2476 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2477 cap
->inc_last_seq();
2478 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2480 in
->find_snaprealm()->inode
->ino(),
2481 cap
->get_cap_id(), cap
->get_last_seq(),
2482 cap
->pending(), cap
->wanted(), 0,
2484 mds
->get_osd_epoch_barrier());
2485 in
->encode_cap_message(m
, cap
);
2486 mds
->send_message_client_counted(m
, client
);
2493 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2495 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2496 * pending mutations. */
2497 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2498 in
->filelock
.is_unstable_and_locked()) ||
2499 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2500 in
->authlock
.is_unstable_and_locked()) ||
2501 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2502 in
->linklock
.is_unstable_and_locked()) ||
2503 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2504 in
->xattrlock
.is_unstable_and_locked()))
2509 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2511 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2512 dout(10) << " wanted " << ccap_string(cap
->wanted())
2513 << " -> " << ccap_string(wanted
) << dendl
;
2514 cap
->set_wanted(wanted
);
2515 } else if (wanted
& ~cap
->wanted()) {
2516 dout(10) << " wanted " << ccap_string(cap
->wanted())
2517 << " -> " << ccap_string(wanted
)
2518 << " (added caps even though we had seq mismatch!)" << dendl
;
2519 cap
->set_wanted(wanted
| cap
->wanted());
2521 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2522 << " -> " << ccap_string(wanted
)
2523 << " (issue_seq " << issue_seq
<< " != last_issue "
2524 << cap
->get_last_issue() << ")" << dendl
;
2528 CInode
*cur
= cap
->get_inode();
2529 if (!cur
->is_auth()) {
2530 request_inode_file_caps(cur
);
2534 if (cap
->wanted() == 0) {
2535 if (cur
->item_open_file
.is_on_list() &&
2536 !cur
->is_any_caps_wanted()) {
2537 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2538 cur
->item_open_file
.remove_myself();
2541 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2542 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2543 CEPH_CAP_FILE_WR
))) {
2544 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2547 if (!cur
->item_open_file
.is_on_list()) {
2548 dout(10) << " adding to open file list " << *cur
<< dendl
;
2549 assert(cur
->last
== CEPH_NOSNAP
);
2550 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2551 EOpen
*le
= new EOpen(mds
->mdlog
);
2552 mds
->mdlog
->start_entry(le
);
2553 le
->add_clean_inode(cur
);
2554 ls
->open_files
.push_back(&cur
->item_open_file
);
2555 mds
->mdlog
->submit_entry(le
);
2562 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2564 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2565 for (auto p
= head_in
->client_need_snapflush
.begin();
2566 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2567 snapid_t snapid
= p
->first
;
2568 set
<client_t
>& clients
= p
->second
;
2569 ++p
; // be careful, q loop below depends on this
2571 if (clients
.count(client
)) {
2572 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2573 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
2575 assert(sin
->first
<= snapid
);
2576 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2577 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2583 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2586 * This policy needs to be AT LEAST as permissive as allowing a client request
2587 * to go forward, or else a client request can release something, the release
2588 * gets deferred, but the request gets processed and deadlocks because when the
2589 * caps can't get revoked.
2591 * Currently, a request wait if anything locked is freezing (can't
2592 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2593 * _MUST_ be in the lock/auth_pin set.
2595 * auth_pins==0 implies no unstable lock and not auth pinnned by
2596 * client request, otherwise continue even it's freezing.
2598 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2602 * This function DOES put the passed message before returning
2604 void Locker::handle_client_caps(MClientCaps
*m
)
2606 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
2607 client_t client
= m
->get_source().num();
2609 snapid_t follows
= m
->get_snap_follows();
2610 dout(7) << "handle_client_caps "
2611 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2612 << " on " << m
->get_ino()
2613 << " tid " << m
->get_client_tid() << " follows " << follows
2614 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2616 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2618 dout(5) << " no session, dropping " << *m
<< dendl
;
2622 if (session
->is_closed() ||
2623 session
->is_closing() ||
2624 session
->is_killing()) {
2625 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2629 if (mds
->is_reconnect() &&
2630 m
->get_dirty() && m
->get_client_tid() > 0 &&
2631 !session
->have_completed_flush(m
->get_client_tid())) {
2632 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2634 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2638 if (m
->get_client_tid() > 0 && session
&&
2639 session
->have_completed_flush(m
->get_client_tid())) {
2640 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2641 << " for client." << client
<< dendl
;
2643 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2644 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2645 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2647 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2648 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2649 mds
->get_osd_epoch_barrier());
2651 ack
->set_snap_follows(follows
);
2652 ack
->set_client_tid(m
->get_client_tid());
2653 mds
->send_message_client_counted(ack
, m
->get_connection());
2654 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2658 // fall-thru because the message may release some caps
2660 m
->set_op(CEPH_CAP_OP_UPDATE
);
2664 // "oldest flush tid" > 0 means client uses unique TID for each flush
2665 if (m
->get_oldest_flush_tid() > 0 && session
) {
2666 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2667 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2669 if (session
->get_num_trim_flushes_warnings() > 0 &&
2670 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2671 session
->reset_num_trim_flushes_warnings();
2673 if (session
->get_num_completed_flushes() >=
2674 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2675 session
->inc_num_trim_flushes_warnings();
2677 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2678 << m
->get_oldest_flush_tid() << "), "
2679 << session
->get_num_completed_flushes()
2680 << " completed flushes recorded in session";
2681 mds
->clog
->warn() << ss
.str();
2682 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2687 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2689 if (mds
->is_clientreplay()) {
2690 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2691 << ", will try again after replayed client requests" << dendl
;
2692 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2695 dout(1) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2700 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2701 // Pause RADOS operations until we see the required epoch
2702 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2705 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2706 // Record the barrier so that we will retransmit it to clients
2707 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2710 dout(10) << " head inode " << *head_in
<< dendl
;
2712 Capability
*cap
= 0;
2713 cap
= head_in
->get_client_cap(client
);
2715 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
2722 if (should_defer_client_cap_frozen(head_in
)) {
2723 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
2724 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2727 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2728 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2729 << ", dropping" << dendl
;
2734 int op
= m
->get_op();
2737 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2738 if (!head_in
->is_auth()) {
2739 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
2743 SnapRealm
*realm
= head_in
->find_snaprealm();
2744 snapid_t snap
= realm
->get_snap_following(follows
);
2745 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2747 CInode
*in
= head_in
;
2748 if (snap
!= CEPH_NOSNAP
) {
2749 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
2751 dout(10) << " snapped inode " << *in
<< dendl
;
2754 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2755 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2756 // case we get a dup response, so whatever.)
2757 MClientCaps
*ack
= 0;
2758 if (m
->get_dirty()) {
2759 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2760 ack
->set_snap_follows(follows
);
2761 ack
->set_client_tid(m
->get_client_tid());
2762 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2765 if (in
== head_in
||
2766 (head_in
->client_need_snapflush
.count(snap
) &&
2767 head_in
->client_need_snapflush
[snap
].count(client
))) {
2768 dout(7) << " flushsnap snap " << snap
2769 << " client." << client
<< " on " << *in
<< dendl
;
2771 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2773 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2774 else if (head_in
->client_need_snapflush
.begin()->first
< snap
)
2775 _do_null_snapflush(head_in
, client
, snap
);
2777 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2780 head_in
->remove_need_snapflush(in
, snap
, client
);
2782 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2784 mds
->send_message_client_counted(ack
, m
->get_connection());
2789 if (cap
->get_cap_id() != m
->get_cap_id()) {
2790 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2792 CInode
*in
= head_in
;
2794 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2795 // intermediate snap inodes
2796 while (in
!= head_in
) {
2797 assert(in
->last
!= CEPH_NOSNAP
);
2798 if (in
->is_auth() && m
->get_dirty()) {
2799 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2800 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2802 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2806 // head inode, and cap
2807 MClientCaps
*ack
= 0;
2809 int caps
= m
->get_caps();
2810 if (caps
& ~cap
->issued()) {
2811 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2812 caps
&= cap
->issued();
2815 cap
->confirm_receipt(m
->get_seq(), caps
);
2816 dout(10) << " follows " << follows
2817 << " retains " << ccap_string(m
->get_caps())
2818 << " dirty " << ccap_string(m
->get_dirty())
2819 << " on " << *in
<< dendl
;
2822 // missing/skipped snapflush?
2823 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2824 // presently only does so when it has actual dirty metadata. But, we
2825 // set up the need_snapflush stuff based on the issued caps.
2826 // We can infer that the client WONT send a FLUSHSNAP once they have
2827 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2829 if (!head_in
->client_need_snapflush
.empty()) {
2830 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2831 _do_null_snapflush(head_in
, client
);
2833 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2837 if (m
->get_dirty() && in
->is_auth()) {
2838 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2839 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2840 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2841 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2842 ack
->set_client_tid(m
->get_client_tid());
2843 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2846 // filter wanted based on what we could ever give out (given auth/replica status)
2847 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2848 int new_wanted
= m
->get_wanted() & head_in
->get_caps_allowed_ever();
2849 if (new_wanted
!= cap
->wanted()) {
2850 if (!need_flush
&& (new_wanted
& ~cap
->pending())) {
2851 // exapnding caps. make sure we aren't waiting for a log flush
2852 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2855 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2858 if (in
->is_auth() &&
2859 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2861 eval(in
, CEPH_CAP_LOCKS
);
2863 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2864 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2866 // no update, ack now.
2868 mds
->send_message_client_counted(ack
, m
->get_connection());
2870 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2871 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2872 issue_caps(in
, cap
);
2874 if (cap
->get_last_seq() == 0 &&
2875 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2876 cap
->issue_norevoke(cap
->issued());
2877 share_inode_max_size(in
, cap
);
2882 mds
->mdlog
->flush();
2890 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2892 ceph_mds_request_release item
;
2894 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2895 LockerContext(l
), client(c
), item(it
) { }
2896 void finish(int r
) override
{
2898 MDRequestRef null_ref
;
2899 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2903 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2904 const string
&dname
)
2906 inodeno_t ino
= (uint64_t)item
.ino
;
2907 uint64_t cap_id
= item
.cap_id
;
2908 int caps
= item
.caps
;
2909 int wanted
= item
.wanted
;
2911 int issue_seq
= item
.issue_seq
;
2912 int mseq
= item
.mseq
;
2914 CInode
*in
= mdcache
->get_inode(ino
);
2918 if (dname
.length()) {
2919 frag_t fg
= in
->pick_dirfrag(dname
);
2920 CDir
*dir
= in
->get_dirfrag(fg
);
2922 CDentry
*dn
= dir
->lookup(dname
);
2924 ClientLease
*l
= dn
->get_client_lease(client
);
2926 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2927 dn
->remove_client_lease(l
, this);
2929 dout(7) << "process_cap_release client." << client
2930 << " doesn't have lease on " << *dn
<< dendl
;
2933 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2934 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2939 Capability
*cap
= in
->get_client_cap(client
);
2943 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2944 << (mdr
? "" : " (DEFERRED, no mdr)")
2947 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2948 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2952 if (cap
->get_cap_id() != cap_id
) {
2953 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2957 if (should_defer_client_cap_frozen(in
)) {
2958 dout(7) << " frozen, deferring" << dendl
;
2959 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2963 if (caps
& ~cap
->issued()) {
2964 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2965 caps
&= cap
->issued();
2967 cap
->confirm_receipt(seq
, caps
);
2969 if (!in
->client_need_snapflush
.empty() &&
2970 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2971 _do_null_snapflush(in
, client
);
2974 adjust_cap_wanted(cap
, wanted
, issue_seq
);
2977 cap
->inc_suppress();
2978 eval(in
, CEPH_CAP_LOCKS
);
2980 cap
->dec_suppress();
2982 // take note; we may need to reissue on this cap later
2984 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
2987 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
2992 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
2993 LockerContext(l
), in(i
), client(c
), seq(s
) {
2994 in
->get(CInode::PIN_PTRWAITER
);
2996 void finish(int r
) override
{
2997 locker
->kick_issue_caps(in
, client
, seq
);
2998 in
->put(CInode::PIN_PTRWAITER
);
3002 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3004 Capability
*cap
= in
->get_client_cap(client
);
3005 if (!cap
|| cap
->get_last_sent() != seq
)
3007 if (in
->is_frozen()) {
3008 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3009 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3010 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3013 dout(10) << "kick_issue_caps released at current seq " << seq
3014 << ", reissuing" << dendl
;
3015 issue_caps(in
, cap
);
3018 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3020 client_t client
= mdr
->get_client();
3021 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3022 p
!= mdr
->cap_releases
.end();
3024 CInode
*in
= mdcache
->get_inode(p
->first
);
3027 kick_issue_caps(in
, client
, p
->second
);
3032 * m and ack might be NULL, so don't dereference them unless dirty != 0
3034 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3036 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3037 << " follows " << follows
<< " snap " << snap
3038 << " on " << *in
<< dendl
;
3040 if (snap
== CEPH_NOSNAP
) {
3041 // hmm, i guess snap was already deleted? just ack!
3042 dout(10) << " wow, the snap following " << follows
3043 << " was already deleted. nothing to record, just ack." << dendl
;
3045 mds
->send_message_client_counted(ack
, m
->get_connection());
3049 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3050 mds
->mdlog
->start_entry(le
);
3051 MutationRef mut
= new MutationImpl();
3052 mut
->ls
= mds
->mdlog
->get_current_segment();
3054 // normal metadata updates that we can apply to the head as well.
3057 bool xattrs
= false;
3058 map
<string
,bufferptr
> *px
= 0;
3059 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3060 m
->xattrbl
.length() &&
3061 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3064 old_inode_t
*oi
= 0;
3065 if (in
->is_multiversion()) {
3066 oi
= in
->pick_old_inode(snap
);
3071 dout(10) << " writing into old inode" << dendl
;
3072 pi
= in
->project_inode();
3073 pi
->version
= in
->pre_dirty();
3074 if (snap
> oi
->first
)
3075 in
->split_old_inode(snap
);
3081 px
= new map
<string
,bufferptr
>;
3082 pi
= in
->project_inode(px
);
3083 pi
->version
= in
->pre_dirty();
3086 _update_cap_fields(in
, dirty
, m
, pi
);
3090 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
3091 << " len " << m
->xattrbl
.length() << dendl
;
3092 pi
->xattr_version
= m
->head
.xattr_version
;
3093 bufferlist::iterator p
= m
->xattrbl
.begin();
3097 if (pi
->client_ranges
.count(client
)) {
3098 if (in
->last
== snap
) {
3099 dout(10) << " removing client_range entirely" << dendl
;
3100 pi
->client_ranges
.erase(client
);
3102 dout(10) << " client_range now follows " << snap
<< dendl
;
3103 pi
->client_ranges
[client
].follows
= snap
;
3108 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3109 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3111 // "oldest flush tid" > 0 means client uses unique TID for each flush
3112 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3113 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3114 ack
->get_oldest_flush_tid());
3116 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3120 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, inode_t
*pi
)
3125 /* m must be valid if there are dirty caps */
3127 uint64_t features
= m
->get_connection()->get_features();
3129 if (m
->get_ctime() > pi
->ctime
) {
3130 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3131 << " for " << *in
<< dendl
;
3132 pi
->ctime
= m
->get_ctime();
3135 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3136 m
->get_change_attr() > pi
->change_attr
) {
3137 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3138 << " for " << *in
<< dendl
;
3139 pi
->change_attr
= m
->get_change_attr();
3143 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3144 utime_t atime
= m
->get_atime();
3145 utime_t mtime
= m
->get_mtime();
3146 uint64_t size
= m
->get_size();
3147 version_t inline_version
= m
->inline_version
;
3149 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3150 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3151 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3152 << " for " << *in
<< dendl
;
3155 if (in
->inode
.is_file() && // ONLY if regular file
3157 dout(7) << " size " << pi
->size
<< " -> " << size
3158 << " for " << *in
<< dendl
;
3160 pi
->rstat
.rbytes
= size
;
3162 if (in
->inode
.is_file() &&
3163 (dirty
& CEPH_CAP_FILE_WR
) &&
3164 inline_version
> pi
->inline_data
.version
) {
3165 pi
->inline_data
.version
= inline_version
;
3166 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3167 pi
->inline_data
.get_data() = m
->inline_data
;
3169 pi
->inline_data
.free_data();
3171 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3172 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3173 << " for " << *in
<< dendl
;
3176 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3177 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3178 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3179 << " for " << *in
<< dendl
;
3180 pi
->time_warp_seq
= m
->get_time_warp_seq();
3184 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3185 if (m
->head
.uid
!= pi
->uid
) {
3186 dout(7) << " uid " << pi
->uid
3187 << " -> " << m
->head
.uid
3188 << " for " << *in
<< dendl
;
3189 pi
->uid
= m
->head
.uid
;
3191 if (m
->head
.gid
!= pi
->gid
) {
3192 dout(7) << " gid " << pi
->gid
3193 << " -> " << m
->head
.gid
3194 << " for " << *in
<< dendl
;
3195 pi
->gid
= m
->head
.gid
;
3197 if (m
->head
.mode
!= pi
->mode
) {
3198 dout(7) << " mode " << oct
<< pi
->mode
3199 << " -> " << m
->head
.mode
<< dec
3200 << " for " << *in
<< dendl
;
3201 pi
->mode
= m
->head
.mode
;
3203 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3204 dout(7) << " btime " << oct
<< pi
->btime
3205 << " -> " << m
->get_btime() << dec
3206 << " for " << *in
<< dendl
;
3207 pi
->btime
= m
->get_btime();
3213 * update inode based on cap flush|flushsnap|wanted.
3214 * adjust max_size, if needed.
3215 * if we update, return true; otherwise, false (no updated needed).
3217 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3218 int dirty
, snapid_t follows
,
3219 MClientCaps
*m
, MClientCaps
*ack
,
3222 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3223 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3224 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3225 << " on " << *in
<< dendl
;
3226 assert(in
->is_auth());
3227 client_t client
= m
->get_source().num();
3228 inode_t
*latest
= in
->get_projected_inode();
3230 // increase or zero max_size?
3231 uint64_t size
= m
->get_size();
3232 bool change_max
= false;
3233 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3234 uint64_t new_max
= old_max
;
3236 if (in
->is_file()) {
3237 bool forced_change_max
= false;
3238 dout(20) << "inode is file" << dendl
;
3239 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3240 dout(20) << "client has write caps; m->get_max_size="
3241 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3242 if (m
->get_max_size() > new_max
) {
3243 dout(10) << "client requests file_max " << m
->get_max_size()
3244 << " > max " << old_max
<< dendl
;
3246 forced_change_max
= true;
3247 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3249 new_max
= calc_new_max_size(latest
, size
);
3251 if (new_max
> old_max
)
3263 if (in
->last
== CEPH_NOSNAP
&&
3265 !in
->filelock
.can_wrlock(client
) &&
3266 !in
->filelock
.can_force_wrlock(client
)) {
3267 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3268 if (in
->filelock
.is_stable()) {
3269 bool need_issue
= false;
3271 cap
->inc_suppress();
3272 if (in
->mds_caps_wanted
.empty() &&
3273 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3274 if (in
->filelock
.get_state() != LOCK_EXCL
)
3275 file_excl(&in
->filelock
, &need_issue
);
3277 simple_lock(&in
->filelock
, &need_issue
);
3281 cap
->dec_suppress();
3283 if (!in
->filelock
.can_wrlock(client
) &&
3284 !in
->filelock
.can_force_wrlock(client
)) {
3285 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3286 forced_change_max
? new_max
: 0,
3289 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3295 if (m
->flockbl
.length()) {
3297 bufferlist::iterator bli
= m
->flockbl
.begin();
3298 ::decode(num_locks
, bli
);
3299 for ( int i
=0; i
< num_locks
; ++i
) {
3300 ceph_filelock decoded_lock
;
3301 ::decode(decoded_lock
, bli
);
3302 in
->get_fcntl_lock_state()->held_locks
.
3303 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3304 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3306 ::decode(num_locks
, bli
);
3307 for ( int i
=0; i
< num_locks
; ++i
) {
3308 ceph_filelock decoded_lock
;
3309 ::decode(decoded_lock
, bli
);
3310 in
->get_flock_lock_state()->held_locks
.
3311 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3312 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3316 if (!dirty
&& !change_max
)
3319 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3320 if (session
->check_access(in
, MAY_WRITE
,
3321 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3323 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3329 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3330 mds
->mdlog
->start_entry(le
);
3333 map
<string
,bufferptr
> *px
= 0;
3334 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3335 m
->xattrbl
.length() &&
3336 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3337 px
= new map
<string
,bufferptr
>;
3339 inode_t
*pi
= in
->project_inode(px
);
3340 pi
->version
= in
->pre_dirty();
3342 MutationRef
mut(new MutationImpl());
3343 mut
->ls
= mds
->mdlog
->get_current_segment();
3345 _update_cap_fields(in
, dirty
, m
, pi
);
3348 dout(7) << " max_size " << old_max
<< " -> " << new_max
3349 << " for " << *in
<< dendl
;
3351 pi
->client_ranges
[client
].range
.first
= 0;
3352 pi
->client_ranges
[client
].range
.last
= new_max
;
3353 pi
->client_ranges
[client
].follows
= in
->first
- 1;
3355 pi
->client_ranges
.erase(client
);
3358 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3359 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3362 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3363 wrlock_force(&in
->authlock
, mut
);
3367 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3368 pi
->xattr_version
= m
->head
.xattr_version
;
3369 bufferlist::iterator p
= m
->xattrbl
.begin();
3372 wrlock_force(&in
->xattrlock
, mut
);
3376 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3377 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3379 // "oldest flush tid" > 0 means client uses unique TID for each flush
3380 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3381 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3382 ack
->get_oldest_flush_tid());
3384 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3387 if (need_flush
&& !*need_flush
&&
3388 ((change_max
&& new_max
) || // max INCREASE
3389 _need_flush_mdlog(in
, dirty
)))
3395 /* This function DOES put the passed message before returning */
3396 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3398 client_t client
= m
->get_source().num();
3399 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3401 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3402 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3406 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3407 // Pause RADOS operations until we see the required epoch
3408 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3411 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3412 // Record the barrier so that we will retransmit it to clients
3413 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3416 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3418 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3419 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3423 session
->notify_cap_release(m
->caps
.size());
3429 class C_Locker_RetryCapRelease
: public LockerContext
{
3433 ceph_seq_t migrate_seq
;
3434 ceph_seq_t issue_seq
;
3436 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3437 ceph_seq_t mseq
, ceph_seq_t seq
) :
3438 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3439 void finish(int r
) override
{
3440 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3444 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3445 ceph_seq_t mseq
, ceph_seq_t seq
)
3447 CInode
*in
= mdcache
->get_inode(ino
);
3449 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3452 Capability
*cap
= in
->get_client_cap(client
);
3454 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3458 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3459 if (cap
->get_cap_id() != cap_id
) {
3460 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3463 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3464 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3467 if (should_defer_client_cap_frozen(in
)) {
3468 dout(7) << " freezing|frozen, deferring" << dendl
;
3469 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3470 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3473 if (seq
!= cap
->get_last_issue()) {
3474 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3475 // clean out any old revoke history
3476 cap
->clean_revoke_from(seq
);
3477 eval_cap_gather(in
);
3480 remove_client_cap(in
, client
);
3483 /* This function DOES put the passed message before returning */
3485 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3487 // clean out any pending snapflush state
3488 if (!in
->client_need_snapflush
.empty())
3489 _do_null_snapflush(in
, client
);
3491 in
->remove_client_cap(client
);
3493 if (in
->is_auth()) {
3494 // make sure we clear out the client byte range
3495 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3496 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3497 check_inode_max_size(in
);
3499 request_inode_file_caps(in
);
3502 try_eval(in
, CEPH_CAP_LOCKS
);
3507 * Return true if any currently revoking caps exceed the
3508 * mds_session_timeout threshold.
3510 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
) const
3512 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3514 // No revoking caps at the moment
3517 utime_t now
= ceph_clock_now();
3518 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3519 if (age
<= g_conf
->mds_session_timeout
) {
3528 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
) const
3530 if (!any_late_revoking_caps(revoking_caps
)) {
3531 // Fast path: no misbehaving clients, execute in O(1)
3535 // Slow path: execute in O(N_clients)
3536 std::map
<client_t
, xlist
<Capability
*> >::const_iterator client_rc_iter
;
3537 for (client_rc_iter
= revoking_caps_by_client
.begin();
3538 client_rc_iter
!= revoking_caps_by_client
.end(); ++client_rc_iter
) {
3539 xlist
<Capability
*> const &client_rc
= client_rc_iter
->second
;
3540 bool any_late
= any_late_revoking_caps(client_rc
);
3542 result
->push_back(client_rc_iter
->first
);
3547 // Hard-code instead of surfacing a config settings because this is
3548 // really a hack that should go away at some point when we have better
3549 // inspection tools for getting at detailed cap state (#7316)
3550 #define MAX_WARN_CAPS 100
3552 void Locker::caps_tick()
3554 utime_t now
= ceph_clock_now();
3556 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3559 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3560 Capability
*cap
= *p
;
3562 utime_t age
= now
- cap
->get_last_revoke_stamp();
3563 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3564 if (age
<= g_conf
->mds_session_timeout
) {
3565 dout(20) << __func__
<< " age below timeout " << g_conf
->mds_session_timeout
<< dendl
;
3569 if (i
> MAX_WARN_CAPS
) {
3570 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3571 << "revoking, ignoring subsequent caps" << dendl
;
3575 // exponential backoff of warning intervals
3576 if (age
> g_conf
->mds_session_timeout
* (1 << cap
->get_num_revoke_warnings())) {
3577 cap
->inc_num_revoke_warnings();
3579 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3580 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3581 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3582 mds
->clog
->warn() << ss
.str();
3583 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3585 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3591 void Locker::handle_client_lease(MClientLease
*m
)
3593 dout(10) << "handle_client_lease " << *m
<< dendl
;
3595 assert(m
->get_source().is_client());
3596 client_t client
= m
->get_source().num();
3598 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3600 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3606 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3607 CDir
*dir
= in
->get_dirfrag(fg
);
3609 dn
= dir
->lookup(m
->dname
);
3611 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3615 dout(10) << " on " << *dn
<< dendl
;
3618 ClientLease
*l
= dn
->get_client_lease(client
);
3620 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3625 switch (m
->get_action()) {
3626 case CEPH_MDS_LEASE_REVOKE_ACK
:
3627 case CEPH_MDS_LEASE_RELEASE
:
3628 if (l
->seq
!= m
->get_seq()) {
3629 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3631 dout(7) << "handle_client_lease client." << client
3632 << " on " << *dn
<< dendl
;
3633 dn
->remove_client_lease(l
, this);
3638 case CEPH_MDS_LEASE_RENEW
:
3640 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3641 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3642 if (dn
->lock
.can_lease(client
)) {
3643 int pool
= 1; // fixme.. do something smart!
3644 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3645 m
->h
.seq
= ++l
->seq
;
3648 utime_t now
= ceph_clock_now();
3649 now
+= mdcache
->client_lease_durations
[pool
];
3650 mdcache
->touch_client_lease(l
, pool
, now
);
3652 mds
->send_message_client_counted(m
, m
->get_connection());
3658 ceph_abort(); // implement me
3664 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3665 bufferlist
&bl
, utime_t now
, Session
*session
)
3667 CInode
*diri
= dn
->get_dir()->get_inode();
3668 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3669 ((!diri
->filelock
.can_lease(client
) &&
3670 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3671 dn
->lock
.can_lease(client
)) {
3672 int pool
= 1; // fixme.. do something smart!
3673 // issue a dentry lease
3674 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3675 session
->touch_lease(l
);
3677 now
+= mdcache
->client_lease_durations
[pool
];
3678 mdcache
->touch_client_lease(l
, pool
, now
);
3681 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3683 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3685 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3686 << " on " << *dn
<< dendl
;
3694 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3699 void Locker::revoke_client_leases(SimpleLock
*lock
)
3702 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3703 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3704 p
!= dn
->client_lease_map
.end();
3706 ClientLease
*l
= p
->second
;
3709 assert(lock
->get_type() == CEPH_LOCK_DN
);
3711 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3712 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3714 // i should also revoke the dir ICONTENT lease, if they have it!
3715 CInode
*diri
= dn
->get_dir()->get_inode();
3716 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3719 diri
->first
, CEPH_NOSNAP
,
3727 // locks ----------------------------------------------------------------
3729 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3731 switch (lock_type
) {
3734 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3735 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3740 fg
= diri
->pick_dirfrag(info
.dname
);
3741 dir
= diri
->get_dirfrag(fg
);
3743 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3746 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3752 case CEPH_LOCK_IAUTH
:
3753 case CEPH_LOCK_ILINK
:
3754 case CEPH_LOCK_IDFT
:
3755 case CEPH_LOCK_IFILE
:
3756 case CEPH_LOCK_INEST
:
3757 case CEPH_LOCK_IXATTR
:
3758 case CEPH_LOCK_ISNAP
:
3759 case CEPH_LOCK_IFLOCK
:
3760 case CEPH_LOCK_IPOLICY
:
3762 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3764 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3767 switch (lock_type
) {
3768 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3769 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3770 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3771 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3772 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3773 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3774 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3775 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3776 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3781 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3789 /* This function DOES put the passed message before returning */
3790 void Locker::handle_lock(MLock
*m
)
3792 // nobody should be talking to us during recovery.
3793 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3795 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3797 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3802 switch (lock
->get_type()) {
3804 case CEPH_LOCK_IAUTH
:
3805 case CEPH_LOCK_ILINK
:
3806 case CEPH_LOCK_ISNAP
:
3807 case CEPH_LOCK_IXATTR
:
3808 case CEPH_LOCK_IFLOCK
:
3809 case CEPH_LOCK_IPOLICY
:
3810 handle_simple_lock(lock
, m
);
3813 case CEPH_LOCK_IDFT
:
3814 case CEPH_LOCK_INEST
:
3815 //handle_scatter_lock((ScatterLock*)lock, m);
3818 case CEPH_LOCK_IFILE
:
3819 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3823 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3833 // ==========================================================================
3836 /** This function may take a reference to m if it needs one, but does
3837 * not put references. */
3838 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3840 MDSCacheObject
*parent
= lock
->get_parent();
3841 if (parent
->is_auth() &&
3842 lock
->get_state() != LOCK_SYNC
&&
3843 !parent
->is_frozen()) {
3844 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3845 << " on " << *parent
<< dendl
;
3846 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3847 if (lock
->is_stable()) {
3850 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3851 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3852 new C_MDS_RetryMessage(mds
, m
->get()));
3855 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3856 << " on " << *parent
<< dendl
;
3857 // replica should retry
3861 /* This function DOES put the passed message before returning */
3862 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3864 int from
= m
->get_asker();
3866 dout(10) << "handle_simple_lock " << *m
3867 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3869 if (mds
->is_rejoin()) {
3870 if (lock
->get_parent()->is_rejoining()) {
3871 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3872 << ", dropping " << *m
<< dendl
;
3878 switch (m
->get_action()) {
3881 assert(lock
->get_state() == LOCK_LOCK
);
3882 lock
->decode_locked_state(m
->get_data());
3883 lock
->set_state(LOCK_SYNC
);
3884 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3888 assert(lock
->get_state() == LOCK_SYNC
);
3889 lock
->set_state(LOCK_SYNC_LOCK
);
3890 if (lock
->is_leased())
3891 revoke_client_leases(lock
);
3892 eval_gather(lock
, true);
3893 if (lock
->is_unstable_and_locked())
3894 mds
->mdlog
->flush();
3899 case LOCK_AC_LOCKACK
:
3900 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3901 lock
->get_state() == LOCK_SYNC_EXCL
);
3902 assert(lock
->is_gathering(from
));
3903 lock
->remove_gather(from
);
3905 if (lock
->is_gathering()) {
3906 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3907 << ", still gathering " << lock
->get_gather_set() << dendl
;
3909 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3910 << ", last one" << dendl
;
3915 case LOCK_AC_REQRDLOCK
:
3916 handle_reqrdlock(lock
, m
);
3924 /* unused, currently.
3926 class C_Locker_SimpleEval : public Context {
3930 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3931 void finish(int r) {
3932 locker->try_simple_eval(lock);
3936 void Locker::try_simple_eval(SimpleLock *lock)
3938 // unstable and ambiguous auth?
3939 if (!lock->is_stable() &&
3940 lock->get_parent()->is_ambiguous_auth()) {
3941 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3942 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3943 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3947 if (!lock->get_parent()->is_auth()) {
3948 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3952 if (!lock->get_parent()->can_auth_pin()) {
3953 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3954 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3955 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3959 if (lock->is_stable())
3965 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3967 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3969 assert(lock
->get_parent()->is_auth());
3970 assert(lock
->is_stable());
3972 if (lock
->get_parent()->is_freezing_or_frozen()) {
3973 // dentry lock in unreadable state can block path traverse
3974 if ((lock
->get_type() != CEPH_LOCK_DN
||
3975 lock
->get_state() == LOCK_SYNC
||
3976 lock
->get_parent()->is_frozen()))
3980 if (mdcache
->is_readonly()) {
3981 if (lock
->get_state() != LOCK_SYNC
) {
3982 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3983 simple_sync(lock
, need_issue
);
3990 if (lock
->get_type() != CEPH_LOCK_DN
) {
3991 in
= static_cast<CInode
*>(lock
->get_parent());
3992 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
3996 if (lock
->get_state() != LOCK_EXCL
&&
3997 in
&& in
->get_target_loner() >= 0 &&
3998 (wanted
& CEPH_CAP_GEXCL
)) {
3999 dout(7) << "simple_eval stable, going to excl " << *lock
4000 << " on " << *lock
->get_parent() << dendl
;
4001 simple_excl(lock
, need_issue
);
4005 else if (lock
->get_state() != LOCK_SYNC
&&
4006 !lock
->is_wrlocked() &&
4007 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4008 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4009 dout(7) << "simple_eval stable, syncing " << *lock
4010 << " on " << *lock
->get_parent() << dendl
;
4011 simple_sync(lock
, need_issue
);
4018 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4020 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4021 assert(lock
->get_parent()->is_auth());
4022 assert(lock
->is_stable());
4025 if (lock
->get_cap_shift())
4026 in
= static_cast<CInode
*>(lock
->get_parent());
4028 int old_state
= lock
->get_state();
4030 if (old_state
!= LOCK_TSYN
) {
4032 switch (lock
->get_state()) {
4033 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4034 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4035 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4036 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4037 default: ceph_abort();
4041 if (lock
->is_wrlocked())
4044 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4045 send_lock_message(lock
, LOCK_AC_SYNC
);
4046 lock
->init_gather();
4050 if (in
&& in
->is_head()) {
4051 if (in
->issued_caps_need_gather(lock
)) {
4060 bool need_recover
= false;
4061 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4063 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4064 mds
->mdcache
->queue_file_recover(in
);
4065 need_recover
= true;
4070 if (!gather
&& lock
->is_dirty()) {
4071 lock
->get_parent()->auth_pin(lock
);
4072 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4073 mds
->mdlog
->flush();
4078 lock
->get_parent()->auth_pin(lock
);
4080 mds
->mdcache
->do_file_recover();
4085 if (lock
->get_parent()->is_replicated()) { // FIXME
4087 lock
->encode_locked_state(data
);
4088 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4090 lock
->set_state(LOCK_SYNC
);
4091 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4092 if (in
&& in
->is_head()) {
4101 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4103 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4104 assert(lock
->get_parent()->is_auth());
4105 assert(lock
->is_stable());
4108 if (lock
->get_cap_shift())
4109 in
= static_cast<CInode
*>(lock
->get_parent());
4111 switch (lock
->get_state()) {
4112 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4113 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4114 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4115 default: ceph_abort();
4119 if (lock
->is_rdlocked())
4121 if (lock
->is_wrlocked())
4124 if (lock
->get_parent()->is_replicated() &&
4125 lock
->get_state() != LOCK_LOCK_EXCL
&&
4126 lock
->get_state() != LOCK_XSYN_EXCL
) {
4127 send_lock_message(lock
, LOCK_AC_LOCK
);
4128 lock
->init_gather();
4132 if (in
&& in
->is_head()) {
4133 if (in
->issued_caps_need_gather(lock
)) {
4143 lock
->get_parent()->auth_pin(lock
);
4145 lock
->set_state(LOCK_EXCL
);
4146 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4156 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4158 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4159 assert(lock
->get_parent()->is_auth());
4160 assert(lock
->is_stable());
4161 assert(lock
->get_state() != LOCK_LOCK
);
4164 if (lock
->get_cap_shift())
4165 in
= static_cast<CInode
*>(lock
->get_parent());
4167 int old_state
= lock
->get_state();
4169 switch (lock
->get_state()) {
4170 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4171 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4172 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4173 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4174 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4176 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4177 default: ceph_abort();
4181 if (lock
->is_leased()) {
4183 revoke_client_leases(lock
);
4185 if (lock
->is_rdlocked())
4187 if (in
&& in
->is_head()) {
4188 if (in
->issued_caps_need_gather(lock
)) {
4197 bool need_recover
= false;
4198 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4200 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4201 mds
->mdcache
->queue_file_recover(in
);
4202 need_recover
= true;
4207 if (lock
->get_parent()->is_replicated() &&
4208 lock
->get_state() == LOCK_MIX_LOCK
&&
4210 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4212 // move to second stage of gather now, so we don't send the lock action later.
4213 if (lock
->get_state() == LOCK_MIX_LOCK
)
4214 lock
->set_state(LOCK_MIX_LOCK2
);
4216 if (lock
->get_parent()->is_replicated() &&
4217 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4219 send_lock_message(lock
, LOCK_AC_LOCK
);
4220 lock
->init_gather();
4224 if (!gather
&& lock
->is_dirty()) {
4225 lock
->get_parent()->auth_pin(lock
);
4226 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4227 mds
->mdlog
->flush();
4232 lock
->get_parent()->auth_pin(lock
);
4234 mds
->mdcache
->do_file_recover();
4236 lock
->set_state(LOCK_LOCK
);
4237 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4242 void Locker::simple_xlock(SimpleLock
*lock
)
4244 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4245 assert(lock
->get_parent()->is_auth());
4246 //assert(lock->is_stable());
4247 assert(lock
->get_state() != LOCK_XLOCK
);
4250 if (lock
->get_cap_shift())
4251 in
= static_cast<CInode
*>(lock
->get_parent());
4253 if (lock
->is_stable())
4254 lock
->get_parent()->auth_pin(lock
);
4256 switch (lock
->get_state()) {
4258 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4259 default: ceph_abort();
4263 if (lock
->is_rdlocked())
4265 if (lock
->is_wrlocked())
4268 if (in
&& in
->is_head()) {
4269 if (in
->issued_caps_need_gather(lock
)) {
4276 lock
->set_state(LOCK_PREXLOCK
);
4277 //assert("shouldn't be called if we are already xlockable" == 0);
4285 // ==========================================================================
4290 Some notes on scatterlocks.
4292 - The scatter/gather is driven by the inode lock. The scatter always
4293 brings in the latest metadata from the fragments.
4295 - When in a scattered/MIX state, fragments are only allowed to
4296 update/be written to if the accounted stat matches the inode's
4299 - That means, on gather, we _only_ assimilate diffs for frag metadata
4300 that match the current version, because those are the only ones
4301 written during this scatter/gather cycle. (Others didn't permit
4302 it.) We increment the version and journal this to disk.
4304 - When possible, we also simultaneously update our local frag
4305 accounted stats to match.
4307 - On scatter, the new inode info is broadcast to frags, both local
4308 and remote. If possible (auth and !frozen), the dirfrag auth
4309 should update the accounted state (if it isn't already up to date).
4310 Note that this may occur on both the local inode auth node and
4311 inode replicas, so there are two potential paths. If it is NOT
4312 possible, they need to mark_stale to prevent any possible writes.
4314 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4315 only). Both are opportunities to update the frag accounted stats,
4316 even though only the MIX case is affected by a stale dirfrag.
4318 - Because many scatter/gather cycles can potentially go by without a
4319 frag being able to update its accounted stats (due to being frozen
4320 by exports/refragments in progress), the frag may have (even very)
4321 old stat versions. That's fine. If when we do want to update it,
4322 we can update accounted_* and the version first.
4326 class C_Locker_ScatterWB
: public LockerLogContext
{
4330 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4331 LockerLogContext(l
), lock(sl
), mut(m
) {}
4332 void finish(int r
) override
{
4333 locker
->scatter_writebehind_finish(lock
, mut
);
4337 void Locker::scatter_writebehind(ScatterLock
*lock
)
4339 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4340 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4343 MutationRef
mut(new MutationImpl());
4344 mut
->ls
= mds
->mdlog
->get_current_segment();
4346 // forcefully take a wrlock
4347 lock
->get_wrlock(true);
4348 mut
->wrlocks
.insert(lock
);
4349 mut
->locks
.insert(lock
);
4351 in
->pre_cow_old_inode(); // avoid cow mayhem
4353 inode_t
*pi
= in
->project_inode();
4354 pi
->version
= in
->pre_dirty();
4356 in
->finish_scatter_gather_update(lock
->get_type());
4357 lock
->start_flush();
4359 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4360 mds
->mdlog
->start_entry(le
);
4362 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4363 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4365 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4367 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4370 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4372 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4373 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4374 in
->pop_and_dirty_projected_inode(mut
->ls
);
4376 lock
->finish_flush();
4378 // if replicas may have flushed in a mix->lock state, send another
4379 // message so they can finish_flush().
4380 if (in
->is_replicated()) {
4381 switch (lock
->get_state()) {
4383 case LOCK_MIX_LOCK2
:
4386 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4391 drop_locks(mut
.get());
4394 if (lock
->is_stable())
4395 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4397 //scatter_eval_gather(lock);
4400 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4402 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4404 assert(lock
->get_parent()->is_auth());
4405 assert(lock
->is_stable());
4407 if (lock
->get_parent()->is_freezing_or_frozen()) {
4408 dout(20) << " freezing|frozen" << dendl
;
4412 if (mdcache
->is_readonly()) {
4413 if (lock
->get_state() != LOCK_SYNC
) {
4414 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4415 simple_sync(lock
, need_issue
);
4420 if (!lock
->is_rdlocked() &&
4421 lock
->get_state() != LOCK_MIX
&&
4422 lock
->get_scatter_wanted()) {
4423 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4424 << " on " << *lock
->get_parent() << dendl
;
4425 scatter_mix(lock
, need_issue
);
4429 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4430 // in general, we want to keep INEST writable at all times.
4431 if (!lock
->is_rdlocked()) {
4432 if (lock
->get_parent()->is_replicated()) {
4433 if (lock
->get_state() != LOCK_MIX
)
4434 scatter_mix(lock
, need_issue
);
4436 if (lock
->get_state() != LOCK_LOCK
)
4437 simple_lock(lock
, need_issue
);
4443 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4444 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4445 // i _should_ be sync.
4446 if (!lock
->is_wrlocked() &&
4447 lock
->get_state() != LOCK_SYNC
) {
4448 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4449 simple_sync(lock
, need_issue
);
4456 * mark a scatterlock to indicate that the dir fnode has some dirty data
4458 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4461 if (lock
->get_updated_item()->is_on_list()) {
4462 dout(10) << "mark_updated_scatterlock " << *lock
4463 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4465 updated_scatterlocks
.push_back(lock
->get_updated_item());
4466 utime_t now
= ceph_clock_now();
4467 lock
->set_update_stamp(now
);
4468 dout(10) << "mark_updated_scatterlock " << *lock
4469 << " - added at " << now
<< dendl
;
4474 * this is called by scatter_tick and LogSegment::try_to_trim() when
4475 * trying to flush dirty scattered data (i.e. updated fnode) back to
4478 * we need to lock|scatter in order to push fnode changes into the
4481 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4483 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4485 if (p
->is_frozen() || p
->is_freezing()) {
4486 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4488 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4489 else if (lock
->is_dirty())
4490 // just requeue. not ideal.. starvation prone..
4491 updated_scatterlocks
.push_back(lock
->get_updated_item());
4495 if (p
->is_ambiguous_auth()) {
4496 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4498 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4499 else if (lock
->is_dirty())
4500 // just requeue. not ideal.. starvation prone..
4501 updated_scatterlocks
.push_back(lock
->get_updated_item());
4508 if (lock
->is_stable()) {
4509 // can we do it now?
4510 // (only if we're not replicated.. if we are, we really do need
4511 // to nudge the lock state!)
4513 actually, even if we're not replicated, we can't stay in MIX, because another mds
4514 could discover and replicate us at any time. if that happens while we're flushing,
4515 they end up in MIX but their inode has the old scatterstat version.
4517 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4518 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4519 scatter_writebehind(lock);
4521 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4526 if (mdcache
->is_readonly()) {
4527 if (lock
->get_state() != LOCK_SYNC
) {
4528 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4529 simple_sync(static_cast<ScatterLock
*>(lock
));
4534 // adjust lock state
4535 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4536 switch (lock
->get_type()) {
4537 case CEPH_LOCK_IFILE
:
4538 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4539 scatter_mix(static_cast<ScatterLock
*>(lock
));
4540 else if (lock
->get_state() != LOCK_LOCK
)
4541 simple_lock(static_cast<ScatterLock
*>(lock
));
4543 simple_sync(static_cast<ScatterLock
*>(lock
));
4546 case CEPH_LOCK_IDFT
:
4547 case CEPH_LOCK_INEST
:
4548 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4550 else if (lock
->get_state() != LOCK_LOCK
)
4559 if (lock
->is_stable() && count
== 2) {
4560 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4561 // this should only realy happen when called via
4562 // handle_file_lock due to AC_NUDGE, because the rest of the
4563 // time we are replicated or have dirty data and won't get
4564 // called. bailing here avoids an infinite loop.
4569 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4571 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4576 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4577 << *lock
<< " on " << *p
<< dendl
;
4578 // request unscatter?
4579 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4580 if (!mds
->is_cluster_degraded() ||
4581 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4582 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4586 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4588 // also, requeue, in case we had wrong auth or something
4589 if (lock
->is_dirty())
4590 updated_scatterlocks
.push_back(lock
->get_updated_item());
4594 void Locker::scatter_tick()
4596 dout(10) << "scatter_tick" << dendl
;
4599 utime_t now
= ceph_clock_now();
4600 int n
= updated_scatterlocks
.size();
4601 while (!updated_scatterlocks
.empty()) {
4602 ScatterLock
*lock
= updated_scatterlocks
.front();
4604 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4606 if (!lock
->is_dirty()) {
4607 updated_scatterlocks
.pop_front();
4608 dout(10) << " removing from updated_scatterlocks "
4609 << *lock
<< " " << *lock
->get_parent() << dendl
;
4612 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4614 updated_scatterlocks
.pop_front();
4615 scatter_nudge(lock
, 0);
4617 mds
->mdlog
->flush();
4621 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4623 dout(10) << "scatter_tempsync " << *lock
4624 << " on " << *lock
->get_parent() << dendl
;
4625 assert(lock
->get_parent()->is_auth());
4626 assert(lock
->is_stable());
4628 assert(0 == "not fully implemented, at least not for filelock");
4630 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4632 switch (lock
->get_state()) {
4633 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4634 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4635 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4636 default: ceph_abort();
4640 if (lock
->is_wrlocked())
4643 if (lock
->get_cap_shift() &&
4645 in
->issued_caps_need_gather(lock
)) {
4653 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4654 in
->is_replicated()) {
4655 lock
->init_gather();
4656 send_lock_message(lock
, LOCK_AC_LOCK
);
4664 lock
->set_state(LOCK_TSYN
);
4665 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4666 if (lock
->get_cap_shift()) {
4677 // ==========================================================================
4680 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4682 dout(7) << "local_wrlock_grab on " << *lock
4683 << " on " << *lock
->get_parent() << dendl
;
4685 assert(lock
->get_parent()->is_auth());
4686 assert(lock
->can_wrlock());
4687 assert(!mut
->wrlocks
.count(lock
));
4688 lock
->get_wrlock(mut
->get_client());
4689 mut
->wrlocks
.insert(lock
);
4690 mut
->locks
.insert(lock
);
4693 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4695 dout(7) << "local_wrlock_start on " << *lock
4696 << " on " << *lock
->get_parent() << dendl
;
4698 assert(lock
->get_parent()->is_auth());
4699 if (lock
->can_wrlock()) {
4700 assert(!mut
->wrlocks
.count(lock
));
4701 lock
->get_wrlock(mut
->get_client());
4702 mut
->wrlocks
.insert(lock
);
4703 mut
->locks
.insert(lock
);
4706 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4711 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4713 dout(7) << "local_wrlock_finish on " << *lock
4714 << " on " << *lock
->get_parent() << dendl
;
4716 mut
->wrlocks
.erase(lock
);
4717 mut
->locks
.erase(lock
);
4718 if (lock
->get_num_wrlocks() == 0) {
4719 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4720 SimpleLock::WAIT_WR
|
4721 SimpleLock::WAIT_RD
);
4725 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4727 dout(7) << "local_xlock_start on " << *lock
4728 << " on " << *lock
->get_parent() << dendl
;
4730 assert(lock
->get_parent()->is_auth());
4731 if (!lock
->can_xlock_local()) {
4732 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4736 lock
->get_xlock(mut
, mut
->get_client());
4737 mut
->xlocks
.insert(lock
);
4738 mut
->locks
.insert(lock
);
4742 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4744 dout(7) << "local_xlock_finish on " << *lock
4745 << " on " << *lock
->get_parent() << dendl
;
4747 mut
->xlocks
.erase(lock
);
4748 mut
->locks
.erase(lock
);
4750 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4751 SimpleLock::WAIT_WR
|
4752 SimpleLock::WAIT_RD
);
4757 // ==========================================================================
4761 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4763 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4764 int loner_wanted
, other_wanted
;
4765 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4766 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4767 << " loner_wanted=" << gcap_string(loner_wanted
)
4768 << " other_wanted=" << gcap_string(other_wanted
)
4769 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4772 assert(lock
->get_parent()->is_auth());
4773 assert(lock
->is_stable());
4775 if (lock
->get_parent()->is_freezing_or_frozen())
4778 if (mdcache
->is_readonly()) {
4779 if (lock
->get_state() != LOCK_SYNC
) {
4780 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4781 simple_sync(lock
, need_issue
);
4787 if (lock
->get_state() == LOCK_EXCL
) {
4788 dout(20) << " is excl" << dendl
;
4789 int loner_issued
, other_issued
, xlocker_issued
;
4790 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4791 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4792 << " other_issued=" << gcap_string(other_issued
)
4793 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4795 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4796 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4797 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4798 dout(20) << " should lose it" << dendl
;
4799 // we should lose it.
4810 // -> any writer means MIX; RD doesn't matter.
4811 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4812 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4813 scatter_mix(lock
, need_issue
);
4814 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4815 simple_sync(lock
, need_issue
);
4817 dout(10) << " waiting for wrlock to drain" << dendl
;
4822 else if (lock
->get_state() != LOCK_EXCL
&&
4823 !lock
->is_rdlocked() &&
4824 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4825 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4826 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4827 in
->get_target_loner() >= 0) {
4828 dout(7) << "file_eval stable, bump to loner " << *lock
4829 << " on " << *lock
->get_parent() << dendl
;
4830 file_excl(lock
, need_issue
);
4834 else if (lock
->get_state() != LOCK_MIX
&&
4835 !lock
->is_rdlocked() &&
4836 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4837 (lock
->get_scatter_wanted() ||
4838 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4839 dout(7) << "file_eval stable, bump to mixed " << *lock
4840 << " on " << *lock
->get_parent() << dendl
;
4841 scatter_mix(lock
, need_issue
);
4845 else if (lock
->get_state() != LOCK_SYNC
&&
4846 !lock
->is_wrlocked() && // drain wrlocks first!
4847 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4848 !(wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) &&
4849 !((lock
->get_state() == LOCK_MIX
) &&
4850 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4851 //((wanted & CEPH_CAP_RD) ||
4852 //in->is_replicated() ||
4853 //lock->is_leased() ||
4854 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4856 dout(7) << "file_eval stable, bump to sync " << *lock
4857 << " on " << *lock
->get_parent() << dendl
;
4858 simple_sync(lock
, need_issue
);
4864 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4866 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4868 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4869 assert(in
->is_auth());
4870 assert(lock
->is_stable());
4872 if (lock
->get_state() == LOCK_LOCK
) {
4873 in
->start_scatter(lock
);
4874 if (in
->is_replicated()) {
4876 bufferlist softdata
;
4877 lock
->encode_locked_state(softdata
);
4879 // bcast to replicas
4880 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4884 lock
->set_state(LOCK_MIX
);
4885 lock
->clear_scatter_wanted();
4886 if (lock
->get_cap_shift()) {
4894 switch (lock
->get_state()) {
4895 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4896 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4897 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
4898 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4899 default: ceph_abort();
4903 if (lock
->is_rdlocked())
4905 if (in
->is_replicated()) {
4906 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
4907 send_lock_message(lock
, LOCK_AC_MIX
);
4908 lock
->init_gather();
4912 if (lock
->is_leased()) {
4913 revoke_client_leases(lock
);
4916 if (lock
->get_cap_shift() &&
4918 in
->issued_caps_need_gather(lock
)) {
4925 bool need_recover
= false;
4926 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4927 mds
->mdcache
->queue_file_recover(in
);
4928 need_recover
= true;
4933 lock
->get_parent()->auth_pin(lock
);
4935 mds
->mdcache
->do_file_recover();
4937 in
->start_scatter(lock
);
4938 lock
->set_state(LOCK_MIX
);
4939 lock
->clear_scatter_wanted();
4940 if (in
->is_replicated()) {
4941 bufferlist softdata
;
4942 lock
->encode_locked_state(softdata
);
4943 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4945 if (lock
->get_cap_shift()) {
4956 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4958 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4959 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4961 assert(in
->is_auth());
4962 assert(lock
->is_stable());
4964 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4965 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4967 switch (lock
->get_state()) {
4968 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4969 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4970 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4971 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4972 default: ceph_abort();
4976 if (lock
->is_rdlocked())
4978 if (lock
->is_wrlocked())
4981 if (in
->is_replicated() &&
4982 lock
->get_state() != LOCK_LOCK_EXCL
&&
4983 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
4984 send_lock_message(lock
, LOCK_AC_LOCK
);
4985 lock
->init_gather();
4988 if (lock
->is_leased()) {
4989 revoke_client_leases(lock
);
4992 if (in
->is_head() &&
4993 in
->issued_caps_need_gather(lock
)) {
5000 bool need_recover
= false;
5001 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5002 mds
->mdcache
->queue_file_recover(in
);
5003 need_recover
= true;
5008 lock
->get_parent()->auth_pin(lock
);
5010 mds
->mdcache
->do_file_recover();
5012 lock
->set_state(LOCK_EXCL
);
5020 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5022 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5023 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5024 assert(in
->is_auth());
5025 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5027 switch (lock
->get_state()) {
5028 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5029 default: ceph_abort();
5033 if (lock
->is_wrlocked())
5036 if (in
->is_head() &&
5037 in
->issued_caps_need_gather(lock
)) {
5046 lock
->get_parent()->auth_pin(lock
);
5048 lock
->set_state(LOCK_XSYN
);
5049 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5057 void Locker::file_recover(ScatterLock
*lock
)
5059 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5060 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5062 assert(in
->is_auth());
5063 //assert(lock->is_stable());
5064 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5069 if (in->is_replicated()
5070 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5071 send_lock_message(lock, LOCK_AC_LOCK);
5072 lock->init_gather();
5076 if (in
->is_head() &&
5077 in
->issued_caps_need_gather(lock
)) {
5082 lock
->set_state(LOCK_SCAN
);
5084 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5086 mds
->mdcache
->queue_file_recover(in
);
5091 /* This function DOES put the passed message before returning */
5092 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5094 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5095 int from
= m
->get_asker();
5097 if (mds
->is_rejoin()) {
5098 if (in
->is_rejoining()) {
5099 dout(7) << "handle_file_lock still rejoining " << *in
5100 << ", dropping " << *m
<< dendl
;
5106 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5108 << " from mds." << from
<< " "
5111 bool caps
= lock
->get_cap_shift();
5113 switch (m
->get_action()) {
5116 assert(lock
->get_state() == LOCK_LOCK
||
5117 lock
->get_state() == LOCK_MIX
||
5118 lock
->get_state() == LOCK_MIX_SYNC2
);
5120 if (lock
->get_state() == LOCK_MIX
) {
5121 lock
->set_state(LOCK_MIX_SYNC
);
5122 eval_gather(lock
, true);
5123 if (lock
->is_unstable_and_locked())
5124 mds
->mdlog
->flush();
5128 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5129 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5132 lock
->decode_locked_state(m
->get_data());
5133 lock
->set_state(LOCK_SYNC
);
5138 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5143 switch (lock
->get_state()) {
5144 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5145 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5146 default: ceph_abort();
5149 eval_gather(lock
, true);
5150 if (lock
->is_unstable_and_locked())
5151 mds
->mdlog
->flush();
5155 case LOCK_AC_LOCKFLUSHED
:
5156 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5157 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5158 // wake up scatter_nudge waiters
5159 if (lock
->is_stable())
5160 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5164 assert(lock
->get_state() == LOCK_SYNC
||
5165 lock
->get_state() == LOCK_LOCK
||
5166 lock
->get_state() == LOCK_SYNC_MIX2
);
5168 if (lock
->get_state() == LOCK_SYNC
) {
5170 lock
->set_state(LOCK_SYNC_MIX
);
5171 eval_gather(lock
, true);
5172 if (lock
->is_unstable_and_locked())
5173 mds
->mdlog
->flush();
5178 lock
->set_state(LOCK_MIX
);
5179 lock
->decode_locked_state(m
->get_data());
5184 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5189 case LOCK_AC_LOCKACK
:
5190 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5191 lock
->get_state() == LOCK_MIX_LOCK
||
5192 lock
->get_state() == LOCK_MIX_LOCK2
||
5193 lock
->get_state() == LOCK_MIX_EXCL
||
5194 lock
->get_state() == LOCK_SYNC_EXCL
||
5195 lock
->get_state() == LOCK_SYNC_MIX
||
5196 lock
->get_state() == LOCK_MIX_TSYN
);
5197 assert(lock
->is_gathering(from
));
5198 lock
->remove_gather(from
);
5200 if (lock
->get_state() == LOCK_MIX_LOCK
||
5201 lock
->get_state() == LOCK_MIX_LOCK2
||
5202 lock
->get_state() == LOCK_MIX_EXCL
||
5203 lock
->get_state() == LOCK_MIX_TSYN
) {
5204 lock
->decode_locked_state(m
->get_data());
5205 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5206 // delay calling scatter_writebehind().
5207 lock
->clear_flushed();
5210 if (lock
->is_gathering()) {
5211 dout(7) << "handle_file_lock " << *in
<< " from " << from
5212 << ", still gathering " << lock
->get_gather_set() << dendl
;
5214 dout(7) << "handle_file_lock " << *in
<< " from " << from
5215 << ", last one" << dendl
;
5220 case LOCK_AC_SYNCACK
:
5221 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5222 assert(lock
->is_gathering(from
));
5223 lock
->remove_gather(from
);
5225 lock
->decode_locked_state(m
->get_data());
5227 if (lock
->is_gathering()) {
5228 dout(7) << "handle_file_lock " << *in
<< " from " << from
5229 << ", still gathering " << lock
->get_gather_set() << dendl
;
5231 dout(7) << "handle_file_lock " << *in
<< " from " << from
5232 << ", last one" << dendl
;
5237 case LOCK_AC_MIXACK
:
5238 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5239 assert(lock
->is_gathering(from
));
5240 lock
->remove_gather(from
);
5242 if (lock
->is_gathering()) {
5243 dout(7) << "handle_file_lock " << *in
<< " from " << from
5244 << ", still gathering " << lock
->get_gather_set() << dendl
;
5246 dout(7) << "handle_file_lock " << *in
<< " from " << from
5247 << ", last one" << dendl
;
5254 case LOCK_AC_REQSCATTER
:
5255 if (lock
->is_stable()) {
5256 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5257 * because the replica should be holding an auth_pin if they're
5258 * doing this (and thus, we are freezing, not frozen, and indefinite
5259 * starvation isn't an issue).
5261 dout(7) << "handle_file_lock got scatter request on " << *lock
5262 << " on " << *lock
->get_parent() << dendl
;
5263 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5266 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5267 << " on " << *lock
->get_parent() << dendl
;
5268 lock
->set_scatter_wanted();
5272 case LOCK_AC_REQUNSCATTER
:
5273 if (lock
->is_stable()) {
5274 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5275 * because the replica should be holding an auth_pin if they're
5276 * doing this (and thus, we are freezing, not frozen, and indefinite
5277 * starvation isn't an issue).
5279 dout(7) << "handle_file_lock got unscatter request on " << *lock
5280 << " on " << *lock
->get_parent() << dendl
;
5281 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5282 simple_lock(lock
); // FIXME tempsync?
5284 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5285 << " on " << *lock
->get_parent() << dendl
;
5286 lock
->set_unscatter_wanted();
5290 case LOCK_AC_REQRDLOCK
:
5291 handle_reqrdlock(lock
, m
);
5295 if (!lock
->get_parent()->is_auth()) {
5296 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5297 << " on " << *lock
->get_parent() << dendl
;
5298 } else if (!lock
->get_parent()->is_replicated()) {
5299 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5300 << " on " << *lock
->get_parent() << dendl
;
5302 dout(7) << "handle_file_lock trying nudge on " << *lock
5303 << " on " << *lock
->get_parent() << dendl
;
5304 scatter_nudge(lock
, 0, true);
5305 mds
->mdlog
->flush();