1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "MDSContext.h"
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
41 #include "messages/MMDSSlaveRequest.h"
45 #include "common/config.h"
48 #define dout_subsys ceph_subsys_mds
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
53 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
57 class LockerContext
: public MDSInternalContextBase
{
60 MDSRank
*get_mds() override
66 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
67 assert(locker
!= NULL
);
71 class LockerLogContext
: public MDSLogContextBase
{
74 MDSRank
*get_mds() override
80 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
81 assert(locker
!= NULL
);
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message
*m
)
89 switch (m
->get_type()) {
93 handle_lock(static_cast<MLock
*>(m
));
96 case MSG_MDS_INODEFILECAPS
:
97 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
101 case CEPH_MSG_CLIENT_CAPS
:
102 handle_client_caps(static_cast<MClientCaps
*>(m
));
105 case CEPH_MSG_CLIENT_CAPRELEASE
:
106 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
108 case CEPH_MSG_CLIENT_LEASE
:
109 handle_client_lease(static_cast<MClientLease
*>(m
));
113 derr
<< "locker unknown message " << m
->get_type() << dendl
;
114 assert(0 == "locker unknown message");
131 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
133 for (compact_map
<mds_rank_t
,unsigned>::iterator it
= lock
->get_parent()->replicas_begin();
134 it
!= lock
->get_parent()->replicas_end();
136 if (mds
->is_cluster_degraded() &&
137 mds
->mdsmap
->get_state(it
->first
) < MDSMap::STATE_REJOIN
)
139 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
140 mds
->send_message_mds(m
, it
->first
);
144 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
146 for (compact_map
<mds_rank_t
,unsigned>::iterator it
= lock
->get_parent()->replicas_begin();
147 it
!= lock
->get_parent()->replicas_end();
149 if (mds
->is_cluster_degraded() &&
150 mds
->mdsmap
->get_state(it
->first
) < MDSMap::STATE_REJOIN
)
152 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
154 mds
->send_message_mds(m
, it
->first
);
161 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
163 // rdlock ancestor snaps
165 rdlocks
.insert(&in
->snaplock
);
166 while (t
->get_projected_parent_dn()) {
167 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
168 rdlocks
.insert(&t
->snaplock
);
172 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
173 file_layout_t
**layout
)
175 //rdlock ancestor snaps
177 rdlocks
.insert(&in
->snaplock
);
178 rdlocks
.insert(&in
->policylock
);
179 bool found_layout
= false;
181 rdlocks
.insert(&t
->snaplock
);
183 rdlocks
.insert(&t
->policylock
);
184 if (t
->get_projected_inode()->has_layout()) {
185 *layout
= &t
->get_projected_inode()->layout
;
189 if (t
->get_projected_parent_dn() &&
190 t
->get_projected_parent_dn()->get_dir())
191 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
196 struct MarkEventOnDestruct
{
200 MarkEventOnDestruct(MDRequestRef
& _mdr
,
201 const char *_message
) : mdr(_mdr
),
204 ~MarkEventOnDestruct() {
206 mdr
->mark_event(message
);
210 /* If this function returns false, the mdr has been placed
211 * on the appropriate wait list */
212 bool Locker::acquire_locks(MDRequestRef
& mdr
,
213 set
<SimpleLock
*> &rdlocks
,
214 set
<SimpleLock
*> &wrlocks
,
215 set
<SimpleLock
*> &xlocks
,
216 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
217 CInode
*auth_pin_freeze
,
218 bool auth_pin_nonblock
)
220 if (mdr
->done_locking
&&
221 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
222 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
223 return true; // at least we had better be!
225 dout(10) << "acquire_locks " << *mdr
<< dendl
;
227 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
229 client_t client
= mdr
->get_client();
231 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
232 set
<MDSCacheObject
*> mustpin
; // items to authpin
235 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
236 dout(20) << " must xlock " << **p
<< " " << *(*p
)->get_parent() << dendl
;
238 mustpin
.insert((*p
)->get_parent());
240 // augment xlock with a versionlock?
241 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
242 CDentry
*dn
= (CDentry
*)(*p
)->get_parent();
246 if (xlocks
.count(&dn
->versionlock
))
247 continue; // we're xlocking the versionlock too; don't wrlock it!
249 if (mdr
->is_master()) {
250 // master. wrlock versionlock so we can pipeline dentry updates to journal.
251 wrlocks
.insert(&dn
->versionlock
);
253 // slave. exclusively lock the dentry version (i.e. block other journal updates).
254 // this makes rollback safe.
255 xlocks
.insert(&dn
->versionlock
);
256 sorted
.insert(&dn
->versionlock
);
259 if ((*p
)->get_type() > CEPH_LOCK_IVERSION
) {
260 // inode version lock?
261 CInode
*in
= (CInode
*)(*p
)->get_parent();
264 if (mdr
->is_master()) {
265 // master. wrlock versionlock so we can pipeline inode updates to journal.
266 wrlocks
.insert(&in
->versionlock
);
268 // slave. exclusively lock the inode version (i.e. block other journal updates).
269 // this makes rollback safe.
270 xlocks
.insert(&in
->versionlock
);
271 sorted
.insert(&in
->versionlock
);
277 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
278 MDSCacheObject
*object
= (*p
)->get_parent();
279 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
281 if (object
->is_auth())
282 mustpin
.insert(object
);
283 else if (!object
->is_auth() &&
284 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
285 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
286 dout(15) << " will also auth_pin " << *object
287 << " in case we need to request a scatter" << dendl
;
288 mustpin
.insert(object
);
293 if (remote_wrlocks
) {
294 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
295 MDSCacheObject
*object
= p
->first
->get_parent();
296 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
297 << *p
->first
<< " " << *object
<< dendl
;
298 sorted
.insert(p
->first
);
299 mustpin
.insert(object
);
304 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
307 MDSCacheObject
*object
= (*p
)->get_parent();
308 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
310 if (object
->is_auth())
311 mustpin
.insert(object
);
312 else if (!object
->is_auth() &&
313 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
314 dout(15) << " will also auth_pin " << *object
315 << " in case we need to request a rdlock" << dendl
;
316 mustpin
.insert(object
);
322 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
324 // can i auth pin them all now?
325 marker
.message
= "failed to authpin local pins";
326 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
329 MDSCacheObject
*object
= *p
;
331 dout(10) << " must authpin " << *object
<< dendl
;
333 if (mdr
->is_auth_pinned(object
)) {
334 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
336 if (mdr
->more()->is_remote_frozen_authpin
) {
337 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
339 // unfreeze auth pin for the wrong inode
340 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
344 if (!object
->is_auth()) {
345 if (!mdr
->locks
.empty())
346 drop_locks(mdr
.get());
347 if (object
->is_ambiguous_auth()) {
349 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
350 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
351 mdr
->drop_local_auth_pins();
354 mustpin_remote
[object
->authority().first
].insert(object
);
357 if (!object
->can_auth_pin()) {
359 drop_locks(mdr
.get());
360 mdr
->drop_local_auth_pins();
361 if (auth_pin_nonblock
) {
362 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
366 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
367 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
372 // ok, grab local auth pins
373 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
376 MDSCacheObject
*object
= *p
;
377 if (mdr
->is_auth_pinned(object
)) {
378 dout(10) << " already auth_pinned " << *object
<< dendl
;
379 } else if (object
->is_auth()) {
380 dout(10) << " auth_pinning " << *object
<< dendl
;
381 mdr
->auth_pin(object
);
385 // request remote auth_pins
386 if (!mustpin_remote
.empty()) {
387 marker
.message
= "requesting remote authpins";
388 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
389 p
!= mdr
->remote_auth_pins
.end();
391 if (mustpin
.count(p
->first
)) {
392 assert(p
->second
== p
->first
->authority().first
);
393 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
394 if (q
!= mustpin_remote
.end())
395 q
->second
.insert(p
->first
);
398 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
399 p
!= mustpin_remote
.end();
401 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
403 // wait for active auth
404 if (mds
->is_cluster_degraded() &&
405 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
406 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
407 if (mdr
->more()->waiting_on_slave
.empty())
408 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
412 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
413 MMDSSlaveRequest::OP_AUTHPIN
);
414 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
415 q
!= p
->second
.end();
417 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
418 MDSCacheObjectInfo info
;
419 (*q
)->set_object_info(info
);
420 req
->get_authpins().push_back(info
);
421 if (*q
== auth_pin_freeze
)
422 (*q
)->set_object_info(req
->get_authpin_freeze());
425 if (auth_pin_nonblock
)
426 req
->mark_nonblock();
427 mds
->send_message_mds(req
, p
->first
);
429 // put in waiting list
430 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
431 mdr
->more()->waiting_on_slave
.insert(p
->first
);
436 // caps i'll need to issue
437 set
<CInode
*> issue_set
;
441 // make sure they match currently acquired locks.
442 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
443 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
446 bool need_wrlock
= !!wrlocks
.count(*p
);
447 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
450 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
452 SimpleLock
*have
= *existing
;
454 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
455 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
458 if (mdr
->remote_wrlocks
.count(have
)) {
459 if (!need_remote_wrlock
||
460 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
461 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
462 << " " << *have
<< " " << *have
->get_parent() << dendl
;
463 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
466 if (need_wrlock
|| need_remote_wrlock
) {
467 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
468 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
470 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
471 if (need_remote_wrlock
)
472 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
476 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
477 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
482 // hose any stray locks
483 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
484 assert(need_wrlock
|| need_remote_wrlock
);
485 SimpleLock
*lock
= *existing
;
486 if (mdr
->wrlocks
.count(lock
)) {
488 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
489 else if (need_remote_wrlock
) // acquire remote_wrlock first
490 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
491 bool need_issue
= false;
492 wrlock_finish(lock
, mdr
.get(), &need_issue
);
494 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
498 while (existing
!= mdr
->locks
.end()) {
499 SimpleLock
*stray
= *existing
;
501 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
502 bool need_issue
= false;
503 if (mdr
->xlocks
.count(stray
)) {
504 xlock_finish(stray
, mdr
.get(), &need_issue
);
505 } else if (mdr
->rdlocks
.count(stray
)) {
506 rdlock_finish(stray
, mdr
.get(), &need_issue
);
508 // may have acquired both wrlock and remore wrlock
509 if (mdr
->wrlocks
.count(stray
))
510 wrlock_finish(stray
, mdr
.get(), &need_issue
);
511 if (mdr
->remote_wrlocks
.count(stray
))
512 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
515 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
519 if (mdr
->locking
&& *p
!= mdr
->locking
) {
520 cancel_locking(mdr
.get(), &issue_set
);
522 if (xlocks
.count(*p
)) {
523 marker
.message
= "failed to xlock, waiting";
524 if (!xlock_start(*p
, mdr
))
526 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
527 } else if (need_wrlock
|| need_remote_wrlock
) {
528 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
529 marker
.message
= "waiting for remote wrlocks";
530 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
533 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
534 marker
.message
= "failed to wrlock, waiting";
535 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
536 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
537 // can't take the wrlock because the scatter lock is gathering. need to
538 // release the remote wrlock, so that the gathering process can finish.
539 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
540 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
543 // nowait if we have already gotten remote wrlock
544 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
546 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
549 assert(mdr
->is_master());
550 if ((*p
)->is_scatterlock()) {
551 ScatterLock
*slock
= static_cast<ScatterLock
*>(*p
);
552 if (slock
->is_rejoin_mix()) {
553 // If there is a recovering mds who replcated an object when it failed
554 // and scatterlock in the object was in MIX state, It's possible that
555 // the recovering mds needs to take wrlock on the scatterlock when it
556 // replays unsafe requests. So this mds should delay taking rdlock on
557 // the scatterlock until the recovering mds finishes replaying unsafe.
558 // Otherwise unsafe requests may get replayed after current request.
561 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
562 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
563 // delay the rmdir request until the recovering mds has replayed unlink
565 if (mds
->is_cluster_degraded()) {
566 if (!mdr
->is_replay()) {
567 drop_locks(mdr
.get());
568 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
569 dout(10) << " rejoin mix scatterlock " << *slock
<< " " << *(*p
)->get_parent()
570 << ", waiting for cluster recovered" << dendl
;
571 marker
.message
= "rejoin mix scatterlock, waiting for cluster recovered";
575 slock
->clear_rejoin_mix();
580 marker
.message
= "failed to rdlock, waiting";
581 if (!rdlock_start(*p
, mdr
))
583 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
587 // any extra unneeded locks?
588 while (existing
!= mdr
->locks
.end()) {
589 SimpleLock
*stray
= *existing
;
591 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
592 bool need_issue
= false;
593 if (mdr
->xlocks
.count(stray
)) {
594 xlock_finish(stray
, mdr
.get(), &need_issue
);
595 } else if (mdr
->rdlocks
.count(stray
)) {
596 rdlock_finish(stray
, mdr
.get(), &need_issue
);
598 // may have acquired both wrlock and remore wrlock
599 if (mdr
->wrlocks
.count(stray
))
600 wrlock_finish(stray
, mdr
.get(), &need_issue
);
601 if (mdr
->remote_wrlocks
.count(stray
))
602 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
605 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
608 mdr
->done_locking
= true;
609 mdr
->set_mds_stamp(ceph_clock_now());
611 marker
.message
= "acquired locks";
614 issue_caps_set(issue_set
);
619 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
621 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
622 p
!= mut
->xlocks
.end();
624 MDSCacheObject
*object
= (*p
)->get_parent();
625 assert(object
->is_auth());
627 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
629 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
630 (*p
)->set_xlock_done();
634 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
636 while (!mut
->rdlocks
.empty()) {
638 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
639 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
641 pneed_issue
->insert(static_cast<CInode
*>(p
));
645 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
647 set
<mds_rank_t
> slaves
;
649 while (!mut
->xlocks
.empty()) {
650 SimpleLock
*lock
= *mut
->xlocks
.begin();
651 MDSCacheObject
*p
= lock
->get_parent();
653 assert(lock
->get_sm()->can_remote_xlock
);
654 slaves
.insert(p
->authority().first
);
656 mut
->locks
.erase(lock
);
657 mut
->xlocks
.erase(lock
);
661 xlock_finish(lock
, mut
, &ni
);
663 pneed_issue
->insert(static_cast<CInode
*>(p
));
666 while (!mut
->remote_wrlocks
.empty()) {
667 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
668 slaves
.insert(p
->second
);
669 if (mut
->wrlocks
.count(p
->first
) == 0)
670 mut
->locks
.erase(p
->first
);
671 mut
->remote_wrlocks
.erase(p
);
674 while (!mut
->wrlocks
.empty()) {
676 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
677 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
679 pneed_issue
->insert(static_cast<CInode
*>(p
));
682 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
683 if (!mds
->is_cluster_degraded() ||
684 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
685 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
686 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
687 MMDSSlaveRequest::OP_DROPLOCKS
);
688 mds
->send_message_mds(slavereq
, *p
);
693 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
695 SimpleLock
*lock
= mut
->locking
;
697 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
699 if (lock
->get_parent()->is_auth()) {
700 bool need_issue
= false;
701 if (lock
->get_state() == LOCK_PREXLOCK
) {
702 _finish_xlock(lock
, -1, &need_issue
);
703 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
704 lock
->get_num_xlocks() == 0) {
705 lock
->set_state(LOCK_XLOCKDONE
);
706 eval_gather(lock
, true, &need_issue
);
709 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
711 mut
->finish_locking(lock
);
714 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
717 set
<CInode
*> my_need_issue
;
719 pneed_issue
= &my_need_issue
;
722 cancel_locking(mut
, pneed_issue
);
723 _drop_non_rdlocks(mut
, pneed_issue
);
724 _drop_rdlocks(mut
, pneed_issue
);
726 if (pneed_issue
== &my_need_issue
)
727 issue_caps_set(*pneed_issue
);
728 mut
->done_locking
= false;
731 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
733 set
<CInode
*> my_need_issue
;
735 pneed_issue
= &my_need_issue
;
737 _drop_non_rdlocks(mut
, pneed_issue
);
739 if (pneed_issue
== &my_need_issue
)
740 issue_caps_set(*pneed_issue
);
743 void Locker::drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
745 set
<CInode
*> my_need_issue
;
747 pneed_issue
= &my_need_issue
;
749 _drop_rdlocks(mut
, pneed_issue
);
751 if (pneed_issue
== &my_need_issue
)
752 issue_caps_set(*pneed_issue
);
758 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
760 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
761 assert(!lock
->is_stable());
763 int next
= lock
->get_next_state();
766 bool caps
= lock
->get_cap_shift();
767 if (lock
->get_type() != CEPH_LOCK_DN
)
768 in
= static_cast<CInode
*>(lock
->get_parent());
770 bool need_issue
= false;
772 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
773 assert(!caps
|| in
!= NULL
);
774 if (caps
&& in
->is_head()) {
775 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
776 lock
->get_cap_shift(), lock
->get_cap_mask());
777 dout(10) << " next state is " << lock
->get_state_name(next
)
778 << " issued/allows loner " << gcap_string(loner_issued
)
779 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
780 << " xlocker " << gcap_string(xlocker_issued
)
781 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
782 << " other " << gcap_string(other_issued
)
783 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
786 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
787 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
788 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
792 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
793 bool auth
= lock
->get_parent()->is_auth();
794 if (!lock
->is_gathering() &&
795 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
796 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
797 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
798 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
799 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
800 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
801 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
802 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
803 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
804 lock
->get_state() != LOCK_MIX_SYNC2
806 dout(7) << "eval_gather finished gather on " << *lock
807 << " on " << *lock
->get_parent() << dendl
;
809 if (lock
->get_sm() == &sm_filelock
) {
811 if (in
->state_test(CInode::STATE_RECOVERING
)) {
812 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
814 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
815 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
816 mds
->mdcache
->queue_file_recover(in
);
817 mds
->mdcache
->do_file_recover();
822 if (!lock
->get_parent()->is_auth()) {
823 // replica: tell auth
824 mds_rank_t auth
= lock
->get_parent()->authority().first
;
826 if (lock
->get_parent()->is_rejoining() &&
827 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
828 dout(7) << "eval_gather finished gather, but still rejoining "
829 << *lock
->get_parent() << dendl
;
833 if (!mds
->is_cluster_degraded() ||
834 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
835 switch (lock
->get_state()) {
837 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
843 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
844 lock
->encode_locked_state(reply
->get_data());
845 mds
->send_message_mds(reply
, auth
);
846 next
= LOCK_MIX_SYNC2
;
847 (static_cast<ScatterLock
*>(lock
))->start_flush();
852 (static_cast<ScatterLock
*>(lock
))->finish_flush();
853 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
856 // do nothing, we already acked
861 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
862 mds
->send_message_mds(reply
, auth
);
863 next
= LOCK_SYNC_MIX2
;
870 lock
->encode_locked_state(data
);
871 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
872 (static_cast<ScatterLock
*>(lock
))->start_flush();
873 // we'll get an AC_LOCKFLUSHED to complete
884 // once the first (local) stage of mix->lock gather complete we can
885 // gather from replicas
886 if (lock
->get_state() == LOCK_MIX_LOCK
&&
887 lock
->get_parent()->is_replicated()) {
888 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
889 send_lock_message(lock
, LOCK_AC_LOCK
);
891 lock
->set_state(LOCK_MIX_LOCK2
);
895 if (lock
->is_dirty() && !lock
->is_flushed()) {
896 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
900 lock
->clear_flushed();
902 switch (lock
->get_state()) {
907 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
908 if (lock
->get_parent()->is_replicated()) {
910 lock
->encode_locked_state(softdata
);
911 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
913 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
918 if (next
!= LOCK_SYNC
)
927 if (lock
->get_parent()->is_replicated()) {
929 lock
->encode_locked_state(softdata
);
930 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
937 lock
->set_state(next
);
939 if (lock
->get_parent()->is_auth() &&
941 lock
->get_parent()->auth_unpin(lock
);
943 // drop loner before doing waiters
947 in
->get_wanted_loner() != in
->get_loner()) {
948 dout(10) << " trying to drop loner" << dendl
;
949 if (in
->try_drop_loner()) {
950 dout(10) << " dropped loner" << dendl
;
956 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
959 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
961 if (caps
&& in
->is_head())
964 if (lock
->get_parent()->is_auth() &&
966 try_eval(lock
, &need_issue
);
972 else if (in
->is_head())
978 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
980 bool need_issue
= caps_imported
;
981 list
<MDSInternalContextBase
*> finishers
;
983 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
986 if (in
->is_auth() && in
->is_head()) {
987 if (in
->choose_ideal_loner() >= 0) {
988 if (in
->try_set_loner()) {
989 dout(10) << "eval set loner to client." << in
->get_loner() << dendl
;
993 dout(10) << "eval want loner client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
995 dout(10) << "eval doesn't want loner" << dendl
;
999 if (mask
& CEPH_LOCK_IFILE
)
1000 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1001 if (mask
& CEPH_LOCK_IAUTH
)
1002 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1003 if (mask
& CEPH_LOCK_ILINK
)
1004 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1005 if (mask
& CEPH_LOCK_IXATTR
)
1006 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1007 if (mask
& CEPH_LOCK_INEST
)
1008 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1009 if (mask
& CEPH_LOCK_IFLOCK
)
1010 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1011 if (mask
& CEPH_LOCK_IPOLICY
)
1012 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1015 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1016 dout(10) << " trying to drop loner" << dendl
;
1017 if (in
->try_drop_loner()) {
1018 dout(10) << " dropped loner" << dendl
;
1021 if (in
->get_wanted_loner() >= 0) {
1022 if (in
->try_set_loner()) {
1023 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1027 dout(10) << "eval want loner client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1033 finish_contexts(g_ceph_context
, finishers
);
1035 if (need_issue
&& in
->is_head())
1038 dout(10) << "eval done" << dendl
;
1042 class C_Locker_Eval
: public LockerContext
{
1046 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1047 // We are used as an MDSCacheObject waiter, so should
1048 // only be invoked by someone already holding the big lock.
1049 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1050 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1052 void finish(int r
) override
{
1053 locker
->try_eval(p
, mask
);
1054 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1058 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1060 // unstable and ambiguous auth?
1061 if (p
->is_ambiguous_auth()) {
1062 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1063 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1067 if (p
->is_auth() && p
->is_frozen()) {
1068 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1069 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1073 if (mask
& CEPH_LOCK_DN
) {
1074 assert(mask
== CEPH_LOCK_DN
);
1075 bool need_issue
= false; // ignore this, no caps on dentries
1076 CDentry
*dn
= static_cast<CDentry
*>(p
);
1077 eval_any(&dn
->lock
, &need_issue
);
1079 CInode
*in
= static_cast<CInode
*>(p
);
1084 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1086 MDSCacheObject
*p
= lock
->get_parent();
1088 // unstable and ambiguous auth?
1089 if (p
->is_ambiguous_auth()) {
1090 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1091 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1095 if (!p
->is_auth()) {
1096 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1100 if (p
->is_frozen()) {
1101 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1102 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1107 * We could have a situation like:
1109 * - mds A authpins item on mds B
1110 * - mds B starts to freeze tree containing item
1111 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1112 * - mds B lock is unstable, sets scatter_wanted
1113 * - mds B lock stabilizes, calls try_eval.
1115 * We can defer while freezing without causing a deadlock. Honor
1116 * scatter_wanted flag here. This will never get deferred by the
1117 * checks above due to the auth_pin held by the master.
1119 if (lock
->is_scatterlock()) {
1120 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1121 if (slock
->get_scatter_wanted() &&
1122 slock
->get_state() != LOCK_MIX
) {
1123 scatter_mix(slock
, pneed_issue
);
1124 if (!lock
->is_stable())
1126 } else if (slock
->get_unscatter_wanted() &&
1127 slock
->get_state() != LOCK_LOCK
) {
1128 simple_lock(slock
, pneed_issue
);
1129 if (!lock
->is_stable()) {
1135 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1136 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1137 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1141 eval(lock
, pneed_issue
);
1144 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1146 bool need_issue
= false;
1147 list
<MDSInternalContextBase
*> finishers
;
1150 if (!in
->filelock
.is_stable())
1151 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1152 if (!in
->authlock
.is_stable())
1153 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1154 if (!in
->linklock
.is_stable())
1155 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1156 if (!in
->xattrlock
.is_stable())
1157 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1159 if (need_issue
&& in
->is_head()) {
1161 issue_set
->insert(in
);
1166 finish_contexts(g_ceph_context
, finishers
);
1169 void Locker::eval_scatter_gathers(CInode
*in
)
1171 bool need_issue
= false;
1172 list
<MDSInternalContextBase
*> finishers
;
1174 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1177 if (!in
->filelock
.is_stable())
1178 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1179 if (!in
->nestlock
.is_stable())
1180 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1181 if (!in
->dirfragtreelock
.is_stable())
1182 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1184 if (need_issue
&& in
->is_head())
1187 finish_contexts(g_ceph_context
, finishers
);
1190 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1192 switch (lock
->get_type()) {
1193 case CEPH_LOCK_IFILE
:
1194 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1195 case CEPH_LOCK_IDFT
:
1196 case CEPH_LOCK_INEST
:
1197 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1199 return simple_eval(lock
, need_issue
);
1204 // ------------------
1207 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1210 if (lock
->is_stable()) {
1211 if (lock
->get_parent()->is_auth()) {
1212 if (lock
->get_sm() == &sm_scatterlock
) {
1213 // not until tempsync is fully implemented
1214 //if (lock->get_parent()->is_replicated())
1215 //scatter_tempsync((ScatterLock*)lock);
1218 } else if (lock
->get_sm() == &sm_filelock
) {
1219 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1220 if (lock
->get_state() == LOCK_EXCL
&&
1221 in
->get_target_loner() >= 0 &&
1222 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1230 // request rdlock state change from auth
1231 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1232 if (!mds
->is_cluster_degraded() ||
1233 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1234 dout(10) << "requesting rdlock from auth on "
1235 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1236 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1241 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1242 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1243 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1244 mds
->mdcache
->recovery_queue
.prioritize(in
);
1251 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1253 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1255 // can read? grab ref.
1256 if (lock
->can_rdlock(client
))
1259 _rdlock_kick(lock
, false);
1261 if (lock
->can_rdlock(client
))
1266 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1267 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1272 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1274 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1276 // client may be allowed to rdlock the same item it has xlocked.
1277 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1278 if (mut
->snapid
!= CEPH_NOSNAP
)
1280 client_t client
= as_anon
? -1 : mut
->get_client();
1283 if (lock
->get_type() != CEPH_LOCK_DN
)
1284 in
= static_cast<CInode
*>(lock
->get_parent());
1287 if (!lock->get_parent()->is_auth() &&
1288 lock->fw_rdlock_to_auth()) {
1289 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1295 // can read? grab ref.
1296 if (lock
->can_rdlock(client
)) {
1298 mut
->rdlocks
.insert(lock
);
1299 mut
->locks
.insert(lock
);
1303 // hmm, wait a second.
1304 if (in
&& !in
->is_head() && in
->is_auth() &&
1305 lock
->get_state() == LOCK_SNAP_SYNC
) {
1306 // okay, we actually need to kick the head's lock to get ourselves synced up.
1307 CInode
*head
= mdcache
->get_inode(in
->ino());
1309 SimpleLock
*hlock
= head
->get_lock(lock
->get_type());
1310 if (hlock
->get_state() != LOCK_SYNC
) {
1311 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1312 if (!rdlock_start(head
->get_lock(lock
->get_type()), mut
, true)) // ** as_anon, no rdlock on EXCL **
1314 // oh, check our lock again then
1318 if (!_rdlock_kick(lock
, as_anon
))
1324 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1325 wait_on
= SimpleLock::WAIT_RD
;
1327 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1328 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1329 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1334 void Locker::nudge_log(SimpleLock
*lock
)
1336 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1337 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1338 mds
->mdlog
->flush();
1341 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1346 mut
->rdlocks
.erase(lock
);
1347 mut
->locks
.erase(lock
);
1350 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1353 if (!lock
->is_rdlocked()) {
1354 if (!lock
->is_stable())
1355 eval_gather(lock
, false, pneed_issue
);
1356 else if (lock
->get_parent()->is_auth())
1357 try_eval(lock
, pneed_issue
);
1362 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1364 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1365 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1366 if (!(*p
)->can_rdlock(-1)) {
1367 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1373 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1375 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1376 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1377 if (!rdlock_try(*p
, -1, NULL
)) {
1378 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1384 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1386 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1387 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1389 mut
->rdlocks
.insert(*p
);
1390 mut
->locks
.insert(*p
);
1394 // ------------------
1397 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1399 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1400 lock
->get_type() == CEPH_LOCK_DVERSION
)
1401 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1403 dout(7) << "wrlock_force on " << *lock
1404 << " on " << *lock
->get_parent() << dendl
;
1405 lock
->get_wrlock(true);
1406 mut
->wrlocks
.insert(lock
);
1407 mut
->locks
.insert(lock
);
1410 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1412 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1413 lock
->get_type() == CEPH_LOCK_DVERSION
)
1414 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1416 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1418 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1419 client_t client
= mut
->get_client();
1420 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1421 (in
->has_subtree_or_exporting_dirfrag() ||
1422 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1426 if (lock
->can_wrlock(client
) &&
1427 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1429 mut
->wrlocks
.insert(lock
);
1430 mut
->locks
.insert(lock
);
1434 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1435 in
->state_test(CInode::STATE_RECOVERING
)) {
1436 mds
->mdcache
->recovery_queue
.prioritize(in
);
1439 if (!lock
->is_stable())
1442 if (in
->is_auth()) {
1443 // don't do nested lock state change if we have dirty scatterdata and
1444 // may scatter_writebehind or start_scatter, because nowait==true implies
1445 // that the caller already has a log entry open!
1446 if (nowait
&& lock
->is_dirty())
1450 scatter_mix(static_cast<ScatterLock
*>(lock
));
1454 if (nowait
&& !lock
->can_wrlock(client
))
1459 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1460 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1461 if (!mds
->is_cluster_degraded() ||
1462 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1463 dout(10) << "requesting scatter from auth on "
1464 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1465 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1472 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1473 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1480 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1482 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1483 lock
->get_type() == CEPH_LOCK_DVERSION
)
1484 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1486 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1489 mut
->wrlocks
.erase(lock
);
1490 if (mut
->remote_wrlocks
.count(lock
) == 0)
1491 mut
->locks
.erase(lock
);
1494 if (!lock
->is_wrlocked()) {
1495 if (!lock
->is_stable())
1496 eval_gather(lock
, false, pneed_issue
);
1497 else if (lock
->get_parent()->is_auth())
1498 try_eval(lock
, pneed_issue
);
1505 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1507 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1509 // wait for active target
1510 if (mds
->is_cluster_degraded() &&
1511 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1512 dout(7) << " mds." << target
<< " is not active" << dendl
;
1513 if (mut
->more()->waiting_on_slave
.empty())
1514 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1518 // send lock request
1519 mut
->start_locking(lock
, target
);
1520 mut
->more()->slaves
.insert(target
);
1521 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1522 MMDSSlaveRequest::OP_WRLOCK
);
1523 r
->set_lock_type(lock
->get_type());
1524 lock
->get_parent()->set_object_info(r
->get_object_info());
1525 mds
->send_message_mds(r
, target
);
1527 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1528 mut
->more()->waiting_on_slave
.insert(target
);
1531 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1535 mut
->remote_wrlocks
.erase(lock
);
1536 if (mut
->wrlocks
.count(lock
) == 0)
1537 mut
->locks
.erase(lock
);
1539 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1540 << " " << *lock
->get_parent() << dendl
;
1541 if (!mds
->is_cluster_degraded() ||
1542 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1543 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1544 MMDSSlaveRequest::OP_UNWRLOCK
);
1545 slavereq
->set_lock_type(lock
->get_type());
1546 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1547 mds
->send_message_mds(slavereq
, target
);
1552 // ------------------
1555 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1557 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1558 lock
->get_type() == CEPH_LOCK_DVERSION
)
1559 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1561 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1562 client_t client
= mut
->get_client();
1565 if (lock
->get_parent()->is_auth()) {
1568 if (lock
->can_xlock(client
)) {
1569 lock
->set_state(LOCK_XLOCK
);
1570 lock
->get_xlock(mut
, client
);
1571 mut
->xlocks
.insert(lock
);
1572 mut
->locks
.insert(lock
);
1573 mut
->finish_locking(lock
);
1577 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1578 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1579 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1580 mds
->mdcache
->recovery_queue
.prioritize(in
);
1584 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1585 lock
->get_xlock_by_client() != client
||
1586 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1589 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1590 mut
->start_locking(lock
);
1597 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1602 assert(lock
->get_sm()->can_remote_xlock
);
1603 assert(!mut
->slave_request
);
1605 // wait for single auth
1606 if (lock
->get_parent()->is_ambiguous_auth()) {
1607 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1608 new C_MDS_RetryRequest(mdcache
, mut
));
1612 // wait for active auth
1613 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1614 if (mds
->is_cluster_degraded() &&
1615 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1616 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1617 if (mut
->more()->waiting_on_slave
.empty())
1618 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1622 // send lock request
1623 mut
->more()->slaves
.insert(auth
);
1624 mut
->start_locking(lock
, auth
);
1625 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1626 MMDSSlaveRequest::OP_XLOCK
);
1627 r
->set_lock_type(lock
->get_type());
1628 lock
->get_parent()->set_object_info(r
->get_object_info());
1629 mds
->send_message_mds(r
, auth
);
1631 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1632 mut
->more()->waiting_on_slave
.insert(auth
);
1638 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1640 assert(!lock
->is_stable());
1641 if (lock
->get_num_rdlocks() == 0 &&
1642 lock
->get_num_wrlocks() == 0 &&
1643 lock
->get_num_client_lease() == 0 &&
1644 lock
->get_state() != LOCK_XLOCKSNAP
&&
1645 lock
->get_type() != CEPH_LOCK_DN
) {
1646 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1647 client_t loner
= in
->get_target_loner();
1648 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1649 lock
->set_state(LOCK_EXCL
);
1650 lock
->get_parent()->auth_unpin(lock
);
1651 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1652 if (lock
->get_cap_shift())
1653 *pneed_issue
= true;
1654 if (lock
->get_parent()->is_auth() &&
1656 try_eval(lock
, pneed_issue
);
1660 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1661 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1664 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1666 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1667 lock
->get_type() == CEPH_LOCK_DVERSION
)
1668 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1670 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1672 client_t xlocker
= lock
->get_xlock_by_client();
1677 mut
->xlocks
.erase(lock
);
1678 mut
->locks
.erase(lock
);
1680 bool do_issue
= false;
1683 if (!lock
->get_parent()->is_auth()) {
1684 assert(lock
->get_sm()->can_remote_xlock
);
1687 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1688 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1689 if (!mds
->is_cluster_degraded() ||
1690 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1691 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1692 MMDSSlaveRequest::OP_UNXLOCK
);
1693 slavereq
->set_lock_type(lock
->get_type());
1694 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1695 mds
->send_message_mds(slavereq
, auth
);
1698 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1699 SimpleLock::WAIT_WR
|
1700 SimpleLock::WAIT_RD
, 0);
1702 if (lock
->get_num_xlocks() == 0) {
1703 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1704 lock
->set_state(LOCK_XLOCKDONE
);
1705 _finish_xlock(lock
, xlocker
, &do_issue
);
1710 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1711 if (in
->is_head()) {
1713 *pneed_issue
= true;
1720 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1722 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1725 mut
->xlocks
.erase(lock
);
1726 mut
->locks
.erase(lock
);
1728 MDSCacheObject
*p
= lock
->get_parent();
1729 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1731 if (!lock
->is_stable())
1732 lock
->get_parent()->auth_unpin(lock
);
1734 lock
->set_state(LOCK_LOCK
);
1737 void Locker::xlock_import(SimpleLock
*lock
)
1739 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1740 lock
->get_parent()->auth_pin(lock
);
1745 // file i/o -----------------------------------------
1747 version_t
Locker::issue_file_data_version(CInode
*in
)
1749 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1750 return in
->inode
.file_data_version
;
1753 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1761 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1762 bool sm
=false, bool ni
=false, client_t c
=-1,
1763 MClientCaps
*ac
= 0)
1764 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1765 client(c
), ack(ac
) {
1766 in
->get(CInode::PIN_PTRWAITER
);
1768 void finish(int r
) override
{
1769 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1770 in
->put(CInode::PIN_PTRWAITER
);
1774 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1775 client_t client
, MClientCaps
*ack
)
1777 dout(10) << "file_update_finish on " << *in
<< dendl
;
1778 in
->pop_and_dirty_projected_inode(mut
->ls
);
1783 Session
*session
= mds
->get_session(client
);
1785 // "oldest flush tid" > 0 means client uses unique TID for each flush
1786 if (ack
->get_oldest_flush_tid() > 0)
1787 session
->add_completed_flush(ack
->get_client_tid());
1788 mds
->send_message_client_counted(ack
, session
);
1790 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1795 set
<CInode
*> need_issue
;
1796 drop_locks(mut
.get(), &need_issue
);
1798 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1799 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1800 // check for snap writeback completion
1801 bool gather
= false;
1802 compact_map
<int,set
<client_t
> >::iterator p
= in
->client_snap_caps
.begin();
1803 while (p
!= in
->client_snap_caps
.end()) {
1804 SimpleLock
*lock
= in
->get_lock(p
->first
);
1806 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1807 << " lock " << *lock
<< " on " << *in
<< dendl
;
1810 p
->second
.erase(client
);
1811 if (p
->second
.empty()) {
1813 in
->client_snap_caps
.erase(p
++);
1818 if (in
->client_snap_caps
.empty())
1819 in
->item_open_file
.remove_myself();
1820 eval_cap_gather(in
, &need_issue
);
1823 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1824 Capability
*cap
= in
->get_client_cap(client
);
1825 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1826 issue_caps(in
, cap
);
1829 if (share_max
&& in
->is_auth() &&
1830 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1831 share_inode_max_size(in
);
1833 issue_caps_set(need_issue
);
1835 // auth unpin after issuing caps
1839 Capability
* Locker::issue_new_caps(CInode
*in
,
1845 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1848 // if replay, try to reconnect cap, and otherwise do nothing.
1850 mds
->mdcache
->try_reconnect_cap(in
, session
);
1855 assert(session
->info
.inst
.name
.is_client());
1856 client_t my_client
= session
->info
.inst
.name
.num();
1857 int my_want
= ceph_caps_for_mode(mode
);
1859 // register a capability
1860 Capability
*cap
= in
->get_client_cap(my_client
);
1863 cap
= in
->add_client_cap(my_client
, session
, realm
);
1864 cap
->set_wanted(my_want
);
1866 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1870 // make sure it wants sufficient caps
1871 if (my_want
& ~cap
->wanted()) {
1872 // augment wanted caps for this client
1873 cap
->set_wanted(cap
->wanted() | my_want
);
1877 if (in
->is_auth()) {
1878 // [auth] twiddle mode?
1879 eval(in
, CEPH_CAP_LOCKS
);
1881 if (_need_flush_mdlog(in
, my_want
))
1882 mds
->mdlog
->flush();
1885 // [replica] tell auth about any new caps wanted
1886 request_inode_file_caps(in
);
1889 // issue caps (pot. incl new one)
1890 //issue_caps(in); // note: _eval above may have done this already...
1892 // re-issue whatever we can
1893 //cap->issue(cap->pending());
1896 cap
->dec_suppress();
1902 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1904 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1908 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1910 // allowed caps are determined by the lock mode.
1911 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1912 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1913 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1915 client_t loner
= in
->get_loner();
1917 dout(7) << "issue_caps loner client." << loner
1918 << " allowed=" << ccap_string(loner_allowed
)
1919 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1920 << ", others allowed=" << ccap_string(all_allowed
)
1921 << " on " << *in
<< dendl
;
1923 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1924 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1925 << " on " << *in
<< dendl
;
1928 assert(in
->is_head());
1930 // count conflicts with
1934 map
<client_t
, Capability
*>::iterator it
;
1936 it
= in
->client_caps
.find(only_cap
->get_client());
1938 it
= in
->client_caps
.begin();
1939 for (; it
!= in
->client_caps
.end(); ++it
) {
1940 Capability
*cap
= it
->second
;
1941 if (cap
->is_stale())
1944 // do not issue _new_ bits when size|mtime is projected
1946 if (loner
== it
->first
)
1947 allowed
= loner_allowed
;
1949 allowed
= all_allowed
;
1951 // add in any xlocker-only caps (for locks this client is the xlocker for)
1952 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
1954 Session
*session
= mds
->get_session(it
->first
);
1955 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
1956 !(session
&& session
->connection
&&
1957 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
1958 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
1960 int pending
= cap
->pending();
1961 int wanted
= cap
->wanted();
1963 dout(20) << " client." << it
->first
1964 << " pending " << ccap_string(pending
)
1965 << " allowed " << ccap_string(allowed
)
1966 << " wanted " << ccap_string(wanted
)
1969 if (!(pending
& ~allowed
)) {
1970 // skip if suppress or new, and not revocation
1971 if (cap
->is_new() || cap
->is_suppress()) {
1972 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
1977 // notify clients about deleted inode, to make sure they release caps ASAP.
1978 if (in
->inode
.nlink
== 0)
1979 wanted
|= CEPH_CAP_LINK_SHARED
;
1981 // are there caps that the client _wants_ and can have, but aren't pending?
1982 // or do we need to revoke?
1983 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
1984 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
1988 // include caps that clients generally like, while we're at it.
1989 int likes
= in
->get_caps_liked();
1990 int before
= pending
;
1992 if (pending
& ~allowed
)
1993 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
1995 seq
= cap
->issue((wanted
|likes
) & allowed
);
1996 int after
= cap
->pending();
1998 if (cap
->is_new()) {
1999 // haven't send caps to client yet
2000 if (before
& ~after
)
2001 cap
->confirm_receipt(seq
, after
);
2003 dout(7) << " sending MClientCaps to client." << it
->first
2004 << " seq " << cap
->get_last_seq()
2005 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2008 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2009 if (op
== CEPH_CAP_OP_REVOKE
) {
2010 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2011 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2012 cap
->set_last_revoke_stamp(ceph_clock_now());
2013 cap
->reset_num_revoke_warnings();
2016 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2017 in
->find_snaprealm()->inode
->ino(),
2018 cap
->get_cap_id(), cap
->get_last_seq(),
2021 mds
->get_osd_epoch_barrier());
2022 in
->encode_cap_message(m
, cap
);
2024 mds
->send_message_client_counted(m
, it
->first
);
2032 return (nissued
== 0); // true if no re-issued, no callbacks
2035 void Locker::issue_truncate(CInode
*in
)
2037 dout(7) << "issue_truncate on " << *in
<< dendl
;
2039 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2040 it
!= in
->client_caps
.end();
2042 Capability
*cap
= it
->second
;
2043 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2045 in
->find_snaprealm()->inode
->ino(),
2046 cap
->get_cap_id(), cap
->get_last_seq(),
2047 cap
->pending(), cap
->wanted(), 0,
2049 mds
->get_osd_epoch_barrier());
2050 in
->encode_cap_message(m
, cap
);
2051 mds
->send_message_client_counted(m
, it
->first
);
2054 // should we increase max_size?
2055 if (in
->is_auth() && in
->is_file())
2056 check_inode_max_size(in
);
2060 void Locker::revoke_stale_caps(Capability
*cap
)
2062 CInode
*in
= cap
->get_inode();
2063 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2064 // if export succeeds, the cap will be removed. if export fails, we need to
2065 // revoke the cap if it's still stale.
2066 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2070 int issued
= cap
->issued();
2071 if (issued
& ~CEPH_CAP_PIN
) {
2072 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2075 if (in
->is_auth() &&
2076 in
->inode
.client_ranges
.count(cap
->get_client()))
2077 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2079 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2080 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2081 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2082 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2084 if (in
->is_auth()) {
2085 try_eval(in
, CEPH_CAP_LOCKS
);
2087 request_inode_file_caps(in
);
2092 void Locker::revoke_stale_caps(Session
*session
)
2094 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2096 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2097 Capability
*cap
= *p
;
2099 revoke_stale_caps(cap
);
2103 void Locker::resume_stale_caps(Session
*session
)
2105 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2107 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2108 Capability
*cap
= *p
;
2109 CInode
*in
= cap
->get_inode();
2110 assert(in
->is_head());
2111 if (cap
->is_stale()) {
2112 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2115 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2116 // if export succeeds, the cap will be removed. if export fails,
2117 // we need to re-issue the cap if it's not stale.
2118 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2122 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2123 issue_caps(in
, cap
);
2128 void Locker::remove_stale_leases(Session
*session
)
2130 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2131 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2133 ClientLease
*l
= *p
;
2135 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2136 dout(15) << " removing lease on " << *parent
<< dendl
;
2137 parent
->remove_client_lease(l
, this);
2142 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2145 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2146 in
->get(CInode::PIN_PTRWAITER
);
2148 void finish(int r
) override
{
2150 locker
->request_inode_file_caps(in
);
2151 in
->put(CInode::PIN_PTRWAITER
);
2155 void Locker::request_inode_file_caps(CInode
*in
)
2157 assert(!in
->is_auth());
2159 int wanted
= in
->get_caps_wanted() & ~CEPH_CAP_PIN
;
2160 if (wanted
!= in
->replica_caps_wanted
) {
2161 // wait for single auth
2162 if (in
->is_ambiguous_auth()) {
2163 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2164 new C_MDL_RequestInodeFileCaps(this, in
));
2168 mds_rank_t auth
= in
->authority().first
;
2169 if (mds
->is_cluster_degraded() &&
2170 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2171 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2175 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2176 << " was " << ccap_string(in
->replica_caps_wanted
)
2177 << " on " << *in
<< " to mds." << auth
<< dendl
;
2179 in
->replica_caps_wanted
= wanted
;
2181 if (!mds
->is_cluster_degraded() ||
2182 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2183 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2188 /* This function DOES put the passed message before returning */
2189 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2191 // nobody should be talking to us during recovery.
2192 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2195 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2196 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2199 assert(in
->is_auth());
2201 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2204 in
->mds_caps_wanted
[from
] = m
->get_caps();
2206 in
->mds_caps_wanted
.erase(from
);
2208 try_eval(in
, CEPH_CAP_LOCKS
);
2213 class C_MDL_CheckMaxSize
: public LockerContext
{
2215 uint64_t new_max_size
;
2220 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2221 uint64_t _newsize
, utime_t _mtime
) :
2222 LockerContext(l
), in(i
),
2223 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2225 in
->get(CInode::PIN_PTRWAITER
);
2227 void finish(int r
) override
{
2229 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2230 in
->put(CInode::PIN_PTRWAITER
);
2234 uint64_t Locker::calc_new_max_size(inode_t
*pi
, uint64_t size
)
2236 uint64_t new_max
= (size
+ 1) << 1;
2237 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2239 max_inc
*= pi
->get_layout_size_increment();
2240 new_max
= MIN(new_max
, size
+ max_inc
);
2242 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2245 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2246 map
<client_t
,client_writeable_range_t
> *new_ranges
,
2247 bool *max_increased
)
2249 inode_t
*latest
= in
->get_projected_inode();
2251 if(latest
->has_layout()) {
2252 ms
= calc_new_max_size(latest
, size
);
2254 // Layout-less directories like ~mds0/, have zero size
2258 // increase ranges as appropriate.
2259 // shrink to 0 if no WR|BUFFER caps issued.
2260 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2261 p
!= in
->client_caps
.end();
2263 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2264 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2266 if (latest
->client_ranges
.count(p
->first
)) {
2267 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2268 if (ms
> oldr
.range
.last
)
2269 *max_increased
= true;
2270 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2271 nr
.follows
= oldr
.follows
;
2273 *max_increased
= true;
2275 nr
.follows
= in
->first
- 1;
2281 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2282 uint64_t new_max_size
, uint64_t new_size
,
2285 assert(in
->is_auth());
2286 assert(in
->is_file());
2288 inode_t
*latest
= in
->get_projected_inode();
2289 map
<client_t
, client_writeable_range_t
> new_ranges
;
2290 uint64_t size
= latest
->size
;
2291 bool update_size
= new_size
> 0;
2292 bool update_max
= false;
2293 bool max_increased
= false;
2296 new_size
= size
= MAX(size
, new_size
);
2297 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2298 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2299 update_size
= false;
2302 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2304 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2307 if (!update_size
&& !update_max
) {
2308 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2312 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2313 << " update_size " << update_size
2314 << " on " << *in
<< dendl
;
2316 if (in
->is_frozen()) {
2317 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2318 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2322 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2325 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2327 if (in
->filelock
.is_stable()) {
2328 if (in
->get_target_loner() >= 0)
2329 file_excl(&in
->filelock
);
2331 simple_lock(&in
->filelock
);
2333 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2335 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2340 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2341 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2346 MutationRef
mut(new MutationImpl());
2347 mut
->ls
= mds
->mdlog
->get_current_segment();
2349 inode_t
*pi
= in
->project_inode();
2350 pi
->version
= in
->pre_dirty();
2353 dout(10) << "check_inode_max_size client_ranges " << pi
->client_ranges
<< " -> " << new_ranges
<< dendl
;
2354 pi
->client_ranges
= new_ranges
;
2358 dout(10) << "check_inode_max_size size " << pi
->size
<< " -> " << new_size
<< dendl
;
2359 pi
->size
= new_size
;
2360 pi
->rstat
.rbytes
= new_size
;
2361 dout(10) << "check_inode_max_size mtime " << pi
->mtime
<< " -> " << new_mtime
<< dendl
;
2362 pi
->mtime
= new_mtime
;
2365 // use EOpen if the file is still open; otherwise, use EUpdate.
2366 // this is just an optimization to push open files forward into
2367 // newer log segments.
2369 EMetaBlob
*metablob
;
2370 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2371 EOpen
*eo
= new EOpen(mds
->mdlog
);
2372 eo
->add_ino(in
->ino());
2373 metablob
= &eo
->metablob
;
2375 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2377 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2378 metablob
= &eu
->metablob
;
2381 mds
->mdlog
->start_entry(le
);
2382 if (update_size
) { // FIXME if/when we do max_size nested accounting
2383 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2385 CDentry
*parent
= in
->get_projected_parent_dn();
2386 metablob
->add_primary_dentry(parent
, in
, true);
2388 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2389 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2391 mds
->mdlog
->submit_entry(le
,
2392 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2393 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2396 // make max_size _increase_ timely
2398 mds
->mdlog
->flush();
2404 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2407 * only share if currently issued a WR cap. if client doesn't have it,
2408 * file_max doesn't matter, and the client will get it if/when they get
2411 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2412 map
<client_t
, Capability
*>::iterator it
;
2414 it
= in
->client_caps
.find(only_cap
->get_client());
2416 it
= in
->client_caps
.begin();
2417 for (; it
!= in
->client_caps
.end(); ++it
) {
2418 const client_t client
= it
->first
;
2419 Capability
*cap
= it
->second
;
2420 if (cap
->is_suppress())
2422 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2423 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2424 cap
->inc_last_seq();
2425 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2427 in
->find_snaprealm()->inode
->ino(),
2428 cap
->get_cap_id(), cap
->get_last_seq(),
2429 cap
->pending(), cap
->wanted(), 0,
2431 mds
->get_osd_epoch_barrier());
2432 in
->encode_cap_message(m
, cap
);
2433 mds
->send_message_client_counted(m
, client
);
2440 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2442 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2443 * pending mutations. */
2444 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2445 in
->filelock
.is_unstable_and_locked()) ||
2446 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2447 in
->authlock
.is_unstable_and_locked()) ||
2448 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2449 in
->linklock
.is_unstable_and_locked()) ||
2450 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2451 in
->xattrlock
.is_unstable_and_locked()))
2456 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2458 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2459 dout(10) << " wanted " << ccap_string(cap
->wanted())
2460 << " -> " << ccap_string(wanted
) << dendl
;
2461 cap
->set_wanted(wanted
);
2462 } else if (wanted
& ~cap
->wanted()) {
2463 dout(10) << " wanted " << ccap_string(cap
->wanted())
2464 << " -> " << ccap_string(wanted
)
2465 << " (added caps even though we had seq mismatch!)" << dendl
;
2466 cap
->set_wanted(wanted
| cap
->wanted());
2468 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2469 << " -> " << ccap_string(wanted
)
2470 << " (issue_seq " << issue_seq
<< " != last_issue "
2471 << cap
->get_last_issue() << ")" << dendl
;
2475 CInode
*cur
= cap
->get_inode();
2476 if (!cur
->is_auth()) {
2477 request_inode_file_caps(cur
);
2481 if (cap
->wanted() == 0) {
2482 if (cur
->item_open_file
.is_on_list() &&
2483 !cur
->is_any_caps_wanted()) {
2484 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2485 cur
->item_open_file
.remove_myself();
2488 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2489 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2490 CEPH_CAP_FILE_WR
))) {
2491 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2494 if (!cur
->item_open_file
.is_on_list()) {
2495 dout(10) << " adding to open file list " << *cur
<< dendl
;
2496 assert(cur
->last
== CEPH_NOSNAP
);
2497 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2498 EOpen
*le
= new EOpen(mds
->mdlog
);
2499 mds
->mdlog
->start_entry(le
);
2500 le
->add_clean_inode(cur
);
2501 ls
->open_files
.push_back(&cur
->item_open_file
);
2502 mds
->mdlog
->submit_entry(le
);
2509 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
)
2511 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2512 compact_map
<snapid_t
, set
<client_t
> >::iterator p
= head_in
->client_need_snapflush
.begin();
2513 while (p
!= head_in
->client_need_snapflush
.end()) {
2514 snapid_t snapid
= p
->first
;
2515 set
<client_t
>& clients
= p
->second
;
2516 ++p
; // be careful, q loop below depends on this
2518 if (clients
.count(client
)) {
2519 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2520 CInode
*sin
= mdcache
->get_inode(head_in
->ino(), snapid
);
2522 // hrm, look forward until we find the inode.
2523 // (we can only look it up by the last snapid it is valid for)
2524 dout(10) << " didn't have " << head_in
->ino() << " snapid " << snapid
<< dendl
;
2525 for (compact_map
<snapid_t
, set
<client_t
> >::iterator q
= p
; // p is already at next entry
2526 q
!= head_in
->client_need_snapflush
.end();
2528 dout(10) << " trying snapid " << q
->first
<< dendl
;
2529 sin
= mdcache
->get_inode(head_in
->ino(), q
->first
);
2531 assert(sin
->first
<= snapid
);
2534 dout(10) << " didn't have " << head_in
->ino() << " snapid " << q
->first
<< dendl
;
2536 if (!sin
&& head_in
->is_multiversion())
2540 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2541 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2547 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2550 * This policy needs to be AT LEAST as permissive as allowing a client request
2551 * to go forward, or else a client request can release something, the release
2552 * gets deferred, but the request gets processed and deadlocks because when the
2553 * caps can't get revoked.
2555 * Currently, a request wait if anything locked is freezing (can't
2556 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2557 * _MUST_ be in the lock/auth_pin set.
2559 * auth_pins==0 implies no unstable lock and not auth pinnned by
2560 * client request, otherwise continue even it's freezing.
2562 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2566 * This function DOES put the passed message before returning
2568 void Locker::handle_client_caps(MClientCaps
*m
)
2570 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
2571 client_t client
= m
->get_source().num();
2573 snapid_t follows
= m
->get_snap_follows();
2574 dout(7) << "handle_client_caps "
2575 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2576 << " on " << m
->get_ino()
2577 << " tid " << m
->get_client_tid() << " follows " << follows
2578 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2580 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2582 dout(5) << " no session, dropping " << *m
<< dendl
;
2586 if (session
->is_closed() ||
2587 session
->is_closing() ||
2588 session
->is_killing()) {
2589 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2593 if (mds
->is_reconnect() &&
2594 m
->get_dirty() && m
->get_client_tid() > 0 &&
2595 !session
->have_completed_flush(m
->get_client_tid())) {
2596 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2598 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2602 if (m
->get_client_tid() > 0 && session
&&
2603 session
->have_completed_flush(m
->get_client_tid())) {
2604 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2605 << " for client." << client
<< dendl
;
2607 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2608 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2609 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2611 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2612 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2613 mds
->get_osd_epoch_barrier());
2615 ack
->set_snap_follows(follows
);
2616 ack
->set_client_tid(m
->get_client_tid());
2617 mds
->send_message_client_counted(ack
, m
->get_connection());
2618 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2622 // fall-thru because the message may release some caps
2624 m
->set_op(CEPH_CAP_OP_UPDATE
);
2628 // "oldest flush tid" > 0 means client uses unique TID for each flush
2629 if (m
->get_oldest_flush_tid() > 0 && session
) {
2630 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2631 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2633 if (session
->get_num_trim_flushes_warnings() > 0 &&
2634 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2635 session
->reset_num_trim_flushes_warnings();
2637 if (session
->get_num_completed_flushes() >=
2638 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2639 session
->inc_num_trim_flushes_warnings();
2641 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2642 << m
->get_oldest_flush_tid() << "), "
2643 << session
->get_num_completed_flushes()
2644 << " completed flushes recorded in session";
2645 mds
->clog
->warn() << ss
.str();
2646 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2651 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2653 if (mds
->is_clientreplay()) {
2654 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2655 << ", will try again after replayed client requests" << dendl
;
2656 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2659 dout(1) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2664 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2665 // Pause RADOS operations until we see the required epoch
2666 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2669 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2670 // Record the barrier so that we will retransmit it to clients
2671 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2674 CInode
*in
= head_in
;
2676 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2678 dout(10) << " head inode " << *head_in
<< dendl
;
2680 dout(10) << " cap inode " << *in
<< dendl
;
2682 Capability
*cap
= 0;
2683 cap
= in
->get_client_cap(client
);
2684 if (!cap
&& in
!= head_in
)
2685 cap
= head_in
->get_client_cap(client
);
2687 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *in
<< dendl
;
2694 if (should_defer_client_cap_frozen(in
)) {
2695 dout(7) << "handle_client_caps freezing|frozen on " << *in
<< dendl
;
2696 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2699 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2700 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2701 << ", dropping" << dendl
;
2706 int op
= m
->get_op();
2709 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2710 if (!in
->is_auth()) {
2711 dout(7) << " not auth, ignoring flushsnap on " << *in
<< dendl
;
2715 SnapRealm
*realm
= in
->find_snaprealm();
2716 snapid_t snap
= realm
->get_snap_following(follows
);
2717 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2719 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2720 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2721 // case we get a dup response, so whatever.)
2722 MClientCaps
*ack
= 0;
2723 if (m
->get_dirty()) {
2724 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2725 ack
->set_snap_follows(follows
);
2726 ack
->set_client_tid(m
->get_client_tid());
2727 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2730 if (in
== head_in
||
2731 (head_in
->client_need_snapflush
.count(snap
) &&
2732 head_in
->client_need_snapflush
[snap
].count(client
))) {
2733 dout(7) << " flushsnap snap " << snap
2734 << " client." << client
<< " on " << *in
<< dendl
;
2736 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2738 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2740 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2743 head_in
->remove_need_snapflush(in
, snap
, client
);
2746 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2748 mds
->send_message_client_counted(ack
, m
->get_connection());
2753 if (cap
->get_cap_id() != m
->get_cap_id()) {
2754 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2756 // intermediate snap inodes
2757 while (in
!= head_in
) {
2758 assert(in
->last
!= CEPH_NOSNAP
);
2759 if (in
->is_auth() && m
->get_dirty()) {
2760 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2761 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2763 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2766 // head inode, and cap
2767 MClientCaps
*ack
= 0;
2769 int caps
= m
->get_caps();
2770 if (caps
& ~cap
->issued()) {
2771 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2772 caps
&= cap
->issued();
2775 cap
->confirm_receipt(m
->get_seq(), caps
);
2776 dout(10) << " follows " << follows
2777 << " retains " << ccap_string(m
->get_caps())
2778 << " dirty " << ccap_string(m
->get_dirty())
2779 << " on " << *in
<< dendl
;
2782 // missing/skipped snapflush?
2783 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2784 // presently only does so when it has actual dirty metadata. But, we
2785 // set up the need_snapflush stuff based on the issued caps.
2786 // We can infer that the client WONT send a FLUSHSNAP once they have
2787 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2789 if (!head_in
->client_need_snapflush
.empty()) {
2790 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2791 _do_null_snapflush(head_in
, client
);
2793 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2797 if (m
->get_dirty() && in
->is_auth()) {
2798 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2799 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2800 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2801 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2802 ack
->set_client_tid(m
->get_client_tid());
2803 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2806 // filter wanted based on what we could ever give out (given auth/replica status)
2807 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2808 int new_wanted
= m
->get_wanted() & head_in
->get_caps_allowed_ever();
2809 if (new_wanted
!= cap
->wanted()) {
2810 if (!need_flush
&& (new_wanted
& ~cap
->pending())) {
2811 // exapnding caps. make sure we aren't waiting for a log flush
2812 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2815 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2818 if (in
->is_auth() &&
2819 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2821 eval(in
, CEPH_CAP_LOCKS
);
2823 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2824 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2826 // no update, ack now.
2828 mds
->send_message_client_counted(ack
, m
->get_connection());
2830 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2831 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2832 issue_caps(in
, cap
);
2834 if (cap
->get_last_seq() == 0 &&
2835 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2836 cap
->issue_norevoke(cap
->issued());
2837 share_inode_max_size(in
, cap
);
2842 mds
->mdlog
->flush();
2850 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2852 ceph_mds_request_release item
;
2854 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2855 LockerContext(l
), client(c
), item(it
) { }
2856 void finish(int r
) override
{
2858 MDRequestRef null_ref
;
2859 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2863 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2864 const string
&dname
)
2866 inodeno_t ino
= (uint64_t)item
.ino
;
2867 uint64_t cap_id
= item
.cap_id
;
2868 int caps
= item
.caps
;
2869 int wanted
= item
.wanted
;
2871 int issue_seq
= item
.issue_seq
;
2872 int mseq
= item
.mseq
;
2874 CInode
*in
= mdcache
->get_inode(ino
);
2878 if (dname
.length()) {
2879 frag_t fg
= in
->pick_dirfrag(dname
);
2880 CDir
*dir
= in
->get_dirfrag(fg
);
2882 CDentry
*dn
= dir
->lookup(dname
);
2884 ClientLease
*l
= dn
->get_client_lease(client
);
2886 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2887 dn
->remove_client_lease(l
, this);
2889 dout(7) << "process_cap_release client." << client
2890 << " doesn't have lease on " << *dn
<< dendl
;
2893 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2894 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2899 Capability
*cap
= in
->get_client_cap(client
);
2903 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2904 << (mdr
? "" : " (DEFERRED, no mdr)")
2907 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2908 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2912 if (cap
->get_cap_id() != cap_id
) {
2913 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2917 if (should_defer_client_cap_frozen(in
)) {
2918 dout(7) << " frozen, deferring" << dendl
;
2919 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2923 if (caps
& ~cap
->issued()) {
2924 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2925 caps
&= cap
->issued();
2927 cap
->confirm_receipt(seq
, caps
);
2929 if (!in
->client_need_snapflush
.empty() &&
2930 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2931 _do_null_snapflush(in
, client
);
2934 adjust_cap_wanted(cap
, wanted
, issue_seq
);
2937 cap
->inc_suppress();
2938 eval(in
, CEPH_CAP_LOCKS
);
2940 cap
->dec_suppress();
2942 // take note; we may need to reissue on this cap later
2944 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
2947 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
2952 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
2953 LockerContext(l
), in(i
), client(c
), seq(s
) {
2954 in
->get(CInode::PIN_PTRWAITER
);
2956 void finish(int r
) override
{
2957 locker
->kick_issue_caps(in
, client
, seq
);
2958 in
->put(CInode::PIN_PTRWAITER
);
2962 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
2964 Capability
*cap
= in
->get_client_cap(client
);
2965 if (!cap
|| cap
->get_last_sent() != seq
)
2967 if (in
->is_frozen()) {
2968 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
2969 in
->add_waiter(CInode::WAIT_UNFREEZE
,
2970 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
2973 dout(10) << "kick_issue_caps released at current seq " << seq
2974 << ", reissuing" << dendl
;
2975 issue_caps(in
, cap
);
2978 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
2980 client_t client
= mdr
->get_client();
2981 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
2982 p
!= mdr
->cap_releases
.end();
2984 CInode
*in
= mdcache
->get_inode(p
->first
);
2987 kick_issue_caps(in
, client
, p
->second
);
2992 * m and ack might be NULL, so don't dereference them unless dirty != 0
2994 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
2996 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
2997 << " follows " << follows
<< " snap " << snap
2998 << " on " << *in
<< dendl
;
3000 if (snap
== CEPH_NOSNAP
) {
3001 // hmm, i guess snap was already deleted? just ack!
3002 dout(10) << " wow, the snap following " << follows
3003 << " was already deleted. nothing to record, just ack." << dendl
;
3005 mds
->send_message_client_counted(ack
, m
->get_connection());
3009 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3010 mds
->mdlog
->start_entry(le
);
3011 MutationRef mut
= new MutationImpl();
3012 mut
->ls
= mds
->mdlog
->get_current_segment();
3014 // normal metadata updates that we can apply to the head as well.
3017 bool xattrs
= false;
3018 map
<string
,bufferptr
> *px
= 0;
3019 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3020 m
->xattrbl
.length() &&
3021 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3024 old_inode_t
*oi
= 0;
3025 if (in
->is_multiversion()) {
3026 oi
= in
->pick_old_inode(snap
);
3031 dout(10) << " writing into old inode" << dendl
;
3032 pi
= in
->project_inode();
3033 pi
->version
= in
->pre_dirty();
3034 if (snap
> oi
->first
)
3035 in
->split_old_inode(snap
);
3041 px
= new map
<string
,bufferptr
>;
3042 pi
= in
->project_inode(px
);
3043 pi
->version
= in
->pre_dirty();
3046 _update_cap_fields(in
, dirty
, m
, pi
);
3050 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
3051 << " len " << m
->xattrbl
.length() << dendl
;
3052 pi
->xattr_version
= m
->head
.xattr_version
;
3053 bufferlist::iterator p
= m
->xattrbl
.begin();
3057 if (pi
->client_ranges
.count(client
)) {
3058 if (in
->last
== snap
) {
3059 dout(10) << " removing client_range entirely" << dendl
;
3060 pi
->client_ranges
.erase(client
);
3062 dout(10) << " client_range now follows " << snap
<< dendl
;
3063 pi
->client_ranges
[client
].follows
= snap
;
3068 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3069 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3071 // "oldest flush tid" > 0 means client uses unique TID for each flush
3072 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3073 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3074 ack
->get_oldest_flush_tid());
3076 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3080 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, inode_t
*pi
)
3085 /* m must be valid if there are dirty caps */
3087 uint64_t features
= m
->get_connection()->get_features();
3089 if (m
->get_ctime() > pi
->ctime
) {
3090 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3091 << " for " << *in
<< dendl
;
3092 pi
->ctime
= m
->get_ctime();
3095 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3096 m
->get_change_attr() > pi
->change_attr
) {
3097 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3098 << " for " << *in
<< dendl
;
3099 pi
->change_attr
= m
->get_change_attr();
3103 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3104 utime_t atime
= m
->get_atime();
3105 utime_t mtime
= m
->get_mtime();
3106 uint64_t size
= m
->get_size();
3107 version_t inline_version
= m
->inline_version
;
3109 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3110 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3111 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3112 << " for " << *in
<< dendl
;
3115 if (in
->inode
.is_file() && // ONLY if regular file
3117 dout(7) << " size " << pi
->size
<< " -> " << size
3118 << " for " << *in
<< dendl
;
3120 pi
->rstat
.rbytes
= size
;
3122 if (in
->inode
.is_file() &&
3123 (dirty
& CEPH_CAP_FILE_WR
) &&
3124 inline_version
> pi
->inline_data
.version
) {
3125 pi
->inline_data
.version
= inline_version
;
3126 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3127 pi
->inline_data
.get_data() = m
->inline_data
;
3129 pi
->inline_data
.free_data();
3131 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3132 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3133 << " for " << *in
<< dendl
;
3136 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3137 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3138 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3139 << " for " << *in
<< dendl
;
3140 pi
->time_warp_seq
= m
->get_time_warp_seq();
3144 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3145 if (m
->head
.uid
!= pi
->uid
) {
3146 dout(7) << " uid " << pi
->uid
3147 << " -> " << m
->head
.uid
3148 << " for " << *in
<< dendl
;
3149 pi
->uid
= m
->head
.uid
;
3151 if (m
->head
.gid
!= pi
->gid
) {
3152 dout(7) << " gid " << pi
->gid
3153 << " -> " << m
->head
.gid
3154 << " for " << *in
<< dendl
;
3155 pi
->gid
= m
->head
.gid
;
3157 if (m
->head
.mode
!= pi
->mode
) {
3158 dout(7) << " mode " << oct
<< pi
->mode
3159 << " -> " << m
->head
.mode
<< dec
3160 << " for " << *in
<< dendl
;
3161 pi
->mode
= m
->head
.mode
;
3163 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3164 dout(7) << " btime " << oct
<< pi
->btime
3165 << " -> " << m
->get_btime() << dec
3166 << " for " << *in
<< dendl
;
3167 pi
->btime
= m
->get_btime();
3173 * update inode based on cap flush|flushsnap|wanted.
3174 * adjust max_size, if needed.
3175 * if we update, return true; otherwise, false (no updated needed).
3177 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3178 int dirty
, snapid_t follows
,
3179 MClientCaps
*m
, MClientCaps
*ack
,
3182 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3183 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3184 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3185 << " on " << *in
<< dendl
;
3186 assert(in
->is_auth());
3187 client_t client
= m
->get_source().num();
3188 inode_t
*latest
= in
->get_projected_inode();
3190 // increase or zero max_size?
3191 uint64_t size
= m
->get_size();
3192 bool change_max
= false;
3193 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3194 uint64_t new_max
= old_max
;
3196 if (in
->is_file()) {
3197 bool forced_change_max
= false;
3198 dout(20) << "inode is file" << dendl
;
3199 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3200 dout(20) << "client has write caps; m->get_max_size="
3201 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3202 if (m
->get_max_size() > new_max
) {
3203 dout(10) << "client requests file_max " << m
->get_max_size()
3204 << " > max " << old_max
<< dendl
;
3206 forced_change_max
= true;
3207 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3209 new_max
= calc_new_max_size(latest
, size
);
3211 if (new_max
> old_max
)
3223 if (in
->last
== CEPH_NOSNAP
&&
3225 !in
->filelock
.can_wrlock(client
) &&
3226 !in
->filelock
.can_force_wrlock(client
)) {
3227 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3228 if (in
->filelock
.is_stable()) {
3229 bool need_issue
= false;
3231 cap
->inc_suppress();
3232 if (in
->mds_caps_wanted
.empty() &&
3233 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3234 if (in
->filelock
.get_state() != LOCK_EXCL
)
3235 file_excl(&in
->filelock
, &need_issue
);
3237 simple_lock(&in
->filelock
, &need_issue
);
3241 cap
->dec_suppress();
3243 if (!in
->filelock
.can_wrlock(client
) &&
3244 !in
->filelock
.can_force_wrlock(client
)) {
3245 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3246 forced_change_max
? new_max
: 0,
3249 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3255 if (m
->flockbl
.length()) {
3257 bufferlist::iterator bli
= m
->flockbl
.begin();
3258 ::decode(num_locks
, bli
);
3259 for ( int i
=0; i
< num_locks
; ++i
) {
3260 ceph_filelock decoded_lock
;
3261 ::decode(decoded_lock
, bli
);
3262 in
->get_fcntl_lock_state()->held_locks
.
3263 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3264 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3266 ::decode(num_locks
, bli
);
3267 for ( int i
=0; i
< num_locks
; ++i
) {
3268 ceph_filelock decoded_lock
;
3269 ::decode(decoded_lock
, bli
);
3270 in
->get_flock_lock_state()->held_locks
.
3271 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3272 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3276 if (!dirty
&& !change_max
)
3279 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3280 if (session
->check_access(in
, MAY_WRITE
,
3281 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3283 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3289 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3290 mds
->mdlog
->start_entry(le
);
3293 map
<string
,bufferptr
> *px
= 0;
3294 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3295 m
->xattrbl
.length() &&
3296 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3297 px
= new map
<string
,bufferptr
>;
3299 inode_t
*pi
= in
->project_inode(px
);
3300 pi
->version
= in
->pre_dirty();
3302 MutationRef
mut(new MutationImpl());
3303 mut
->ls
= mds
->mdlog
->get_current_segment();
3305 _update_cap_fields(in
, dirty
, m
, pi
);
3308 dout(7) << " max_size " << old_max
<< " -> " << new_max
3309 << " for " << *in
<< dendl
;
3311 pi
->client_ranges
[client
].range
.first
= 0;
3312 pi
->client_ranges
[client
].range
.last
= new_max
;
3313 pi
->client_ranges
[client
].follows
= in
->first
- 1;
3315 pi
->client_ranges
.erase(client
);
3318 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3319 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3322 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3323 wrlock_force(&in
->authlock
, mut
);
3327 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3328 pi
->xattr_version
= m
->head
.xattr_version
;
3329 bufferlist::iterator p
= m
->xattrbl
.begin();
3332 wrlock_force(&in
->xattrlock
, mut
);
3336 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3337 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3339 // "oldest flush tid" > 0 means client uses unique TID for each flush
3340 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3341 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3342 ack
->get_oldest_flush_tid());
3344 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3347 if (need_flush
&& !*need_flush
&&
3348 ((change_max
&& new_max
) || // max INCREASE
3349 _need_flush_mdlog(in
, dirty
)))
3355 /* This function DOES put the passed message before returning */
3356 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3358 client_t client
= m
->get_source().num();
3359 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3361 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3362 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3366 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3367 // Pause RADOS operations until we see the required epoch
3368 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3371 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3372 // Record the barrier so that we will retransmit it to clients
3373 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3376 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3378 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3379 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3383 session
->notify_cap_release(m
->caps
.size());
3389 class C_Locker_RetryCapRelease
: public LockerContext
{
3393 ceph_seq_t migrate_seq
;
3394 ceph_seq_t issue_seq
;
3396 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3397 ceph_seq_t mseq
, ceph_seq_t seq
) :
3398 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3399 void finish(int r
) override
{
3400 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3404 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3405 ceph_seq_t mseq
, ceph_seq_t seq
)
3407 CInode
*in
= mdcache
->get_inode(ino
);
3409 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3412 Capability
*cap
= in
->get_client_cap(client
);
3414 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3418 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3419 if (cap
->get_cap_id() != cap_id
) {
3420 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3423 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3424 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3427 if (should_defer_client_cap_frozen(in
)) {
3428 dout(7) << " freezing|frozen, deferring" << dendl
;
3429 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3430 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3433 if (seq
!= cap
->get_last_issue()) {
3434 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3435 // clean out any old revoke history
3436 cap
->clean_revoke_from(seq
);
3437 eval_cap_gather(in
);
3440 remove_client_cap(in
, client
);
3443 /* This function DOES put the passed message before returning */
3445 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3447 // clean out any pending snapflush state
3448 if (!in
->client_need_snapflush
.empty())
3449 _do_null_snapflush(in
, client
);
3451 in
->remove_client_cap(client
);
3453 if (in
->is_auth()) {
3454 // make sure we clear out the client byte range
3455 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3456 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3457 check_inode_max_size(in
);
3459 request_inode_file_caps(in
);
3462 try_eval(in
, CEPH_CAP_LOCKS
);
3467 * Return true if any currently revoking caps exceed the
3468 * mds_revoke_cap_timeout threshold.
3470 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
) const
3472 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3474 // No revoking caps at the moment
3477 utime_t now
= ceph_clock_now();
3478 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3479 if (age
<= g_conf
->mds_revoke_cap_timeout
) {
3488 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
) const
3490 if (!any_late_revoking_caps(revoking_caps
)) {
3491 // Fast path: no misbehaving clients, execute in O(1)
3495 // Slow path: execute in O(N_clients)
3496 std::map
<client_t
, xlist
<Capability
*> >::const_iterator client_rc_iter
;
3497 for (client_rc_iter
= revoking_caps_by_client
.begin();
3498 client_rc_iter
!= revoking_caps_by_client
.end(); ++client_rc_iter
) {
3499 xlist
<Capability
*> const &client_rc
= client_rc_iter
->second
;
3500 bool any_late
= any_late_revoking_caps(client_rc
);
3502 result
->push_back(client_rc_iter
->first
);
3507 // Hard-code instead of surfacing a config settings because this is
3508 // really a hack that should go away at some point when we have better
3509 // inspection tools for getting at detailed cap state (#7316)
3510 #define MAX_WARN_CAPS 100
3512 void Locker::caps_tick()
3514 utime_t now
= ceph_clock_now();
3516 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3519 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3520 Capability
*cap
= *p
;
3522 utime_t age
= now
- cap
->get_last_revoke_stamp();
3523 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3524 if (age
<= g_conf
->mds_revoke_cap_timeout
) {
3525 dout(20) << __func__
<< " age below timeout " << g_conf
->mds_revoke_cap_timeout
<< dendl
;
3529 if (i
> MAX_WARN_CAPS
) {
3530 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3531 << "revoking, ignoring subsequent caps" << dendl
;
3535 // exponential backoff of warning intervals
3536 if (age
> g_conf
->mds_revoke_cap_timeout
* (1 << cap
->get_num_revoke_warnings())) {
3537 cap
->inc_num_revoke_warnings();
3539 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3540 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3541 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3542 mds
->clog
->warn() << ss
.str();
3543 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3545 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3551 void Locker::handle_client_lease(MClientLease
*m
)
3553 dout(10) << "handle_client_lease " << *m
<< dendl
;
3555 assert(m
->get_source().is_client());
3556 client_t client
= m
->get_source().num();
3558 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3560 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3566 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3567 CDir
*dir
= in
->get_dirfrag(fg
);
3569 dn
= dir
->lookup(m
->dname
);
3571 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3575 dout(10) << " on " << *dn
<< dendl
;
3578 ClientLease
*l
= dn
->get_client_lease(client
);
3580 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3585 switch (m
->get_action()) {
3586 case CEPH_MDS_LEASE_REVOKE_ACK
:
3587 case CEPH_MDS_LEASE_RELEASE
:
3588 if (l
->seq
!= m
->get_seq()) {
3589 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3591 dout(7) << "handle_client_lease client." << client
3592 << " on " << *dn
<< dendl
;
3593 dn
->remove_client_lease(l
, this);
3598 case CEPH_MDS_LEASE_RENEW
:
3600 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3601 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3602 if (dn
->lock
.can_lease(client
)) {
3603 int pool
= 1; // fixme.. do something smart!
3604 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3605 m
->h
.seq
= ++l
->seq
;
3608 utime_t now
= ceph_clock_now();
3609 now
+= mdcache
->client_lease_durations
[pool
];
3610 mdcache
->touch_client_lease(l
, pool
, now
);
3612 mds
->send_message_client_counted(m
, m
->get_connection());
3618 ceph_abort(); // implement me
3624 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3625 bufferlist
&bl
, utime_t now
, Session
*session
)
3627 CInode
*diri
= dn
->get_dir()->get_inode();
3628 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3629 ((!diri
->filelock
.can_lease(client
) &&
3630 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3631 dn
->lock
.can_lease(client
)) {
3632 int pool
= 1; // fixme.. do something smart!
3633 // issue a dentry lease
3634 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3635 session
->touch_lease(l
);
3637 now
+= mdcache
->client_lease_durations
[pool
];
3638 mdcache
->touch_client_lease(l
, pool
, now
);
3641 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3643 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3645 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3646 << " on " << *dn
<< dendl
;
3654 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3659 void Locker::revoke_client_leases(SimpleLock
*lock
)
3662 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3663 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3664 p
!= dn
->client_lease_map
.end();
3666 ClientLease
*l
= p
->second
;
3669 assert(lock
->get_type() == CEPH_LOCK_DN
);
3671 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3672 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3674 // i should also revoke the dir ICONTENT lease, if they have it!
3675 CInode
*diri
= dn
->get_dir()->get_inode();
3676 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3679 diri
->first
, CEPH_NOSNAP
,
3683 assert(n
== lock
->get_num_client_lease());
3688 // locks ----------------------------------------------------------------
3690 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3692 switch (lock_type
) {
3695 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3696 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3701 fg
= diri
->pick_dirfrag(info
.dname
);
3702 dir
= diri
->get_dirfrag(fg
);
3704 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3707 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3713 case CEPH_LOCK_IAUTH
:
3714 case CEPH_LOCK_ILINK
:
3715 case CEPH_LOCK_IDFT
:
3716 case CEPH_LOCK_IFILE
:
3717 case CEPH_LOCK_INEST
:
3718 case CEPH_LOCK_IXATTR
:
3719 case CEPH_LOCK_ISNAP
:
3720 case CEPH_LOCK_IFLOCK
:
3721 case CEPH_LOCK_IPOLICY
:
3723 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3725 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3728 switch (lock_type
) {
3729 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3730 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3731 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3732 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3733 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3734 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3735 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3736 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3737 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3742 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3750 /* This function DOES put the passed message before returning */
3751 void Locker::handle_lock(MLock
*m
)
3753 // nobody should be talking to us during recovery.
3754 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3756 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3758 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3763 switch (lock
->get_type()) {
3765 case CEPH_LOCK_IAUTH
:
3766 case CEPH_LOCK_ILINK
:
3767 case CEPH_LOCK_ISNAP
:
3768 case CEPH_LOCK_IXATTR
:
3769 case CEPH_LOCK_IFLOCK
:
3770 case CEPH_LOCK_IPOLICY
:
3771 handle_simple_lock(lock
, m
);
3774 case CEPH_LOCK_IDFT
:
3775 case CEPH_LOCK_INEST
:
3776 //handle_scatter_lock((ScatterLock*)lock, m);
3779 case CEPH_LOCK_IFILE
:
3780 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3784 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3794 // ==========================================================================
3797 /** This function may take a reference to m if it needs one, but does
3798 * not put references. */
3799 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3801 MDSCacheObject
*parent
= lock
->get_parent();
3802 if (parent
->is_auth() &&
3803 lock
->get_state() != LOCK_SYNC
&&
3804 !parent
->is_frozen()) {
3805 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3806 << " on " << *parent
<< dendl
;
3807 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3808 if (lock
->is_stable()) {
3811 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3812 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3813 new C_MDS_RetryMessage(mds
, m
->get()));
3816 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3817 << " on " << *parent
<< dendl
;
3818 // replica should retry
3822 /* This function DOES put the passed message before returning */
3823 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3825 int from
= m
->get_asker();
3827 dout(10) << "handle_simple_lock " << *m
3828 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3830 if (mds
->is_rejoin()) {
3831 if (lock
->get_parent()->is_rejoining()) {
3832 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3833 << ", dropping " << *m
<< dendl
;
3839 switch (m
->get_action()) {
3842 assert(lock
->get_state() == LOCK_LOCK
);
3843 lock
->decode_locked_state(m
->get_data());
3844 lock
->set_state(LOCK_SYNC
);
3845 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3849 assert(lock
->get_state() == LOCK_SYNC
);
3850 lock
->set_state(LOCK_SYNC_LOCK
);
3851 if (lock
->is_leased())
3852 revoke_client_leases(lock
);
3853 eval_gather(lock
, true);
3854 if (lock
->is_unstable_and_locked())
3855 mds
->mdlog
->flush();
3860 case LOCK_AC_LOCKACK
:
3861 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3862 lock
->get_state() == LOCK_SYNC_EXCL
);
3863 assert(lock
->is_gathering(from
));
3864 lock
->remove_gather(from
);
3866 if (lock
->is_gathering()) {
3867 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3868 << ", still gathering " << lock
->get_gather_set() << dendl
;
3870 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3871 << ", last one" << dendl
;
3876 case LOCK_AC_REQRDLOCK
:
3877 handle_reqrdlock(lock
, m
);
3885 /* unused, currently.
3887 class C_Locker_SimpleEval : public Context {
3891 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3892 void finish(int r) {
3893 locker->try_simple_eval(lock);
3897 void Locker::try_simple_eval(SimpleLock *lock)
3899 // unstable and ambiguous auth?
3900 if (!lock->is_stable() &&
3901 lock->get_parent()->is_ambiguous_auth()) {
3902 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3903 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3904 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3908 if (!lock->get_parent()->is_auth()) {
3909 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3913 if (!lock->get_parent()->can_auth_pin()) {
3914 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3915 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3916 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3920 if (lock->is_stable())
3926 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3928 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3930 assert(lock
->get_parent()->is_auth());
3931 assert(lock
->is_stable());
3933 if (lock
->get_parent()->is_freezing_or_frozen()) {
3934 // dentry lock in unreadable state can block path traverse
3935 if ((lock
->get_type() != CEPH_LOCK_DN
||
3936 lock
->get_state() == LOCK_SYNC
||
3937 lock
->get_parent()->is_frozen()))
3941 if (mdcache
->is_readonly()) {
3942 if (lock
->get_state() != LOCK_SYNC
) {
3943 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3944 simple_sync(lock
, need_issue
);
3951 if (lock
->get_type() != CEPH_LOCK_DN
) {
3952 in
= static_cast<CInode
*>(lock
->get_parent());
3953 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
3957 if (lock
->get_state() != LOCK_EXCL
&&
3958 in
&& in
->get_target_loner() >= 0 &&
3959 (wanted
& CEPH_CAP_GEXCL
)) {
3960 dout(7) << "simple_eval stable, going to excl " << *lock
3961 << " on " << *lock
->get_parent() << dendl
;
3962 simple_excl(lock
, need_issue
);
3966 else if (lock
->get_state() != LOCK_SYNC
&&
3967 !lock
->is_wrlocked() &&
3968 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
3969 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
3970 dout(7) << "simple_eval stable, syncing " << *lock
3971 << " on " << *lock
->get_parent() << dendl
;
3972 simple_sync(lock
, need_issue
);
3979 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
3981 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3982 assert(lock
->get_parent()->is_auth());
3983 assert(lock
->is_stable());
3986 if (lock
->get_cap_shift())
3987 in
= static_cast<CInode
*>(lock
->get_parent());
3989 int old_state
= lock
->get_state();
3991 if (old_state
!= LOCK_TSYN
) {
3993 switch (lock
->get_state()) {
3994 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
3995 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
3996 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
3997 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
3998 default: ceph_abort();
4002 if (lock
->is_wrlocked())
4005 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4006 send_lock_message(lock
, LOCK_AC_SYNC
);
4007 lock
->init_gather();
4011 if (in
&& in
->is_head()) {
4012 if (in
->issued_caps_need_gather(lock
)) {
4021 bool need_recover
= false;
4022 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4024 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4025 mds
->mdcache
->queue_file_recover(in
);
4026 need_recover
= true;
4031 if (!gather
&& lock
->is_dirty()) {
4032 lock
->get_parent()->auth_pin(lock
);
4033 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4034 mds
->mdlog
->flush();
4039 lock
->get_parent()->auth_pin(lock
);
4041 mds
->mdcache
->do_file_recover();
4046 if (lock
->get_parent()->is_replicated()) { // FIXME
4048 lock
->encode_locked_state(data
);
4049 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4051 lock
->set_state(LOCK_SYNC
);
4052 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4053 if (in
&& in
->is_head()) {
4062 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4064 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4065 assert(lock
->get_parent()->is_auth());
4066 assert(lock
->is_stable());
4069 if (lock
->get_cap_shift())
4070 in
= static_cast<CInode
*>(lock
->get_parent());
4072 switch (lock
->get_state()) {
4073 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4074 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4075 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4076 default: ceph_abort();
4080 if (lock
->is_rdlocked())
4082 if (lock
->is_wrlocked())
4085 if (lock
->get_parent()->is_replicated() &&
4086 lock
->get_state() != LOCK_LOCK_EXCL
&&
4087 lock
->get_state() != LOCK_XSYN_EXCL
) {
4088 send_lock_message(lock
, LOCK_AC_LOCK
);
4089 lock
->init_gather();
4093 if (in
&& in
->is_head()) {
4094 if (in
->issued_caps_need_gather(lock
)) {
4104 lock
->get_parent()->auth_pin(lock
);
4106 lock
->set_state(LOCK_EXCL
);
4107 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4117 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4119 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4120 assert(lock
->get_parent()->is_auth());
4121 assert(lock
->is_stable());
4122 assert(lock
->get_state() != LOCK_LOCK
);
4125 if (lock
->get_cap_shift())
4126 in
= static_cast<CInode
*>(lock
->get_parent());
4128 int old_state
= lock
->get_state();
4130 switch (lock
->get_state()) {
4131 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4133 file_excl(static_cast<ScatterLock
*>(lock
), need_issue
);
4134 if (lock
->get_state() != LOCK_EXCL
)
4137 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4138 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4139 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4141 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4142 default: ceph_abort();
4146 if (lock
->is_leased()) {
4148 revoke_client_leases(lock
);
4150 if (lock
->is_rdlocked())
4152 if (in
&& in
->is_head()) {
4153 if (in
->issued_caps_need_gather(lock
)) {
4162 bool need_recover
= false;
4163 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4165 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4166 mds
->mdcache
->queue_file_recover(in
);
4167 need_recover
= true;
4172 if (lock
->get_parent()->is_replicated() &&
4173 lock
->get_state() == LOCK_MIX_LOCK
&&
4175 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4177 // move to second stage of gather now, so we don't send the lock action later.
4178 if (lock
->get_state() == LOCK_MIX_LOCK
)
4179 lock
->set_state(LOCK_MIX_LOCK2
);
4181 if (lock
->get_parent()->is_replicated() &&
4182 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4184 send_lock_message(lock
, LOCK_AC_LOCK
);
4185 lock
->init_gather();
4189 if (!gather
&& lock
->is_dirty()) {
4190 lock
->get_parent()->auth_pin(lock
);
4191 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4192 mds
->mdlog
->flush();
4197 lock
->get_parent()->auth_pin(lock
);
4199 mds
->mdcache
->do_file_recover();
4201 lock
->set_state(LOCK_LOCK
);
4202 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4207 void Locker::simple_xlock(SimpleLock
*lock
)
4209 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4210 assert(lock
->get_parent()->is_auth());
4211 //assert(lock->is_stable());
4212 assert(lock
->get_state() != LOCK_XLOCK
);
4215 if (lock
->get_cap_shift())
4216 in
= static_cast<CInode
*>(lock
->get_parent());
4218 if (lock
->is_stable())
4219 lock
->get_parent()->auth_pin(lock
);
4221 switch (lock
->get_state()) {
4223 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4224 default: ceph_abort();
4228 if (lock
->is_rdlocked())
4230 if (lock
->is_wrlocked())
4233 if (in
&& in
->is_head()) {
4234 if (in
->issued_caps_need_gather(lock
)) {
4241 lock
->set_state(LOCK_PREXLOCK
);
4242 //assert("shouldn't be called if we are already xlockable" == 0);
4250 // ==========================================================================
4255 Some notes on scatterlocks.
4257 - The scatter/gather is driven by the inode lock. The scatter always
4258 brings in the latest metadata from the fragments.
4260 - When in a scattered/MIX state, fragments are only allowed to
4261 update/be written to if the accounted stat matches the inode's
4264 - That means, on gather, we _only_ assimilate diffs for frag metadata
4265 that match the current version, because those are the only ones
4266 written during this scatter/gather cycle. (Others didn't permit
4267 it.) We increment the version and journal this to disk.
4269 - When possible, we also simultaneously update our local frag
4270 accounted stats to match.
4272 - On scatter, the new inode info is broadcast to frags, both local
4273 and remote. If possible (auth and !frozen), the dirfrag auth
4274 should update the accounted state (if it isn't already up to date).
4275 Note that this may occur on both the local inode auth node and
4276 inode replicas, so there are two potential paths. If it is NOT
4277 possible, they need to mark_stale to prevent any possible writes.
4279 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4280 only). Both are opportunities to update the frag accounted stats,
4281 even though only the MIX case is affected by a stale dirfrag.
4283 - Because many scatter/gather cycles can potentially go by without a
4284 frag being able to update its accounted stats (due to being frozen
4285 by exports/refragments in progress), the frag may have (even very)
4286 old stat versions. That's fine. If when we do want to update it,
4287 we can update accounted_* and the version first.
4291 class C_Locker_ScatterWB
: public LockerLogContext
{
4295 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4296 LockerLogContext(l
), lock(sl
), mut(m
) {}
4297 void finish(int r
) override
{
4298 locker
->scatter_writebehind_finish(lock
, mut
);
4302 void Locker::scatter_writebehind(ScatterLock
*lock
)
4304 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4305 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4308 MutationRef
mut(new MutationImpl());
4309 mut
->ls
= mds
->mdlog
->get_current_segment();
4311 // forcefully take a wrlock
4312 lock
->get_wrlock(true);
4313 mut
->wrlocks
.insert(lock
);
4314 mut
->locks
.insert(lock
);
4316 in
->pre_cow_old_inode(); // avoid cow mayhem
4318 inode_t
*pi
= in
->project_inode();
4319 pi
->version
= in
->pre_dirty();
4321 in
->finish_scatter_gather_update(lock
->get_type());
4322 lock
->start_flush();
4324 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4325 mds
->mdlog
->start_entry(le
);
4327 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4328 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4330 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4332 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4335 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4337 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4338 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4339 in
->pop_and_dirty_projected_inode(mut
->ls
);
4341 lock
->finish_flush();
4343 // if replicas may have flushed in a mix->lock state, send another
4344 // message so they can finish_flush().
4345 if (in
->is_replicated()) {
4346 switch (lock
->get_state()) {
4348 case LOCK_MIX_LOCK2
:
4351 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4356 drop_locks(mut
.get());
4359 if (lock
->is_stable())
4360 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4362 //scatter_eval_gather(lock);
4365 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4367 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4369 assert(lock
->get_parent()->is_auth());
4370 assert(lock
->is_stable());
4372 if (lock
->get_parent()->is_freezing_or_frozen()) {
4373 dout(20) << " freezing|frozen" << dendl
;
4377 if (mdcache
->is_readonly()) {
4378 if (lock
->get_state() != LOCK_SYNC
) {
4379 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4380 simple_sync(lock
, need_issue
);
4385 if (!lock
->is_rdlocked() &&
4386 lock
->get_state() != LOCK_MIX
&&
4387 lock
->get_scatter_wanted()) {
4388 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4389 << " on " << *lock
->get_parent() << dendl
;
4390 scatter_mix(lock
, need_issue
);
4394 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4395 // in general, we want to keep INEST writable at all times.
4396 if (!lock
->is_rdlocked()) {
4397 if (lock
->get_parent()->is_replicated()) {
4398 if (lock
->get_state() != LOCK_MIX
)
4399 scatter_mix(lock
, need_issue
);
4401 if (lock
->get_state() != LOCK_LOCK
)
4402 simple_lock(lock
, need_issue
);
4408 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4409 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4410 // i _should_ be sync.
4411 if (!lock
->is_wrlocked() &&
4412 lock
->get_state() != LOCK_SYNC
) {
4413 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4414 simple_sync(lock
, need_issue
);
4421 * mark a scatterlock to indicate that the dir fnode has some dirty data
4423 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4426 if (lock
->get_updated_item()->is_on_list()) {
4427 dout(10) << "mark_updated_scatterlock " << *lock
4428 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4430 updated_scatterlocks
.push_back(lock
->get_updated_item());
4431 utime_t now
= ceph_clock_now();
4432 lock
->set_update_stamp(now
);
4433 dout(10) << "mark_updated_scatterlock " << *lock
4434 << " - added at " << now
<< dendl
;
4439 * this is called by scatter_tick and LogSegment::try_to_trim() when
4440 * trying to flush dirty scattered data (i.e. updated fnode) back to
4443 * we need to lock|scatter in order to push fnode changes into the
4446 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4448 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4450 if (p
->is_frozen() || p
->is_freezing()) {
4451 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4453 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4455 // just requeue. not ideal.. starvation prone..
4456 updated_scatterlocks
.push_back(lock
->get_updated_item());
4460 if (p
->is_ambiguous_auth()) {
4461 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4463 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4465 // just requeue. not ideal.. starvation prone..
4466 updated_scatterlocks
.push_back(lock
->get_updated_item());
4473 if (lock
->is_stable()) {
4474 // can we do it now?
4475 // (only if we're not replicated.. if we are, we really do need
4476 // to nudge the lock state!)
4478 actually, even if we're not replicated, we can't stay in MIX, because another mds
4479 could discover and replicate us at any time. if that happens while we're flushing,
4480 they end up in MIX but their inode has the old scatterstat version.
4482 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4483 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4484 scatter_writebehind(lock);
4486 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4491 if (mdcache
->is_readonly()) {
4492 if (lock
->get_state() != LOCK_SYNC
) {
4493 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4494 simple_sync(static_cast<ScatterLock
*>(lock
));
4499 // adjust lock state
4500 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4501 switch (lock
->get_type()) {
4502 case CEPH_LOCK_IFILE
:
4503 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4504 scatter_mix(static_cast<ScatterLock
*>(lock
));
4505 else if (lock
->get_state() != LOCK_LOCK
)
4506 simple_lock(static_cast<ScatterLock
*>(lock
));
4508 simple_sync(static_cast<ScatterLock
*>(lock
));
4511 case CEPH_LOCK_IDFT
:
4512 case CEPH_LOCK_INEST
:
4513 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4515 else if (lock
->get_state() != LOCK_LOCK
)
4524 if (lock
->is_stable() && count
== 2) {
4525 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4526 // this should only realy happen when called via
4527 // handle_file_lock due to AC_NUDGE, because the rest of the
4528 // time we are replicated or have dirty data and won't get
4529 // called. bailing here avoids an infinite loop.
4534 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4536 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4541 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4542 << *lock
<< " on " << *p
<< dendl
;
4543 // request unscatter?
4544 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4545 if (!mds
->is_cluster_degraded() ||
4546 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4547 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4551 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4553 // also, requeue, in case we had wrong auth or something
4554 updated_scatterlocks
.push_back(lock
->get_updated_item());
4558 void Locker::scatter_tick()
4560 dout(10) << "scatter_tick" << dendl
;
4563 utime_t now
= ceph_clock_now();
4564 int n
= updated_scatterlocks
.size();
4565 while (!updated_scatterlocks
.empty()) {
4566 ScatterLock
*lock
= updated_scatterlocks
.front();
4568 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4570 if (!lock
->is_dirty()) {
4571 updated_scatterlocks
.pop_front();
4572 dout(10) << " removing from updated_scatterlocks "
4573 << *lock
<< " " << *lock
->get_parent() << dendl
;
4576 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4578 updated_scatterlocks
.pop_front();
4579 scatter_nudge(lock
, 0);
4581 mds
->mdlog
->flush();
4585 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4587 dout(10) << "scatter_tempsync " << *lock
4588 << " on " << *lock
->get_parent() << dendl
;
4589 assert(lock
->get_parent()->is_auth());
4590 assert(lock
->is_stable());
4592 assert(0 == "not fully implemented, at least not for filelock");
4594 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4596 switch (lock
->get_state()) {
4597 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4598 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4599 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4600 default: ceph_abort();
4604 if (lock
->is_wrlocked())
4607 if (lock
->get_cap_shift() &&
4609 in
->issued_caps_need_gather(lock
)) {
4617 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4618 in
->is_replicated()) {
4619 lock
->init_gather();
4620 send_lock_message(lock
, LOCK_AC_LOCK
);
4628 lock
->set_state(LOCK_TSYN
);
4629 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4630 if (lock
->get_cap_shift()) {
4641 // ==========================================================================
4644 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4646 dout(7) << "local_wrlock_grab on " << *lock
4647 << " on " << *lock
->get_parent() << dendl
;
4649 assert(lock
->get_parent()->is_auth());
4650 assert(lock
->can_wrlock());
4651 assert(!mut
->wrlocks
.count(lock
));
4652 lock
->get_wrlock(mut
->get_client());
4653 mut
->wrlocks
.insert(lock
);
4654 mut
->locks
.insert(lock
);
4657 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4659 dout(7) << "local_wrlock_start on " << *lock
4660 << " on " << *lock
->get_parent() << dendl
;
4662 assert(lock
->get_parent()->is_auth());
4663 if (lock
->can_wrlock()) {
4664 assert(!mut
->wrlocks
.count(lock
));
4665 lock
->get_wrlock(mut
->get_client());
4666 mut
->wrlocks
.insert(lock
);
4667 mut
->locks
.insert(lock
);
4670 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4675 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4677 dout(7) << "local_wrlock_finish on " << *lock
4678 << " on " << *lock
->get_parent() << dendl
;
4680 mut
->wrlocks
.erase(lock
);
4681 mut
->locks
.erase(lock
);
4682 if (lock
->get_num_wrlocks() == 0) {
4683 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4684 SimpleLock::WAIT_WR
|
4685 SimpleLock::WAIT_RD
);
4689 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4691 dout(7) << "local_xlock_start on " << *lock
4692 << " on " << *lock
->get_parent() << dendl
;
4694 assert(lock
->get_parent()->is_auth());
4695 if (!lock
->can_xlock_local()) {
4696 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4700 lock
->get_xlock(mut
, mut
->get_client());
4701 mut
->xlocks
.insert(lock
);
4702 mut
->locks
.insert(lock
);
4706 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4708 dout(7) << "local_xlock_finish on " << *lock
4709 << " on " << *lock
->get_parent() << dendl
;
4711 mut
->xlocks
.erase(lock
);
4712 mut
->locks
.erase(lock
);
4714 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4715 SimpleLock::WAIT_WR
|
4716 SimpleLock::WAIT_RD
);
4721 // ==========================================================================
4725 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4727 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4728 int loner_wanted
, other_wanted
;
4729 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4730 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4731 << " loner_wanted=" << gcap_string(loner_wanted
)
4732 << " other_wanted=" << gcap_string(other_wanted
)
4733 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4736 assert(lock
->get_parent()->is_auth());
4737 assert(lock
->is_stable());
4739 if (lock
->get_parent()->is_freezing_or_frozen())
4742 if (mdcache
->is_readonly()) {
4743 if (lock
->get_state() != LOCK_SYNC
) {
4744 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4745 simple_sync(lock
, need_issue
);
4751 if (lock
->get_state() == LOCK_EXCL
) {
4752 dout(20) << " is excl" << dendl
;
4753 int loner_issued
, other_issued
, xlocker_issued
;
4754 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4755 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4756 << " other_issued=" << gcap_string(other_issued
)
4757 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4759 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4760 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4761 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4762 dout(20) << " should lose it" << dendl
;
4763 // we should lose it.
4774 // -> any writer means MIX; RD doesn't matter.
4775 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4776 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4777 scatter_mix(lock
, need_issue
);
4778 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4779 simple_sync(lock
, need_issue
);
4781 dout(10) << " waiting for wrlock to drain" << dendl
;
4786 else if (lock
->get_state() != LOCK_EXCL
&&
4787 !lock
->is_rdlocked() &&
4788 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4789 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4790 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4791 in
->get_target_loner() >= 0) {
4792 dout(7) << "file_eval stable, bump to loner " << *lock
4793 << " on " << *lock
->get_parent() << dendl
;
4794 file_excl(lock
, need_issue
);
4798 else if (lock
->get_state() != LOCK_MIX
&&
4799 !lock
->is_rdlocked() &&
4800 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4801 (lock
->get_scatter_wanted() ||
4802 (in
->get_wanted_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4803 dout(7) << "file_eval stable, bump to mixed " << *lock
4804 << " on " << *lock
->get_parent() << dendl
;
4805 scatter_mix(lock
, need_issue
);
4809 else if (lock
->get_state() != LOCK_SYNC
&&
4810 !lock
->is_wrlocked() && // drain wrlocks first!
4811 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4812 !(wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) &&
4813 !((lock
->get_state() == LOCK_MIX
) &&
4814 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4815 //((wanted & CEPH_CAP_RD) ||
4816 //in->is_replicated() ||
4817 //lock->get_num_client_lease() ||
4818 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4820 dout(7) << "file_eval stable, bump to sync " << *lock
4821 << " on " << *lock
->get_parent() << dendl
;
4822 simple_sync(lock
, need_issue
);
4828 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4830 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4832 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4833 assert(in
->is_auth());
4834 assert(lock
->is_stable());
4836 if (lock
->get_state() == LOCK_LOCK
) {
4837 in
->start_scatter(lock
);
4838 if (in
->is_replicated()) {
4840 bufferlist softdata
;
4841 lock
->encode_locked_state(softdata
);
4843 // bcast to replicas
4844 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4848 lock
->set_state(LOCK_MIX
);
4849 lock
->clear_scatter_wanted();
4850 if (lock
->get_cap_shift()) {
4858 switch (lock
->get_state()) {
4859 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4861 file_excl(lock
, need_issue
);
4862 if (lock
->get_state() != LOCK_EXCL
)
4865 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4866 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4867 default: ceph_abort();
4871 if (lock
->is_rdlocked())
4873 if (in
->is_replicated()) {
4874 if (lock
->get_state() != LOCK_EXCL_MIX
&& // EXCL replica is already LOCK
4875 lock
->get_state() != LOCK_XSYN_EXCL
) { // XSYN replica is already LOCK; ** FIXME here too!
4876 send_lock_message(lock
, LOCK_AC_MIX
);
4877 lock
->init_gather();
4881 if (lock
->is_leased()) {
4882 revoke_client_leases(lock
);
4885 if (lock
->get_cap_shift() &&
4887 in
->issued_caps_need_gather(lock
)) {
4894 bool need_recover
= false;
4895 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4896 mds
->mdcache
->queue_file_recover(in
);
4897 need_recover
= true;
4902 lock
->get_parent()->auth_pin(lock
);
4904 mds
->mdcache
->do_file_recover();
4906 in
->start_scatter(lock
);
4907 lock
->set_state(LOCK_MIX
);
4908 lock
->clear_scatter_wanted();
4909 if (in
->is_replicated()) {
4910 bufferlist softdata
;
4911 lock
->encode_locked_state(softdata
);
4912 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4914 if (lock
->get_cap_shift()) {
4925 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4927 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4928 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4930 assert(in
->is_auth());
4931 assert(lock
->is_stable());
4933 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4934 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4936 switch (lock
->get_state()) {
4937 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4938 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4939 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4940 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4941 default: ceph_abort();
4945 if (lock
->is_rdlocked())
4947 if (lock
->is_wrlocked())
4950 if (in
->is_replicated() &&
4951 lock
->get_state() != LOCK_LOCK_EXCL
&&
4952 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
4953 send_lock_message(lock
, LOCK_AC_LOCK
);
4954 lock
->init_gather();
4957 if (lock
->is_leased()) {
4958 revoke_client_leases(lock
);
4961 if (in
->is_head() &&
4962 in
->issued_caps_need_gather(lock
)) {
4969 bool need_recover
= false;
4970 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4971 mds
->mdcache
->queue_file_recover(in
);
4972 need_recover
= true;
4977 lock
->get_parent()->auth_pin(lock
);
4979 mds
->mdcache
->do_file_recover();
4981 lock
->set_state(LOCK_EXCL
);
4989 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
4991 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4992 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4993 assert(in
->is_auth());
4994 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
4996 switch (lock
->get_state()) {
4997 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
4998 default: ceph_abort();
5002 if (lock
->is_wrlocked())
5005 if (in
->is_head() &&
5006 in
->issued_caps_need_gather(lock
)) {
5015 lock
->get_parent()->auth_pin(lock
);
5017 lock
->set_state(LOCK_XSYN
);
5018 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5026 void Locker::file_recover(ScatterLock
*lock
)
5028 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5029 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5031 assert(in
->is_auth());
5032 //assert(lock->is_stable());
5033 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5038 if (in->is_replicated()
5039 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5040 send_lock_message(lock, LOCK_AC_LOCK);
5041 lock->init_gather();
5045 if (in
->is_head() &&
5046 in
->issued_caps_need_gather(lock
)) {
5051 lock
->set_state(LOCK_SCAN
);
5053 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5055 mds
->mdcache
->queue_file_recover(in
);
5060 /* This function DOES put the passed message before returning */
5061 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5063 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5064 int from
= m
->get_asker();
5066 if (mds
->is_rejoin()) {
5067 if (in
->is_rejoining()) {
5068 dout(7) << "handle_file_lock still rejoining " << *in
5069 << ", dropping " << *m
<< dendl
;
5075 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5077 << " from mds." << from
<< " "
5080 bool caps
= lock
->get_cap_shift();
5082 switch (m
->get_action()) {
5085 assert(lock
->get_state() == LOCK_LOCK
||
5086 lock
->get_state() == LOCK_MIX
||
5087 lock
->get_state() == LOCK_MIX_SYNC2
);
5089 if (lock
->get_state() == LOCK_MIX
) {
5090 lock
->set_state(LOCK_MIX_SYNC
);
5091 eval_gather(lock
, true);
5092 if (lock
->is_unstable_and_locked())
5093 mds
->mdlog
->flush();
5097 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5098 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5101 lock
->decode_locked_state(m
->get_data());
5102 lock
->set_state(LOCK_SYNC
);
5107 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5112 switch (lock
->get_state()) {
5113 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5114 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5115 default: ceph_abort();
5118 eval_gather(lock
, true);
5119 if (lock
->is_unstable_and_locked())
5120 mds
->mdlog
->flush();
5124 case LOCK_AC_LOCKFLUSHED
:
5125 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5126 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5127 // wake up scatter_nudge waiters
5128 if (lock
->is_stable())
5129 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5133 assert(lock
->get_state() == LOCK_SYNC
||
5134 lock
->get_state() == LOCK_LOCK
||
5135 lock
->get_state() == LOCK_SYNC_MIX2
);
5137 if (lock
->get_state() == LOCK_SYNC
) {
5139 lock
->set_state(LOCK_SYNC_MIX
);
5140 eval_gather(lock
, true);
5141 if (lock
->is_unstable_and_locked())
5142 mds
->mdlog
->flush();
5147 lock
->decode_locked_state(m
->get_data());
5148 lock
->set_state(LOCK_MIX
);
5153 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5158 case LOCK_AC_LOCKACK
:
5159 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5160 lock
->get_state() == LOCK_MIX_LOCK
||
5161 lock
->get_state() == LOCK_MIX_LOCK2
||
5162 lock
->get_state() == LOCK_MIX_EXCL
||
5163 lock
->get_state() == LOCK_SYNC_EXCL
||
5164 lock
->get_state() == LOCK_SYNC_MIX
||
5165 lock
->get_state() == LOCK_MIX_TSYN
);
5166 assert(lock
->is_gathering(from
));
5167 lock
->remove_gather(from
);
5169 if (lock
->get_state() == LOCK_MIX_LOCK
||
5170 lock
->get_state() == LOCK_MIX_LOCK2
||
5171 lock
->get_state() == LOCK_MIX_EXCL
||
5172 lock
->get_state() == LOCK_MIX_TSYN
) {
5173 lock
->decode_locked_state(m
->get_data());
5174 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5175 // delay calling scatter_writebehind().
5176 lock
->clear_flushed();
5179 if (lock
->is_gathering()) {
5180 dout(7) << "handle_file_lock " << *in
<< " from " << from
5181 << ", still gathering " << lock
->get_gather_set() << dendl
;
5183 dout(7) << "handle_file_lock " << *in
<< " from " << from
5184 << ", last one" << dendl
;
5189 case LOCK_AC_SYNCACK
:
5190 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5191 assert(lock
->is_gathering(from
));
5192 lock
->remove_gather(from
);
5194 lock
->decode_locked_state(m
->get_data());
5196 if (lock
->is_gathering()) {
5197 dout(7) << "handle_file_lock " << *in
<< " from " << from
5198 << ", still gathering " << lock
->get_gather_set() << dendl
;
5200 dout(7) << "handle_file_lock " << *in
<< " from " << from
5201 << ", last one" << dendl
;
5206 case LOCK_AC_MIXACK
:
5207 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5208 assert(lock
->is_gathering(from
));
5209 lock
->remove_gather(from
);
5211 if (lock
->is_gathering()) {
5212 dout(7) << "handle_file_lock " << *in
<< " from " << from
5213 << ", still gathering " << lock
->get_gather_set() << dendl
;
5215 dout(7) << "handle_file_lock " << *in
<< " from " << from
5216 << ", last one" << dendl
;
5223 case LOCK_AC_REQSCATTER
:
5224 if (lock
->is_stable()) {
5225 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5226 * because the replica should be holding an auth_pin if they're
5227 * doing this (and thus, we are freezing, not frozen, and indefinite
5228 * starvation isn't an issue).
5230 dout(7) << "handle_file_lock got scatter request on " << *lock
5231 << " on " << *lock
->get_parent() << dendl
;
5232 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5235 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5236 << " on " << *lock
->get_parent() << dendl
;
5237 lock
->set_scatter_wanted();
5241 case LOCK_AC_REQUNSCATTER
:
5242 if (lock
->is_stable()) {
5243 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5244 * because the replica should be holding an auth_pin if they're
5245 * doing this (and thus, we are freezing, not frozen, and indefinite
5246 * starvation isn't an issue).
5248 dout(7) << "handle_file_lock got unscatter request on " << *lock
5249 << " on " << *lock
->get_parent() << dendl
;
5250 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5251 simple_lock(lock
); // FIXME tempsync?
5253 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5254 << " on " << *lock
->get_parent() << dendl
;
5255 lock
->set_unscatter_wanted();
5259 case LOCK_AC_REQRDLOCK
:
5260 handle_reqrdlock(lock
, m
);
5264 if (!lock
->get_parent()->is_auth()) {
5265 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5266 << " on " << *lock
->get_parent() << dendl
;
5267 } else if (!lock
->get_parent()->is_replicated()) {
5268 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5269 << " on " << *lock
->get_parent() << dendl
;
5271 dout(7) << "handle_file_lock trying nudge on " << *lock
5272 << " on " << *lock
->get_parent() << dendl
;
5273 scatter_nudge(lock
, 0, true);
5274 mds
->mdlog
->flush();