1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
20 #include "MDBalancer.h"
25 #include "MDSContext.h"
30 #include "events/EUpdate.h"
31 #include "events/EOpen.h"
33 #include "msg/Messenger.h"
34 #include "osdc/Objecter.h"
36 #include "messages/MInodeFileCaps.h"
37 #include "messages/MLock.h"
38 #include "messages/MClientLease.h"
39 #include "messages/MClientReply.h"
40 #include "messages/MClientCaps.h"
41 #include "messages/MClientCapRelease.h"
43 #include "messages/MMDSSlaveRequest.h"
47 #include "common/config.h"
50 #define dout_subsys ceph_subsys_mds
52 #define dout_context g_ceph_context
53 #define dout_prefix _prefix(_dout, mds)
54 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
55 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
59 class LockerContext
: public MDSInternalContextBase
{
62 MDSRank
*get_mds() override
68 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
69 assert(locker
!= NULL
);
73 class LockerLogContext
: public MDSLogContextBase
{
76 MDSRank
*get_mds() override
82 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
83 assert(locker
!= NULL
);
87 /* This function DOES put the passed message before returning */
88 void Locker::dispatch(Message
*m
)
91 switch (m
->get_type()) {
95 handle_lock(static_cast<MLock
*>(m
));
98 case MSG_MDS_INODEFILECAPS
:
99 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
103 case CEPH_MSG_CLIENT_CAPS
:
104 handle_client_caps(static_cast<MClientCaps
*>(m
));
107 case CEPH_MSG_CLIENT_CAPRELEASE
:
108 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
110 case CEPH_MSG_CLIENT_LEASE
:
111 handle_client_lease(static_cast<MClientLease
*>(m
));
115 derr
<< "locker unknown message " << m
->get_type() << dendl
;
116 assert(0 == "locker unknown message");
133 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
135 for (const auto &it
: lock
->get_parent()->get_replicas()) {
136 if (mds
->is_cluster_degraded() &&
137 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
139 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
140 mds
->send_message_mds(m
, it
.first
);
144 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
146 for (const auto &it
: lock
->get_parent()->get_replicas()) {
147 if (mds
->is_cluster_degraded() &&
148 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
150 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
152 mds
->send_message_mds(m
, it
.first
);
159 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
161 // rdlock ancestor snaps
163 rdlocks
.insert(&in
->snaplock
);
164 while (t
->get_projected_parent_dn()) {
165 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
166 rdlocks
.insert(&t
->snaplock
);
170 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
171 file_layout_t
**layout
)
173 //rdlock ancestor snaps
175 rdlocks
.insert(&in
->snaplock
);
176 rdlocks
.insert(&in
->policylock
);
177 bool found_layout
= false;
179 rdlocks
.insert(&t
->snaplock
);
181 rdlocks
.insert(&t
->policylock
);
182 if (t
->get_projected_inode()->has_layout()) {
183 *layout
= &t
->get_projected_inode()->layout
;
187 if (t
->get_projected_parent_dn() &&
188 t
->get_projected_parent_dn()->get_dir())
189 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
194 struct MarkEventOnDestruct
{
198 MarkEventOnDestruct(MDRequestRef
& _mdr
,
199 const char *_message
) : mdr(_mdr
),
202 ~MarkEventOnDestruct() {
204 mdr
->mark_event(message
);
208 /* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210 bool Locker::acquire_locks(MDRequestRef
& mdr
,
211 set
<SimpleLock
*> &rdlocks
,
212 set
<SimpleLock
*> &wrlocks
,
213 set
<SimpleLock
*> &xlocks
,
214 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
215 CInode
*auth_pin_freeze
,
216 bool auth_pin_nonblock
)
218 if (mdr
->done_locking
&&
219 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
220 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
221 return true; // at least we had better be!
223 dout(10) << "acquire_locks " << *mdr
<< dendl
;
225 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
227 client_t client
= mdr
->get_client();
229 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
230 set
<MDSCacheObject
*> mustpin
; // items to authpin
233 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
234 SimpleLock
*lock
= *p
;
236 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
237 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
238 mds
->is_cluster_degraded() &&
240 !mdr
->is_queued_for_replay()) {
241 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
242 // get processed in proper order.
244 if (lock
->get_parent()->is_auth()) {
245 if (!mdr
->locks
.count(lock
)) {
247 lock
->get_parent()->list_replicas(ls
);
249 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
256 // if the lock is the latest locked one, it's possible that slave mds got the lock
257 // while there are recovering mds.
258 if (!mdr
->locks
.count(lock
) || lock
== *mdr
->locks
.rbegin())
262 dout(10) << " must xlock " << *lock
<< " " << *lock
->get_parent()
263 << ", waiting for cluster recovered" << dendl
;
264 mds
->locker
->drop_locks(mdr
.get(), NULL
);
265 mdr
->drop_local_auth_pins();
266 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
271 dout(20) << " must xlock " << *lock
<< " " << *lock
->get_parent() << dendl
;
274 mustpin
.insert(lock
->get_parent());
276 // augment xlock with a versionlock?
277 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
278 CDentry
*dn
= (CDentry
*)lock
->get_parent();
282 if (xlocks
.count(&dn
->versionlock
))
283 continue; // we're xlocking the versionlock too; don't wrlock it!
285 if (mdr
->is_master()) {
286 // master. wrlock versionlock so we can pipeline dentry updates to journal.
287 wrlocks
.insert(&dn
->versionlock
);
289 // slave. exclusively lock the dentry version (i.e. block other journal updates).
290 // this makes rollback safe.
291 xlocks
.insert(&dn
->versionlock
);
292 sorted
.insert(&dn
->versionlock
);
295 if (lock
->get_type() > CEPH_LOCK_IVERSION
) {
296 // inode version lock?
297 CInode
*in
= (CInode
*)lock
->get_parent();
300 if (mdr
->is_master()) {
301 // master. wrlock versionlock so we can pipeline inode updates to journal.
302 wrlocks
.insert(&in
->versionlock
);
304 // slave. exclusively lock the inode version (i.e. block other journal updates).
305 // this makes rollback safe.
306 xlocks
.insert(&in
->versionlock
);
307 sorted
.insert(&in
->versionlock
);
313 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
314 MDSCacheObject
*object
= (*p
)->get_parent();
315 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
317 if (object
->is_auth())
318 mustpin
.insert(object
);
319 else if (!object
->is_auth() &&
320 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
321 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
322 dout(15) << " will also auth_pin " << *object
323 << " in case we need to request a scatter" << dendl
;
324 mustpin
.insert(object
);
329 if (remote_wrlocks
) {
330 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
331 MDSCacheObject
*object
= p
->first
->get_parent();
332 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
333 << *p
->first
<< " " << *object
<< dendl
;
334 sorted
.insert(p
->first
);
335 mustpin
.insert(object
);
340 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
343 MDSCacheObject
*object
= (*p
)->get_parent();
344 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
346 if (object
->is_auth())
347 mustpin
.insert(object
);
348 else if (!object
->is_auth() &&
349 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
350 dout(15) << " will also auth_pin " << *object
351 << " in case we need to request a rdlock" << dendl
;
352 mustpin
.insert(object
);
358 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
360 // can i auth pin them all now?
361 marker
.message
= "failed to authpin local pins";
362 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
365 MDSCacheObject
*object
= *p
;
367 dout(10) << " must authpin " << *object
<< dendl
;
369 if (mdr
->is_auth_pinned(object
)) {
370 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
372 if (mdr
->more()->is_remote_frozen_authpin
) {
373 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
375 // unfreeze auth pin for the wrong inode
376 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
380 if (!object
->is_auth()) {
381 if (!mdr
->locks
.empty())
382 drop_locks(mdr
.get());
383 if (object
->is_ambiguous_auth()) {
385 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
386 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
387 mdr
->drop_local_auth_pins();
390 mustpin_remote
[object
->authority().first
].insert(object
);
393 if (!object
->can_auth_pin()) {
395 drop_locks(mdr
.get());
396 mdr
->drop_local_auth_pins();
397 if (auth_pin_nonblock
) {
398 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
402 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
403 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
405 if (!mdr
->remote_auth_pins
.empty())
406 notify_freeze_waiter(object
);
412 // ok, grab local auth pins
413 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
416 MDSCacheObject
*object
= *p
;
417 if (mdr
->is_auth_pinned(object
)) {
418 dout(10) << " already auth_pinned " << *object
<< dendl
;
419 } else if (object
->is_auth()) {
420 dout(10) << " auth_pinning " << *object
<< dendl
;
421 mdr
->auth_pin(object
);
425 // request remote auth_pins
426 if (!mustpin_remote
.empty()) {
427 marker
.message
= "requesting remote authpins";
428 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
429 p
!= mdr
->remote_auth_pins
.end();
431 if (mustpin
.count(p
->first
)) {
432 assert(p
->second
== p
->first
->authority().first
);
433 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
434 if (q
!= mustpin_remote
.end())
435 q
->second
.insert(p
->first
);
438 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
439 p
!= mustpin_remote
.end();
441 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
443 // wait for active auth
444 if (mds
->is_cluster_degraded() &&
445 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
446 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
447 if (mdr
->more()->waiting_on_slave
.empty())
448 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
452 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
453 MMDSSlaveRequest::OP_AUTHPIN
);
454 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
455 q
!= p
->second
.end();
457 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
458 MDSCacheObjectInfo info
;
459 (*q
)->set_object_info(info
);
460 req
->get_authpins().push_back(info
);
461 if (*q
== auth_pin_freeze
)
462 (*q
)->set_object_info(req
->get_authpin_freeze());
465 if (auth_pin_nonblock
)
466 req
->mark_nonblock();
467 mds
->send_message_mds(req
, p
->first
);
469 // put in waiting list
470 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
471 mdr
->more()->waiting_on_slave
.insert(p
->first
);
476 // caps i'll need to issue
477 set
<CInode
*> issue_set
;
481 // make sure they match currently acquired locks.
482 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
483 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
486 bool need_wrlock
= !!wrlocks
.count(*p
);
487 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
490 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
492 SimpleLock
*have
= *existing
;
494 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
495 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
498 if (mdr
->remote_wrlocks
.count(have
)) {
499 if (!need_remote_wrlock
||
500 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
501 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
502 << " " << *have
<< " " << *have
->get_parent() << dendl
;
503 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
506 if (need_wrlock
|| need_remote_wrlock
) {
507 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
508 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
510 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
511 if (need_remote_wrlock
)
512 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
516 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
517 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
522 // hose any stray locks
523 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
524 assert(need_wrlock
|| need_remote_wrlock
);
525 SimpleLock
*lock
= *existing
;
526 if (mdr
->wrlocks
.count(lock
)) {
528 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
529 else if (need_remote_wrlock
) // acquire remote_wrlock first
530 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
531 bool need_issue
= false;
532 wrlock_finish(lock
, mdr
.get(), &need_issue
);
534 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
538 while (existing
!= mdr
->locks
.end()) {
539 SimpleLock
*stray
= *existing
;
541 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
542 bool need_issue
= false;
543 if (mdr
->xlocks
.count(stray
)) {
544 xlock_finish(stray
, mdr
.get(), &need_issue
);
545 } else if (mdr
->rdlocks
.count(stray
)) {
546 rdlock_finish(stray
, mdr
.get(), &need_issue
);
548 // may have acquired both wrlock and remore wrlock
549 if (mdr
->wrlocks
.count(stray
))
550 wrlock_finish(stray
, mdr
.get(), &need_issue
);
551 if (mdr
->remote_wrlocks
.count(stray
))
552 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
555 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
559 if (mdr
->locking
&& *p
!= mdr
->locking
) {
560 cancel_locking(mdr
.get(), &issue_set
);
562 if (xlocks
.count(*p
)) {
563 marker
.message
= "failed to xlock, waiting";
564 if (!xlock_start(*p
, mdr
))
566 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
567 } else if (need_wrlock
|| need_remote_wrlock
) {
568 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
569 marker
.message
= "waiting for remote wrlocks";
570 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
573 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
574 marker
.message
= "failed to wrlock, waiting";
575 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
576 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
577 // can't take the wrlock because the scatter lock is gathering. need to
578 // release the remote wrlock, so that the gathering process can finish.
579 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
580 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
583 // nowait if we have already gotten remote wrlock
584 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
586 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
589 assert(mdr
->is_master());
590 if ((*p
)->needs_recover()) {
591 if (mds
->is_cluster_degraded()) {
592 if (!mdr
->is_queued_for_replay()) {
593 // see comments in SimpleLock::set_state_rejoin() and
594 // ScatterLock::encode_state_for_rejoin()
595 drop_locks(mdr
.get());
596 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
597 dout(10) << " rejoin recovering " << **p
<< " " << *(*p
)->get_parent()
598 << ", waiting for cluster recovered" << dendl
;
599 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
603 (*p
)->clear_need_recover();
607 marker
.message
= "failed to rdlock, waiting";
608 if (!rdlock_start(*p
, mdr
))
610 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
614 // any extra unneeded locks?
615 while (existing
!= mdr
->locks
.end()) {
616 SimpleLock
*stray
= *existing
;
618 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
619 bool need_issue
= false;
620 if (mdr
->xlocks
.count(stray
)) {
621 xlock_finish(stray
, mdr
.get(), &need_issue
);
622 } else if (mdr
->rdlocks
.count(stray
)) {
623 rdlock_finish(stray
, mdr
.get(), &need_issue
);
625 // may have acquired both wrlock and remore wrlock
626 if (mdr
->wrlocks
.count(stray
))
627 wrlock_finish(stray
, mdr
.get(), &need_issue
);
628 if (mdr
->remote_wrlocks
.count(stray
))
629 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
632 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
635 mdr
->done_locking
= true;
636 mdr
->set_mds_stamp(ceph_clock_now());
638 marker
.message
= "acquired locks";
641 issue_caps_set(issue_set
);
645 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
648 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
650 dir
= in
->get_parent_dir();
651 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
654 dir
= dynamic_cast<CDir
*>(o
);
658 if (dir
->is_freezing_dir())
659 mdcache
->fragment_freeze_inc_num_waiters(dir
);
660 if (dir
->is_freezing_tree()) {
661 while (!dir
->is_freezing_tree_root())
662 dir
= dir
->get_parent_dir();
663 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
668 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
670 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
671 p
!= mut
->xlocks
.end();
673 MDSCacheObject
*object
= (*p
)->get_parent();
674 assert(object
->is_auth());
676 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
678 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
679 (*p
)->set_xlock_done();
683 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
685 while (!mut
->rdlocks
.empty()) {
687 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
688 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
690 pneed_issue
->insert(static_cast<CInode
*>(p
));
694 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
696 set
<mds_rank_t
> slaves
;
698 while (!mut
->xlocks
.empty()) {
699 SimpleLock
*lock
= *mut
->xlocks
.begin();
700 MDSCacheObject
*p
= lock
->get_parent();
702 assert(lock
->get_sm()->can_remote_xlock
);
703 slaves
.insert(p
->authority().first
);
705 mut
->locks
.erase(lock
);
706 mut
->xlocks
.erase(lock
);
710 xlock_finish(lock
, mut
, &ni
);
712 pneed_issue
->insert(static_cast<CInode
*>(p
));
715 while (!mut
->remote_wrlocks
.empty()) {
716 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
717 slaves
.insert(p
->second
);
718 if (mut
->wrlocks
.count(p
->first
) == 0)
719 mut
->locks
.erase(p
->first
);
720 mut
->remote_wrlocks
.erase(p
);
723 while (!mut
->wrlocks
.empty()) {
725 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
726 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
728 pneed_issue
->insert(static_cast<CInode
*>(p
));
731 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
732 if (!mds
->is_cluster_degraded() ||
733 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
734 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
735 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
736 MMDSSlaveRequest::OP_DROPLOCKS
);
737 mds
->send_message_mds(slavereq
, *p
);
742 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
744 SimpleLock
*lock
= mut
->locking
;
746 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
748 if (lock
->get_parent()->is_auth()) {
749 bool need_issue
= false;
750 if (lock
->get_state() == LOCK_PREXLOCK
) {
751 _finish_xlock(lock
, -1, &need_issue
);
752 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
753 lock
->get_num_xlocks() == 0) {
754 lock
->set_state(LOCK_XLOCKDONE
);
755 eval_gather(lock
, true, &need_issue
);
758 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
760 mut
->finish_locking(lock
);
763 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
766 set
<CInode
*> my_need_issue
;
768 pneed_issue
= &my_need_issue
;
771 cancel_locking(mut
, pneed_issue
);
772 _drop_non_rdlocks(mut
, pneed_issue
);
773 _drop_rdlocks(mut
, pneed_issue
);
775 if (pneed_issue
== &my_need_issue
)
776 issue_caps_set(*pneed_issue
);
777 mut
->done_locking
= false;
780 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
782 set
<CInode
*> my_need_issue
;
784 pneed_issue
= &my_need_issue
;
786 _drop_non_rdlocks(mut
, pneed_issue
);
788 if (pneed_issue
== &my_need_issue
)
789 issue_caps_set(*pneed_issue
);
792 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
794 set
<CInode
*> need_issue
;
796 for (auto p
= mut
->rdlocks
.begin(); p
!= mut
->rdlocks
.end(); ) {
797 SimpleLock
*lock
= *p
;
799 // make later mksnap/setlayout (at other mds) wait for this unsafe request
800 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
801 lock
->get_type() == CEPH_LOCK_IPOLICY
)
804 rdlock_finish(lock
, mut
, &ni
);
806 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
809 issue_caps_set(need_issue
);
814 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
816 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
817 assert(!lock
->is_stable());
819 int next
= lock
->get_next_state();
822 bool caps
= lock
->get_cap_shift();
823 if (lock
->get_type() != CEPH_LOCK_DN
)
824 in
= static_cast<CInode
*>(lock
->get_parent());
826 bool need_issue
= false;
828 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
829 assert(!caps
|| in
!= NULL
);
830 if (caps
&& in
->is_head()) {
831 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
832 lock
->get_cap_shift(), lock
->get_cap_mask());
833 dout(10) << " next state is " << lock
->get_state_name(next
)
834 << " issued/allows loner " << gcap_string(loner_issued
)
835 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
836 << " xlocker " << gcap_string(xlocker_issued
)
837 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
838 << " other " << gcap_string(other_issued
)
839 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
842 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
843 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
844 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
848 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
849 bool auth
= lock
->get_parent()->is_auth();
850 if (!lock
->is_gathering() &&
851 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
853 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
854 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
855 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
856 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
857 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
858 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
859 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
860 lock
->get_state() != LOCK_MIX_SYNC2
862 dout(7) << "eval_gather finished gather on " << *lock
863 << " on " << *lock
->get_parent() << dendl
;
865 if (lock
->get_sm() == &sm_filelock
) {
867 if (in
->state_test(CInode::STATE_RECOVERING
)) {
868 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
870 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
871 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
872 mds
->mdcache
->queue_file_recover(in
);
873 mds
->mdcache
->do_file_recover();
878 if (!lock
->get_parent()->is_auth()) {
879 // replica: tell auth
880 mds_rank_t auth
= lock
->get_parent()->authority().first
;
882 if (lock
->get_parent()->is_rejoining() &&
883 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
884 dout(7) << "eval_gather finished gather, but still rejoining "
885 << *lock
->get_parent() << dendl
;
889 if (!mds
->is_cluster_degraded() ||
890 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
891 switch (lock
->get_state()) {
893 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
899 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
900 lock
->encode_locked_state(reply
->get_data());
901 mds
->send_message_mds(reply
, auth
);
902 next
= LOCK_MIX_SYNC2
;
903 (static_cast<ScatterLock
*>(lock
))->start_flush();
908 (static_cast<ScatterLock
*>(lock
))->finish_flush();
909 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
912 // do nothing, we already acked
917 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
918 mds
->send_message_mds(reply
, auth
);
919 next
= LOCK_SYNC_MIX2
;
926 lock
->encode_locked_state(data
);
927 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
928 (static_cast<ScatterLock
*>(lock
))->start_flush();
929 // we'll get an AC_LOCKFLUSHED to complete
940 // once the first (local) stage of mix->lock gather complete we can
941 // gather from replicas
942 if (lock
->get_state() == LOCK_MIX_LOCK
&&
943 lock
->get_parent()->is_replicated()) {
944 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
945 send_lock_message(lock
, LOCK_AC_LOCK
);
947 lock
->set_state(LOCK_MIX_LOCK2
);
951 if (lock
->is_dirty() && !lock
->is_flushed()) {
952 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
956 lock
->clear_flushed();
958 switch (lock
->get_state()) {
964 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
965 if (lock
->get_parent()->is_replicated()) {
967 lock
->encode_locked_state(softdata
);
968 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
970 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
975 if (next
!= LOCK_SYNC
)
984 if (lock
->get_parent()->is_replicated()) {
986 lock
->encode_locked_state(softdata
);
987 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
994 lock
->set_state(next
);
996 if (lock
->get_parent()->is_auth() &&
998 lock
->get_parent()->auth_unpin(lock
);
1000 // drop loner before doing waiters
1004 in
->get_wanted_loner() != in
->get_loner()) {
1005 dout(10) << " trying to drop loner" << dendl
;
1006 if (in
->try_drop_loner()) {
1007 dout(10) << " dropped loner" << dendl
;
1013 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1016 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1018 if (caps
&& in
->is_head())
1021 if (lock
->get_parent()->is_auth() &&
1023 try_eval(lock
, &need_issue
);
1028 *pneed_issue
= true;
1029 else if (in
->is_head())
1035 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1037 bool need_issue
= caps_imported
;
1038 list
<MDSInternalContextBase
*> finishers
;
1040 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1043 if (in
->is_auth() && in
->is_head()) {
1044 client_t orig_loner
= in
->get_loner();
1045 if (in
->choose_ideal_loner()) {
1046 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1049 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1050 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1056 if (mask
& CEPH_LOCK_IFILE
)
1057 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1058 if (mask
& CEPH_LOCK_IAUTH
)
1059 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1060 if (mask
& CEPH_LOCK_ILINK
)
1061 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1062 if (mask
& CEPH_LOCK_IXATTR
)
1063 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1064 if (mask
& CEPH_LOCK_INEST
)
1065 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1066 if (mask
& CEPH_LOCK_IFLOCK
)
1067 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1068 if (mask
& CEPH_LOCK_IPOLICY
)
1069 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1072 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1073 if (in
->try_drop_loner()) {
1075 if (in
->get_wanted_loner() >= 0) {
1076 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1077 bool ok
= in
->try_set_loner();
1085 finish_contexts(g_ceph_context
, finishers
);
1087 if (need_issue
&& in
->is_head())
1090 dout(10) << "eval done" << dendl
;
1094 class C_Locker_Eval
: public LockerContext
{
1098 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1099 // We are used as an MDSCacheObject waiter, so should
1100 // only be invoked by someone already holding the big lock.
1101 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1102 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1104 void finish(int r
) override
{
1105 locker
->try_eval(p
, mask
);
1106 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1110 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1112 // unstable and ambiguous auth?
1113 if (p
->is_ambiguous_auth()) {
1114 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1115 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1119 if (p
->is_auth() && p
->is_frozen()) {
1120 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1121 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1125 if (mask
& CEPH_LOCK_DN
) {
1126 assert(mask
== CEPH_LOCK_DN
);
1127 bool need_issue
= false; // ignore this, no caps on dentries
1128 CDentry
*dn
= static_cast<CDentry
*>(p
);
1129 eval_any(&dn
->lock
, &need_issue
);
1131 CInode
*in
= static_cast<CInode
*>(p
);
1136 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1138 MDSCacheObject
*p
= lock
->get_parent();
1140 // unstable and ambiguous auth?
1141 if (p
->is_ambiguous_auth()) {
1142 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1143 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1147 if (!p
->is_auth()) {
1148 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1152 if (p
->is_frozen()) {
1153 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1154 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1159 * We could have a situation like:
1161 * - mds A authpins item on mds B
1162 * - mds B starts to freeze tree containing item
1163 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1164 * - mds B lock is unstable, sets scatter_wanted
1165 * - mds B lock stabilizes, calls try_eval.
1167 * We can defer while freezing without causing a deadlock. Honor
1168 * scatter_wanted flag here. This will never get deferred by the
1169 * checks above due to the auth_pin held by the master.
1171 if (lock
->is_scatterlock()) {
1172 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1173 if (slock
->get_scatter_wanted() &&
1174 slock
->get_state() != LOCK_MIX
) {
1175 scatter_mix(slock
, pneed_issue
);
1176 if (!lock
->is_stable())
1178 } else if (slock
->get_unscatter_wanted() &&
1179 slock
->get_state() != LOCK_LOCK
) {
1180 simple_lock(slock
, pneed_issue
);
1181 if (!lock
->is_stable()) {
1187 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1188 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1189 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1193 eval(lock
, pneed_issue
);
1196 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1198 bool need_issue
= false;
1199 list
<MDSInternalContextBase
*> finishers
;
1202 if (!in
->filelock
.is_stable())
1203 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1204 if (!in
->authlock
.is_stable())
1205 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1206 if (!in
->linklock
.is_stable())
1207 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1208 if (!in
->xattrlock
.is_stable())
1209 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1211 if (need_issue
&& in
->is_head()) {
1213 issue_set
->insert(in
);
1218 finish_contexts(g_ceph_context
, finishers
);
1221 void Locker::eval_scatter_gathers(CInode
*in
)
1223 bool need_issue
= false;
1224 list
<MDSInternalContextBase
*> finishers
;
1226 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1229 if (!in
->filelock
.is_stable())
1230 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1231 if (!in
->nestlock
.is_stable())
1232 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1233 if (!in
->dirfragtreelock
.is_stable())
1234 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1236 if (need_issue
&& in
->is_head())
1239 finish_contexts(g_ceph_context
, finishers
);
1242 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1244 switch (lock
->get_type()) {
1245 case CEPH_LOCK_IFILE
:
1246 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1247 case CEPH_LOCK_IDFT
:
1248 case CEPH_LOCK_INEST
:
1249 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1251 return simple_eval(lock
, need_issue
);
1256 // ------------------
1259 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1262 if (lock
->is_stable()) {
1263 if (lock
->get_parent()->is_auth()) {
1264 if (lock
->get_sm() == &sm_scatterlock
) {
1265 // not until tempsync is fully implemented
1266 //if (lock->get_parent()->is_replicated())
1267 //scatter_tempsync((ScatterLock*)lock);
1270 } else if (lock
->get_sm() == &sm_filelock
) {
1271 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1272 if (lock
->get_state() == LOCK_EXCL
&&
1273 in
->get_target_loner() >= 0 &&
1274 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1282 // request rdlock state change from auth
1283 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1284 if (!mds
->is_cluster_degraded() ||
1285 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1286 dout(10) << "requesting rdlock from auth on "
1287 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1288 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1293 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1294 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1295 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1296 mds
->mdcache
->recovery_queue
.prioritize(in
);
1303 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1305 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1307 // can read? grab ref.
1308 if (lock
->can_rdlock(client
))
1311 _rdlock_kick(lock
, false);
1313 if (lock
->can_rdlock(client
))
1318 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1319 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1324 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1326 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1328 // client may be allowed to rdlock the same item it has xlocked.
1329 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1330 if (mut
->snapid
!= CEPH_NOSNAP
)
1332 client_t client
= as_anon
? -1 : mut
->get_client();
1335 if (lock
->get_type() != CEPH_LOCK_DN
)
1336 in
= static_cast<CInode
*>(lock
->get_parent());
1339 if (!lock->get_parent()->is_auth() &&
1340 lock->fw_rdlock_to_auth()) {
1341 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1347 // can read? grab ref.
1348 if (lock
->can_rdlock(client
)) {
1350 mut
->rdlocks
.insert(lock
);
1351 mut
->locks
.insert(lock
);
1355 // hmm, wait a second.
1356 if (in
&& !in
->is_head() && in
->is_auth() &&
1357 lock
->get_state() == LOCK_SNAP_SYNC
) {
1358 // okay, we actually need to kick the head's lock to get ourselves synced up.
1359 CInode
*head
= mdcache
->get_inode(in
->ino());
1361 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1362 if (hlock
->get_state() == LOCK_SYNC
)
1363 hlock
= head
->get_lock(lock
->get_type());
1365 if (hlock
->get_state() != LOCK_SYNC
) {
1366 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1367 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1369 // oh, check our lock again then
1373 if (!_rdlock_kick(lock
, as_anon
))
1379 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1380 wait_on
= SimpleLock::WAIT_RD
;
1382 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1383 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1384 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1389 void Locker::nudge_log(SimpleLock
*lock
)
1391 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1392 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1393 mds
->mdlog
->flush();
1396 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1401 mut
->rdlocks
.erase(lock
);
1402 mut
->locks
.erase(lock
);
1405 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1408 if (!lock
->is_rdlocked()) {
1409 if (!lock
->is_stable())
1410 eval_gather(lock
, false, pneed_issue
);
1411 else if (lock
->get_parent()->is_auth())
1412 try_eval(lock
, pneed_issue
);
1417 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1419 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1420 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1421 if (!(*p
)->can_rdlock(-1)) {
1422 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1428 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1430 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1431 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1432 if (!rdlock_try(*p
, -1, NULL
)) {
1433 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1439 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1441 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1442 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1444 mut
->rdlocks
.insert(*p
);
1445 mut
->locks
.insert(*p
);
1449 // ------------------
1452 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1454 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1455 lock
->get_type() == CEPH_LOCK_DVERSION
)
1456 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1458 dout(7) << "wrlock_force on " << *lock
1459 << " on " << *lock
->get_parent() << dendl
;
1460 lock
->get_wrlock(true);
1461 mut
->wrlocks
.insert(lock
);
1462 mut
->locks
.insert(lock
);
1465 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1467 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1468 lock
->get_type() == CEPH_LOCK_DVERSION
)
1469 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1471 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1473 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1474 client_t client
= mut
->get_client();
1475 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1476 (in
->has_subtree_or_exporting_dirfrag() ||
1477 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1481 if (lock
->can_wrlock(client
) &&
1482 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1484 mut
->wrlocks
.insert(lock
);
1485 mut
->locks
.insert(lock
);
1489 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1490 in
->state_test(CInode::STATE_RECOVERING
)) {
1491 mds
->mdcache
->recovery_queue
.prioritize(in
);
1494 if (!lock
->is_stable())
1497 if (in
->is_auth()) {
1498 // don't do nested lock state change if we have dirty scatterdata and
1499 // may scatter_writebehind or start_scatter, because nowait==true implies
1500 // that the caller already has a log entry open!
1501 if (nowait
&& lock
->is_dirty())
1505 scatter_mix(static_cast<ScatterLock
*>(lock
));
1509 if (nowait
&& !lock
->can_wrlock(client
))
1514 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1515 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1516 if (!mds
->is_cluster_degraded() ||
1517 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1518 dout(10) << "requesting scatter from auth on "
1519 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1520 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1527 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1528 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1535 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1537 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1538 lock
->get_type() == CEPH_LOCK_DVERSION
)
1539 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1541 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1544 mut
->wrlocks
.erase(lock
);
1545 if (mut
->remote_wrlocks
.count(lock
) == 0)
1546 mut
->locks
.erase(lock
);
1549 if (!lock
->is_wrlocked()) {
1550 if (!lock
->is_stable())
1551 eval_gather(lock
, false, pneed_issue
);
1552 else if (lock
->get_parent()->is_auth())
1553 try_eval(lock
, pneed_issue
);
1560 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1562 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1564 // wait for active target
1565 if (mds
->is_cluster_degraded() &&
1566 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1567 dout(7) << " mds." << target
<< " is not active" << dendl
;
1568 if (mut
->more()->waiting_on_slave
.empty())
1569 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1573 // send lock request
1574 mut
->start_locking(lock
, target
);
1575 mut
->more()->slaves
.insert(target
);
1576 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1577 MMDSSlaveRequest::OP_WRLOCK
);
1578 r
->set_lock_type(lock
->get_type());
1579 lock
->get_parent()->set_object_info(r
->get_object_info());
1580 mds
->send_message_mds(r
, target
);
1582 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1583 mut
->more()->waiting_on_slave
.insert(target
);
1586 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1590 mut
->remote_wrlocks
.erase(lock
);
1591 if (mut
->wrlocks
.count(lock
) == 0)
1592 mut
->locks
.erase(lock
);
1594 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1595 << " " << *lock
->get_parent() << dendl
;
1596 if (!mds
->is_cluster_degraded() ||
1597 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1598 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1599 MMDSSlaveRequest::OP_UNWRLOCK
);
1600 slavereq
->set_lock_type(lock
->get_type());
1601 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1602 mds
->send_message_mds(slavereq
, target
);
1607 // ------------------
1610 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1612 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1613 lock
->get_type() == CEPH_LOCK_DVERSION
)
1614 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1616 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1617 client_t client
= mut
->get_client();
1620 if (lock
->get_parent()->is_auth()) {
1623 if (lock
->can_xlock(client
)) {
1624 lock
->set_state(LOCK_XLOCK
);
1625 lock
->get_xlock(mut
, client
);
1626 mut
->xlocks
.insert(lock
);
1627 mut
->locks
.insert(lock
);
1628 mut
->finish_locking(lock
);
1632 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1633 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1634 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1635 mds
->mdcache
->recovery_queue
.prioritize(in
);
1639 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1640 lock
->get_xlock_by_client() != client
||
1641 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1644 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1645 mut
->start_locking(lock
);
1652 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1657 assert(lock
->get_sm()->can_remote_xlock
);
1658 assert(!mut
->slave_request
);
1660 // wait for single auth
1661 if (lock
->get_parent()->is_ambiguous_auth()) {
1662 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1663 new C_MDS_RetryRequest(mdcache
, mut
));
1667 // wait for active auth
1668 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1669 if (mds
->is_cluster_degraded() &&
1670 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1671 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1672 if (mut
->more()->waiting_on_slave
.empty())
1673 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1677 // send lock request
1678 mut
->more()->slaves
.insert(auth
);
1679 mut
->start_locking(lock
, auth
);
1680 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1681 MMDSSlaveRequest::OP_XLOCK
);
1682 r
->set_lock_type(lock
->get_type());
1683 lock
->get_parent()->set_object_info(r
->get_object_info());
1684 mds
->send_message_mds(r
, auth
);
1686 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1687 mut
->more()->waiting_on_slave
.insert(auth
);
1693 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1695 assert(!lock
->is_stable());
1696 if (lock
->get_num_rdlocks() == 0 &&
1697 lock
->get_num_wrlocks() == 0 &&
1698 !lock
->is_leased() &&
1699 lock
->get_state() != LOCK_XLOCKSNAP
&&
1700 lock
->get_type() != CEPH_LOCK_DN
) {
1701 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1702 client_t loner
= in
->get_target_loner();
1703 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1704 lock
->set_state(LOCK_EXCL
);
1705 lock
->get_parent()->auth_unpin(lock
);
1706 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1707 if (lock
->get_cap_shift())
1708 *pneed_issue
= true;
1709 if (lock
->get_parent()->is_auth() &&
1711 try_eval(lock
, pneed_issue
);
1715 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1716 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1719 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1721 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1722 lock
->get_type() == CEPH_LOCK_DVERSION
)
1723 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1725 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1727 client_t xlocker
= lock
->get_xlock_by_client();
1732 mut
->xlocks
.erase(lock
);
1733 mut
->locks
.erase(lock
);
1735 bool do_issue
= false;
1738 if (!lock
->get_parent()->is_auth()) {
1739 assert(lock
->get_sm()->can_remote_xlock
);
1742 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1743 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1744 if (!mds
->is_cluster_degraded() ||
1745 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1746 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1747 MMDSSlaveRequest::OP_UNXLOCK
);
1748 slavereq
->set_lock_type(lock
->get_type());
1749 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1750 mds
->send_message_mds(slavereq
, auth
);
1753 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1754 SimpleLock::WAIT_WR
|
1755 SimpleLock::WAIT_RD
, 0);
1757 if (lock
->get_num_xlocks() == 0) {
1758 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1759 lock
->set_state(LOCK_XLOCKDONE
);
1760 _finish_xlock(lock
, xlocker
, &do_issue
);
1765 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1766 if (in
->is_head()) {
1768 *pneed_issue
= true;
1775 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1777 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1780 mut
->xlocks
.erase(lock
);
1781 mut
->locks
.erase(lock
);
1783 MDSCacheObject
*p
= lock
->get_parent();
1784 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1786 if (!lock
->is_stable())
1787 lock
->get_parent()->auth_unpin(lock
);
1789 lock
->set_state(LOCK_LOCK
);
1792 void Locker::xlock_import(SimpleLock
*lock
)
1794 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1795 lock
->get_parent()->auth_pin(lock
);
1800 // file i/o -----------------------------------------
1802 version_t
Locker::issue_file_data_version(CInode
*in
)
1804 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1805 return in
->inode
.file_data_version
;
1808 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1816 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1817 bool sm
=false, bool ni
=false, client_t c
=-1,
1818 MClientCaps
*ac
= 0)
1819 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1820 client(c
), ack(ac
) {
1821 in
->get(CInode::PIN_PTRWAITER
);
1823 void finish(int r
) override
{
1824 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1825 in
->put(CInode::PIN_PTRWAITER
);
1829 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1830 client_t client
, MClientCaps
*ack
)
1832 dout(10) << "file_update_finish on " << *in
<< dendl
;
1833 in
->pop_and_dirty_projected_inode(mut
->ls
);
1838 Session
*session
= mds
->get_session(client
);
1840 // "oldest flush tid" > 0 means client uses unique TID for each flush
1841 if (ack
->get_oldest_flush_tid() > 0)
1842 session
->add_completed_flush(ack
->get_client_tid());
1843 mds
->send_message_client_counted(ack
, session
);
1845 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1850 set
<CInode
*> need_issue
;
1851 drop_locks(mut
.get(), &need_issue
);
1853 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1854 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1855 // check for snap writeback completion
1856 bool gather
= false;
1857 auto p
= in
->client_snap_caps
.begin();
1858 while (p
!= in
->client_snap_caps
.end()) {
1859 SimpleLock
*lock
= in
->get_lock(p
->first
);
1861 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1862 << " lock " << *lock
<< " on " << *in
<< dendl
;
1865 p
->second
.erase(client
);
1866 if (p
->second
.empty()) {
1868 in
->client_snap_caps
.erase(p
++);
1873 if (in
->client_snap_caps
.empty())
1874 in
->item_open_file
.remove_myself();
1875 eval_cap_gather(in
, &need_issue
);
1878 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1879 Capability
*cap
= in
->get_client_cap(client
);
1880 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1881 issue_caps(in
, cap
);
1884 if (share_max
&& in
->is_auth() &&
1885 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1886 share_inode_max_size(in
);
1888 issue_caps_set(need_issue
);
1890 utime_t now
= ceph_clock_now();
1891 mds
->balancer
->hit_inode(now
, in
, META_POP_IWR
);
1893 // auth unpin after issuing caps
1897 Capability
* Locker::issue_new_caps(CInode
*in
,
1903 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1906 // if replay, try to reconnect cap, and otherwise do nothing.
1908 mds
->mdcache
->try_reconnect_cap(in
, session
);
1913 assert(session
->info
.inst
.name
.is_client());
1914 client_t my_client
= session
->info
.inst
.name
.num();
1915 int my_want
= ceph_caps_for_mode(mode
);
1917 // register a capability
1918 Capability
*cap
= in
->get_client_cap(my_client
);
1921 cap
= in
->add_client_cap(my_client
, session
, realm
);
1922 cap
->set_wanted(my_want
);
1924 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1928 // make sure it wants sufficient caps
1929 if (my_want
& ~cap
->wanted()) {
1930 // augment wanted caps for this client
1931 cap
->set_wanted(cap
->wanted() | my_want
);
1935 if (in
->is_auth()) {
1936 // [auth] twiddle mode?
1937 eval(in
, CEPH_CAP_LOCKS
);
1939 if (_need_flush_mdlog(in
, my_want
))
1940 mds
->mdlog
->flush();
1943 // [replica] tell auth about any new caps wanted
1944 request_inode_file_caps(in
);
1947 // issue caps (pot. incl new one)
1948 //issue_caps(in); // note: _eval above may have done this already...
1950 // re-issue whatever we can
1951 //cap->issue(cap->pending());
1954 cap
->dec_suppress();
1960 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1962 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1966 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1968 // allowed caps are determined by the lock mode.
1969 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1970 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1971 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1973 client_t loner
= in
->get_loner();
1975 dout(7) << "issue_caps loner client." << loner
1976 << " allowed=" << ccap_string(loner_allowed
)
1977 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1978 << ", others allowed=" << ccap_string(all_allowed
)
1979 << " on " << *in
<< dendl
;
1981 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1982 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1983 << " on " << *in
<< dendl
;
1986 assert(in
->is_head());
1988 // count conflicts with
1992 map
<client_t
, Capability
*>::iterator it
;
1994 it
= in
->client_caps
.find(only_cap
->get_client());
1996 it
= in
->client_caps
.begin();
1997 for (; it
!= in
->client_caps
.end(); ++it
) {
1998 Capability
*cap
= it
->second
;
1999 if (cap
->is_stale())
2002 // do not issue _new_ bits when size|mtime is projected
2004 if (loner
== it
->first
)
2005 allowed
= loner_allowed
;
2007 allowed
= all_allowed
;
2009 // add in any xlocker-only caps (for locks this client is the xlocker for)
2010 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2012 Session
*session
= mds
->get_session(it
->first
);
2013 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
2014 !(session
&& session
->connection
&&
2015 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
2016 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2018 int pending
= cap
->pending();
2019 int wanted
= cap
->wanted();
2021 dout(20) << " client." << it
->first
2022 << " pending " << ccap_string(pending
)
2023 << " allowed " << ccap_string(allowed
)
2024 << " wanted " << ccap_string(wanted
)
2027 if (!(pending
& ~allowed
)) {
2028 // skip if suppress or new, and not revocation
2029 if (cap
->is_new() || cap
->is_suppress()) {
2030 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2035 // notify clients about deleted inode, to make sure they release caps ASAP.
2036 if (in
->inode
.nlink
== 0)
2037 wanted
|= CEPH_CAP_LINK_SHARED
;
2039 // are there caps that the client _wants_ and can have, but aren't pending?
2040 // or do we need to revoke?
2041 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2042 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2046 // include caps that clients generally like, while we're at it.
2047 int likes
= in
->get_caps_liked();
2048 int before
= pending
;
2050 if (pending
& ~allowed
)
2051 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2053 seq
= cap
->issue((wanted
|likes
) & allowed
);
2054 int after
= cap
->pending();
2056 if (cap
->is_new()) {
2057 // haven't send caps to client yet
2058 if (before
& ~after
)
2059 cap
->confirm_receipt(seq
, after
);
2061 dout(7) << " sending MClientCaps to client." << it
->first
2062 << " seq " << cap
->get_last_seq()
2063 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2066 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2067 if (op
== CEPH_CAP_OP_REVOKE
) {
2068 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2069 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2070 cap
->set_last_revoke_stamp(ceph_clock_now());
2071 cap
->reset_num_revoke_warnings();
2074 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2075 in
->find_snaprealm()->inode
->ino(),
2076 cap
->get_cap_id(), cap
->get_last_seq(),
2079 mds
->get_osd_epoch_barrier());
2080 in
->encode_cap_message(m
, cap
);
2082 mds
->send_message_client_counted(m
, it
->first
);
2090 return (nissued
== 0); // true if no re-issued, no callbacks
2093 void Locker::issue_truncate(CInode
*in
)
2095 dout(7) << "issue_truncate on " << *in
<< dendl
;
2097 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2098 it
!= in
->client_caps
.end();
2100 Capability
*cap
= it
->second
;
2101 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2103 in
->find_snaprealm()->inode
->ino(),
2104 cap
->get_cap_id(), cap
->get_last_seq(),
2105 cap
->pending(), cap
->wanted(), 0,
2107 mds
->get_osd_epoch_barrier());
2108 in
->encode_cap_message(m
, cap
);
2109 mds
->send_message_client_counted(m
, it
->first
);
2112 // should we increase max_size?
2113 if (in
->is_auth() && in
->is_file())
2114 check_inode_max_size(in
);
2118 void Locker::revoke_stale_caps(Capability
*cap
)
2120 CInode
*in
= cap
->get_inode();
2121 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2122 // if export succeeds, the cap will be removed. if export fails, we need to
2123 // revoke the cap if it's still stale.
2124 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2128 int issued
= cap
->issued();
2129 if (issued
& ~CEPH_CAP_PIN
) {
2130 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2133 if (in
->is_auth() &&
2134 in
->inode
.client_ranges
.count(cap
->get_client()))
2135 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2137 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2138 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2139 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2140 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2142 if (in
->is_auth()) {
2143 try_eval(in
, CEPH_CAP_LOCKS
);
2145 request_inode_file_caps(in
);
2150 void Locker::revoke_stale_caps(Session
*session
)
2152 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2154 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2155 Capability
*cap
= *p
;
2157 revoke_stale_caps(cap
);
2161 void Locker::resume_stale_caps(Session
*session
)
2163 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2165 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2166 Capability
*cap
= *p
;
2167 CInode
*in
= cap
->get_inode();
2168 assert(in
->is_head());
2169 if (cap
->is_stale()) {
2170 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2173 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2174 // if export succeeds, the cap will be removed. if export fails,
2175 // we need to re-issue the cap if it's not stale.
2176 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2180 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2181 issue_caps(in
, cap
);
2186 void Locker::remove_stale_leases(Session
*session
)
2188 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2189 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2191 ClientLease
*l
= *p
;
2193 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2194 dout(15) << " removing lease on " << *parent
<< dendl
;
2195 parent
->remove_client_lease(l
, this);
2200 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2203 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2204 in
->get(CInode::PIN_PTRWAITER
);
2206 void finish(int r
) override
{
2208 locker
->request_inode_file_caps(in
);
2209 in
->put(CInode::PIN_PTRWAITER
);
2213 void Locker::request_inode_file_caps(CInode
*in
)
2215 assert(!in
->is_auth());
2217 int wanted
= in
->get_caps_wanted() & ~CEPH_CAP_PIN
;
2218 if (wanted
!= in
->replica_caps_wanted
) {
2219 // wait for single auth
2220 if (in
->is_ambiguous_auth()) {
2221 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2222 new C_MDL_RequestInodeFileCaps(this, in
));
2226 mds_rank_t auth
= in
->authority().first
;
2227 if (mds
->is_cluster_degraded() &&
2228 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2229 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2233 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2234 << " was " << ccap_string(in
->replica_caps_wanted
)
2235 << " on " << *in
<< " to mds." << auth
<< dendl
;
2237 in
->replica_caps_wanted
= wanted
;
2239 if (!mds
->is_cluster_degraded() ||
2240 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2241 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2246 /* This function DOES put the passed message before returning */
2247 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2249 // nobody should be talking to us during recovery.
2250 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2253 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2254 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2257 assert(in
->is_auth());
2259 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2262 in
->mds_caps_wanted
[from
] = m
->get_caps();
2264 in
->mds_caps_wanted
.erase(from
);
2266 try_eval(in
, CEPH_CAP_LOCKS
);
2271 class C_MDL_CheckMaxSize
: public LockerContext
{
2273 uint64_t new_max_size
;
2278 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2279 uint64_t _newsize
, utime_t _mtime
) :
2280 LockerContext(l
), in(i
),
2281 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2283 in
->get(CInode::PIN_PTRWAITER
);
2285 void finish(int r
) override
{
2287 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2288 in
->put(CInode::PIN_PTRWAITER
);
2292 uint64_t Locker::calc_new_max_size(CInode::mempool_inode
*pi
, uint64_t size
)
2294 uint64_t new_max
= (size
+ 1) << 1;
2295 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2297 max_inc
*= pi
->layout
.object_size
;
2298 new_max
= MIN(new_max
, size
+ max_inc
);
2300 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2303 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2304 CInode::mempool_inode::client_range_map
*new_ranges
,
2305 bool *max_increased
)
2307 auto latest
= in
->get_projected_inode();
2309 if(latest
->has_layout()) {
2310 ms
= calc_new_max_size(latest
, size
);
2312 // Layout-less directories like ~mds0/, have zero size
2316 // increase ranges as appropriate.
2317 // shrink to 0 if no WR|BUFFER caps issued.
2318 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2319 p
!= in
->client_caps
.end();
2321 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2322 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2324 if (latest
->client_ranges
.count(p
->first
)) {
2325 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2326 if (ms
> oldr
.range
.last
)
2327 *max_increased
= true;
2328 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2329 nr
.follows
= oldr
.follows
;
2331 *max_increased
= true;
2333 nr
.follows
= in
->first
- 1;
2339 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2340 uint64_t new_max_size
, uint64_t new_size
,
2343 assert(in
->is_auth());
2344 assert(in
->is_file());
2346 CInode::mempool_inode
*latest
= in
->get_projected_inode();
2347 CInode::mempool_inode::client_range_map new_ranges
;
2348 uint64_t size
= latest
->size
;
2349 bool update_size
= new_size
> 0;
2350 bool update_max
= false;
2351 bool max_increased
= false;
2354 new_size
= size
= MAX(size
, new_size
);
2355 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2356 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2357 update_size
= false;
2360 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2362 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2365 if (!update_size
&& !update_max
) {
2366 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2370 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2371 << " update_size " << update_size
2372 << " on " << *in
<< dendl
;
2374 if (in
->is_frozen()) {
2375 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2376 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2380 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2383 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2385 if (in
->filelock
.is_stable()) {
2386 if (in
->get_target_loner() >= 0)
2387 file_excl(&in
->filelock
);
2389 simple_lock(&in
->filelock
);
2391 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2393 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2398 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2399 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2404 MutationRef
mut(new MutationImpl());
2405 mut
->ls
= mds
->mdlog
->get_current_segment();
2407 auto &pi
= in
->project_inode();
2408 pi
.inode
.version
= in
->pre_dirty();
2411 dout(10) << "check_inode_max_size client_ranges " << pi
.inode
.client_ranges
<< " -> " << new_ranges
<< dendl
;
2412 pi
.inode
.client_ranges
= new_ranges
;
2416 dout(10) << "check_inode_max_size size " << pi
.inode
.size
<< " -> " << new_size
<< dendl
;
2417 pi
.inode
.size
= new_size
;
2418 pi
.inode
.rstat
.rbytes
= new_size
;
2419 dout(10) << "check_inode_max_size mtime " << pi
.inode
.mtime
<< " -> " << new_mtime
<< dendl
;
2420 pi
.inode
.mtime
= new_mtime
;
2421 if (new_mtime
> pi
.inode
.ctime
)
2422 pi
.inode
.ctime
= pi
.inode
.rstat
.rctime
= new_mtime
;
2425 // use EOpen if the file is still open; otherwise, use EUpdate.
2426 // this is just an optimization to push open files forward into
2427 // newer log segments.
2429 EMetaBlob
*metablob
;
2430 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2431 EOpen
*eo
= new EOpen(mds
->mdlog
);
2432 eo
->add_ino(in
->ino());
2433 metablob
= &eo
->metablob
;
2435 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2437 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2438 metablob
= &eu
->metablob
;
2441 mds
->mdlog
->start_entry(le
);
2442 if (update_size
) { // FIXME if/when we do max_size nested accounting
2443 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2445 CDentry
*parent
= in
->get_projected_parent_dn();
2446 metablob
->add_primary_dentry(parent
, in
, true);
2448 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2449 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2451 mds
->mdlog
->submit_entry(le
,
2452 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2453 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2456 // make max_size _increase_ timely
2458 mds
->mdlog
->flush();
2464 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2467 * only share if currently issued a WR cap. if client doesn't have it,
2468 * file_max doesn't matter, and the client will get it if/when they get
2471 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2472 map
<client_t
, Capability
*>::iterator it
;
2474 it
= in
->client_caps
.find(only_cap
->get_client());
2476 it
= in
->client_caps
.begin();
2477 for (; it
!= in
->client_caps
.end(); ++it
) {
2478 const client_t client
= it
->first
;
2479 Capability
*cap
= it
->second
;
2480 if (cap
->is_suppress())
2482 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2483 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2484 cap
->inc_last_seq();
2485 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2487 in
->find_snaprealm()->inode
->ino(),
2488 cap
->get_cap_id(), cap
->get_last_seq(),
2489 cap
->pending(), cap
->wanted(), 0,
2491 mds
->get_osd_epoch_barrier());
2492 in
->encode_cap_message(m
, cap
);
2493 mds
->send_message_client_counted(m
, client
);
2500 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2502 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2503 * pending mutations. */
2504 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2505 in
->filelock
.is_unstable_and_locked()) ||
2506 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2507 in
->authlock
.is_unstable_and_locked()) ||
2508 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2509 in
->linklock
.is_unstable_and_locked()) ||
2510 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2511 in
->xattrlock
.is_unstable_and_locked()))
2516 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2518 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2519 dout(10) << " wanted " << ccap_string(cap
->wanted())
2520 << " -> " << ccap_string(wanted
) << dendl
;
2521 cap
->set_wanted(wanted
);
2522 } else if (wanted
& ~cap
->wanted()) {
2523 dout(10) << " wanted " << ccap_string(cap
->wanted())
2524 << " -> " << ccap_string(wanted
)
2525 << " (added caps even though we had seq mismatch!)" << dendl
;
2526 cap
->set_wanted(wanted
| cap
->wanted());
2528 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2529 << " -> " << ccap_string(wanted
)
2530 << " (issue_seq " << issue_seq
<< " != last_issue "
2531 << cap
->get_last_issue() << ")" << dendl
;
2535 CInode
*cur
= cap
->get_inode();
2536 if (!cur
->is_auth()) {
2537 request_inode_file_caps(cur
);
2541 if (cap
->wanted() == 0) {
2542 if (cur
->item_open_file
.is_on_list() &&
2543 !cur
->is_any_caps_wanted()) {
2544 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2545 cur
->item_open_file
.remove_myself();
2548 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2549 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2550 CEPH_CAP_FILE_WR
))) {
2551 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2554 if (!cur
->item_open_file
.is_on_list()) {
2555 dout(10) << " adding to open file list " << *cur
<< dendl
;
2556 assert(cur
->last
== CEPH_NOSNAP
);
2557 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2558 EOpen
*le
= new EOpen(mds
->mdlog
);
2559 mds
->mdlog
->start_entry(le
);
2560 le
->add_clean_inode(cur
);
2561 ls
->open_files
.push_back(&cur
->item_open_file
);
2562 mds
->mdlog
->submit_entry(le
);
2569 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2571 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2572 for (auto p
= head_in
->client_need_snapflush
.begin();
2573 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2574 snapid_t snapid
= p
->first
;
2575 auto &clients
= p
->second
;
2576 ++p
; // be careful, q loop below depends on this
2578 if (clients
.count(client
)) {
2579 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2580 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
2582 assert(sin
->first
<= snapid
);
2583 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2584 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2590 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2593 * This policy needs to be AT LEAST as permissive as allowing a client request
2594 * to go forward, or else a client request can release something, the release
2595 * gets deferred, but the request gets processed and deadlocks because when the
2596 * caps can't get revoked.
2598 * Currently, a request wait if anything locked is freezing (can't
2599 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2600 * _MUST_ be in the lock/auth_pin set.
2602 * auth_pins==0 implies no unstable lock and not auth pinnned by
2603 * client request, otherwise continue even it's freezing.
2605 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2609 * This function DOES put the passed message before returning
2611 void Locker::handle_client_caps(MClientCaps
*m
)
2613 client_t client
= m
->get_source().num();
2614 snapid_t follows
= m
->get_snap_follows();
2615 dout(7) << "handle_client_caps "
2616 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2617 << " on " << m
->get_ino()
2618 << " tid " << m
->get_client_tid() << " follows " << follows
2619 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2621 Session
*session
= mds
->get_session(m
);
2622 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2624 dout(5) << " no session, dropping " << *m
<< dendl
;
2628 if (session
->is_closed() ||
2629 session
->is_closing() ||
2630 session
->is_killing()) {
2631 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2635 if (mds
->is_reconnect() &&
2636 m
->get_dirty() && m
->get_client_tid() > 0 &&
2637 !session
->have_completed_flush(m
->get_client_tid())) {
2638 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2640 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2644 if (m
->get_client_tid() > 0 && session
&&
2645 session
->have_completed_flush(m
->get_client_tid())) {
2646 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2647 << " for client." << client
<< dendl
;
2649 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2650 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2651 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2653 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2654 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2655 mds
->get_osd_epoch_barrier());
2657 ack
->set_snap_follows(follows
);
2658 ack
->set_client_tid(m
->get_client_tid());
2659 mds
->send_message_client_counted(ack
, m
->get_connection());
2660 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2664 // fall-thru because the message may release some caps
2666 m
->set_op(CEPH_CAP_OP_UPDATE
);
2670 // "oldest flush tid" > 0 means client uses unique TID for each flush
2671 if (m
->get_oldest_flush_tid() > 0 && session
) {
2672 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2673 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2675 if (session
->get_num_trim_flushes_warnings() > 0 &&
2676 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2677 session
->reset_num_trim_flushes_warnings();
2679 if (session
->get_num_completed_flushes() >=
2680 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2681 session
->inc_num_trim_flushes_warnings();
2683 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2684 << m
->get_oldest_flush_tid() << "), "
2685 << session
->get_num_completed_flushes()
2686 << " completed flushes recorded in session";
2687 mds
->clog
->warn() << ss
.str();
2688 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2693 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2695 if (mds
->is_clientreplay()) {
2696 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2697 << ", will try again after replayed client requests" << dendl
;
2698 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2703 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2704 * Sequence of events that cause this are:
2705 * - client sends caps message to mds.a
2706 * - mds finishes subtree migration, send cap export to client
2707 * - mds trim its cache
2708 * - mds receives cap messages from client
2710 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2715 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2716 // Pause RADOS operations until we see the required epoch
2717 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2720 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2721 // Record the barrier so that we will retransmit it to clients
2722 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2725 dout(10) << " head inode " << *head_in
<< dendl
;
2727 Capability
*cap
= 0;
2728 cap
= head_in
->get_client_cap(client
);
2730 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
2737 if (should_defer_client_cap_frozen(head_in
)) {
2738 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
2739 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2742 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2743 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2744 << ", dropping" << dendl
;
2749 int op
= m
->get_op();
2752 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2753 if (!head_in
->is_auth()) {
2754 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
2758 SnapRealm
*realm
= head_in
->find_snaprealm();
2759 snapid_t snap
= realm
->get_snap_following(follows
);
2760 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2762 CInode
*in
= head_in
;
2763 if (snap
!= CEPH_NOSNAP
) {
2764 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
2766 dout(10) << " snapped inode " << *in
<< dendl
;
2769 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2770 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2771 // case we get a dup response, so whatever.)
2772 MClientCaps
*ack
= 0;
2773 if (m
->get_dirty()) {
2774 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2775 ack
->set_snap_follows(follows
);
2776 ack
->set_client_tid(m
->get_client_tid());
2777 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2780 if (in
== head_in
||
2781 (head_in
->client_need_snapflush
.count(snap
) &&
2782 head_in
->client_need_snapflush
[snap
].count(client
))) {
2783 dout(7) << " flushsnap snap " << snap
2784 << " client." << client
<< " on " << *in
<< dendl
;
2786 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2788 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2789 else if (head_in
->client_need_snapflush
.begin()->first
< snap
)
2790 _do_null_snapflush(head_in
, client
, snap
);
2792 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2795 head_in
->remove_need_snapflush(in
, snap
, client
);
2797 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2799 mds
->send_message_client_counted(ack
, m
->get_connection());
2804 if (cap
->get_cap_id() != m
->get_cap_id()) {
2805 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2807 CInode
*in
= head_in
;
2809 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2810 // intermediate snap inodes
2811 while (in
!= head_in
) {
2812 assert(in
->last
!= CEPH_NOSNAP
);
2813 if (in
->is_auth() && m
->get_dirty()) {
2814 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2815 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2817 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2821 // head inode, and cap
2822 MClientCaps
*ack
= 0;
2824 int caps
= m
->get_caps();
2825 if (caps
& ~cap
->issued()) {
2826 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2827 caps
&= cap
->issued();
2830 cap
->confirm_receipt(m
->get_seq(), caps
);
2831 dout(10) << " follows " << follows
2832 << " retains " << ccap_string(m
->get_caps())
2833 << " dirty " << ccap_string(m
->get_dirty())
2834 << " on " << *in
<< dendl
;
2837 // missing/skipped snapflush?
2838 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2839 // presently only does so when it has actual dirty metadata. But, we
2840 // set up the need_snapflush stuff based on the issued caps.
2841 // We can infer that the client WONT send a FLUSHSNAP once they have
2842 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2844 if (!head_in
->client_need_snapflush
.empty()) {
2845 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2846 _do_null_snapflush(head_in
, client
);
2848 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2852 if (m
->get_dirty() && in
->is_auth()) {
2853 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2854 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2855 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2856 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2857 ack
->set_client_tid(m
->get_client_tid());
2858 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2861 // filter wanted based on what we could ever give out (given auth/replica status)
2862 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2863 int new_wanted
= m
->get_wanted() & head_in
->get_caps_allowed_ever();
2864 if (new_wanted
!= cap
->wanted()) {
2865 if (!need_flush
&& (new_wanted
& ~cap
->pending())) {
2866 // exapnding caps. make sure we aren't waiting for a log flush
2867 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2870 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2873 if (in
->is_auth() &&
2874 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2876 eval(in
, CEPH_CAP_LOCKS
);
2878 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2879 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2881 // no update, ack now.
2883 mds
->send_message_client_counted(ack
, m
->get_connection());
2885 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2886 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2887 issue_caps(in
, cap
);
2889 if (cap
->get_last_seq() == 0 &&
2890 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2891 cap
->issue_norevoke(cap
->issued());
2892 share_inode_max_size(in
, cap
);
2897 mds
->mdlog
->flush();
2905 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2907 ceph_mds_request_release item
;
2909 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2910 LockerContext(l
), client(c
), item(it
) { }
2911 void finish(int r
) override
{
2913 MDRequestRef null_ref
;
2914 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2918 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2919 boost::string_view dname
)
2921 inodeno_t ino
= (uint64_t)item
.ino
;
2922 uint64_t cap_id
= item
.cap_id
;
2923 int caps
= item
.caps
;
2924 int wanted
= item
.wanted
;
2926 int issue_seq
= item
.issue_seq
;
2927 int mseq
= item
.mseq
;
2929 CInode
*in
= mdcache
->get_inode(ino
);
2933 if (dname
.length()) {
2934 frag_t fg
= in
->pick_dirfrag(dname
);
2935 CDir
*dir
= in
->get_dirfrag(fg
);
2937 CDentry
*dn
= dir
->lookup(dname
);
2939 ClientLease
*l
= dn
->get_client_lease(client
);
2941 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2942 dn
->remove_client_lease(l
, this);
2944 dout(7) << "process_cap_release client." << client
2945 << " doesn't have lease on " << *dn
<< dendl
;
2948 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2949 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2954 Capability
*cap
= in
->get_client_cap(client
);
2958 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2959 << (mdr
? "" : " (DEFERRED, no mdr)")
2962 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2963 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2967 if (cap
->get_cap_id() != cap_id
) {
2968 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2972 if (should_defer_client_cap_frozen(in
)) {
2973 dout(7) << " frozen, deferring" << dendl
;
2974 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2978 if (caps
& ~cap
->issued()) {
2979 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2980 caps
&= cap
->issued();
2982 cap
->confirm_receipt(seq
, caps
);
2984 if (!in
->client_need_snapflush
.empty() &&
2985 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2986 _do_null_snapflush(in
, client
);
2989 adjust_cap_wanted(cap
, wanted
, issue_seq
);
2992 cap
->inc_suppress();
2993 eval(in
, CEPH_CAP_LOCKS
);
2995 cap
->dec_suppress();
2997 // take note; we may need to reissue on this cap later
2999 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
3002 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
3007 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
3008 LockerContext(l
), in(i
), client(c
), seq(s
) {
3009 in
->get(CInode::PIN_PTRWAITER
);
3011 void finish(int r
) override
{
3012 locker
->kick_issue_caps(in
, client
, seq
);
3013 in
->put(CInode::PIN_PTRWAITER
);
3017 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3019 Capability
*cap
= in
->get_client_cap(client
);
3020 if (!cap
|| cap
->get_last_sent() != seq
)
3022 if (in
->is_frozen()) {
3023 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3024 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3025 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3028 dout(10) << "kick_issue_caps released at current seq " << seq
3029 << ", reissuing" << dendl
;
3030 issue_caps(in
, cap
);
3033 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3035 client_t client
= mdr
->get_client();
3036 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3037 p
!= mdr
->cap_releases
.end();
3039 CInode
*in
= mdcache
->get_inode(p
->first
);
3042 kick_issue_caps(in
, client
, p
->second
);
3047 * m and ack might be NULL, so don't dereference them unless dirty != 0
3049 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3051 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3052 << " follows " << follows
<< " snap " << snap
3053 << " on " << *in
<< dendl
;
3055 if (snap
== CEPH_NOSNAP
) {
3056 // hmm, i guess snap was already deleted? just ack!
3057 dout(10) << " wow, the snap following " << follows
3058 << " was already deleted. nothing to record, just ack." << dendl
;
3060 mds
->send_message_client_counted(ack
, m
->get_connection());
3064 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3065 mds
->mdlog
->start_entry(le
);
3066 MutationRef mut
= new MutationImpl();
3067 mut
->ls
= mds
->mdlog
->get_current_segment();
3069 // normal metadata updates that we can apply to the head as well.
3072 CInode::mempool_xattr_map
*px
= nullptr;
3073 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3074 m
->xattrbl
.length() &&
3075 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3077 CInode::mempool_old_inode
*oi
= 0;
3078 if (in
->is_multiversion()) {
3079 oi
= in
->pick_old_inode(snap
);
3082 CInode::mempool_inode
*i
;
3084 dout(10) << " writing into old inode" << dendl
;
3085 auto &pi
= in
->project_inode();
3086 pi
.inode
.version
= in
->pre_dirty();
3087 if (snap
> oi
->first
)
3088 in
->split_old_inode(snap
);
3093 auto &pi
= in
->project_inode(xattrs
);
3094 pi
.inode
.version
= in
->pre_dirty();
3097 px
= pi
.xattrs
.get();
3100 _update_cap_fields(in
, dirty
, m
, i
);
3104 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3105 << " len " << m
->xattrbl
.length() << dendl
;
3106 i
->xattr_version
= m
->head
.xattr_version
;
3107 bufferlist::iterator p
= m
->xattrbl
.begin();
3112 auto it
= i
->client_ranges
.find(client
);
3113 if (it
!= i
->client_ranges
.end()) {
3114 if (in
->last
== snap
) {
3115 dout(10) << " removing client_range entirely" << dendl
;
3116 i
->client_ranges
.erase(it
);
3118 dout(10) << " client_range now follows " << snap
<< dendl
;
3119 it
->second
.follows
= snap
;
3125 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3126 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3128 // "oldest flush tid" > 0 means client uses unique TID for each flush
3129 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3130 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3131 ack
->get_oldest_flush_tid());
3133 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3137 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, CInode::mempool_inode
*pi
)
3142 /* m must be valid if there are dirty caps */
3144 uint64_t features
= m
->get_connection()->get_features();
3146 if (m
->get_ctime() > pi
->ctime
) {
3147 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3148 << " for " << *in
<< dendl
;
3149 pi
->ctime
= pi
->rstat
.rctime
= m
->get_ctime();
3152 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3153 m
->get_change_attr() > pi
->change_attr
) {
3154 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3155 << " for " << *in
<< dendl
;
3156 pi
->change_attr
= m
->get_change_attr();
3160 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3161 utime_t atime
= m
->get_atime();
3162 utime_t mtime
= m
->get_mtime();
3163 uint64_t size
= m
->get_size();
3164 version_t inline_version
= m
->inline_version
;
3166 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3167 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3168 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3169 << " for " << *in
<< dendl
;
3172 if (in
->inode
.is_file() && // ONLY if regular file
3174 dout(7) << " size " << pi
->size
<< " -> " << size
3175 << " for " << *in
<< dendl
;
3177 pi
->rstat
.rbytes
= size
;
3179 if (in
->inode
.is_file() &&
3180 (dirty
& CEPH_CAP_FILE_WR
) &&
3181 inline_version
> pi
->inline_data
.version
) {
3182 pi
->inline_data
.version
= inline_version
;
3183 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3184 pi
->inline_data
.get_data() = m
->inline_data
;
3186 pi
->inline_data
.free_data();
3188 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3189 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3190 << " for " << *in
<< dendl
;
3193 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3194 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3195 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3196 << " for " << *in
<< dendl
;
3197 pi
->time_warp_seq
= m
->get_time_warp_seq();
3201 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3202 if (m
->head
.uid
!= pi
->uid
) {
3203 dout(7) << " uid " << pi
->uid
3204 << " -> " << m
->head
.uid
3205 << " for " << *in
<< dendl
;
3206 pi
->uid
= m
->head
.uid
;
3208 if (m
->head
.gid
!= pi
->gid
) {
3209 dout(7) << " gid " << pi
->gid
3210 << " -> " << m
->head
.gid
3211 << " for " << *in
<< dendl
;
3212 pi
->gid
= m
->head
.gid
;
3214 if (m
->head
.mode
!= pi
->mode
) {
3215 dout(7) << " mode " << oct
<< pi
->mode
3216 << " -> " << m
->head
.mode
<< dec
3217 << " for " << *in
<< dendl
;
3218 pi
->mode
= m
->head
.mode
;
3220 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3221 dout(7) << " btime " << oct
<< pi
->btime
3222 << " -> " << m
->get_btime() << dec
3223 << " for " << *in
<< dendl
;
3224 pi
->btime
= m
->get_btime();
3230 * update inode based on cap flush|flushsnap|wanted.
3231 * adjust max_size, if needed.
3232 * if we update, return true; otherwise, false (no updated needed).
3234 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3235 int dirty
, snapid_t follows
,
3236 MClientCaps
*m
, MClientCaps
*ack
,
3239 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3240 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3241 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3242 << " on " << *in
<< dendl
;
3243 assert(in
->is_auth());
3244 client_t client
= m
->get_source().num();
3245 CInode::mempool_inode
*latest
= in
->get_projected_inode();
3247 // increase or zero max_size?
3248 uint64_t size
= m
->get_size();
3249 bool change_max
= false;
3250 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3251 uint64_t new_max
= old_max
;
3253 if (in
->is_file()) {
3254 bool forced_change_max
= false;
3255 dout(20) << "inode is file" << dendl
;
3256 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3257 dout(20) << "client has write caps; m->get_max_size="
3258 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3259 if (m
->get_max_size() > new_max
) {
3260 dout(10) << "client requests file_max " << m
->get_max_size()
3261 << " > max " << old_max
<< dendl
;
3263 forced_change_max
= true;
3264 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3266 new_max
= calc_new_max_size(latest
, size
);
3268 if (new_max
> old_max
)
3280 if (in
->last
== CEPH_NOSNAP
&&
3282 !in
->filelock
.can_wrlock(client
) &&
3283 !in
->filelock
.can_force_wrlock(client
)) {
3284 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3285 if (in
->filelock
.is_stable()) {
3286 bool need_issue
= false;
3288 cap
->inc_suppress();
3289 if (in
->mds_caps_wanted
.empty() &&
3290 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3291 if (in
->filelock
.get_state() != LOCK_EXCL
)
3292 file_excl(&in
->filelock
, &need_issue
);
3294 simple_lock(&in
->filelock
, &need_issue
);
3298 cap
->dec_suppress();
3300 if (!in
->filelock
.can_wrlock(client
) &&
3301 !in
->filelock
.can_force_wrlock(client
)) {
3302 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3303 forced_change_max
? new_max
: 0,
3306 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3312 if (m
->flockbl
.length()) {
3314 bufferlist::iterator bli
= m
->flockbl
.begin();
3315 ::decode(num_locks
, bli
);
3316 for ( int i
=0; i
< num_locks
; ++i
) {
3317 ceph_filelock decoded_lock
;
3318 ::decode(decoded_lock
, bli
);
3319 in
->get_fcntl_lock_state()->held_locks
.
3320 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3321 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3323 ::decode(num_locks
, bli
);
3324 for ( int i
=0; i
< num_locks
; ++i
) {
3325 ceph_filelock decoded_lock
;
3326 ::decode(decoded_lock
, bli
);
3327 in
->get_flock_lock_state()->held_locks
.
3328 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3329 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3333 if (!dirty
&& !change_max
)
3336 Session
*session
= mds
->get_session(m
);
3337 if (session
->check_access(in
, MAY_WRITE
,
3338 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3339 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3344 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3345 mds
->mdlog
->start_entry(le
);
3347 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3348 m
->xattrbl
.length() &&
3349 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3351 auto &pi
= in
->project_inode(xattr
);
3352 pi
.inode
.version
= in
->pre_dirty();
3354 MutationRef
mut(new MutationImpl());
3355 mut
->ls
= mds
->mdlog
->get_current_segment();
3357 _update_cap_fields(in
, dirty
, m
, &pi
.inode
);
3360 dout(7) << " max_size " << old_max
<< " -> " << new_max
3361 << " for " << *in
<< dendl
;
3363 auto &cr
= pi
.inode
.client_ranges
[client
];
3365 cr
.range
.last
= new_max
;
3366 cr
.follows
= in
->first
- 1;
3368 pi
.inode
.client_ranges
.erase(client
);
3371 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3372 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3375 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3376 wrlock_force(&in
->authlock
, mut
);
3380 dout(7) << " xattrs v" << pi
.inode
.xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3381 pi
.inode
.xattr_version
= m
->head
.xattr_version
;
3382 bufferlist::iterator p
= m
->xattrbl
.begin();
3383 ::decode(*pi
.xattrs
, p
);
3384 wrlock_force(&in
->xattrlock
, mut
);
3388 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3389 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3391 // "oldest flush tid" > 0 means client uses unique TID for each flush
3392 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3393 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3394 ack
->get_oldest_flush_tid());
3396 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3399 if (need_flush
&& !*need_flush
&&
3400 ((change_max
&& new_max
) || // max INCREASE
3401 _need_flush_mdlog(in
, dirty
)))
3407 /* This function DOES put the passed message before returning */
3408 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3410 client_t client
= m
->get_source().num();
3411 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3413 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3414 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3418 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3419 // Pause RADOS operations until we see the required epoch
3420 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3423 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3424 // Record the barrier so that we will retransmit it to clients
3425 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3428 Session
*session
= mds
->get_session(m
);
3430 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3431 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3435 session
->notify_cap_release(m
->caps
.size());
3441 class C_Locker_RetryCapRelease
: public LockerContext
{
3445 ceph_seq_t migrate_seq
;
3446 ceph_seq_t issue_seq
;
3448 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3449 ceph_seq_t mseq
, ceph_seq_t seq
) :
3450 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3451 void finish(int r
) override
{
3452 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3456 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3457 ceph_seq_t mseq
, ceph_seq_t seq
)
3459 CInode
*in
= mdcache
->get_inode(ino
);
3461 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3464 Capability
*cap
= in
->get_client_cap(client
);
3466 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3470 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3471 if (cap
->get_cap_id() != cap_id
) {
3472 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3475 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3476 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3479 if (should_defer_client_cap_frozen(in
)) {
3480 dout(7) << " freezing|frozen, deferring" << dendl
;
3481 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3482 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3485 if (seq
!= cap
->get_last_issue()) {
3486 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3487 // clean out any old revoke history
3488 cap
->clean_revoke_from(seq
);
3489 eval_cap_gather(in
);
3492 remove_client_cap(in
, client
);
3495 /* This function DOES put the passed message before returning */
3497 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3499 // clean out any pending snapflush state
3500 if (!in
->client_need_snapflush
.empty())
3501 _do_null_snapflush(in
, client
);
3503 in
->remove_client_cap(client
);
3505 if (in
->is_auth()) {
3506 // make sure we clear out the client byte range
3507 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3508 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3509 check_inode_max_size(in
);
3511 request_inode_file_caps(in
);
3514 try_eval(in
, CEPH_CAP_LOCKS
);
3519 * Return true if any currently revoking caps exceed the
3520 * mds_session_timeout threshold.
3522 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
) const
3524 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3526 // No revoking caps at the moment
3529 utime_t now
= ceph_clock_now();
3530 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3531 if (age
<= g_conf
->mds_session_timeout
) {
3540 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
) const
3542 if (!any_late_revoking_caps(revoking_caps
)) {
3543 // Fast path: no misbehaving clients, execute in O(1)
3547 // Slow path: execute in O(N_clients)
3548 std::map
<client_t
, xlist
<Capability
*> >::const_iterator client_rc_iter
;
3549 for (client_rc_iter
= revoking_caps_by_client
.begin();
3550 client_rc_iter
!= revoking_caps_by_client
.end(); ++client_rc_iter
) {
3551 xlist
<Capability
*> const &client_rc
= client_rc_iter
->second
;
3552 bool any_late
= any_late_revoking_caps(client_rc
);
3554 result
->push_back(client_rc_iter
->first
);
3559 // Hard-code instead of surfacing a config settings because this is
3560 // really a hack that should go away at some point when we have better
3561 // inspection tools for getting at detailed cap state (#7316)
3562 #define MAX_WARN_CAPS 100
3564 void Locker::caps_tick()
3566 utime_t now
= ceph_clock_now();
3568 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3571 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3572 Capability
*cap
= *p
;
3574 utime_t age
= now
- cap
->get_last_revoke_stamp();
3575 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3576 if (age
<= g_conf
->mds_session_timeout
) {
3577 dout(20) << __func__
<< " age below timeout " << g_conf
->mds_session_timeout
<< dendl
;
3581 if (i
> MAX_WARN_CAPS
) {
3582 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3583 << "revoking, ignoring subsequent caps" << dendl
;
3587 // exponential backoff of warning intervals
3588 if (age
> g_conf
->mds_session_timeout
* (1 << cap
->get_num_revoke_warnings())) {
3589 cap
->inc_num_revoke_warnings();
3591 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3592 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3593 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3594 mds
->clog
->warn() << ss
.str();
3595 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3597 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3603 void Locker::handle_client_lease(MClientLease
*m
)
3605 dout(10) << "handle_client_lease " << *m
<< dendl
;
3607 assert(m
->get_source().is_client());
3608 client_t client
= m
->get_source().num();
3610 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3612 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3618 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3619 CDir
*dir
= in
->get_dirfrag(fg
);
3621 dn
= dir
->lookup(m
->dname
);
3623 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3627 dout(10) << " on " << *dn
<< dendl
;
3630 ClientLease
*l
= dn
->get_client_lease(client
);
3632 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3637 switch (m
->get_action()) {
3638 case CEPH_MDS_LEASE_REVOKE_ACK
:
3639 case CEPH_MDS_LEASE_RELEASE
:
3640 if (l
->seq
!= m
->get_seq()) {
3641 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3643 dout(7) << "handle_client_lease client." << client
3644 << " on " << *dn
<< dendl
;
3645 dn
->remove_client_lease(l
, this);
3650 case CEPH_MDS_LEASE_RENEW
:
3652 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3653 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3654 if (dn
->lock
.can_lease(client
)) {
3655 int pool
= 1; // fixme.. do something smart!
3656 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3657 m
->h
.seq
= ++l
->seq
;
3660 utime_t now
= ceph_clock_now();
3661 now
+= mdcache
->client_lease_durations
[pool
];
3662 mdcache
->touch_client_lease(l
, pool
, now
);
3664 mds
->send_message_client_counted(m
, m
->get_connection());
3670 ceph_abort(); // implement me
3676 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3677 bufferlist
&bl
, utime_t now
, Session
*session
)
3679 CInode
*diri
= dn
->get_dir()->get_inode();
3680 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3681 ((!diri
->filelock
.can_lease(client
) &&
3682 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3683 dn
->lock
.can_lease(client
)) {
3684 int pool
= 1; // fixme.. do something smart!
3685 // issue a dentry lease
3686 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3687 session
->touch_lease(l
);
3689 now
+= mdcache
->client_lease_durations
[pool
];
3690 mdcache
->touch_client_lease(l
, pool
, now
);
3693 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3695 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3697 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3698 << " on " << *dn
<< dendl
;
3706 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3711 void Locker::revoke_client_leases(SimpleLock
*lock
)
3714 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3715 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3716 p
!= dn
->client_lease_map
.end();
3718 ClientLease
*l
= p
->second
;
3721 assert(lock
->get_type() == CEPH_LOCK_DN
);
3723 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3724 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3726 // i should also revoke the dir ICONTENT lease, if they have it!
3727 CInode
*diri
= dn
->get_dir()->get_inode();
3728 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3731 diri
->first
, CEPH_NOSNAP
,
3739 // locks ----------------------------------------------------------------
3741 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3743 switch (lock_type
) {
3746 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3747 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3752 fg
= diri
->pick_dirfrag(info
.dname
);
3753 dir
= diri
->get_dirfrag(fg
);
3755 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3758 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3764 case CEPH_LOCK_IAUTH
:
3765 case CEPH_LOCK_ILINK
:
3766 case CEPH_LOCK_IDFT
:
3767 case CEPH_LOCK_IFILE
:
3768 case CEPH_LOCK_INEST
:
3769 case CEPH_LOCK_IXATTR
:
3770 case CEPH_LOCK_ISNAP
:
3771 case CEPH_LOCK_IFLOCK
:
3772 case CEPH_LOCK_IPOLICY
:
3774 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3776 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3779 switch (lock_type
) {
3780 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3781 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3782 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3783 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3784 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3785 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3786 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3787 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3788 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3793 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3801 /* This function DOES put the passed message before returning */
3802 void Locker::handle_lock(MLock
*m
)
3804 // nobody should be talking to us during recovery.
3805 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3807 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3809 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3814 switch (lock
->get_type()) {
3816 case CEPH_LOCK_IAUTH
:
3817 case CEPH_LOCK_ILINK
:
3818 case CEPH_LOCK_ISNAP
:
3819 case CEPH_LOCK_IXATTR
:
3820 case CEPH_LOCK_IFLOCK
:
3821 case CEPH_LOCK_IPOLICY
:
3822 handle_simple_lock(lock
, m
);
3825 case CEPH_LOCK_IDFT
:
3826 case CEPH_LOCK_INEST
:
3827 //handle_scatter_lock((ScatterLock*)lock, m);
3830 case CEPH_LOCK_IFILE
:
3831 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3835 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3845 // ==========================================================================
3848 /** This function may take a reference to m if it needs one, but does
3849 * not put references. */
3850 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3852 MDSCacheObject
*parent
= lock
->get_parent();
3853 if (parent
->is_auth() &&
3854 lock
->get_state() != LOCK_SYNC
&&
3855 !parent
->is_frozen()) {
3856 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3857 << " on " << *parent
<< dendl
;
3858 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3859 if (lock
->is_stable()) {
3862 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3863 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3864 new C_MDS_RetryMessage(mds
, m
->get()));
3867 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3868 << " on " << *parent
<< dendl
;
3869 // replica should retry
3873 /* This function DOES put the passed message before returning */
3874 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3876 int from
= m
->get_asker();
3878 dout(10) << "handle_simple_lock " << *m
3879 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3881 if (mds
->is_rejoin()) {
3882 if (lock
->get_parent()->is_rejoining()) {
3883 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3884 << ", dropping " << *m
<< dendl
;
3890 switch (m
->get_action()) {
3893 assert(lock
->get_state() == LOCK_LOCK
);
3894 lock
->decode_locked_state(m
->get_data());
3895 lock
->set_state(LOCK_SYNC
);
3896 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3900 assert(lock
->get_state() == LOCK_SYNC
);
3901 lock
->set_state(LOCK_SYNC_LOCK
);
3902 if (lock
->is_leased())
3903 revoke_client_leases(lock
);
3904 eval_gather(lock
, true);
3905 if (lock
->is_unstable_and_locked())
3906 mds
->mdlog
->flush();
3911 case LOCK_AC_LOCKACK
:
3912 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3913 lock
->get_state() == LOCK_SYNC_EXCL
);
3914 assert(lock
->is_gathering(from
));
3915 lock
->remove_gather(from
);
3917 if (lock
->is_gathering()) {
3918 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3919 << ", still gathering " << lock
->get_gather_set() << dendl
;
3921 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3922 << ", last one" << dendl
;
3927 case LOCK_AC_REQRDLOCK
:
3928 handle_reqrdlock(lock
, m
);
3936 /* unused, currently.
3938 class C_Locker_SimpleEval : public Context {
3942 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3943 void finish(int r) {
3944 locker->try_simple_eval(lock);
3948 void Locker::try_simple_eval(SimpleLock *lock)
3950 // unstable and ambiguous auth?
3951 if (!lock->is_stable() &&
3952 lock->get_parent()->is_ambiguous_auth()) {
3953 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3954 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3955 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3959 if (!lock->get_parent()->is_auth()) {
3960 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3964 if (!lock->get_parent()->can_auth_pin()) {
3965 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3966 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3967 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3971 if (lock->is_stable())
3977 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3979 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3981 assert(lock
->get_parent()->is_auth());
3982 assert(lock
->is_stable());
3984 if (lock
->get_parent()->is_freezing_or_frozen()) {
3985 // dentry lock in unreadable state can block path traverse
3986 if ((lock
->get_type() != CEPH_LOCK_DN
||
3987 lock
->get_state() == LOCK_SYNC
||
3988 lock
->get_parent()->is_frozen()))
3992 if (mdcache
->is_readonly()) {
3993 if (lock
->get_state() != LOCK_SYNC
) {
3994 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3995 simple_sync(lock
, need_issue
);
4002 if (lock
->get_type() != CEPH_LOCK_DN
) {
4003 in
= static_cast<CInode
*>(lock
->get_parent());
4004 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
4008 if (lock
->get_state() != LOCK_EXCL
&&
4009 in
&& in
->get_target_loner() >= 0 &&
4010 (wanted
& CEPH_CAP_GEXCL
)) {
4011 dout(7) << "simple_eval stable, going to excl " << *lock
4012 << " on " << *lock
->get_parent() << dendl
;
4013 simple_excl(lock
, need_issue
);
4017 else if (lock
->get_state() != LOCK_SYNC
&&
4018 !lock
->is_wrlocked() &&
4019 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4020 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4021 dout(7) << "simple_eval stable, syncing " << *lock
4022 << " on " << *lock
->get_parent() << dendl
;
4023 simple_sync(lock
, need_issue
);
4030 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4032 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4033 assert(lock
->get_parent()->is_auth());
4034 assert(lock
->is_stable());
4037 if (lock
->get_cap_shift())
4038 in
= static_cast<CInode
*>(lock
->get_parent());
4040 int old_state
= lock
->get_state();
4042 if (old_state
!= LOCK_TSYN
) {
4044 switch (lock
->get_state()) {
4045 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4046 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4047 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4048 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4049 default: ceph_abort();
4053 if (lock
->is_wrlocked())
4056 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4057 send_lock_message(lock
, LOCK_AC_SYNC
);
4058 lock
->init_gather();
4062 if (in
&& in
->is_head()) {
4063 if (in
->issued_caps_need_gather(lock
)) {
4072 bool need_recover
= false;
4073 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4075 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4076 mds
->mdcache
->queue_file_recover(in
);
4077 need_recover
= true;
4082 if (!gather
&& lock
->is_dirty()) {
4083 lock
->get_parent()->auth_pin(lock
);
4084 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4085 mds
->mdlog
->flush();
4090 lock
->get_parent()->auth_pin(lock
);
4092 mds
->mdcache
->do_file_recover();
4097 if (lock
->get_parent()->is_replicated()) { // FIXME
4099 lock
->encode_locked_state(data
);
4100 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4102 lock
->set_state(LOCK_SYNC
);
4103 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4104 if (in
&& in
->is_head()) {
4113 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4115 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4116 assert(lock
->get_parent()->is_auth());
4117 assert(lock
->is_stable());
4120 if (lock
->get_cap_shift())
4121 in
= static_cast<CInode
*>(lock
->get_parent());
4123 switch (lock
->get_state()) {
4124 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4125 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4126 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4127 default: ceph_abort();
4131 if (lock
->is_rdlocked())
4133 if (lock
->is_wrlocked())
4136 if (lock
->get_parent()->is_replicated() &&
4137 lock
->get_state() != LOCK_LOCK_EXCL
&&
4138 lock
->get_state() != LOCK_XSYN_EXCL
) {
4139 send_lock_message(lock
, LOCK_AC_LOCK
);
4140 lock
->init_gather();
4144 if (in
&& in
->is_head()) {
4145 if (in
->issued_caps_need_gather(lock
)) {
4155 lock
->get_parent()->auth_pin(lock
);
4157 lock
->set_state(LOCK_EXCL
);
4158 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4168 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4170 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4171 assert(lock
->get_parent()->is_auth());
4172 assert(lock
->is_stable());
4173 assert(lock
->get_state() != LOCK_LOCK
);
4176 if (lock
->get_cap_shift())
4177 in
= static_cast<CInode
*>(lock
->get_parent());
4179 int old_state
= lock
->get_state();
4181 switch (lock
->get_state()) {
4182 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4183 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4184 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4185 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4186 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4188 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4189 default: ceph_abort();
4193 if (lock
->is_leased()) {
4195 revoke_client_leases(lock
);
4197 if (lock
->is_rdlocked())
4199 if (in
&& in
->is_head()) {
4200 if (in
->issued_caps_need_gather(lock
)) {
4209 bool need_recover
= false;
4210 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4212 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4213 mds
->mdcache
->queue_file_recover(in
);
4214 need_recover
= true;
4219 if (lock
->get_parent()->is_replicated() &&
4220 lock
->get_state() == LOCK_MIX_LOCK
&&
4222 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4224 // move to second stage of gather now, so we don't send the lock action later.
4225 if (lock
->get_state() == LOCK_MIX_LOCK
)
4226 lock
->set_state(LOCK_MIX_LOCK2
);
4228 if (lock
->get_parent()->is_replicated() &&
4229 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4231 send_lock_message(lock
, LOCK_AC_LOCK
);
4232 lock
->init_gather();
4236 if (!gather
&& lock
->is_dirty()) {
4237 lock
->get_parent()->auth_pin(lock
);
4238 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4239 mds
->mdlog
->flush();
4244 lock
->get_parent()->auth_pin(lock
);
4246 mds
->mdcache
->do_file_recover();
4248 lock
->set_state(LOCK_LOCK
);
4249 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4254 void Locker::simple_xlock(SimpleLock
*lock
)
4256 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4257 assert(lock
->get_parent()->is_auth());
4258 //assert(lock->is_stable());
4259 assert(lock
->get_state() != LOCK_XLOCK
);
4262 if (lock
->get_cap_shift())
4263 in
= static_cast<CInode
*>(lock
->get_parent());
4265 if (lock
->is_stable())
4266 lock
->get_parent()->auth_pin(lock
);
4268 switch (lock
->get_state()) {
4270 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4271 default: ceph_abort();
4275 if (lock
->is_rdlocked())
4277 if (lock
->is_wrlocked())
4280 if (in
&& in
->is_head()) {
4281 if (in
->issued_caps_need_gather(lock
)) {
4288 lock
->set_state(LOCK_PREXLOCK
);
4289 //assert("shouldn't be called if we are already xlockable" == 0);
4297 // ==========================================================================
4302 Some notes on scatterlocks.
4304 - The scatter/gather is driven by the inode lock. The scatter always
4305 brings in the latest metadata from the fragments.
4307 - When in a scattered/MIX state, fragments are only allowed to
4308 update/be written to if the accounted stat matches the inode's
4311 - That means, on gather, we _only_ assimilate diffs for frag metadata
4312 that match the current version, because those are the only ones
4313 written during this scatter/gather cycle. (Others didn't permit
4314 it.) We increment the version and journal this to disk.
4316 - When possible, we also simultaneously update our local frag
4317 accounted stats to match.
4319 - On scatter, the new inode info is broadcast to frags, both local
4320 and remote. If possible (auth and !frozen), the dirfrag auth
4321 should update the accounted state (if it isn't already up to date).
4322 Note that this may occur on both the local inode auth node and
4323 inode replicas, so there are two potential paths. If it is NOT
4324 possible, they need to mark_stale to prevent any possible writes.
4326 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4327 only). Both are opportunities to update the frag accounted stats,
4328 even though only the MIX case is affected by a stale dirfrag.
4330 - Because many scatter/gather cycles can potentially go by without a
4331 frag being able to update its accounted stats (due to being frozen
4332 by exports/refragments in progress), the frag may have (even very)
4333 old stat versions. That's fine. If when we do want to update it,
4334 we can update accounted_* and the version first.
4338 class C_Locker_ScatterWB
: public LockerLogContext
{
4342 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4343 LockerLogContext(l
), lock(sl
), mut(m
) {}
4344 void finish(int r
) override
{
4345 locker
->scatter_writebehind_finish(lock
, mut
);
4349 void Locker::scatter_writebehind(ScatterLock
*lock
)
4351 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4352 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4355 MutationRef
mut(new MutationImpl());
4356 mut
->ls
= mds
->mdlog
->get_current_segment();
4358 // forcefully take a wrlock
4359 lock
->get_wrlock(true);
4360 mut
->wrlocks
.insert(lock
);
4361 mut
->locks
.insert(lock
);
4363 in
->pre_cow_old_inode(); // avoid cow mayhem
4365 auto &pi
= in
->project_inode();
4366 pi
.inode
.version
= in
->pre_dirty();
4368 in
->finish_scatter_gather_update(lock
->get_type());
4369 lock
->start_flush();
4371 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4372 mds
->mdlog
->start_entry(le
);
4374 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4375 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4377 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4379 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4382 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4384 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4385 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4386 in
->pop_and_dirty_projected_inode(mut
->ls
);
4388 lock
->finish_flush();
4390 // if replicas may have flushed in a mix->lock state, send another
4391 // message so they can finish_flush().
4392 if (in
->is_replicated()) {
4393 switch (lock
->get_state()) {
4395 case LOCK_MIX_LOCK2
:
4398 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4403 drop_locks(mut
.get());
4406 if (lock
->is_stable())
4407 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4409 //scatter_eval_gather(lock);
4412 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4414 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4416 assert(lock
->get_parent()->is_auth());
4417 assert(lock
->is_stable());
4419 if (lock
->get_parent()->is_freezing_or_frozen()) {
4420 dout(20) << " freezing|frozen" << dendl
;
4424 if (mdcache
->is_readonly()) {
4425 if (lock
->get_state() != LOCK_SYNC
) {
4426 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4427 simple_sync(lock
, need_issue
);
4432 if (!lock
->is_rdlocked() &&
4433 lock
->get_state() != LOCK_MIX
&&
4434 lock
->get_scatter_wanted()) {
4435 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4436 << " on " << *lock
->get_parent() << dendl
;
4437 scatter_mix(lock
, need_issue
);
4441 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4442 // in general, we want to keep INEST writable at all times.
4443 if (!lock
->is_rdlocked()) {
4444 if (lock
->get_parent()->is_replicated()) {
4445 if (lock
->get_state() != LOCK_MIX
)
4446 scatter_mix(lock
, need_issue
);
4448 if (lock
->get_state() != LOCK_LOCK
)
4449 simple_lock(lock
, need_issue
);
4455 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4456 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4457 // i _should_ be sync.
4458 if (!lock
->is_wrlocked() &&
4459 lock
->get_state() != LOCK_SYNC
) {
4460 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4461 simple_sync(lock
, need_issue
);
4468 * mark a scatterlock to indicate that the dir fnode has some dirty data
4470 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4473 if (lock
->get_updated_item()->is_on_list()) {
4474 dout(10) << "mark_updated_scatterlock " << *lock
4475 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4477 updated_scatterlocks
.push_back(lock
->get_updated_item());
4478 utime_t now
= ceph_clock_now();
4479 lock
->set_update_stamp(now
);
4480 dout(10) << "mark_updated_scatterlock " << *lock
4481 << " - added at " << now
<< dendl
;
4486 * this is called by scatter_tick and LogSegment::try_to_trim() when
4487 * trying to flush dirty scattered data (i.e. updated fnode) back to
4490 * we need to lock|scatter in order to push fnode changes into the
4493 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4495 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4497 if (p
->is_frozen() || p
->is_freezing()) {
4498 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4500 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4501 else if (lock
->is_dirty())
4502 // just requeue. not ideal.. starvation prone..
4503 updated_scatterlocks
.push_back(lock
->get_updated_item());
4507 if (p
->is_ambiguous_auth()) {
4508 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4510 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4511 else if (lock
->is_dirty())
4512 // just requeue. not ideal.. starvation prone..
4513 updated_scatterlocks
.push_back(lock
->get_updated_item());
4520 if (lock
->is_stable()) {
4521 // can we do it now?
4522 // (only if we're not replicated.. if we are, we really do need
4523 // to nudge the lock state!)
4525 actually, even if we're not replicated, we can't stay in MIX, because another mds
4526 could discover and replicate us at any time. if that happens while we're flushing,
4527 they end up in MIX but their inode has the old scatterstat version.
4529 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4530 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4531 scatter_writebehind(lock);
4533 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4538 if (mdcache
->is_readonly()) {
4539 if (lock
->get_state() != LOCK_SYNC
) {
4540 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4541 simple_sync(static_cast<ScatterLock
*>(lock
));
4546 // adjust lock state
4547 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4548 switch (lock
->get_type()) {
4549 case CEPH_LOCK_IFILE
:
4550 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4551 scatter_mix(static_cast<ScatterLock
*>(lock
));
4552 else if (lock
->get_state() != LOCK_LOCK
)
4553 simple_lock(static_cast<ScatterLock
*>(lock
));
4555 simple_sync(static_cast<ScatterLock
*>(lock
));
4558 case CEPH_LOCK_IDFT
:
4559 case CEPH_LOCK_INEST
:
4560 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4562 else if (lock
->get_state() != LOCK_LOCK
)
4571 if (lock
->is_stable() && count
== 2) {
4572 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4573 // this should only realy happen when called via
4574 // handle_file_lock due to AC_NUDGE, because the rest of the
4575 // time we are replicated or have dirty data and won't get
4576 // called. bailing here avoids an infinite loop.
4581 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4583 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4588 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4589 << *lock
<< " on " << *p
<< dendl
;
4590 // request unscatter?
4591 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4592 if (!mds
->is_cluster_degraded() ||
4593 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4594 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4598 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4600 // also, requeue, in case we had wrong auth or something
4601 if (lock
->is_dirty())
4602 updated_scatterlocks
.push_back(lock
->get_updated_item());
4606 void Locker::scatter_tick()
4608 dout(10) << "scatter_tick" << dendl
;
4611 utime_t now
= ceph_clock_now();
4612 int n
= updated_scatterlocks
.size();
4613 while (!updated_scatterlocks
.empty()) {
4614 ScatterLock
*lock
= updated_scatterlocks
.front();
4616 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4618 if (!lock
->is_dirty()) {
4619 updated_scatterlocks
.pop_front();
4620 dout(10) << " removing from updated_scatterlocks "
4621 << *lock
<< " " << *lock
->get_parent() << dendl
;
4624 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4626 updated_scatterlocks
.pop_front();
4627 scatter_nudge(lock
, 0);
4629 mds
->mdlog
->flush();
4633 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4635 dout(10) << "scatter_tempsync " << *lock
4636 << " on " << *lock
->get_parent() << dendl
;
4637 assert(lock
->get_parent()->is_auth());
4638 assert(lock
->is_stable());
4640 assert(0 == "not fully implemented, at least not for filelock");
4642 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4644 switch (lock
->get_state()) {
4645 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4646 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4647 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4648 default: ceph_abort();
4652 if (lock
->is_wrlocked())
4655 if (lock
->get_cap_shift() &&
4657 in
->issued_caps_need_gather(lock
)) {
4665 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4666 in
->is_replicated()) {
4667 lock
->init_gather();
4668 send_lock_message(lock
, LOCK_AC_LOCK
);
4676 lock
->set_state(LOCK_TSYN
);
4677 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4678 if (lock
->get_cap_shift()) {
4689 // ==========================================================================
4692 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4694 dout(7) << "local_wrlock_grab on " << *lock
4695 << " on " << *lock
->get_parent() << dendl
;
4697 assert(lock
->get_parent()->is_auth());
4698 assert(lock
->can_wrlock());
4699 assert(!mut
->wrlocks
.count(lock
));
4700 lock
->get_wrlock(mut
->get_client());
4701 mut
->wrlocks
.insert(lock
);
4702 mut
->locks
.insert(lock
);
4705 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4707 dout(7) << "local_wrlock_start on " << *lock
4708 << " on " << *lock
->get_parent() << dendl
;
4710 assert(lock
->get_parent()->is_auth());
4711 if (lock
->can_wrlock()) {
4712 assert(!mut
->wrlocks
.count(lock
));
4713 lock
->get_wrlock(mut
->get_client());
4714 mut
->wrlocks
.insert(lock
);
4715 mut
->locks
.insert(lock
);
4718 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4723 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4725 dout(7) << "local_wrlock_finish on " << *lock
4726 << " on " << *lock
->get_parent() << dendl
;
4728 mut
->wrlocks
.erase(lock
);
4729 mut
->locks
.erase(lock
);
4730 if (lock
->get_num_wrlocks() == 0) {
4731 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4732 SimpleLock::WAIT_WR
|
4733 SimpleLock::WAIT_RD
);
4737 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4739 dout(7) << "local_xlock_start on " << *lock
4740 << " on " << *lock
->get_parent() << dendl
;
4742 assert(lock
->get_parent()->is_auth());
4743 if (!lock
->can_xlock_local()) {
4744 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4748 lock
->get_xlock(mut
, mut
->get_client());
4749 mut
->xlocks
.insert(lock
);
4750 mut
->locks
.insert(lock
);
4754 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4756 dout(7) << "local_xlock_finish on " << *lock
4757 << " on " << *lock
->get_parent() << dendl
;
4759 mut
->xlocks
.erase(lock
);
4760 mut
->locks
.erase(lock
);
4762 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4763 SimpleLock::WAIT_WR
|
4764 SimpleLock::WAIT_RD
);
4769 // ==========================================================================
4773 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4775 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4776 int loner_wanted
, other_wanted
;
4777 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4778 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4779 << " loner_wanted=" << gcap_string(loner_wanted
)
4780 << " other_wanted=" << gcap_string(other_wanted
)
4781 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4784 assert(lock
->get_parent()->is_auth());
4785 assert(lock
->is_stable());
4787 if (lock
->get_parent()->is_freezing_or_frozen())
4790 if (mdcache
->is_readonly()) {
4791 if (lock
->get_state() != LOCK_SYNC
) {
4792 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4793 simple_sync(lock
, need_issue
);
4799 if (lock
->get_state() == LOCK_EXCL
) {
4800 dout(20) << " is excl" << dendl
;
4801 int loner_issued
, other_issued
, xlocker_issued
;
4802 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4803 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4804 << " other_issued=" << gcap_string(other_issued
)
4805 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4807 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4808 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4809 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4810 dout(20) << " should lose it" << dendl
;
4811 // we should lose it.
4822 // -> any writer means MIX; RD doesn't matter.
4823 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4824 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4825 scatter_mix(lock
, need_issue
);
4826 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4827 simple_sync(lock
, need_issue
);
4829 dout(10) << " waiting for wrlock to drain" << dendl
;
4834 else if (lock
->get_state() != LOCK_EXCL
&&
4835 !lock
->is_rdlocked() &&
4836 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4837 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4838 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4839 in
->get_target_loner() >= 0) {
4840 dout(7) << "file_eval stable, bump to loner " << *lock
4841 << " on " << *lock
->get_parent() << dendl
;
4842 file_excl(lock
, need_issue
);
4846 else if (lock
->get_state() != LOCK_MIX
&&
4847 !lock
->is_rdlocked() &&
4848 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4849 (lock
->get_scatter_wanted() ||
4850 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4851 dout(7) << "file_eval stable, bump to mixed " << *lock
4852 << " on " << *lock
->get_parent() << dendl
;
4853 scatter_mix(lock
, need_issue
);
4857 else if (lock
->get_state() != LOCK_SYNC
&&
4858 !lock
->is_wrlocked() && // drain wrlocks first!
4859 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4860 !(wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) &&
4861 !((lock
->get_state() == LOCK_MIX
) &&
4862 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4863 //((wanted & CEPH_CAP_RD) ||
4864 //in->is_replicated() ||
4865 //lock->is_leased() ||
4866 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4868 dout(7) << "file_eval stable, bump to sync " << *lock
4869 << " on " << *lock
->get_parent() << dendl
;
4870 simple_sync(lock
, need_issue
);
4876 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4878 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4880 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4881 assert(in
->is_auth());
4882 assert(lock
->is_stable());
4884 if (lock
->get_state() == LOCK_LOCK
) {
4885 in
->start_scatter(lock
);
4886 if (in
->is_replicated()) {
4888 bufferlist softdata
;
4889 lock
->encode_locked_state(softdata
);
4891 // bcast to replicas
4892 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4896 lock
->set_state(LOCK_MIX
);
4897 lock
->clear_scatter_wanted();
4898 if (lock
->get_cap_shift()) {
4906 switch (lock
->get_state()) {
4907 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4908 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4909 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
4910 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4911 default: ceph_abort();
4915 if (lock
->is_rdlocked())
4917 if (in
->is_replicated()) {
4918 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
4919 send_lock_message(lock
, LOCK_AC_MIX
);
4920 lock
->init_gather();
4924 if (lock
->is_leased()) {
4925 revoke_client_leases(lock
);
4928 if (lock
->get_cap_shift() &&
4930 in
->issued_caps_need_gather(lock
)) {
4937 bool need_recover
= false;
4938 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4939 mds
->mdcache
->queue_file_recover(in
);
4940 need_recover
= true;
4945 lock
->get_parent()->auth_pin(lock
);
4947 mds
->mdcache
->do_file_recover();
4949 in
->start_scatter(lock
);
4950 lock
->set_state(LOCK_MIX
);
4951 lock
->clear_scatter_wanted();
4952 if (in
->is_replicated()) {
4953 bufferlist softdata
;
4954 lock
->encode_locked_state(softdata
);
4955 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4957 if (lock
->get_cap_shift()) {
4968 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4970 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4971 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4973 assert(in
->is_auth());
4974 assert(lock
->is_stable());
4976 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4977 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4979 switch (lock
->get_state()) {
4980 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4981 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4982 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4983 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4984 default: ceph_abort();
4988 if (lock
->is_rdlocked())
4990 if (lock
->is_wrlocked())
4993 if (in
->is_replicated() &&
4994 lock
->get_state() != LOCK_LOCK_EXCL
&&
4995 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
4996 send_lock_message(lock
, LOCK_AC_LOCK
);
4997 lock
->init_gather();
5000 if (lock
->is_leased()) {
5001 revoke_client_leases(lock
);
5004 if (in
->is_head() &&
5005 in
->issued_caps_need_gather(lock
)) {
5012 bool need_recover
= false;
5013 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5014 mds
->mdcache
->queue_file_recover(in
);
5015 need_recover
= true;
5020 lock
->get_parent()->auth_pin(lock
);
5022 mds
->mdcache
->do_file_recover();
5024 lock
->set_state(LOCK_EXCL
);
5032 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5034 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5035 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5036 assert(in
->is_auth());
5037 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5039 switch (lock
->get_state()) {
5040 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5041 default: ceph_abort();
5045 if (lock
->is_wrlocked())
5048 if (in
->is_head() &&
5049 in
->issued_caps_need_gather(lock
)) {
5058 lock
->get_parent()->auth_pin(lock
);
5060 lock
->set_state(LOCK_XSYN
);
5061 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5069 void Locker::file_recover(ScatterLock
*lock
)
5071 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5072 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5074 assert(in
->is_auth());
5075 //assert(lock->is_stable());
5076 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5081 if (in->is_replicated()
5082 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5083 send_lock_message(lock, LOCK_AC_LOCK);
5084 lock->init_gather();
5088 if (in
->is_head() &&
5089 in
->issued_caps_need_gather(lock
)) {
5094 lock
->set_state(LOCK_SCAN
);
5096 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5098 mds
->mdcache
->queue_file_recover(in
);
5103 /* This function DOES put the passed message before returning */
5104 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5106 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5107 int from
= m
->get_asker();
5109 if (mds
->is_rejoin()) {
5110 if (in
->is_rejoining()) {
5111 dout(7) << "handle_file_lock still rejoining " << *in
5112 << ", dropping " << *m
<< dendl
;
5118 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5120 << " from mds." << from
<< " "
5123 bool caps
= lock
->get_cap_shift();
5125 switch (m
->get_action()) {
5128 assert(lock
->get_state() == LOCK_LOCK
||
5129 lock
->get_state() == LOCK_MIX
||
5130 lock
->get_state() == LOCK_MIX_SYNC2
);
5132 if (lock
->get_state() == LOCK_MIX
) {
5133 lock
->set_state(LOCK_MIX_SYNC
);
5134 eval_gather(lock
, true);
5135 if (lock
->is_unstable_and_locked())
5136 mds
->mdlog
->flush();
5140 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5141 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5144 lock
->decode_locked_state(m
->get_data());
5145 lock
->set_state(LOCK_SYNC
);
5150 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5155 switch (lock
->get_state()) {
5156 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5157 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5158 default: ceph_abort();
5161 eval_gather(lock
, true);
5162 if (lock
->is_unstable_and_locked())
5163 mds
->mdlog
->flush();
5167 case LOCK_AC_LOCKFLUSHED
:
5168 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5169 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5170 // wake up scatter_nudge waiters
5171 if (lock
->is_stable())
5172 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5176 assert(lock
->get_state() == LOCK_SYNC
||
5177 lock
->get_state() == LOCK_LOCK
||
5178 lock
->get_state() == LOCK_SYNC_MIX2
);
5180 if (lock
->get_state() == LOCK_SYNC
) {
5182 lock
->set_state(LOCK_SYNC_MIX
);
5183 eval_gather(lock
, true);
5184 if (lock
->is_unstable_and_locked())
5185 mds
->mdlog
->flush();
5190 lock
->set_state(LOCK_MIX
);
5191 lock
->decode_locked_state(m
->get_data());
5196 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5201 case LOCK_AC_LOCKACK
:
5202 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5203 lock
->get_state() == LOCK_MIX_LOCK
||
5204 lock
->get_state() == LOCK_MIX_LOCK2
||
5205 lock
->get_state() == LOCK_MIX_EXCL
||
5206 lock
->get_state() == LOCK_SYNC_EXCL
||
5207 lock
->get_state() == LOCK_SYNC_MIX
||
5208 lock
->get_state() == LOCK_MIX_TSYN
);
5209 assert(lock
->is_gathering(from
));
5210 lock
->remove_gather(from
);
5212 if (lock
->get_state() == LOCK_MIX_LOCK
||
5213 lock
->get_state() == LOCK_MIX_LOCK2
||
5214 lock
->get_state() == LOCK_MIX_EXCL
||
5215 lock
->get_state() == LOCK_MIX_TSYN
) {
5216 lock
->decode_locked_state(m
->get_data());
5217 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5218 // delay calling scatter_writebehind().
5219 lock
->clear_flushed();
5222 if (lock
->is_gathering()) {
5223 dout(7) << "handle_file_lock " << *in
<< " from " << from
5224 << ", still gathering " << lock
->get_gather_set() << dendl
;
5226 dout(7) << "handle_file_lock " << *in
<< " from " << from
5227 << ", last one" << dendl
;
5232 case LOCK_AC_SYNCACK
:
5233 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5234 assert(lock
->is_gathering(from
));
5235 lock
->remove_gather(from
);
5237 lock
->decode_locked_state(m
->get_data());
5239 if (lock
->is_gathering()) {
5240 dout(7) << "handle_file_lock " << *in
<< " from " << from
5241 << ", still gathering " << lock
->get_gather_set() << dendl
;
5243 dout(7) << "handle_file_lock " << *in
<< " from " << from
5244 << ", last one" << dendl
;
5249 case LOCK_AC_MIXACK
:
5250 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5251 assert(lock
->is_gathering(from
));
5252 lock
->remove_gather(from
);
5254 if (lock
->is_gathering()) {
5255 dout(7) << "handle_file_lock " << *in
<< " from " << from
5256 << ", still gathering " << lock
->get_gather_set() << dendl
;
5258 dout(7) << "handle_file_lock " << *in
<< " from " << from
5259 << ", last one" << dendl
;
5266 case LOCK_AC_REQSCATTER
:
5267 if (lock
->is_stable()) {
5268 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5269 * because the replica should be holding an auth_pin if they're
5270 * doing this (and thus, we are freezing, not frozen, and indefinite
5271 * starvation isn't an issue).
5273 dout(7) << "handle_file_lock got scatter request on " << *lock
5274 << " on " << *lock
->get_parent() << dendl
;
5275 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5278 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5279 << " on " << *lock
->get_parent() << dendl
;
5280 lock
->set_scatter_wanted();
5284 case LOCK_AC_REQUNSCATTER
:
5285 if (lock
->is_stable()) {
5286 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5287 * because the replica should be holding an auth_pin if they're
5288 * doing this (and thus, we are freezing, not frozen, and indefinite
5289 * starvation isn't an issue).
5291 dout(7) << "handle_file_lock got unscatter request on " << *lock
5292 << " on " << *lock
->get_parent() << dendl
;
5293 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5294 simple_lock(lock
); // FIXME tempsync?
5296 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5297 << " on " << *lock
->get_parent() << dendl
;
5298 lock
->set_unscatter_wanted();
5302 case LOCK_AC_REQRDLOCK
:
5303 handle_reqrdlock(lock
, m
);
5307 if (!lock
->get_parent()->is_auth()) {
5308 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5309 << " on " << *lock
->get_parent() << dendl
;
5310 } else if (!lock
->get_parent()->is_replicated()) {
5311 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5312 << " on " << *lock
->get_parent() << dendl
;
5314 dout(7) << "handle_file_lock trying nudge on " << *lock
5315 << " on " << *lock
->get_parent() << dendl
;
5316 scatter_nudge(lock
, 0, true);
5317 mds
->mdlog
->flush();