1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
20 #include "MDBalancer.h"
25 #include "MDSContext.h"
30 #include "events/EUpdate.h"
31 #include "events/EOpen.h"
33 #include "msg/Messenger.h"
34 #include "osdc/Objecter.h"
36 #include "messages/MInodeFileCaps.h"
37 #include "messages/MLock.h"
38 #include "messages/MClientLease.h"
39 #include "messages/MClientReply.h"
40 #include "messages/MClientCaps.h"
41 #include "messages/MClientCapRelease.h"
43 #include "messages/MMDSSlaveRequest.h"
47 #include "common/config.h"
50 #define dout_subsys ceph_subsys_mds
52 #define dout_context g_ceph_context
53 #define dout_prefix _prefix(_dout, mds)
54 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
55 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
59 class LockerContext
: public MDSInternalContextBase
{
62 MDSRank
*get_mds() override
68 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
69 assert(locker
!= NULL
);
73 class LockerLogContext
: public MDSLogContextBase
{
76 MDSRank
*get_mds() override
82 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
83 assert(locker
!= NULL
);
87 /* This function DOES put the passed message before returning */
88 void Locker::dispatch(Message
*m
)
91 switch (m
->get_type()) {
95 handle_lock(static_cast<MLock
*>(m
));
98 case MSG_MDS_INODEFILECAPS
:
99 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
103 case CEPH_MSG_CLIENT_CAPS
:
104 handle_client_caps(static_cast<MClientCaps
*>(m
));
107 case CEPH_MSG_CLIENT_CAPRELEASE
:
108 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
110 case CEPH_MSG_CLIENT_LEASE
:
111 handle_client_lease(static_cast<MClientLease
*>(m
));
115 derr
<< "locker unknown message " << m
->get_type() << dendl
;
116 assert(0 == "locker unknown message");
133 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
135 for (const auto &it
: lock
->get_parent()->get_replicas()) {
136 if (mds
->is_cluster_degraded() &&
137 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
139 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
140 mds
->send_message_mds(m
, it
.first
);
144 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
146 for (const auto &it
: lock
->get_parent()->get_replicas()) {
147 if (mds
->is_cluster_degraded() &&
148 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
150 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
152 mds
->send_message_mds(m
, it
.first
);
159 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
161 // rdlock ancestor snaps
163 rdlocks
.insert(&in
->snaplock
);
164 while (t
->get_projected_parent_dn()) {
165 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
166 rdlocks
.insert(&t
->snaplock
);
170 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
171 file_layout_t
**layout
)
173 //rdlock ancestor snaps
175 rdlocks
.insert(&in
->snaplock
);
176 rdlocks
.insert(&in
->policylock
);
177 bool found_layout
= false;
179 rdlocks
.insert(&t
->snaplock
);
181 rdlocks
.insert(&t
->policylock
);
182 if (t
->get_projected_inode()->has_layout()) {
183 *layout
= &t
->get_projected_inode()->layout
;
187 if (t
->get_projected_parent_dn() &&
188 t
->get_projected_parent_dn()->get_dir())
189 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
194 struct MarkEventOnDestruct
{
198 MarkEventOnDestruct(MDRequestRef
& _mdr
,
199 const char *_message
) : mdr(_mdr
),
202 ~MarkEventOnDestruct() {
204 mdr
->mark_event(message
);
208 /* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210 bool Locker::acquire_locks(MDRequestRef
& mdr
,
211 set
<SimpleLock
*> &rdlocks
,
212 set
<SimpleLock
*> &wrlocks
,
213 set
<SimpleLock
*> &xlocks
,
214 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
215 CInode
*auth_pin_freeze
,
216 bool auth_pin_nonblock
)
218 if (mdr
->done_locking
&&
219 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
220 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
221 return true; // at least we had better be!
223 dout(10) << "acquire_locks " << *mdr
<< dendl
;
225 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
227 client_t client
= mdr
->get_client();
229 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
230 set
<MDSCacheObject
*> mustpin
; // items to authpin
233 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
234 SimpleLock
*lock
= *p
;
236 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
237 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
238 mds
->is_cluster_degraded() &&
240 !mdr
->is_queued_for_replay()) {
241 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
242 // get processed in proper order.
244 if (lock
->get_parent()->is_auth()) {
245 if (!mdr
->locks
.count(lock
)) {
247 lock
->get_parent()->list_replicas(ls
);
249 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
256 // if the lock is the latest locked one, it's possible that slave mds got the lock
257 // while there are recovering mds.
258 if (!mdr
->locks
.count(lock
) || lock
== *mdr
->locks
.rbegin())
262 dout(10) << " must xlock " << *lock
<< " " << *lock
->get_parent()
263 << ", waiting for cluster recovered" << dendl
;
264 mds
->locker
->drop_locks(mdr
.get(), NULL
);
265 mdr
->drop_local_auth_pins();
266 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
271 dout(20) << " must xlock " << *lock
<< " " << *lock
->get_parent() << dendl
;
274 mustpin
.insert(lock
->get_parent());
276 // augment xlock with a versionlock?
277 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
278 CDentry
*dn
= (CDentry
*)lock
->get_parent();
282 if (xlocks
.count(&dn
->versionlock
))
283 continue; // we're xlocking the versionlock too; don't wrlock it!
285 if (mdr
->is_master()) {
286 // master. wrlock versionlock so we can pipeline dentry updates to journal.
287 wrlocks
.insert(&dn
->versionlock
);
289 // slave. exclusively lock the dentry version (i.e. block other journal updates).
290 // this makes rollback safe.
291 xlocks
.insert(&dn
->versionlock
);
292 sorted
.insert(&dn
->versionlock
);
295 if (lock
->get_type() > CEPH_LOCK_IVERSION
) {
296 // inode version lock?
297 CInode
*in
= (CInode
*)lock
->get_parent();
300 if (mdr
->is_master()) {
301 // master. wrlock versionlock so we can pipeline inode updates to journal.
302 wrlocks
.insert(&in
->versionlock
);
304 // slave. exclusively lock the inode version (i.e. block other journal updates).
305 // this makes rollback safe.
306 xlocks
.insert(&in
->versionlock
);
307 sorted
.insert(&in
->versionlock
);
313 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
314 MDSCacheObject
*object
= (*p
)->get_parent();
315 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
317 if (object
->is_auth())
318 mustpin
.insert(object
);
319 else if (!object
->is_auth() &&
320 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
321 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
322 dout(15) << " will also auth_pin " << *object
323 << " in case we need to request a scatter" << dendl
;
324 mustpin
.insert(object
);
329 if (remote_wrlocks
) {
330 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
331 MDSCacheObject
*object
= p
->first
->get_parent();
332 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
333 << *p
->first
<< " " << *object
<< dendl
;
334 sorted
.insert(p
->first
);
335 mustpin
.insert(object
);
340 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
343 MDSCacheObject
*object
= (*p
)->get_parent();
344 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
346 if (object
->is_auth())
347 mustpin
.insert(object
);
348 else if (!object
->is_auth() &&
349 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
350 dout(15) << " will also auth_pin " << *object
351 << " in case we need to request a rdlock" << dendl
;
352 mustpin
.insert(object
);
358 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
360 // can i auth pin them all now?
361 marker
.message
= "failed to authpin local pins";
362 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
365 MDSCacheObject
*object
= *p
;
367 dout(10) << " must authpin " << *object
<< dendl
;
369 if (mdr
->is_auth_pinned(object
)) {
370 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
372 if (mdr
->more()->is_remote_frozen_authpin
) {
373 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
375 // unfreeze auth pin for the wrong inode
376 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
380 if (!object
->is_auth()) {
381 if (!mdr
->locks
.empty())
382 drop_locks(mdr
.get());
383 if (object
->is_ambiguous_auth()) {
385 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
386 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
387 mdr
->drop_local_auth_pins();
390 mustpin_remote
[object
->authority().first
].insert(object
);
393 if (!object
->can_auth_pin()) {
395 drop_locks(mdr
.get());
396 mdr
->drop_local_auth_pins();
397 if (auth_pin_nonblock
) {
398 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
402 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
403 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
405 if (!mdr
->remote_auth_pins
.empty())
406 notify_freeze_waiter(object
);
412 // ok, grab local auth pins
413 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
416 MDSCacheObject
*object
= *p
;
417 if (mdr
->is_auth_pinned(object
)) {
418 dout(10) << " already auth_pinned " << *object
<< dendl
;
419 } else if (object
->is_auth()) {
420 dout(10) << " auth_pinning " << *object
<< dendl
;
421 mdr
->auth_pin(object
);
425 // request remote auth_pins
426 if (!mustpin_remote
.empty()) {
427 marker
.message
= "requesting remote authpins";
428 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
429 p
!= mdr
->remote_auth_pins
.end();
431 if (mustpin
.count(p
->first
)) {
432 assert(p
->second
== p
->first
->authority().first
);
433 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
434 if (q
!= mustpin_remote
.end())
435 q
->second
.insert(p
->first
);
438 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
439 p
!= mustpin_remote
.end();
441 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
443 // wait for active auth
444 if (mds
->is_cluster_degraded() &&
445 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
446 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
447 if (mdr
->more()->waiting_on_slave
.empty())
448 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
452 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
453 MMDSSlaveRequest::OP_AUTHPIN
);
454 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
455 q
!= p
->second
.end();
457 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
458 MDSCacheObjectInfo info
;
459 (*q
)->set_object_info(info
);
460 req
->get_authpins().push_back(info
);
461 if (*q
== auth_pin_freeze
)
462 (*q
)->set_object_info(req
->get_authpin_freeze());
465 if (auth_pin_nonblock
)
466 req
->mark_nonblock();
467 mds
->send_message_mds(req
, p
->first
);
469 // put in waiting list
470 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
471 mdr
->more()->waiting_on_slave
.insert(p
->first
);
476 // caps i'll need to issue
477 set
<CInode
*> issue_set
;
481 // make sure they match currently acquired locks.
482 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
483 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
486 bool need_wrlock
= !!wrlocks
.count(*p
);
487 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
490 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
492 SimpleLock
*have
= *existing
;
494 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
495 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
498 if (mdr
->remote_wrlocks
.count(have
)) {
499 if (!need_remote_wrlock
||
500 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
501 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
502 << " " << *have
<< " " << *have
->get_parent() << dendl
;
503 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
506 if (need_wrlock
|| need_remote_wrlock
) {
507 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
508 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
510 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
511 if (need_remote_wrlock
)
512 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
516 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
517 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
522 // hose any stray locks
523 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
524 assert(need_wrlock
|| need_remote_wrlock
);
525 SimpleLock
*lock
= *existing
;
526 if (mdr
->wrlocks
.count(lock
)) {
528 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
529 else if (need_remote_wrlock
) // acquire remote_wrlock first
530 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
531 bool need_issue
= false;
532 wrlock_finish(lock
, mdr
.get(), &need_issue
);
534 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
538 while (existing
!= mdr
->locks
.end()) {
539 SimpleLock
*stray
= *existing
;
541 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
542 bool need_issue
= false;
543 if (mdr
->xlocks
.count(stray
)) {
544 xlock_finish(stray
, mdr
.get(), &need_issue
);
545 } else if (mdr
->rdlocks
.count(stray
)) {
546 rdlock_finish(stray
, mdr
.get(), &need_issue
);
548 // may have acquired both wrlock and remore wrlock
549 if (mdr
->wrlocks
.count(stray
))
550 wrlock_finish(stray
, mdr
.get(), &need_issue
);
551 if (mdr
->remote_wrlocks
.count(stray
))
552 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
555 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
559 if (mdr
->locking
&& *p
!= mdr
->locking
) {
560 cancel_locking(mdr
.get(), &issue_set
);
562 if (xlocks
.count(*p
)) {
563 marker
.message
= "failed to xlock, waiting";
564 if (!xlock_start(*p
, mdr
))
566 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
567 } else if (need_wrlock
|| need_remote_wrlock
) {
568 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
569 marker
.message
= "waiting for remote wrlocks";
570 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
573 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
574 marker
.message
= "failed to wrlock, waiting";
575 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
576 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
577 // can't take the wrlock because the scatter lock is gathering. need to
578 // release the remote wrlock, so that the gathering process can finish.
579 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
580 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
583 // nowait if we have already gotten remote wrlock
584 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
586 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
589 assert(mdr
->is_master());
590 if ((*p
)->needs_recover()) {
591 if (mds
->is_cluster_degraded()) {
592 if (!mdr
->is_queued_for_replay()) {
593 // see comments in SimpleLock::set_state_rejoin() and
594 // ScatterLock::encode_state_for_rejoin()
595 drop_locks(mdr
.get());
596 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
597 dout(10) << " rejoin recovering " << **p
<< " " << *(*p
)->get_parent()
598 << ", waiting for cluster recovered" << dendl
;
599 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
603 (*p
)->clear_need_recover();
607 marker
.message
= "failed to rdlock, waiting";
608 if (!rdlock_start(*p
, mdr
))
610 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
614 // any extra unneeded locks?
615 while (existing
!= mdr
->locks
.end()) {
616 SimpleLock
*stray
= *existing
;
618 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
619 bool need_issue
= false;
620 if (mdr
->xlocks
.count(stray
)) {
621 xlock_finish(stray
, mdr
.get(), &need_issue
);
622 } else if (mdr
->rdlocks
.count(stray
)) {
623 rdlock_finish(stray
, mdr
.get(), &need_issue
);
625 // may have acquired both wrlock and remore wrlock
626 if (mdr
->wrlocks
.count(stray
))
627 wrlock_finish(stray
, mdr
.get(), &need_issue
);
628 if (mdr
->remote_wrlocks
.count(stray
))
629 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
632 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
635 mdr
->done_locking
= true;
636 mdr
->set_mds_stamp(ceph_clock_now());
638 marker
.message
= "acquired locks";
641 issue_caps_set(issue_set
);
645 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
648 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
650 dir
= in
->get_parent_dir();
651 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
654 dir
= dynamic_cast<CDir
*>(o
);
658 if (dir
->is_freezing_dir())
659 mdcache
->fragment_freeze_inc_num_waiters(dir
);
660 if (dir
->is_freezing_tree()) {
661 while (!dir
->is_freezing_tree_root())
662 dir
= dir
->get_parent_dir();
663 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
668 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
670 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
671 p
!= mut
->xlocks
.end();
673 MDSCacheObject
*object
= (*p
)->get_parent();
674 assert(object
->is_auth());
676 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
678 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
679 (*p
)->set_xlock_done();
683 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
685 while (!mut
->rdlocks
.empty()) {
687 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
688 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
690 pneed_issue
->insert(static_cast<CInode
*>(p
));
694 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
696 set
<mds_rank_t
> slaves
;
698 while (!mut
->xlocks
.empty()) {
699 SimpleLock
*lock
= *mut
->xlocks
.begin();
700 MDSCacheObject
*p
= lock
->get_parent();
702 assert(lock
->get_sm()->can_remote_xlock
);
703 slaves
.insert(p
->authority().first
);
705 mut
->locks
.erase(lock
);
706 mut
->xlocks
.erase(lock
);
710 xlock_finish(lock
, mut
, &ni
);
712 pneed_issue
->insert(static_cast<CInode
*>(p
));
715 while (!mut
->remote_wrlocks
.empty()) {
716 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
717 slaves
.insert(p
->second
);
718 if (mut
->wrlocks
.count(p
->first
) == 0)
719 mut
->locks
.erase(p
->first
);
720 mut
->remote_wrlocks
.erase(p
);
723 while (!mut
->wrlocks
.empty()) {
725 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
726 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
728 pneed_issue
->insert(static_cast<CInode
*>(p
));
731 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
732 if (!mds
->is_cluster_degraded() ||
733 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
734 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
735 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
736 MMDSSlaveRequest::OP_DROPLOCKS
);
737 mds
->send_message_mds(slavereq
, *p
);
742 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
744 SimpleLock
*lock
= mut
->locking
;
746 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
748 if (lock
->get_parent()->is_auth()) {
749 bool need_issue
= false;
750 if (lock
->get_state() == LOCK_PREXLOCK
) {
751 _finish_xlock(lock
, -1, &need_issue
);
752 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
753 lock
->get_num_xlocks() == 0) {
754 lock
->set_state(LOCK_XLOCKDONE
);
755 eval_gather(lock
, true, &need_issue
);
758 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
760 mut
->finish_locking(lock
);
763 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
766 set
<CInode
*> my_need_issue
;
768 pneed_issue
= &my_need_issue
;
771 cancel_locking(mut
, pneed_issue
);
772 _drop_non_rdlocks(mut
, pneed_issue
);
773 _drop_rdlocks(mut
, pneed_issue
);
775 if (pneed_issue
== &my_need_issue
)
776 issue_caps_set(*pneed_issue
);
777 mut
->done_locking
= false;
780 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
782 set
<CInode
*> my_need_issue
;
784 pneed_issue
= &my_need_issue
;
786 _drop_non_rdlocks(mut
, pneed_issue
);
788 if (pneed_issue
== &my_need_issue
)
789 issue_caps_set(*pneed_issue
);
792 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
794 set
<CInode
*> need_issue
;
796 for (auto p
= mut
->rdlocks
.begin(); p
!= mut
->rdlocks
.end(); ) {
797 SimpleLock
*lock
= *p
;
799 // make later mksnap/setlayout (at other mds) wait for this unsafe request
800 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
801 lock
->get_type() == CEPH_LOCK_IPOLICY
)
804 rdlock_finish(lock
, mut
, &ni
);
806 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
809 issue_caps_set(need_issue
);
814 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
816 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
817 assert(!lock
->is_stable());
819 int next
= lock
->get_next_state();
822 bool caps
= lock
->get_cap_shift();
823 if (lock
->get_type() != CEPH_LOCK_DN
)
824 in
= static_cast<CInode
*>(lock
->get_parent());
826 bool need_issue
= false;
828 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
829 assert(!caps
|| in
!= NULL
);
830 if (caps
&& in
->is_head()) {
831 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
832 lock
->get_cap_shift(), lock
->get_cap_mask());
833 dout(10) << " next state is " << lock
->get_state_name(next
)
834 << " issued/allows loner " << gcap_string(loner_issued
)
835 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
836 << " xlocker " << gcap_string(xlocker_issued
)
837 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
838 << " other " << gcap_string(other_issued
)
839 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
842 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
843 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
844 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
848 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
849 bool auth
= lock
->get_parent()->is_auth();
850 if (!lock
->is_gathering() &&
851 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
853 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
854 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
855 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
856 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
857 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
858 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
859 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
860 lock
->get_state() != LOCK_MIX_SYNC2
862 dout(7) << "eval_gather finished gather on " << *lock
863 << " on " << *lock
->get_parent() << dendl
;
865 if (lock
->get_sm() == &sm_filelock
) {
867 if (in
->state_test(CInode::STATE_RECOVERING
)) {
868 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
870 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
871 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
872 mds
->mdcache
->queue_file_recover(in
);
873 mds
->mdcache
->do_file_recover();
878 if (!lock
->get_parent()->is_auth()) {
879 // replica: tell auth
880 mds_rank_t auth
= lock
->get_parent()->authority().first
;
882 if (lock
->get_parent()->is_rejoining() &&
883 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
884 dout(7) << "eval_gather finished gather, but still rejoining "
885 << *lock
->get_parent() << dendl
;
889 if (!mds
->is_cluster_degraded() ||
890 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
891 switch (lock
->get_state()) {
893 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
899 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
900 lock
->encode_locked_state(reply
->get_data());
901 mds
->send_message_mds(reply
, auth
);
902 next
= LOCK_MIX_SYNC2
;
903 (static_cast<ScatterLock
*>(lock
))->start_flush();
908 (static_cast<ScatterLock
*>(lock
))->finish_flush();
909 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
912 // do nothing, we already acked
917 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
918 mds
->send_message_mds(reply
, auth
);
919 next
= LOCK_SYNC_MIX2
;
926 lock
->encode_locked_state(data
);
927 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
928 (static_cast<ScatterLock
*>(lock
))->start_flush();
929 // we'll get an AC_LOCKFLUSHED to complete
940 // once the first (local) stage of mix->lock gather complete we can
941 // gather from replicas
942 if (lock
->get_state() == LOCK_MIX_LOCK
&&
943 lock
->get_parent()->is_replicated()) {
944 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
945 send_lock_message(lock
, LOCK_AC_LOCK
);
947 lock
->set_state(LOCK_MIX_LOCK2
);
951 if (lock
->is_dirty() && !lock
->is_flushed()) {
952 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
956 lock
->clear_flushed();
958 switch (lock
->get_state()) {
964 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
965 if (lock
->get_parent()->is_replicated()) {
967 lock
->encode_locked_state(softdata
);
968 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
970 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
975 if (next
!= LOCK_SYNC
)
984 if (lock
->get_parent()->is_replicated()) {
986 lock
->encode_locked_state(softdata
);
987 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
994 lock
->set_state(next
);
996 if (lock
->get_parent()->is_auth() &&
998 lock
->get_parent()->auth_unpin(lock
);
1000 // drop loner before doing waiters
1004 in
->get_wanted_loner() != in
->get_loner()) {
1005 dout(10) << " trying to drop loner" << dendl
;
1006 if (in
->try_drop_loner()) {
1007 dout(10) << " dropped loner" << dendl
;
1013 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1016 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1018 if (caps
&& in
->is_head())
1021 if (lock
->get_parent()->is_auth() &&
1023 try_eval(lock
, &need_issue
);
1028 *pneed_issue
= true;
1029 else if (in
->is_head())
1035 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1037 bool need_issue
= caps_imported
;
1038 list
<MDSInternalContextBase
*> finishers
;
1040 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1043 if (in
->is_auth() && in
->is_head()) {
1044 client_t orig_loner
= in
->get_loner();
1045 if (in
->choose_ideal_loner()) {
1046 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1049 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1050 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1056 if (mask
& CEPH_LOCK_IFILE
)
1057 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1058 if (mask
& CEPH_LOCK_IAUTH
)
1059 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1060 if (mask
& CEPH_LOCK_ILINK
)
1061 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1062 if (mask
& CEPH_LOCK_IXATTR
)
1063 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1064 if (mask
& CEPH_LOCK_INEST
)
1065 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1066 if (mask
& CEPH_LOCK_IFLOCK
)
1067 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1068 if (mask
& CEPH_LOCK_IPOLICY
)
1069 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1072 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1073 if (in
->try_drop_loner()) {
1075 if (in
->get_wanted_loner() >= 0) {
1076 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1077 bool ok
= in
->try_set_loner();
1085 finish_contexts(g_ceph_context
, finishers
);
1087 if (need_issue
&& in
->is_head())
1090 dout(10) << "eval done" << dendl
;
1094 class C_Locker_Eval
: public LockerContext
{
1098 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1099 // We are used as an MDSCacheObject waiter, so should
1100 // only be invoked by someone already holding the big lock.
1101 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1102 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1104 void finish(int r
) override
{
1105 locker
->try_eval(p
, mask
);
1106 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1110 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1112 // unstable and ambiguous auth?
1113 if (p
->is_ambiguous_auth()) {
1114 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1115 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1119 if (p
->is_auth() && p
->is_frozen()) {
1120 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1121 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1125 if (mask
& CEPH_LOCK_DN
) {
1126 assert(mask
== CEPH_LOCK_DN
);
1127 bool need_issue
= false; // ignore this, no caps on dentries
1128 CDentry
*dn
= static_cast<CDentry
*>(p
);
1129 eval_any(&dn
->lock
, &need_issue
);
1131 CInode
*in
= static_cast<CInode
*>(p
);
1136 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1138 MDSCacheObject
*p
= lock
->get_parent();
1140 // unstable and ambiguous auth?
1141 if (p
->is_ambiguous_auth()) {
1142 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1143 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1147 if (!p
->is_auth()) {
1148 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1152 if (p
->is_frozen()) {
1153 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1154 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1159 * We could have a situation like:
1161 * - mds A authpins item on mds B
1162 * - mds B starts to freeze tree containing item
1163 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1164 * - mds B lock is unstable, sets scatter_wanted
1165 * - mds B lock stabilizes, calls try_eval.
1167 * We can defer while freezing without causing a deadlock. Honor
1168 * scatter_wanted flag here. This will never get deferred by the
1169 * checks above due to the auth_pin held by the master.
1171 if (lock
->is_scatterlock()) {
1172 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1173 if (slock
->get_scatter_wanted() &&
1174 slock
->get_state() != LOCK_MIX
) {
1175 scatter_mix(slock
, pneed_issue
);
1176 if (!lock
->is_stable())
1178 } else if (slock
->get_unscatter_wanted() &&
1179 slock
->get_state() != LOCK_LOCK
) {
1180 simple_lock(slock
, pneed_issue
);
1181 if (!lock
->is_stable()) {
1187 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1188 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1189 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1193 eval(lock
, pneed_issue
);
1196 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1198 bool need_issue
= false;
1199 list
<MDSInternalContextBase
*> finishers
;
1202 if (!in
->filelock
.is_stable())
1203 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1204 if (!in
->authlock
.is_stable())
1205 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1206 if (!in
->linklock
.is_stable())
1207 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1208 if (!in
->xattrlock
.is_stable())
1209 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1211 if (need_issue
&& in
->is_head()) {
1213 issue_set
->insert(in
);
1218 finish_contexts(g_ceph_context
, finishers
);
1221 void Locker::eval_scatter_gathers(CInode
*in
)
1223 bool need_issue
= false;
1224 list
<MDSInternalContextBase
*> finishers
;
1226 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1229 if (!in
->filelock
.is_stable())
1230 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1231 if (!in
->nestlock
.is_stable())
1232 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1233 if (!in
->dirfragtreelock
.is_stable())
1234 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1236 if (need_issue
&& in
->is_head())
1239 finish_contexts(g_ceph_context
, finishers
);
1242 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1244 switch (lock
->get_type()) {
1245 case CEPH_LOCK_IFILE
:
1246 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1247 case CEPH_LOCK_IDFT
:
1248 case CEPH_LOCK_INEST
:
1249 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1251 return simple_eval(lock
, need_issue
);
1256 // ------------------
1259 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1262 if (lock
->is_stable()) {
1263 if (lock
->get_parent()->is_auth()) {
1264 if (lock
->get_sm() == &sm_scatterlock
) {
1265 // not until tempsync is fully implemented
1266 //if (lock->get_parent()->is_replicated())
1267 //scatter_tempsync((ScatterLock*)lock);
1270 } else if (lock
->get_sm() == &sm_filelock
) {
1271 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1272 if (lock
->get_state() == LOCK_EXCL
&&
1273 in
->get_target_loner() >= 0 &&
1274 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1282 // request rdlock state change from auth
1283 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1284 if (!mds
->is_cluster_degraded() ||
1285 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1286 dout(10) << "requesting rdlock from auth on "
1287 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1288 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1293 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1294 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1295 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1296 mds
->mdcache
->recovery_queue
.prioritize(in
);
1303 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1305 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1307 // can read? grab ref.
1308 if (lock
->can_rdlock(client
))
1311 _rdlock_kick(lock
, false);
1313 if (lock
->can_rdlock(client
))
1318 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1319 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1324 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1326 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1328 // client may be allowed to rdlock the same item it has xlocked.
1329 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1330 if (mut
->snapid
!= CEPH_NOSNAP
)
1332 client_t client
= as_anon
? -1 : mut
->get_client();
1335 if (lock
->get_type() != CEPH_LOCK_DN
)
1336 in
= static_cast<CInode
*>(lock
->get_parent());
1339 if (!lock->get_parent()->is_auth() &&
1340 lock->fw_rdlock_to_auth()) {
1341 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1347 // can read? grab ref.
1348 if (lock
->can_rdlock(client
)) {
1350 mut
->rdlocks
.insert(lock
);
1351 mut
->locks
.insert(lock
);
1355 // hmm, wait a second.
1356 if (in
&& !in
->is_head() && in
->is_auth() &&
1357 lock
->get_state() == LOCK_SNAP_SYNC
) {
1358 // okay, we actually need to kick the head's lock to get ourselves synced up.
1359 CInode
*head
= mdcache
->get_inode(in
->ino());
1361 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1362 if (hlock
->get_state() == LOCK_SYNC
)
1363 hlock
= head
->get_lock(lock
->get_type());
1365 if (hlock
->get_state() != LOCK_SYNC
) {
1366 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1367 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1369 // oh, check our lock again then
1373 if (!_rdlock_kick(lock
, as_anon
))
1379 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1380 wait_on
= SimpleLock::WAIT_RD
;
1382 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1383 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1384 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1389 void Locker::nudge_log(SimpleLock
*lock
)
1391 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1392 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1393 mds
->mdlog
->flush();
1396 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1401 mut
->rdlocks
.erase(lock
);
1402 mut
->locks
.erase(lock
);
1405 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1408 if (!lock
->is_rdlocked()) {
1409 if (!lock
->is_stable())
1410 eval_gather(lock
, false, pneed_issue
);
1411 else if (lock
->get_parent()->is_auth())
1412 try_eval(lock
, pneed_issue
);
1417 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1419 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1420 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1421 if (!(*p
)->can_rdlock(-1)) {
1422 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1428 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1430 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1431 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1432 if (!rdlock_try(*p
, -1, NULL
)) {
1433 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1439 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1441 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1442 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1444 mut
->rdlocks
.insert(*p
);
1445 mut
->locks
.insert(*p
);
1449 // ------------------
1452 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1454 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1455 lock
->get_type() == CEPH_LOCK_DVERSION
)
1456 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1458 dout(7) << "wrlock_force on " << *lock
1459 << " on " << *lock
->get_parent() << dendl
;
1460 lock
->get_wrlock(true);
1461 mut
->wrlocks
.insert(lock
);
1462 mut
->locks
.insert(lock
);
1465 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1467 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1468 lock
->get_type() == CEPH_LOCK_DVERSION
)
1469 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1471 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1473 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1474 client_t client
= mut
->get_client();
1475 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1476 (in
->has_subtree_or_exporting_dirfrag() ||
1477 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1481 if (lock
->can_wrlock(client
) &&
1482 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1484 mut
->wrlocks
.insert(lock
);
1485 mut
->locks
.insert(lock
);
1489 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1490 in
->state_test(CInode::STATE_RECOVERING
)) {
1491 mds
->mdcache
->recovery_queue
.prioritize(in
);
1494 if (!lock
->is_stable())
1497 if (in
->is_auth()) {
1498 // don't do nested lock state change if we have dirty scatterdata and
1499 // may scatter_writebehind or start_scatter, because nowait==true implies
1500 // that the caller already has a log entry open!
1501 if (nowait
&& lock
->is_dirty())
1505 scatter_mix(static_cast<ScatterLock
*>(lock
));
1509 if (nowait
&& !lock
->can_wrlock(client
))
1514 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1515 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1516 if (!mds
->is_cluster_degraded() ||
1517 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1518 dout(10) << "requesting scatter from auth on "
1519 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1520 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1527 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1528 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1535 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1537 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1538 lock
->get_type() == CEPH_LOCK_DVERSION
)
1539 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1541 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1544 mut
->wrlocks
.erase(lock
);
1545 if (mut
->remote_wrlocks
.count(lock
) == 0)
1546 mut
->locks
.erase(lock
);
1549 if (!lock
->is_wrlocked()) {
1550 if (!lock
->is_stable())
1551 eval_gather(lock
, false, pneed_issue
);
1552 else if (lock
->get_parent()->is_auth())
1553 try_eval(lock
, pneed_issue
);
1560 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1562 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1564 // wait for active target
1565 if (mds
->is_cluster_degraded() &&
1566 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1567 dout(7) << " mds." << target
<< " is not active" << dendl
;
1568 if (mut
->more()->waiting_on_slave
.empty())
1569 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1573 // send lock request
1574 mut
->start_locking(lock
, target
);
1575 mut
->more()->slaves
.insert(target
);
1576 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1577 MMDSSlaveRequest::OP_WRLOCK
);
1578 r
->set_lock_type(lock
->get_type());
1579 lock
->get_parent()->set_object_info(r
->get_object_info());
1580 mds
->send_message_mds(r
, target
);
1582 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1583 mut
->more()->waiting_on_slave
.insert(target
);
1586 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1590 mut
->remote_wrlocks
.erase(lock
);
1591 if (mut
->wrlocks
.count(lock
) == 0)
1592 mut
->locks
.erase(lock
);
1594 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1595 << " " << *lock
->get_parent() << dendl
;
1596 if (!mds
->is_cluster_degraded() ||
1597 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1598 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1599 MMDSSlaveRequest::OP_UNWRLOCK
);
1600 slavereq
->set_lock_type(lock
->get_type());
1601 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1602 mds
->send_message_mds(slavereq
, target
);
1607 // ------------------
1610 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1612 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1613 lock
->get_type() == CEPH_LOCK_DVERSION
)
1614 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1616 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1617 client_t client
= mut
->get_client();
1620 if (lock
->get_parent()->is_auth()) {
1623 if (lock
->can_xlock(client
)) {
1624 lock
->set_state(LOCK_XLOCK
);
1625 lock
->get_xlock(mut
, client
);
1626 mut
->xlocks
.insert(lock
);
1627 mut
->locks
.insert(lock
);
1628 mut
->finish_locking(lock
);
1632 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1633 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1634 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1635 mds
->mdcache
->recovery_queue
.prioritize(in
);
1639 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1640 lock
->get_xlock_by_client() != client
||
1641 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1644 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1645 mut
->start_locking(lock
);
1652 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1657 assert(lock
->get_sm()->can_remote_xlock
);
1658 assert(!mut
->slave_request
);
1660 // wait for single auth
1661 if (lock
->get_parent()->is_ambiguous_auth()) {
1662 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1663 new C_MDS_RetryRequest(mdcache
, mut
));
1667 // wait for active auth
1668 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1669 if (mds
->is_cluster_degraded() &&
1670 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1671 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1672 if (mut
->more()->waiting_on_slave
.empty())
1673 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1677 // send lock request
1678 mut
->more()->slaves
.insert(auth
);
1679 mut
->start_locking(lock
, auth
);
1680 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1681 MMDSSlaveRequest::OP_XLOCK
);
1682 r
->set_lock_type(lock
->get_type());
1683 lock
->get_parent()->set_object_info(r
->get_object_info());
1684 mds
->send_message_mds(r
, auth
);
1686 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1687 mut
->more()->waiting_on_slave
.insert(auth
);
1693 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1695 assert(!lock
->is_stable());
1696 if (lock
->get_num_rdlocks() == 0 &&
1697 lock
->get_num_wrlocks() == 0 &&
1698 !lock
->is_leased() &&
1699 lock
->get_state() != LOCK_XLOCKSNAP
&&
1700 lock
->get_type() != CEPH_LOCK_DN
) {
1701 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1702 client_t loner
= in
->get_target_loner();
1703 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1704 lock
->set_state(LOCK_EXCL
);
1705 lock
->get_parent()->auth_unpin(lock
);
1706 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1707 if (lock
->get_cap_shift())
1708 *pneed_issue
= true;
1709 if (lock
->get_parent()->is_auth() &&
1711 try_eval(lock
, pneed_issue
);
1715 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1716 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1719 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1721 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1722 lock
->get_type() == CEPH_LOCK_DVERSION
)
1723 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1725 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1727 client_t xlocker
= lock
->get_xlock_by_client();
1732 mut
->xlocks
.erase(lock
);
1733 mut
->locks
.erase(lock
);
1735 bool do_issue
= false;
1738 if (!lock
->get_parent()->is_auth()) {
1739 assert(lock
->get_sm()->can_remote_xlock
);
1742 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1743 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1744 if (!mds
->is_cluster_degraded() ||
1745 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1746 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1747 MMDSSlaveRequest::OP_UNXLOCK
);
1748 slavereq
->set_lock_type(lock
->get_type());
1749 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1750 mds
->send_message_mds(slavereq
, auth
);
1753 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1754 SimpleLock::WAIT_WR
|
1755 SimpleLock::WAIT_RD
, 0);
1757 if (lock
->get_num_xlocks() == 0) {
1758 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1759 lock
->set_state(LOCK_XLOCKDONE
);
1760 _finish_xlock(lock
, xlocker
, &do_issue
);
1765 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1766 if (in
->is_head()) {
1768 *pneed_issue
= true;
1775 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1777 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1780 mut
->xlocks
.erase(lock
);
1781 mut
->locks
.erase(lock
);
1783 MDSCacheObject
*p
= lock
->get_parent();
1784 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1786 if (!lock
->is_stable())
1787 lock
->get_parent()->auth_unpin(lock
);
1789 lock
->set_state(LOCK_LOCK
);
1792 void Locker::xlock_import(SimpleLock
*lock
)
1794 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1795 lock
->get_parent()->auth_pin(lock
);
1800 // file i/o -----------------------------------------
1802 version_t
Locker::issue_file_data_version(CInode
*in
)
1804 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1805 return in
->inode
.file_data_version
;
1808 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1816 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1817 bool sm
=false, bool ni
=false, client_t c
=-1,
1818 MClientCaps
*ac
= 0)
1819 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1820 client(c
), ack(ac
) {
1821 in
->get(CInode::PIN_PTRWAITER
);
1823 void finish(int r
) override
{
1824 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1825 in
->put(CInode::PIN_PTRWAITER
);
1829 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1830 client_t client
, MClientCaps
*ack
)
1832 dout(10) << "file_update_finish on " << *in
<< dendl
;
1833 in
->pop_and_dirty_projected_inode(mut
->ls
);
1838 Session
*session
= mds
->get_session(client
);
1840 // "oldest flush tid" > 0 means client uses unique TID for each flush
1841 if (ack
->get_oldest_flush_tid() > 0)
1842 session
->add_completed_flush(ack
->get_client_tid());
1843 mds
->send_message_client_counted(ack
, session
);
1845 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1850 set
<CInode
*> need_issue
;
1851 drop_locks(mut
.get(), &need_issue
);
1853 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1854 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1855 // check for snap writeback completion
1856 bool gather
= false;
1857 auto p
= in
->client_snap_caps
.begin();
1858 while (p
!= in
->client_snap_caps
.end()) {
1859 SimpleLock
*lock
= in
->get_lock(p
->first
);
1861 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1862 << " lock " << *lock
<< " on " << *in
<< dendl
;
1865 p
->second
.erase(client
);
1866 if (p
->second
.empty()) {
1868 in
->client_snap_caps
.erase(p
++);
1873 if (in
->client_snap_caps
.empty())
1874 in
->item_open_file
.remove_myself();
1875 eval_cap_gather(in
, &need_issue
);
1878 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1879 Capability
*cap
= in
->get_client_cap(client
);
1880 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1881 issue_caps(in
, cap
);
1884 if (share_max
&& in
->is_auth() &&
1885 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1886 share_inode_max_size(in
);
1888 issue_caps_set(need_issue
);
1890 utime_t now
= ceph_clock_now();
1891 mds
->balancer
->hit_inode(now
, in
, META_POP_IWR
);
1893 // auth unpin after issuing caps
1897 Capability
* Locker::issue_new_caps(CInode
*in
,
1903 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1906 // if replay, try to reconnect cap, and otherwise do nothing.
1908 mds
->mdcache
->try_reconnect_cap(in
, session
);
1913 assert(session
->info
.inst
.name
.is_client());
1914 client_t my_client
= session
->info
.inst
.name
.num();
1915 int my_want
= ceph_caps_for_mode(mode
);
1917 // register a capability
1918 Capability
*cap
= in
->get_client_cap(my_client
);
1921 cap
= in
->add_client_cap(my_client
, session
, realm
);
1922 cap
->set_wanted(my_want
);
1924 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1928 // make sure it wants sufficient caps
1929 if (my_want
& ~cap
->wanted()) {
1930 // augment wanted caps for this client
1931 cap
->set_wanted(cap
->wanted() | my_want
);
1935 if (in
->is_auth()) {
1936 // [auth] twiddle mode?
1937 eval(in
, CEPH_CAP_LOCKS
);
1939 if (_need_flush_mdlog(in
, my_want
))
1940 mds
->mdlog
->flush();
1943 // [replica] tell auth about any new caps wanted
1944 request_inode_file_caps(in
);
1947 // issue caps (pot. incl new one)
1948 //issue_caps(in); // note: _eval above may have done this already...
1950 // re-issue whatever we can
1951 //cap->issue(cap->pending());
1954 cap
->dec_suppress();
1960 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1962 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1966 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1968 // allowed caps are determined by the lock mode.
1969 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1970 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1971 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1973 client_t loner
= in
->get_loner();
1975 dout(7) << "issue_caps loner client." << loner
1976 << " allowed=" << ccap_string(loner_allowed
)
1977 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1978 << ", others allowed=" << ccap_string(all_allowed
)
1979 << " on " << *in
<< dendl
;
1981 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1982 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1983 << " on " << *in
<< dendl
;
1986 assert(in
->is_head());
1988 // count conflicts with
1992 map
<client_t
, Capability
*>::iterator it
;
1994 it
= in
->client_caps
.find(only_cap
->get_client());
1996 it
= in
->client_caps
.begin();
1997 for (; it
!= in
->client_caps
.end(); ++it
) {
1998 Capability
*cap
= it
->second
;
1999 if (cap
->is_stale())
2002 // do not issue _new_ bits when size|mtime is projected
2004 if (loner
== it
->first
)
2005 allowed
= loner_allowed
;
2007 allowed
= all_allowed
;
2009 // add in any xlocker-only caps (for locks this client is the xlocker for)
2010 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2012 Session
*session
= mds
->get_session(it
->first
);
2013 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
2014 !(session
&& session
->connection
&&
2015 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
2016 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2018 int pending
= cap
->pending();
2019 int wanted
= cap
->wanted();
2021 dout(20) << " client." << it
->first
2022 << " pending " << ccap_string(pending
)
2023 << " allowed " << ccap_string(allowed
)
2024 << " wanted " << ccap_string(wanted
)
2027 if (!(pending
& ~allowed
)) {
2028 // skip if suppress or new, and not revocation
2029 if (cap
->is_new() || cap
->is_suppress()) {
2030 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2035 // notify clients about deleted inode, to make sure they release caps ASAP.
2036 if (in
->inode
.nlink
== 0)
2037 wanted
|= CEPH_CAP_LINK_SHARED
;
2039 // are there caps that the client _wants_ and can have, but aren't pending?
2040 // or do we need to revoke?
2041 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2042 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2046 // include caps that clients generally like, while we're at it.
2047 int likes
= in
->get_caps_liked();
2048 int before
= pending
;
2050 if (pending
& ~allowed
)
2051 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2053 seq
= cap
->issue((wanted
|likes
) & allowed
);
2054 int after
= cap
->pending();
2056 if (cap
->is_new()) {
2057 // haven't send caps to client yet
2058 if (before
& ~after
)
2059 cap
->confirm_receipt(seq
, after
);
2061 dout(7) << " sending MClientCaps to client." << it
->first
2062 << " seq " << cap
->get_last_seq()
2063 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2066 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2067 if (op
== CEPH_CAP_OP_REVOKE
) {
2068 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2069 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2070 cap
->set_last_revoke_stamp(ceph_clock_now());
2071 cap
->reset_num_revoke_warnings();
2074 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2075 in
->find_snaprealm()->inode
->ino(),
2076 cap
->get_cap_id(), cap
->get_last_seq(),
2079 mds
->get_osd_epoch_barrier());
2080 in
->encode_cap_message(m
, cap
);
2082 mds
->send_message_client_counted(m
, it
->first
);
2090 return (nissued
== 0); // true if no re-issued, no callbacks
2093 void Locker::issue_truncate(CInode
*in
)
2095 dout(7) << "issue_truncate on " << *in
<< dendl
;
2097 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2098 it
!= in
->client_caps
.end();
2100 Capability
*cap
= it
->second
;
2101 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2103 in
->find_snaprealm()->inode
->ino(),
2104 cap
->get_cap_id(), cap
->get_last_seq(),
2105 cap
->pending(), cap
->wanted(), 0,
2107 mds
->get_osd_epoch_barrier());
2108 in
->encode_cap_message(m
, cap
);
2109 mds
->send_message_client_counted(m
, it
->first
);
2112 // should we increase max_size?
2113 if (in
->is_auth() && in
->is_file())
2114 check_inode_max_size(in
);
2118 void Locker::revoke_stale_caps(Capability
*cap
)
2120 CInode
*in
= cap
->get_inode();
2121 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2122 // if export succeeds, the cap will be removed. if export fails, we need to
2123 // revoke the cap if it's still stale.
2124 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2128 int issued
= cap
->issued();
2129 if (issued
& ~CEPH_CAP_PIN
) {
2130 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2133 if (in
->is_auth() &&
2134 in
->inode
.client_ranges
.count(cap
->get_client()))
2135 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2137 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2138 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2139 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2140 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2142 if (in
->is_auth()) {
2143 try_eval(in
, CEPH_CAP_LOCKS
);
2145 request_inode_file_caps(in
);
2150 void Locker::revoke_stale_caps(Session
*session
)
2152 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2154 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2155 Capability
*cap
= *p
;
2157 revoke_stale_caps(cap
);
2161 void Locker::resume_stale_caps(Session
*session
)
2163 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2165 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2166 Capability
*cap
= *p
;
2167 CInode
*in
= cap
->get_inode();
2168 assert(in
->is_head());
2169 if (cap
->is_stale()) {
2170 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2173 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2174 // if export succeeds, the cap will be removed. if export fails,
2175 // we need to re-issue the cap if it's not stale.
2176 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2180 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2181 issue_caps(in
, cap
);
2186 void Locker::remove_stale_leases(Session
*session
)
2188 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2189 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2191 ClientLease
*l
= *p
;
2193 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2194 dout(15) << " removing lease on " << *parent
<< dendl
;
2195 parent
->remove_client_lease(l
, this);
2200 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2203 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2204 in
->get(CInode::PIN_PTRWAITER
);
2206 void finish(int r
) override
{
2208 locker
->request_inode_file_caps(in
);
2209 in
->put(CInode::PIN_PTRWAITER
);
2213 void Locker::request_inode_file_caps(CInode
*in
)
2215 assert(!in
->is_auth());
2217 int wanted
= in
->get_caps_wanted() & ~CEPH_CAP_PIN
;
2218 if (wanted
!= in
->replica_caps_wanted
) {
2219 // wait for single auth
2220 if (in
->is_ambiguous_auth()) {
2221 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2222 new C_MDL_RequestInodeFileCaps(this, in
));
2226 mds_rank_t auth
= in
->authority().first
;
2227 if (mds
->is_cluster_degraded() &&
2228 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2229 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2233 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2234 << " was " << ccap_string(in
->replica_caps_wanted
)
2235 << " on " << *in
<< " to mds." << auth
<< dendl
;
2237 in
->replica_caps_wanted
= wanted
;
2239 if (!mds
->is_cluster_degraded() ||
2240 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2241 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2246 /* This function DOES put the passed message before returning */
2247 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2249 // nobody should be talking to us during recovery.
2250 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2253 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2254 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2257 assert(in
->is_auth());
2259 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2262 in
->mds_caps_wanted
[from
] = m
->get_caps();
2264 in
->mds_caps_wanted
.erase(from
);
2266 try_eval(in
, CEPH_CAP_LOCKS
);
2271 class C_MDL_CheckMaxSize
: public LockerContext
{
2273 uint64_t new_max_size
;
2278 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2279 uint64_t _newsize
, utime_t _mtime
) :
2280 LockerContext(l
), in(i
),
2281 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2283 in
->get(CInode::PIN_PTRWAITER
);
2285 void finish(int r
) override
{
2287 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2288 in
->put(CInode::PIN_PTRWAITER
);
2292 uint64_t Locker::calc_new_max_size(CInode::mempool_inode
*pi
, uint64_t size
)
2294 uint64_t new_max
= (size
+ 1) << 1;
2295 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2297 max_inc
*= pi
->layout
.object_size
;
2298 new_max
= MIN(new_max
, size
+ max_inc
);
2300 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2303 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2304 CInode::mempool_inode::client_range_map
*new_ranges
,
2305 bool *max_increased
)
2307 auto latest
= in
->get_projected_inode();
2309 if(latest
->has_layout()) {
2310 ms
= calc_new_max_size(latest
, size
);
2312 // Layout-less directories like ~mds0/, have zero size
2316 // increase ranges as appropriate.
2317 // shrink to 0 if no WR|BUFFER caps issued.
2318 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2319 p
!= in
->client_caps
.end();
2321 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2322 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2324 if (latest
->client_ranges
.count(p
->first
)) {
2325 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2326 if (ms
> oldr
.range
.last
)
2327 *max_increased
= true;
2328 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2329 nr
.follows
= oldr
.follows
;
2331 *max_increased
= true;
2333 nr
.follows
= in
->first
- 1;
2339 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2340 uint64_t new_max_size
, uint64_t new_size
,
2343 assert(in
->is_auth());
2344 assert(in
->is_file());
2346 CInode::mempool_inode
*latest
= in
->get_projected_inode();
2347 CInode::mempool_inode::client_range_map new_ranges
;
2348 uint64_t size
= latest
->size
;
2349 bool update_size
= new_size
> 0;
2350 bool update_max
= false;
2351 bool max_increased
= false;
2354 new_size
= size
= MAX(size
, new_size
);
2355 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2356 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2357 update_size
= false;
2360 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2362 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2365 if (!update_size
&& !update_max
) {
2366 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2370 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2371 << " update_size " << update_size
2372 << " on " << *in
<< dendl
;
2374 if (in
->is_frozen()) {
2375 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2376 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2380 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2383 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2385 if (in
->filelock
.is_stable()) {
2386 if (in
->get_target_loner() >= 0)
2387 file_excl(&in
->filelock
);
2389 simple_lock(&in
->filelock
);
2391 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2393 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2398 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2399 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2404 MutationRef
mut(new MutationImpl());
2405 mut
->ls
= mds
->mdlog
->get_current_segment();
2407 auto &pi
= in
->project_inode();
2408 pi
.inode
.version
= in
->pre_dirty();
2411 dout(10) << "check_inode_max_size client_ranges " << pi
.inode
.client_ranges
<< " -> " << new_ranges
<< dendl
;
2412 pi
.inode
.client_ranges
= new_ranges
;
2416 dout(10) << "check_inode_max_size size " << pi
.inode
.size
<< " -> " << new_size
<< dendl
;
2417 pi
.inode
.size
= new_size
;
2418 pi
.inode
.rstat
.rbytes
= new_size
;
2419 dout(10) << "check_inode_max_size mtime " << pi
.inode
.mtime
<< " -> " << new_mtime
<< dendl
;
2420 pi
.inode
.mtime
= new_mtime
;
2421 if (new_mtime
> pi
.inode
.ctime
)
2422 pi
.inode
.ctime
= pi
.inode
.rstat
.rctime
= new_mtime
;
2425 // use EOpen if the file is still open; otherwise, use EUpdate.
2426 // this is just an optimization to push open files forward into
2427 // newer log segments.
2429 EMetaBlob
*metablob
;
2430 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2431 EOpen
*eo
= new EOpen(mds
->mdlog
);
2432 eo
->add_ino(in
->ino());
2433 metablob
= &eo
->metablob
;
2435 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2437 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2438 metablob
= &eu
->metablob
;
2441 mds
->mdlog
->start_entry(le
);
2442 if (update_size
) { // FIXME if/when we do max_size nested accounting
2443 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2445 CDentry
*parent
= in
->get_projected_parent_dn();
2446 metablob
->add_primary_dentry(parent
, in
, true);
2448 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2449 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2451 mds
->mdlog
->submit_entry(le
,
2452 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2453 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2456 // make max_size _increase_ timely
2458 mds
->mdlog
->flush();
2464 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2467 * only share if currently issued a WR cap. if client doesn't have it,
2468 * file_max doesn't matter, and the client will get it if/when they get
2471 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2472 map
<client_t
, Capability
*>::iterator it
;
2474 it
= in
->client_caps
.find(only_cap
->get_client());
2476 it
= in
->client_caps
.begin();
2477 for (; it
!= in
->client_caps
.end(); ++it
) {
2478 const client_t client
= it
->first
;
2479 Capability
*cap
= it
->second
;
2480 if (cap
->is_suppress())
2482 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2483 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2484 cap
->inc_last_seq();
2485 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2487 in
->find_snaprealm()->inode
->ino(),
2488 cap
->get_cap_id(), cap
->get_last_seq(),
2489 cap
->pending(), cap
->wanted(), 0,
2491 mds
->get_osd_epoch_barrier());
2492 in
->encode_cap_message(m
, cap
);
2493 mds
->send_message_client_counted(m
, client
);
2500 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2502 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2503 * pending mutations. */
2504 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2505 in
->filelock
.is_unstable_and_locked()) ||
2506 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2507 in
->authlock
.is_unstable_and_locked()) ||
2508 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2509 in
->linklock
.is_unstable_and_locked()) ||
2510 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2511 in
->xattrlock
.is_unstable_and_locked()))
2516 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2518 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2519 dout(10) << " wanted " << ccap_string(cap
->wanted())
2520 << " -> " << ccap_string(wanted
) << dendl
;
2521 cap
->set_wanted(wanted
);
2522 } else if (wanted
& ~cap
->wanted()) {
2523 dout(10) << " wanted " << ccap_string(cap
->wanted())
2524 << " -> " << ccap_string(wanted
)
2525 << " (added caps even though we had seq mismatch!)" << dendl
;
2526 cap
->set_wanted(wanted
| cap
->wanted());
2528 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2529 << " -> " << ccap_string(wanted
)
2530 << " (issue_seq " << issue_seq
<< " != last_issue "
2531 << cap
->get_last_issue() << ")" << dendl
;
2535 CInode
*cur
= cap
->get_inode();
2536 if (!cur
->is_auth()) {
2537 request_inode_file_caps(cur
);
2541 if (cap
->wanted() == 0) {
2542 if (cur
->item_open_file
.is_on_list() &&
2543 !cur
->is_any_caps_wanted()) {
2544 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2545 cur
->item_open_file
.remove_myself();
2548 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2549 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2550 CEPH_CAP_FILE_WR
))) {
2551 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2554 if (!cur
->item_open_file
.is_on_list()) {
2555 dout(10) << " adding to open file list " << *cur
<< dendl
;
2556 assert(cur
->last
== CEPH_NOSNAP
);
2557 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2558 EOpen
*le
= new EOpen(mds
->mdlog
);
2559 mds
->mdlog
->start_entry(le
);
2560 le
->add_clean_inode(cur
);
2561 ls
->open_files
.push_back(&cur
->item_open_file
);
2562 mds
->mdlog
->submit_entry(le
);
2569 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2571 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2572 for (auto p
= head_in
->client_need_snapflush
.begin();
2573 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2574 snapid_t snapid
= p
->first
;
2575 auto &clients
= p
->second
;
2576 ++p
; // be careful, q loop below depends on this
2578 if (clients
.count(client
)) {
2579 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2580 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
2582 assert(sin
->first
<= snapid
);
2583 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2584 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2590 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2593 * This policy needs to be AT LEAST as permissive as allowing a client request
2594 * to go forward, or else a client request can release something, the release
2595 * gets deferred, but the request gets processed and deadlocks because when the
2596 * caps can't get revoked.
2598 * Currently, a request wait if anything locked is freezing (can't
2599 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2600 * _MUST_ be in the lock/auth_pin set.
2602 * auth_pins==0 implies no unstable lock and not auth pinnned by
2603 * client request, otherwise continue even it's freezing.
2605 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2609 * This function DOES put the passed message before returning
2611 void Locker::handle_client_caps(MClientCaps
*m
)
2613 client_t client
= m
->get_source().num();
2614 snapid_t follows
= m
->get_snap_follows();
2615 dout(7) << "handle_client_caps "
2616 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2617 << " on " << m
->get_ino()
2618 << " tid " << m
->get_client_tid() << " follows " << follows
2619 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2621 Session
*session
= mds
->get_session(m
);
2622 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2624 dout(5) << " no session, dropping " << *m
<< dendl
;
2628 if (session
->is_closed() ||
2629 session
->is_closing() ||
2630 session
->is_killing()) {
2631 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2635 if (mds
->is_reconnect() &&
2636 m
->get_dirty() && m
->get_client_tid() > 0 &&
2637 !session
->have_completed_flush(m
->get_client_tid())) {
2638 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2640 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2644 if (m
->get_client_tid() > 0 && session
&&
2645 session
->have_completed_flush(m
->get_client_tid())) {
2646 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2647 << " for client." << client
<< dendl
;
2649 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2650 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2651 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2653 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2654 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2655 mds
->get_osd_epoch_barrier());
2657 ack
->set_snap_follows(follows
);
2658 ack
->set_client_tid(m
->get_client_tid());
2659 mds
->send_message_client_counted(ack
, m
->get_connection());
2660 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2664 // fall-thru because the message may release some caps
2666 m
->set_op(CEPH_CAP_OP_UPDATE
);
2670 // "oldest flush tid" > 0 means client uses unique TID for each flush
2671 if (m
->get_oldest_flush_tid() > 0 && session
) {
2672 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2673 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2675 if (session
->get_num_trim_flushes_warnings() > 0 &&
2676 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2677 session
->reset_num_trim_flushes_warnings();
2679 if (session
->get_num_completed_flushes() >=
2680 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2681 session
->inc_num_trim_flushes_warnings();
2683 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2684 << m
->get_oldest_flush_tid() << "), "
2685 << session
->get_num_completed_flushes()
2686 << " completed flushes recorded in session";
2687 mds
->clog
->warn() << ss
.str();
2688 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2693 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2695 if (mds
->is_clientreplay()) {
2696 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2697 << ", will try again after replayed client requests" << dendl
;
2698 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2701 dout(1) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2706 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2707 // Pause RADOS operations until we see the required epoch
2708 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2711 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2712 // Record the barrier so that we will retransmit it to clients
2713 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2716 dout(10) << " head inode " << *head_in
<< dendl
;
2718 Capability
*cap
= 0;
2719 cap
= head_in
->get_client_cap(client
);
2721 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
2728 if (should_defer_client_cap_frozen(head_in
)) {
2729 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
2730 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2733 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2734 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2735 << ", dropping" << dendl
;
2740 int op
= m
->get_op();
2743 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2744 if (!head_in
->is_auth()) {
2745 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
2749 SnapRealm
*realm
= head_in
->find_snaprealm();
2750 snapid_t snap
= realm
->get_snap_following(follows
);
2751 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2753 CInode
*in
= head_in
;
2754 if (snap
!= CEPH_NOSNAP
) {
2755 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
2757 dout(10) << " snapped inode " << *in
<< dendl
;
2760 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2761 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2762 // case we get a dup response, so whatever.)
2763 MClientCaps
*ack
= 0;
2764 if (m
->get_dirty()) {
2765 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2766 ack
->set_snap_follows(follows
);
2767 ack
->set_client_tid(m
->get_client_tid());
2768 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2771 if (in
== head_in
||
2772 (head_in
->client_need_snapflush
.count(snap
) &&
2773 head_in
->client_need_snapflush
[snap
].count(client
))) {
2774 dout(7) << " flushsnap snap " << snap
2775 << " client." << client
<< " on " << *in
<< dendl
;
2777 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2779 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2780 else if (head_in
->client_need_snapflush
.begin()->first
< snap
)
2781 _do_null_snapflush(head_in
, client
, snap
);
2783 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2786 head_in
->remove_need_snapflush(in
, snap
, client
);
2788 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2790 mds
->send_message_client_counted(ack
, m
->get_connection());
2795 if (cap
->get_cap_id() != m
->get_cap_id()) {
2796 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2798 CInode
*in
= head_in
;
2800 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2801 // intermediate snap inodes
2802 while (in
!= head_in
) {
2803 assert(in
->last
!= CEPH_NOSNAP
);
2804 if (in
->is_auth() && m
->get_dirty()) {
2805 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2806 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2808 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2812 // head inode, and cap
2813 MClientCaps
*ack
= 0;
2815 int caps
= m
->get_caps();
2816 if (caps
& ~cap
->issued()) {
2817 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2818 caps
&= cap
->issued();
2821 cap
->confirm_receipt(m
->get_seq(), caps
);
2822 dout(10) << " follows " << follows
2823 << " retains " << ccap_string(m
->get_caps())
2824 << " dirty " << ccap_string(m
->get_dirty())
2825 << " on " << *in
<< dendl
;
2828 // missing/skipped snapflush?
2829 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2830 // presently only does so when it has actual dirty metadata. But, we
2831 // set up the need_snapflush stuff based on the issued caps.
2832 // We can infer that the client WONT send a FLUSHSNAP once they have
2833 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2835 if (!head_in
->client_need_snapflush
.empty()) {
2836 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2837 _do_null_snapflush(head_in
, client
);
2839 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2843 if (m
->get_dirty() && in
->is_auth()) {
2844 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2845 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2846 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2847 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2848 ack
->set_client_tid(m
->get_client_tid());
2849 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2852 // filter wanted based on what we could ever give out (given auth/replica status)
2853 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2854 int new_wanted
= m
->get_wanted() & head_in
->get_caps_allowed_ever();
2855 if (new_wanted
!= cap
->wanted()) {
2856 if (!need_flush
&& (new_wanted
& ~cap
->pending())) {
2857 // exapnding caps. make sure we aren't waiting for a log flush
2858 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2861 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2864 if (in
->is_auth() &&
2865 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2867 eval(in
, CEPH_CAP_LOCKS
);
2869 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2870 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2872 // no update, ack now.
2874 mds
->send_message_client_counted(ack
, m
->get_connection());
2876 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2877 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2878 issue_caps(in
, cap
);
2880 if (cap
->get_last_seq() == 0 &&
2881 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2882 cap
->issue_norevoke(cap
->issued());
2883 share_inode_max_size(in
, cap
);
2888 mds
->mdlog
->flush();
2896 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2898 ceph_mds_request_release item
;
2900 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2901 LockerContext(l
), client(c
), item(it
) { }
2902 void finish(int r
) override
{
2904 MDRequestRef null_ref
;
2905 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2909 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2910 boost::string_view dname
)
2912 inodeno_t ino
= (uint64_t)item
.ino
;
2913 uint64_t cap_id
= item
.cap_id
;
2914 int caps
= item
.caps
;
2915 int wanted
= item
.wanted
;
2917 int issue_seq
= item
.issue_seq
;
2918 int mseq
= item
.mseq
;
2920 CInode
*in
= mdcache
->get_inode(ino
);
2924 if (dname
.length()) {
2925 frag_t fg
= in
->pick_dirfrag(dname
);
2926 CDir
*dir
= in
->get_dirfrag(fg
);
2928 CDentry
*dn
= dir
->lookup(dname
);
2930 ClientLease
*l
= dn
->get_client_lease(client
);
2932 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2933 dn
->remove_client_lease(l
, this);
2935 dout(7) << "process_cap_release client." << client
2936 << " doesn't have lease on " << *dn
<< dendl
;
2939 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2940 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2945 Capability
*cap
= in
->get_client_cap(client
);
2949 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2950 << (mdr
? "" : " (DEFERRED, no mdr)")
2953 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2954 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2958 if (cap
->get_cap_id() != cap_id
) {
2959 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2963 if (should_defer_client_cap_frozen(in
)) {
2964 dout(7) << " frozen, deferring" << dendl
;
2965 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2969 if (caps
& ~cap
->issued()) {
2970 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2971 caps
&= cap
->issued();
2973 cap
->confirm_receipt(seq
, caps
);
2975 if (!in
->client_need_snapflush
.empty() &&
2976 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2977 _do_null_snapflush(in
, client
);
2980 adjust_cap_wanted(cap
, wanted
, issue_seq
);
2983 cap
->inc_suppress();
2984 eval(in
, CEPH_CAP_LOCKS
);
2986 cap
->dec_suppress();
2988 // take note; we may need to reissue on this cap later
2990 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
2993 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
2998 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
2999 LockerContext(l
), in(i
), client(c
), seq(s
) {
3000 in
->get(CInode::PIN_PTRWAITER
);
3002 void finish(int r
) override
{
3003 locker
->kick_issue_caps(in
, client
, seq
);
3004 in
->put(CInode::PIN_PTRWAITER
);
3008 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3010 Capability
*cap
= in
->get_client_cap(client
);
3011 if (!cap
|| cap
->get_last_sent() != seq
)
3013 if (in
->is_frozen()) {
3014 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3015 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3016 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3019 dout(10) << "kick_issue_caps released at current seq " << seq
3020 << ", reissuing" << dendl
;
3021 issue_caps(in
, cap
);
3024 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3026 client_t client
= mdr
->get_client();
3027 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3028 p
!= mdr
->cap_releases
.end();
3030 CInode
*in
= mdcache
->get_inode(p
->first
);
3033 kick_issue_caps(in
, client
, p
->second
);
3038 * m and ack might be NULL, so don't dereference them unless dirty != 0
3040 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3042 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3043 << " follows " << follows
<< " snap " << snap
3044 << " on " << *in
<< dendl
;
3046 if (snap
== CEPH_NOSNAP
) {
3047 // hmm, i guess snap was already deleted? just ack!
3048 dout(10) << " wow, the snap following " << follows
3049 << " was already deleted. nothing to record, just ack." << dendl
;
3051 mds
->send_message_client_counted(ack
, m
->get_connection());
3055 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3056 mds
->mdlog
->start_entry(le
);
3057 MutationRef mut
= new MutationImpl();
3058 mut
->ls
= mds
->mdlog
->get_current_segment();
3060 // normal metadata updates that we can apply to the head as well.
3063 CInode::mempool_xattr_map
*px
= nullptr;
3064 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3065 m
->xattrbl
.length() &&
3066 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3068 CInode::mempool_old_inode
*oi
= 0;
3069 if (in
->is_multiversion()) {
3070 oi
= in
->pick_old_inode(snap
);
3073 CInode::mempool_inode
*i
;
3075 dout(10) << " writing into old inode" << dendl
;
3076 auto &pi
= in
->project_inode();
3077 pi
.inode
.version
= in
->pre_dirty();
3078 if (snap
> oi
->first
)
3079 in
->split_old_inode(snap
);
3084 auto &pi
= in
->project_inode(xattrs
);
3085 pi
.inode
.version
= in
->pre_dirty();
3088 px
= pi
.xattrs
.get();
3091 _update_cap_fields(in
, dirty
, m
, i
);
3095 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3096 << " len " << m
->xattrbl
.length() << dendl
;
3097 i
->xattr_version
= m
->head
.xattr_version
;
3098 bufferlist::iterator p
= m
->xattrbl
.begin();
3103 auto it
= i
->client_ranges
.find(client
);
3104 if (it
!= i
->client_ranges
.end()) {
3105 if (in
->last
== snap
) {
3106 dout(10) << " removing client_range entirely" << dendl
;
3107 i
->client_ranges
.erase(it
);
3109 dout(10) << " client_range now follows " << snap
<< dendl
;
3110 it
->second
.follows
= snap
;
3116 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3117 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3119 // "oldest flush tid" > 0 means client uses unique TID for each flush
3120 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3121 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3122 ack
->get_oldest_flush_tid());
3124 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3128 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, CInode::mempool_inode
*pi
)
3133 /* m must be valid if there are dirty caps */
3135 uint64_t features
= m
->get_connection()->get_features();
3137 if (m
->get_ctime() > pi
->ctime
) {
3138 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3139 << " for " << *in
<< dendl
;
3140 pi
->ctime
= pi
->rstat
.rctime
= m
->get_ctime();
3143 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3144 m
->get_change_attr() > pi
->change_attr
) {
3145 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3146 << " for " << *in
<< dendl
;
3147 pi
->change_attr
= m
->get_change_attr();
3151 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3152 utime_t atime
= m
->get_atime();
3153 utime_t mtime
= m
->get_mtime();
3154 uint64_t size
= m
->get_size();
3155 version_t inline_version
= m
->inline_version
;
3157 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3158 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3159 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3160 << " for " << *in
<< dendl
;
3163 if (in
->inode
.is_file() && // ONLY if regular file
3165 dout(7) << " size " << pi
->size
<< " -> " << size
3166 << " for " << *in
<< dendl
;
3168 pi
->rstat
.rbytes
= size
;
3170 if (in
->inode
.is_file() &&
3171 (dirty
& CEPH_CAP_FILE_WR
) &&
3172 inline_version
> pi
->inline_data
.version
) {
3173 pi
->inline_data
.version
= inline_version
;
3174 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3175 pi
->inline_data
.get_data() = m
->inline_data
;
3177 pi
->inline_data
.free_data();
3179 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3180 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3181 << " for " << *in
<< dendl
;
3184 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3185 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3186 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3187 << " for " << *in
<< dendl
;
3188 pi
->time_warp_seq
= m
->get_time_warp_seq();
3192 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3193 if (m
->head
.uid
!= pi
->uid
) {
3194 dout(7) << " uid " << pi
->uid
3195 << " -> " << m
->head
.uid
3196 << " for " << *in
<< dendl
;
3197 pi
->uid
= m
->head
.uid
;
3199 if (m
->head
.gid
!= pi
->gid
) {
3200 dout(7) << " gid " << pi
->gid
3201 << " -> " << m
->head
.gid
3202 << " for " << *in
<< dendl
;
3203 pi
->gid
= m
->head
.gid
;
3205 if (m
->head
.mode
!= pi
->mode
) {
3206 dout(7) << " mode " << oct
<< pi
->mode
3207 << " -> " << m
->head
.mode
<< dec
3208 << " for " << *in
<< dendl
;
3209 pi
->mode
= m
->head
.mode
;
3211 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3212 dout(7) << " btime " << oct
<< pi
->btime
3213 << " -> " << m
->get_btime() << dec
3214 << " for " << *in
<< dendl
;
3215 pi
->btime
= m
->get_btime();
3221 * update inode based on cap flush|flushsnap|wanted.
3222 * adjust max_size, if needed.
3223 * if we update, return true; otherwise, false (no updated needed).
3225 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3226 int dirty
, snapid_t follows
,
3227 MClientCaps
*m
, MClientCaps
*ack
,
3230 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3231 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3232 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3233 << " on " << *in
<< dendl
;
3234 assert(in
->is_auth());
3235 client_t client
= m
->get_source().num();
3236 CInode::mempool_inode
*latest
= in
->get_projected_inode();
3238 // increase or zero max_size?
3239 uint64_t size
= m
->get_size();
3240 bool change_max
= false;
3241 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3242 uint64_t new_max
= old_max
;
3244 if (in
->is_file()) {
3245 bool forced_change_max
= false;
3246 dout(20) << "inode is file" << dendl
;
3247 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3248 dout(20) << "client has write caps; m->get_max_size="
3249 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3250 if (m
->get_max_size() > new_max
) {
3251 dout(10) << "client requests file_max " << m
->get_max_size()
3252 << " > max " << old_max
<< dendl
;
3254 forced_change_max
= true;
3255 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3257 new_max
= calc_new_max_size(latest
, size
);
3259 if (new_max
> old_max
)
3271 if (in
->last
== CEPH_NOSNAP
&&
3273 !in
->filelock
.can_wrlock(client
) &&
3274 !in
->filelock
.can_force_wrlock(client
)) {
3275 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3276 if (in
->filelock
.is_stable()) {
3277 bool need_issue
= false;
3279 cap
->inc_suppress();
3280 if (in
->mds_caps_wanted
.empty() &&
3281 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3282 if (in
->filelock
.get_state() != LOCK_EXCL
)
3283 file_excl(&in
->filelock
, &need_issue
);
3285 simple_lock(&in
->filelock
, &need_issue
);
3289 cap
->dec_suppress();
3291 if (!in
->filelock
.can_wrlock(client
) &&
3292 !in
->filelock
.can_force_wrlock(client
)) {
3293 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3294 forced_change_max
? new_max
: 0,
3297 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3303 if (m
->flockbl
.length()) {
3305 bufferlist::iterator bli
= m
->flockbl
.begin();
3306 ::decode(num_locks
, bli
);
3307 for ( int i
=0; i
< num_locks
; ++i
) {
3308 ceph_filelock decoded_lock
;
3309 ::decode(decoded_lock
, bli
);
3310 in
->get_fcntl_lock_state()->held_locks
.
3311 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3312 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3314 ::decode(num_locks
, bli
);
3315 for ( int i
=0; i
< num_locks
; ++i
) {
3316 ceph_filelock decoded_lock
;
3317 ::decode(decoded_lock
, bli
);
3318 in
->get_flock_lock_state()->held_locks
.
3319 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3320 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3324 if (!dirty
&& !change_max
)
3327 Session
*session
= mds
->get_session(m
);
3328 if (session
->check_access(in
, MAY_WRITE
,
3329 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3330 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3335 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3336 mds
->mdlog
->start_entry(le
);
3338 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3339 m
->xattrbl
.length() &&
3340 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3342 auto &pi
= in
->project_inode(xattr
);
3343 pi
.inode
.version
= in
->pre_dirty();
3345 MutationRef
mut(new MutationImpl());
3346 mut
->ls
= mds
->mdlog
->get_current_segment();
3348 _update_cap_fields(in
, dirty
, m
, &pi
.inode
);
3351 dout(7) << " max_size " << old_max
<< " -> " << new_max
3352 << " for " << *in
<< dendl
;
3354 auto &cr
= pi
.inode
.client_ranges
[client
];
3356 cr
.range
.last
= new_max
;
3357 cr
.follows
= in
->first
- 1;
3359 pi
.inode
.client_ranges
.erase(client
);
3362 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3363 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3366 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3367 wrlock_force(&in
->authlock
, mut
);
3371 dout(7) << " xattrs v" << pi
.inode
.xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3372 pi
.inode
.xattr_version
= m
->head
.xattr_version
;
3373 bufferlist::iterator p
= m
->xattrbl
.begin();
3374 ::decode(*pi
.xattrs
, p
);
3375 wrlock_force(&in
->xattrlock
, mut
);
3379 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3380 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3382 // "oldest flush tid" > 0 means client uses unique TID for each flush
3383 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3384 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3385 ack
->get_oldest_flush_tid());
3387 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3390 if (need_flush
&& !*need_flush
&&
3391 ((change_max
&& new_max
) || // max INCREASE
3392 _need_flush_mdlog(in
, dirty
)))
3398 /* This function DOES put the passed message before returning */
3399 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3401 client_t client
= m
->get_source().num();
3402 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3404 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3405 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3409 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3410 // Pause RADOS operations until we see the required epoch
3411 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3414 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3415 // Record the barrier so that we will retransmit it to clients
3416 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3419 Session
*session
= mds
->get_session(m
);
3421 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3422 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3426 session
->notify_cap_release(m
->caps
.size());
3432 class C_Locker_RetryCapRelease
: public LockerContext
{
3436 ceph_seq_t migrate_seq
;
3437 ceph_seq_t issue_seq
;
3439 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3440 ceph_seq_t mseq
, ceph_seq_t seq
) :
3441 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3442 void finish(int r
) override
{
3443 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3447 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3448 ceph_seq_t mseq
, ceph_seq_t seq
)
3450 CInode
*in
= mdcache
->get_inode(ino
);
3452 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3455 Capability
*cap
= in
->get_client_cap(client
);
3457 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3461 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3462 if (cap
->get_cap_id() != cap_id
) {
3463 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3466 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3467 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3470 if (should_defer_client_cap_frozen(in
)) {
3471 dout(7) << " freezing|frozen, deferring" << dendl
;
3472 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3473 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3476 if (seq
!= cap
->get_last_issue()) {
3477 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3478 // clean out any old revoke history
3479 cap
->clean_revoke_from(seq
);
3480 eval_cap_gather(in
);
3483 remove_client_cap(in
, client
);
3486 /* This function DOES put the passed message before returning */
3488 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3490 // clean out any pending snapflush state
3491 if (!in
->client_need_snapflush
.empty())
3492 _do_null_snapflush(in
, client
);
3494 in
->remove_client_cap(client
);
3496 if (in
->is_auth()) {
3497 // make sure we clear out the client byte range
3498 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3499 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3500 check_inode_max_size(in
);
3502 request_inode_file_caps(in
);
3505 try_eval(in
, CEPH_CAP_LOCKS
);
3510 * Return true if any currently revoking caps exceed the
3511 * mds_session_timeout threshold.
3513 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
) const
3515 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3517 // No revoking caps at the moment
3520 utime_t now
= ceph_clock_now();
3521 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3522 if (age
<= g_conf
->mds_session_timeout
) {
3531 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
) const
3533 if (!any_late_revoking_caps(revoking_caps
)) {
3534 // Fast path: no misbehaving clients, execute in O(1)
3538 // Slow path: execute in O(N_clients)
3539 std::map
<client_t
, xlist
<Capability
*> >::const_iterator client_rc_iter
;
3540 for (client_rc_iter
= revoking_caps_by_client
.begin();
3541 client_rc_iter
!= revoking_caps_by_client
.end(); ++client_rc_iter
) {
3542 xlist
<Capability
*> const &client_rc
= client_rc_iter
->second
;
3543 bool any_late
= any_late_revoking_caps(client_rc
);
3545 result
->push_back(client_rc_iter
->first
);
3550 // Hard-code instead of surfacing a config settings because this is
3551 // really a hack that should go away at some point when we have better
3552 // inspection tools for getting at detailed cap state (#7316)
3553 #define MAX_WARN_CAPS 100
3555 void Locker::caps_tick()
3557 utime_t now
= ceph_clock_now();
3559 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3562 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3563 Capability
*cap
= *p
;
3565 utime_t age
= now
- cap
->get_last_revoke_stamp();
3566 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3567 if (age
<= g_conf
->mds_session_timeout
) {
3568 dout(20) << __func__
<< " age below timeout " << g_conf
->mds_session_timeout
<< dendl
;
3572 if (i
> MAX_WARN_CAPS
) {
3573 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3574 << "revoking, ignoring subsequent caps" << dendl
;
3578 // exponential backoff of warning intervals
3579 if (age
> g_conf
->mds_session_timeout
* (1 << cap
->get_num_revoke_warnings())) {
3580 cap
->inc_num_revoke_warnings();
3582 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3583 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3584 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3585 mds
->clog
->warn() << ss
.str();
3586 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3588 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3594 void Locker::handle_client_lease(MClientLease
*m
)
3596 dout(10) << "handle_client_lease " << *m
<< dendl
;
3598 assert(m
->get_source().is_client());
3599 client_t client
= m
->get_source().num();
3601 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3603 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3609 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3610 CDir
*dir
= in
->get_dirfrag(fg
);
3612 dn
= dir
->lookup(m
->dname
);
3614 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3618 dout(10) << " on " << *dn
<< dendl
;
3621 ClientLease
*l
= dn
->get_client_lease(client
);
3623 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3628 switch (m
->get_action()) {
3629 case CEPH_MDS_LEASE_REVOKE_ACK
:
3630 case CEPH_MDS_LEASE_RELEASE
:
3631 if (l
->seq
!= m
->get_seq()) {
3632 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3634 dout(7) << "handle_client_lease client." << client
3635 << " on " << *dn
<< dendl
;
3636 dn
->remove_client_lease(l
, this);
3641 case CEPH_MDS_LEASE_RENEW
:
3643 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3644 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3645 if (dn
->lock
.can_lease(client
)) {
3646 int pool
= 1; // fixme.. do something smart!
3647 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3648 m
->h
.seq
= ++l
->seq
;
3651 utime_t now
= ceph_clock_now();
3652 now
+= mdcache
->client_lease_durations
[pool
];
3653 mdcache
->touch_client_lease(l
, pool
, now
);
3655 mds
->send_message_client_counted(m
, m
->get_connection());
3661 ceph_abort(); // implement me
3667 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3668 bufferlist
&bl
, utime_t now
, Session
*session
)
3670 CInode
*diri
= dn
->get_dir()->get_inode();
3671 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3672 ((!diri
->filelock
.can_lease(client
) &&
3673 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3674 dn
->lock
.can_lease(client
)) {
3675 int pool
= 1; // fixme.. do something smart!
3676 // issue a dentry lease
3677 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3678 session
->touch_lease(l
);
3680 now
+= mdcache
->client_lease_durations
[pool
];
3681 mdcache
->touch_client_lease(l
, pool
, now
);
3684 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3686 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3688 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3689 << " on " << *dn
<< dendl
;
3697 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3702 void Locker::revoke_client_leases(SimpleLock
*lock
)
3705 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3706 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3707 p
!= dn
->client_lease_map
.end();
3709 ClientLease
*l
= p
->second
;
3712 assert(lock
->get_type() == CEPH_LOCK_DN
);
3714 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3715 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3717 // i should also revoke the dir ICONTENT lease, if they have it!
3718 CInode
*diri
= dn
->get_dir()->get_inode();
3719 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3722 diri
->first
, CEPH_NOSNAP
,
3730 // locks ----------------------------------------------------------------
3732 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3734 switch (lock_type
) {
3737 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3738 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3743 fg
= diri
->pick_dirfrag(info
.dname
);
3744 dir
= diri
->get_dirfrag(fg
);
3746 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3749 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3755 case CEPH_LOCK_IAUTH
:
3756 case CEPH_LOCK_ILINK
:
3757 case CEPH_LOCK_IDFT
:
3758 case CEPH_LOCK_IFILE
:
3759 case CEPH_LOCK_INEST
:
3760 case CEPH_LOCK_IXATTR
:
3761 case CEPH_LOCK_ISNAP
:
3762 case CEPH_LOCK_IFLOCK
:
3763 case CEPH_LOCK_IPOLICY
:
3765 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3767 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3770 switch (lock_type
) {
3771 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3772 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3773 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3774 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3775 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3776 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3777 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3778 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3779 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3784 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3792 /* This function DOES put the passed message before returning */
3793 void Locker::handle_lock(MLock
*m
)
3795 // nobody should be talking to us during recovery.
3796 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3798 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3800 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3805 switch (lock
->get_type()) {
3807 case CEPH_LOCK_IAUTH
:
3808 case CEPH_LOCK_ILINK
:
3809 case CEPH_LOCK_ISNAP
:
3810 case CEPH_LOCK_IXATTR
:
3811 case CEPH_LOCK_IFLOCK
:
3812 case CEPH_LOCK_IPOLICY
:
3813 handle_simple_lock(lock
, m
);
3816 case CEPH_LOCK_IDFT
:
3817 case CEPH_LOCK_INEST
:
3818 //handle_scatter_lock((ScatterLock*)lock, m);
3821 case CEPH_LOCK_IFILE
:
3822 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3826 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3836 // ==========================================================================
3839 /** This function may take a reference to m if it needs one, but does
3840 * not put references. */
3841 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3843 MDSCacheObject
*parent
= lock
->get_parent();
3844 if (parent
->is_auth() &&
3845 lock
->get_state() != LOCK_SYNC
&&
3846 !parent
->is_frozen()) {
3847 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3848 << " on " << *parent
<< dendl
;
3849 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3850 if (lock
->is_stable()) {
3853 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3854 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3855 new C_MDS_RetryMessage(mds
, m
->get()));
3858 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3859 << " on " << *parent
<< dendl
;
3860 // replica should retry
3864 /* This function DOES put the passed message before returning */
3865 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3867 int from
= m
->get_asker();
3869 dout(10) << "handle_simple_lock " << *m
3870 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3872 if (mds
->is_rejoin()) {
3873 if (lock
->get_parent()->is_rejoining()) {
3874 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3875 << ", dropping " << *m
<< dendl
;
3881 switch (m
->get_action()) {
3884 assert(lock
->get_state() == LOCK_LOCK
);
3885 lock
->decode_locked_state(m
->get_data());
3886 lock
->set_state(LOCK_SYNC
);
3887 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3891 assert(lock
->get_state() == LOCK_SYNC
);
3892 lock
->set_state(LOCK_SYNC_LOCK
);
3893 if (lock
->is_leased())
3894 revoke_client_leases(lock
);
3895 eval_gather(lock
, true);
3896 if (lock
->is_unstable_and_locked())
3897 mds
->mdlog
->flush();
3902 case LOCK_AC_LOCKACK
:
3903 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3904 lock
->get_state() == LOCK_SYNC_EXCL
);
3905 assert(lock
->is_gathering(from
));
3906 lock
->remove_gather(from
);
3908 if (lock
->is_gathering()) {
3909 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3910 << ", still gathering " << lock
->get_gather_set() << dendl
;
3912 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3913 << ", last one" << dendl
;
3918 case LOCK_AC_REQRDLOCK
:
3919 handle_reqrdlock(lock
, m
);
3927 /* unused, currently.
3929 class C_Locker_SimpleEval : public Context {
3933 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3934 void finish(int r) {
3935 locker->try_simple_eval(lock);
3939 void Locker::try_simple_eval(SimpleLock *lock)
3941 // unstable and ambiguous auth?
3942 if (!lock->is_stable() &&
3943 lock->get_parent()->is_ambiguous_auth()) {
3944 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3945 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3946 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3950 if (!lock->get_parent()->is_auth()) {
3951 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3955 if (!lock->get_parent()->can_auth_pin()) {
3956 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3957 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3958 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3962 if (lock->is_stable())
3968 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3970 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3972 assert(lock
->get_parent()->is_auth());
3973 assert(lock
->is_stable());
3975 if (lock
->get_parent()->is_freezing_or_frozen()) {
3976 // dentry lock in unreadable state can block path traverse
3977 if ((lock
->get_type() != CEPH_LOCK_DN
||
3978 lock
->get_state() == LOCK_SYNC
||
3979 lock
->get_parent()->is_frozen()))
3983 if (mdcache
->is_readonly()) {
3984 if (lock
->get_state() != LOCK_SYNC
) {
3985 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3986 simple_sync(lock
, need_issue
);
3993 if (lock
->get_type() != CEPH_LOCK_DN
) {
3994 in
= static_cast<CInode
*>(lock
->get_parent());
3995 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
3999 if (lock
->get_state() != LOCK_EXCL
&&
4000 in
&& in
->get_target_loner() >= 0 &&
4001 (wanted
& CEPH_CAP_GEXCL
)) {
4002 dout(7) << "simple_eval stable, going to excl " << *lock
4003 << " on " << *lock
->get_parent() << dendl
;
4004 simple_excl(lock
, need_issue
);
4008 else if (lock
->get_state() != LOCK_SYNC
&&
4009 !lock
->is_wrlocked() &&
4010 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4011 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4012 dout(7) << "simple_eval stable, syncing " << *lock
4013 << " on " << *lock
->get_parent() << dendl
;
4014 simple_sync(lock
, need_issue
);
4021 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4023 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4024 assert(lock
->get_parent()->is_auth());
4025 assert(lock
->is_stable());
4028 if (lock
->get_cap_shift())
4029 in
= static_cast<CInode
*>(lock
->get_parent());
4031 int old_state
= lock
->get_state();
4033 if (old_state
!= LOCK_TSYN
) {
4035 switch (lock
->get_state()) {
4036 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4037 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4038 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4039 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4040 default: ceph_abort();
4044 if (lock
->is_wrlocked())
4047 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4048 send_lock_message(lock
, LOCK_AC_SYNC
);
4049 lock
->init_gather();
4053 if (in
&& in
->is_head()) {
4054 if (in
->issued_caps_need_gather(lock
)) {
4063 bool need_recover
= false;
4064 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4066 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4067 mds
->mdcache
->queue_file_recover(in
);
4068 need_recover
= true;
4073 if (!gather
&& lock
->is_dirty()) {
4074 lock
->get_parent()->auth_pin(lock
);
4075 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4076 mds
->mdlog
->flush();
4081 lock
->get_parent()->auth_pin(lock
);
4083 mds
->mdcache
->do_file_recover();
4088 if (lock
->get_parent()->is_replicated()) { // FIXME
4090 lock
->encode_locked_state(data
);
4091 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4093 lock
->set_state(LOCK_SYNC
);
4094 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4095 if (in
&& in
->is_head()) {
4104 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4106 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4107 assert(lock
->get_parent()->is_auth());
4108 assert(lock
->is_stable());
4111 if (lock
->get_cap_shift())
4112 in
= static_cast<CInode
*>(lock
->get_parent());
4114 switch (lock
->get_state()) {
4115 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4116 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4117 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4118 default: ceph_abort();
4122 if (lock
->is_rdlocked())
4124 if (lock
->is_wrlocked())
4127 if (lock
->get_parent()->is_replicated() &&
4128 lock
->get_state() != LOCK_LOCK_EXCL
&&
4129 lock
->get_state() != LOCK_XSYN_EXCL
) {
4130 send_lock_message(lock
, LOCK_AC_LOCK
);
4131 lock
->init_gather();
4135 if (in
&& in
->is_head()) {
4136 if (in
->issued_caps_need_gather(lock
)) {
4146 lock
->get_parent()->auth_pin(lock
);
4148 lock
->set_state(LOCK_EXCL
);
4149 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4159 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4161 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4162 assert(lock
->get_parent()->is_auth());
4163 assert(lock
->is_stable());
4164 assert(lock
->get_state() != LOCK_LOCK
);
4167 if (lock
->get_cap_shift())
4168 in
= static_cast<CInode
*>(lock
->get_parent());
4170 int old_state
= lock
->get_state();
4172 switch (lock
->get_state()) {
4173 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4174 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4175 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4176 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4177 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4179 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4180 default: ceph_abort();
4184 if (lock
->is_leased()) {
4186 revoke_client_leases(lock
);
4188 if (lock
->is_rdlocked())
4190 if (in
&& in
->is_head()) {
4191 if (in
->issued_caps_need_gather(lock
)) {
4200 bool need_recover
= false;
4201 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4203 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4204 mds
->mdcache
->queue_file_recover(in
);
4205 need_recover
= true;
4210 if (lock
->get_parent()->is_replicated() &&
4211 lock
->get_state() == LOCK_MIX_LOCK
&&
4213 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4215 // move to second stage of gather now, so we don't send the lock action later.
4216 if (lock
->get_state() == LOCK_MIX_LOCK
)
4217 lock
->set_state(LOCK_MIX_LOCK2
);
4219 if (lock
->get_parent()->is_replicated() &&
4220 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4222 send_lock_message(lock
, LOCK_AC_LOCK
);
4223 lock
->init_gather();
4227 if (!gather
&& lock
->is_dirty()) {
4228 lock
->get_parent()->auth_pin(lock
);
4229 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4230 mds
->mdlog
->flush();
4235 lock
->get_parent()->auth_pin(lock
);
4237 mds
->mdcache
->do_file_recover();
4239 lock
->set_state(LOCK_LOCK
);
4240 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4245 void Locker::simple_xlock(SimpleLock
*lock
)
4247 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4248 assert(lock
->get_parent()->is_auth());
4249 //assert(lock->is_stable());
4250 assert(lock
->get_state() != LOCK_XLOCK
);
4253 if (lock
->get_cap_shift())
4254 in
= static_cast<CInode
*>(lock
->get_parent());
4256 if (lock
->is_stable())
4257 lock
->get_parent()->auth_pin(lock
);
4259 switch (lock
->get_state()) {
4261 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4262 default: ceph_abort();
4266 if (lock
->is_rdlocked())
4268 if (lock
->is_wrlocked())
4271 if (in
&& in
->is_head()) {
4272 if (in
->issued_caps_need_gather(lock
)) {
4279 lock
->set_state(LOCK_PREXLOCK
);
4280 //assert("shouldn't be called if we are already xlockable" == 0);
4288 // ==========================================================================
4293 Some notes on scatterlocks.
4295 - The scatter/gather is driven by the inode lock. The scatter always
4296 brings in the latest metadata from the fragments.
4298 - When in a scattered/MIX state, fragments are only allowed to
4299 update/be written to if the accounted stat matches the inode's
4302 - That means, on gather, we _only_ assimilate diffs for frag metadata
4303 that match the current version, because those are the only ones
4304 written during this scatter/gather cycle. (Others didn't permit
4305 it.) We increment the version and journal this to disk.
4307 - When possible, we also simultaneously update our local frag
4308 accounted stats to match.
4310 - On scatter, the new inode info is broadcast to frags, both local
4311 and remote. If possible (auth and !frozen), the dirfrag auth
4312 should update the accounted state (if it isn't already up to date).
4313 Note that this may occur on both the local inode auth node and
4314 inode replicas, so there are two potential paths. If it is NOT
4315 possible, they need to mark_stale to prevent any possible writes.
4317 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4318 only). Both are opportunities to update the frag accounted stats,
4319 even though only the MIX case is affected by a stale dirfrag.
4321 - Because many scatter/gather cycles can potentially go by without a
4322 frag being able to update its accounted stats (due to being frozen
4323 by exports/refragments in progress), the frag may have (even very)
4324 old stat versions. That's fine. If when we do want to update it,
4325 we can update accounted_* and the version first.
4329 class C_Locker_ScatterWB
: public LockerLogContext
{
4333 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4334 LockerLogContext(l
), lock(sl
), mut(m
) {}
4335 void finish(int r
) override
{
4336 locker
->scatter_writebehind_finish(lock
, mut
);
4340 void Locker::scatter_writebehind(ScatterLock
*lock
)
4342 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4343 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4346 MutationRef
mut(new MutationImpl());
4347 mut
->ls
= mds
->mdlog
->get_current_segment();
4349 // forcefully take a wrlock
4350 lock
->get_wrlock(true);
4351 mut
->wrlocks
.insert(lock
);
4352 mut
->locks
.insert(lock
);
4354 in
->pre_cow_old_inode(); // avoid cow mayhem
4356 auto &pi
= in
->project_inode();
4357 pi
.inode
.version
= in
->pre_dirty();
4359 in
->finish_scatter_gather_update(lock
->get_type());
4360 lock
->start_flush();
4362 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4363 mds
->mdlog
->start_entry(le
);
4365 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4366 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4368 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4370 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4373 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4375 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4376 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4377 in
->pop_and_dirty_projected_inode(mut
->ls
);
4379 lock
->finish_flush();
4381 // if replicas may have flushed in a mix->lock state, send another
4382 // message so they can finish_flush().
4383 if (in
->is_replicated()) {
4384 switch (lock
->get_state()) {
4386 case LOCK_MIX_LOCK2
:
4389 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4394 drop_locks(mut
.get());
4397 if (lock
->is_stable())
4398 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4400 //scatter_eval_gather(lock);
4403 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4405 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4407 assert(lock
->get_parent()->is_auth());
4408 assert(lock
->is_stable());
4410 if (lock
->get_parent()->is_freezing_or_frozen()) {
4411 dout(20) << " freezing|frozen" << dendl
;
4415 if (mdcache
->is_readonly()) {
4416 if (lock
->get_state() != LOCK_SYNC
) {
4417 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4418 simple_sync(lock
, need_issue
);
4423 if (!lock
->is_rdlocked() &&
4424 lock
->get_state() != LOCK_MIX
&&
4425 lock
->get_scatter_wanted()) {
4426 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4427 << " on " << *lock
->get_parent() << dendl
;
4428 scatter_mix(lock
, need_issue
);
4432 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4433 // in general, we want to keep INEST writable at all times.
4434 if (!lock
->is_rdlocked()) {
4435 if (lock
->get_parent()->is_replicated()) {
4436 if (lock
->get_state() != LOCK_MIX
)
4437 scatter_mix(lock
, need_issue
);
4439 if (lock
->get_state() != LOCK_LOCK
)
4440 simple_lock(lock
, need_issue
);
4446 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4447 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4448 // i _should_ be sync.
4449 if (!lock
->is_wrlocked() &&
4450 lock
->get_state() != LOCK_SYNC
) {
4451 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4452 simple_sync(lock
, need_issue
);
4459 * mark a scatterlock to indicate that the dir fnode has some dirty data
4461 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4464 if (lock
->get_updated_item()->is_on_list()) {
4465 dout(10) << "mark_updated_scatterlock " << *lock
4466 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4468 updated_scatterlocks
.push_back(lock
->get_updated_item());
4469 utime_t now
= ceph_clock_now();
4470 lock
->set_update_stamp(now
);
4471 dout(10) << "mark_updated_scatterlock " << *lock
4472 << " - added at " << now
<< dendl
;
4477 * this is called by scatter_tick and LogSegment::try_to_trim() when
4478 * trying to flush dirty scattered data (i.e. updated fnode) back to
4481 * we need to lock|scatter in order to push fnode changes into the
4484 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4486 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4488 if (p
->is_frozen() || p
->is_freezing()) {
4489 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4491 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4492 else if (lock
->is_dirty())
4493 // just requeue. not ideal.. starvation prone..
4494 updated_scatterlocks
.push_back(lock
->get_updated_item());
4498 if (p
->is_ambiguous_auth()) {
4499 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4501 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4502 else if (lock
->is_dirty())
4503 // just requeue. not ideal.. starvation prone..
4504 updated_scatterlocks
.push_back(lock
->get_updated_item());
4511 if (lock
->is_stable()) {
4512 // can we do it now?
4513 // (only if we're not replicated.. if we are, we really do need
4514 // to nudge the lock state!)
4516 actually, even if we're not replicated, we can't stay in MIX, because another mds
4517 could discover and replicate us at any time. if that happens while we're flushing,
4518 they end up in MIX but their inode has the old scatterstat version.
4520 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4521 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4522 scatter_writebehind(lock);
4524 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4529 if (mdcache
->is_readonly()) {
4530 if (lock
->get_state() != LOCK_SYNC
) {
4531 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4532 simple_sync(static_cast<ScatterLock
*>(lock
));
4537 // adjust lock state
4538 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4539 switch (lock
->get_type()) {
4540 case CEPH_LOCK_IFILE
:
4541 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4542 scatter_mix(static_cast<ScatterLock
*>(lock
));
4543 else if (lock
->get_state() != LOCK_LOCK
)
4544 simple_lock(static_cast<ScatterLock
*>(lock
));
4546 simple_sync(static_cast<ScatterLock
*>(lock
));
4549 case CEPH_LOCK_IDFT
:
4550 case CEPH_LOCK_INEST
:
4551 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4553 else if (lock
->get_state() != LOCK_LOCK
)
4562 if (lock
->is_stable() && count
== 2) {
4563 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4564 // this should only realy happen when called via
4565 // handle_file_lock due to AC_NUDGE, because the rest of the
4566 // time we are replicated or have dirty data and won't get
4567 // called. bailing here avoids an infinite loop.
4572 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4574 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4579 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4580 << *lock
<< " on " << *p
<< dendl
;
4581 // request unscatter?
4582 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4583 if (!mds
->is_cluster_degraded() ||
4584 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4585 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4589 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4591 // also, requeue, in case we had wrong auth or something
4592 if (lock
->is_dirty())
4593 updated_scatterlocks
.push_back(lock
->get_updated_item());
4597 void Locker::scatter_tick()
4599 dout(10) << "scatter_tick" << dendl
;
4602 utime_t now
= ceph_clock_now();
4603 int n
= updated_scatterlocks
.size();
4604 while (!updated_scatterlocks
.empty()) {
4605 ScatterLock
*lock
= updated_scatterlocks
.front();
4607 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4609 if (!lock
->is_dirty()) {
4610 updated_scatterlocks
.pop_front();
4611 dout(10) << " removing from updated_scatterlocks "
4612 << *lock
<< " " << *lock
->get_parent() << dendl
;
4615 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4617 updated_scatterlocks
.pop_front();
4618 scatter_nudge(lock
, 0);
4620 mds
->mdlog
->flush();
4624 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4626 dout(10) << "scatter_tempsync " << *lock
4627 << " on " << *lock
->get_parent() << dendl
;
4628 assert(lock
->get_parent()->is_auth());
4629 assert(lock
->is_stable());
4631 assert(0 == "not fully implemented, at least not for filelock");
4633 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4635 switch (lock
->get_state()) {
4636 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4637 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4638 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4639 default: ceph_abort();
4643 if (lock
->is_wrlocked())
4646 if (lock
->get_cap_shift() &&
4648 in
->issued_caps_need_gather(lock
)) {
4656 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4657 in
->is_replicated()) {
4658 lock
->init_gather();
4659 send_lock_message(lock
, LOCK_AC_LOCK
);
4667 lock
->set_state(LOCK_TSYN
);
4668 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4669 if (lock
->get_cap_shift()) {
4680 // ==========================================================================
4683 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4685 dout(7) << "local_wrlock_grab on " << *lock
4686 << " on " << *lock
->get_parent() << dendl
;
4688 assert(lock
->get_parent()->is_auth());
4689 assert(lock
->can_wrlock());
4690 assert(!mut
->wrlocks
.count(lock
));
4691 lock
->get_wrlock(mut
->get_client());
4692 mut
->wrlocks
.insert(lock
);
4693 mut
->locks
.insert(lock
);
4696 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4698 dout(7) << "local_wrlock_start on " << *lock
4699 << " on " << *lock
->get_parent() << dendl
;
4701 assert(lock
->get_parent()->is_auth());
4702 if (lock
->can_wrlock()) {
4703 assert(!mut
->wrlocks
.count(lock
));
4704 lock
->get_wrlock(mut
->get_client());
4705 mut
->wrlocks
.insert(lock
);
4706 mut
->locks
.insert(lock
);
4709 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4714 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4716 dout(7) << "local_wrlock_finish on " << *lock
4717 << " on " << *lock
->get_parent() << dendl
;
4719 mut
->wrlocks
.erase(lock
);
4720 mut
->locks
.erase(lock
);
4721 if (lock
->get_num_wrlocks() == 0) {
4722 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4723 SimpleLock::WAIT_WR
|
4724 SimpleLock::WAIT_RD
);
4728 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4730 dout(7) << "local_xlock_start on " << *lock
4731 << " on " << *lock
->get_parent() << dendl
;
4733 assert(lock
->get_parent()->is_auth());
4734 if (!lock
->can_xlock_local()) {
4735 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4739 lock
->get_xlock(mut
, mut
->get_client());
4740 mut
->xlocks
.insert(lock
);
4741 mut
->locks
.insert(lock
);
4745 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4747 dout(7) << "local_xlock_finish on " << *lock
4748 << " on " << *lock
->get_parent() << dendl
;
4750 mut
->xlocks
.erase(lock
);
4751 mut
->locks
.erase(lock
);
4753 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4754 SimpleLock::WAIT_WR
|
4755 SimpleLock::WAIT_RD
);
4760 // ==========================================================================
4764 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4766 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4767 int loner_wanted
, other_wanted
;
4768 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4769 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4770 << " loner_wanted=" << gcap_string(loner_wanted
)
4771 << " other_wanted=" << gcap_string(other_wanted
)
4772 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4775 assert(lock
->get_parent()->is_auth());
4776 assert(lock
->is_stable());
4778 if (lock
->get_parent()->is_freezing_or_frozen())
4781 if (mdcache
->is_readonly()) {
4782 if (lock
->get_state() != LOCK_SYNC
) {
4783 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4784 simple_sync(lock
, need_issue
);
4790 if (lock
->get_state() == LOCK_EXCL
) {
4791 dout(20) << " is excl" << dendl
;
4792 int loner_issued
, other_issued
, xlocker_issued
;
4793 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4794 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4795 << " other_issued=" << gcap_string(other_issued
)
4796 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4798 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4799 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4800 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4801 dout(20) << " should lose it" << dendl
;
4802 // we should lose it.
4813 // -> any writer means MIX; RD doesn't matter.
4814 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4815 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4816 scatter_mix(lock
, need_issue
);
4817 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4818 simple_sync(lock
, need_issue
);
4820 dout(10) << " waiting for wrlock to drain" << dendl
;
4825 else if (lock
->get_state() != LOCK_EXCL
&&
4826 !lock
->is_rdlocked() &&
4827 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4828 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4829 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4830 in
->get_target_loner() >= 0) {
4831 dout(7) << "file_eval stable, bump to loner " << *lock
4832 << " on " << *lock
->get_parent() << dendl
;
4833 file_excl(lock
, need_issue
);
4837 else if (lock
->get_state() != LOCK_MIX
&&
4838 !lock
->is_rdlocked() &&
4839 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4840 (lock
->get_scatter_wanted() ||
4841 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4842 dout(7) << "file_eval stable, bump to mixed " << *lock
4843 << " on " << *lock
->get_parent() << dendl
;
4844 scatter_mix(lock
, need_issue
);
4848 else if (lock
->get_state() != LOCK_SYNC
&&
4849 !lock
->is_wrlocked() && // drain wrlocks first!
4850 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4851 !(wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) &&
4852 !((lock
->get_state() == LOCK_MIX
) &&
4853 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4854 //((wanted & CEPH_CAP_RD) ||
4855 //in->is_replicated() ||
4856 //lock->is_leased() ||
4857 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4859 dout(7) << "file_eval stable, bump to sync " << *lock
4860 << " on " << *lock
->get_parent() << dendl
;
4861 simple_sync(lock
, need_issue
);
4867 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4869 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4871 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4872 assert(in
->is_auth());
4873 assert(lock
->is_stable());
4875 if (lock
->get_state() == LOCK_LOCK
) {
4876 in
->start_scatter(lock
);
4877 if (in
->is_replicated()) {
4879 bufferlist softdata
;
4880 lock
->encode_locked_state(softdata
);
4882 // bcast to replicas
4883 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4887 lock
->set_state(LOCK_MIX
);
4888 lock
->clear_scatter_wanted();
4889 if (lock
->get_cap_shift()) {
4897 switch (lock
->get_state()) {
4898 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4899 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4900 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
4901 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4902 default: ceph_abort();
4906 if (lock
->is_rdlocked())
4908 if (in
->is_replicated()) {
4909 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
4910 send_lock_message(lock
, LOCK_AC_MIX
);
4911 lock
->init_gather();
4915 if (lock
->is_leased()) {
4916 revoke_client_leases(lock
);
4919 if (lock
->get_cap_shift() &&
4921 in
->issued_caps_need_gather(lock
)) {
4928 bool need_recover
= false;
4929 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4930 mds
->mdcache
->queue_file_recover(in
);
4931 need_recover
= true;
4936 lock
->get_parent()->auth_pin(lock
);
4938 mds
->mdcache
->do_file_recover();
4940 in
->start_scatter(lock
);
4941 lock
->set_state(LOCK_MIX
);
4942 lock
->clear_scatter_wanted();
4943 if (in
->is_replicated()) {
4944 bufferlist softdata
;
4945 lock
->encode_locked_state(softdata
);
4946 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4948 if (lock
->get_cap_shift()) {
4959 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4961 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4962 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4964 assert(in
->is_auth());
4965 assert(lock
->is_stable());
4967 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4968 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4970 switch (lock
->get_state()) {
4971 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4972 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4973 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4974 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4975 default: ceph_abort();
4979 if (lock
->is_rdlocked())
4981 if (lock
->is_wrlocked())
4984 if (in
->is_replicated() &&
4985 lock
->get_state() != LOCK_LOCK_EXCL
&&
4986 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
4987 send_lock_message(lock
, LOCK_AC_LOCK
);
4988 lock
->init_gather();
4991 if (lock
->is_leased()) {
4992 revoke_client_leases(lock
);
4995 if (in
->is_head() &&
4996 in
->issued_caps_need_gather(lock
)) {
5003 bool need_recover
= false;
5004 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5005 mds
->mdcache
->queue_file_recover(in
);
5006 need_recover
= true;
5011 lock
->get_parent()->auth_pin(lock
);
5013 mds
->mdcache
->do_file_recover();
5015 lock
->set_state(LOCK_EXCL
);
5023 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5025 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5026 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5027 assert(in
->is_auth());
5028 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5030 switch (lock
->get_state()) {
5031 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5032 default: ceph_abort();
5036 if (lock
->is_wrlocked())
5039 if (in
->is_head() &&
5040 in
->issued_caps_need_gather(lock
)) {
5049 lock
->get_parent()->auth_pin(lock
);
5051 lock
->set_state(LOCK_XSYN
);
5052 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5060 void Locker::file_recover(ScatterLock
*lock
)
5062 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5063 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5065 assert(in
->is_auth());
5066 //assert(lock->is_stable());
5067 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5072 if (in->is_replicated()
5073 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5074 send_lock_message(lock, LOCK_AC_LOCK);
5075 lock->init_gather();
5079 if (in
->is_head() &&
5080 in
->issued_caps_need_gather(lock
)) {
5085 lock
->set_state(LOCK_SCAN
);
5087 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5089 mds
->mdcache
->queue_file_recover(in
);
5094 /* This function DOES put the passed message before returning */
5095 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5097 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5098 int from
= m
->get_asker();
5100 if (mds
->is_rejoin()) {
5101 if (in
->is_rejoining()) {
5102 dout(7) << "handle_file_lock still rejoining " << *in
5103 << ", dropping " << *m
<< dendl
;
5109 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5111 << " from mds." << from
<< " "
5114 bool caps
= lock
->get_cap_shift();
5116 switch (m
->get_action()) {
5119 assert(lock
->get_state() == LOCK_LOCK
||
5120 lock
->get_state() == LOCK_MIX
||
5121 lock
->get_state() == LOCK_MIX_SYNC2
);
5123 if (lock
->get_state() == LOCK_MIX
) {
5124 lock
->set_state(LOCK_MIX_SYNC
);
5125 eval_gather(lock
, true);
5126 if (lock
->is_unstable_and_locked())
5127 mds
->mdlog
->flush();
5131 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5132 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5135 lock
->decode_locked_state(m
->get_data());
5136 lock
->set_state(LOCK_SYNC
);
5141 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5146 switch (lock
->get_state()) {
5147 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5148 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5149 default: ceph_abort();
5152 eval_gather(lock
, true);
5153 if (lock
->is_unstable_and_locked())
5154 mds
->mdlog
->flush();
5158 case LOCK_AC_LOCKFLUSHED
:
5159 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5160 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5161 // wake up scatter_nudge waiters
5162 if (lock
->is_stable())
5163 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5167 assert(lock
->get_state() == LOCK_SYNC
||
5168 lock
->get_state() == LOCK_LOCK
||
5169 lock
->get_state() == LOCK_SYNC_MIX2
);
5171 if (lock
->get_state() == LOCK_SYNC
) {
5173 lock
->set_state(LOCK_SYNC_MIX
);
5174 eval_gather(lock
, true);
5175 if (lock
->is_unstable_and_locked())
5176 mds
->mdlog
->flush();
5181 lock
->set_state(LOCK_MIX
);
5182 lock
->decode_locked_state(m
->get_data());
5187 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5192 case LOCK_AC_LOCKACK
:
5193 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5194 lock
->get_state() == LOCK_MIX_LOCK
||
5195 lock
->get_state() == LOCK_MIX_LOCK2
||
5196 lock
->get_state() == LOCK_MIX_EXCL
||
5197 lock
->get_state() == LOCK_SYNC_EXCL
||
5198 lock
->get_state() == LOCK_SYNC_MIX
||
5199 lock
->get_state() == LOCK_MIX_TSYN
);
5200 assert(lock
->is_gathering(from
));
5201 lock
->remove_gather(from
);
5203 if (lock
->get_state() == LOCK_MIX_LOCK
||
5204 lock
->get_state() == LOCK_MIX_LOCK2
||
5205 lock
->get_state() == LOCK_MIX_EXCL
||
5206 lock
->get_state() == LOCK_MIX_TSYN
) {
5207 lock
->decode_locked_state(m
->get_data());
5208 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5209 // delay calling scatter_writebehind().
5210 lock
->clear_flushed();
5213 if (lock
->is_gathering()) {
5214 dout(7) << "handle_file_lock " << *in
<< " from " << from
5215 << ", still gathering " << lock
->get_gather_set() << dendl
;
5217 dout(7) << "handle_file_lock " << *in
<< " from " << from
5218 << ", last one" << dendl
;
5223 case LOCK_AC_SYNCACK
:
5224 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5225 assert(lock
->is_gathering(from
));
5226 lock
->remove_gather(from
);
5228 lock
->decode_locked_state(m
->get_data());
5230 if (lock
->is_gathering()) {
5231 dout(7) << "handle_file_lock " << *in
<< " from " << from
5232 << ", still gathering " << lock
->get_gather_set() << dendl
;
5234 dout(7) << "handle_file_lock " << *in
<< " from " << from
5235 << ", last one" << dendl
;
5240 case LOCK_AC_MIXACK
:
5241 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5242 assert(lock
->is_gathering(from
));
5243 lock
->remove_gather(from
);
5245 if (lock
->is_gathering()) {
5246 dout(7) << "handle_file_lock " << *in
<< " from " << from
5247 << ", still gathering " << lock
->get_gather_set() << dendl
;
5249 dout(7) << "handle_file_lock " << *in
<< " from " << from
5250 << ", last one" << dendl
;
5257 case LOCK_AC_REQSCATTER
:
5258 if (lock
->is_stable()) {
5259 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5260 * because the replica should be holding an auth_pin if they're
5261 * doing this (and thus, we are freezing, not frozen, and indefinite
5262 * starvation isn't an issue).
5264 dout(7) << "handle_file_lock got scatter request on " << *lock
5265 << " on " << *lock
->get_parent() << dendl
;
5266 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5269 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5270 << " on " << *lock
->get_parent() << dendl
;
5271 lock
->set_scatter_wanted();
5275 case LOCK_AC_REQUNSCATTER
:
5276 if (lock
->is_stable()) {
5277 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5278 * because the replica should be holding an auth_pin if they're
5279 * doing this (and thus, we are freezing, not frozen, and indefinite
5280 * starvation isn't an issue).
5282 dout(7) << "handle_file_lock got unscatter request on " << *lock
5283 << " on " << *lock
->get_parent() << dendl
;
5284 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5285 simple_lock(lock
); // FIXME tempsync?
5287 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5288 << " on " << *lock
->get_parent() << dendl
;
5289 lock
->set_unscatter_wanted();
5293 case LOCK_AC_REQRDLOCK
:
5294 handle_reqrdlock(lock
, m
);
5298 if (!lock
->get_parent()->is_auth()) {
5299 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5300 << " on " << *lock
->get_parent() << dendl
;
5301 } else if (!lock
->get_parent()->is_replicated()) {
5302 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5303 << " on " << *lock
->get_parent() << dendl
;
5305 dout(7) << "handle_file_lock trying nudge on " << *lock
5306 << " on " << *lock
->get_parent() << dendl
;
5307 scatter_nudge(lock
, 0, true);
5308 mds
->mdlog
->flush();