1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "MDSContext.h"
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
41 #include "messages/MMDSSlaveRequest.h"
45 #include "common/config.h"
48 #define dout_subsys ceph_subsys_mds
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
53 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
57 class LockerContext
: public MDSInternalContextBase
{
60 MDSRank
*get_mds() override
66 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
67 assert(locker
!= NULL
);
71 class LockerLogContext
: public MDSLogContextBase
{
74 MDSRank
*get_mds() override
80 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
81 assert(locker
!= NULL
);
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message
*m
)
89 switch (m
->get_type()) {
93 handle_lock(static_cast<MLock
*>(m
));
96 case MSG_MDS_INODEFILECAPS
:
97 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
101 case CEPH_MSG_CLIENT_CAPS
:
102 handle_client_caps(static_cast<MClientCaps
*>(m
));
105 case CEPH_MSG_CLIENT_CAPRELEASE
:
106 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
108 case CEPH_MSG_CLIENT_LEASE
:
109 handle_client_lease(static_cast<MClientLease
*>(m
));
113 derr
<< "locker unknown message " << m
->get_type() << dendl
;
114 assert(0 == "locker unknown message");
131 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
133 for (compact_map
<mds_rank_t
,unsigned>::iterator it
= lock
->get_parent()->replicas_begin();
134 it
!= lock
->get_parent()->replicas_end();
136 if (mds
->is_cluster_degraded() &&
137 mds
->mdsmap
->get_state(it
->first
) < MDSMap::STATE_REJOIN
)
139 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
140 mds
->send_message_mds(m
, it
->first
);
144 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
146 for (compact_map
<mds_rank_t
,unsigned>::iterator it
= lock
->get_parent()->replicas_begin();
147 it
!= lock
->get_parent()->replicas_end();
149 if (mds
->is_cluster_degraded() &&
150 mds
->mdsmap
->get_state(it
->first
) < MDSMap::STATE_REJOIN
)
152 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
154 mds
->send_message_mds(m
, it
->first
);
161 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
163 // rdlock ancestor snaps
165 rdlocks
.insert(&in
->snaplock
);
166 while (t
->get_projected_parent_dn()) {
167 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
168 rdlocks
.insert(&t
->snaplock
);
172 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
173 file_layout_t
**layout
)
175 //rdlock ancestor snaps
177 rdlocks
.insert(&in
->snaplock
);
178 rdlocks
.insert(&in
->policylock
);
179 bool found_layout
= false;
181 rdlocks
.insert(&t
->snaplock
);
183 rdlocks
.insert(&t
->policylock
);
184 if (t
->get_projected_inode()->has_layout()) {
185 *layout
= &t
->get_projected_inode()->layout
;
189 if (t
->get_projected_parent_dn() &&
190 t
->get_projected_parent_dn()->get_dir())
191 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
196 struct MarkEventOnDestruct
{
200 MarkEventOnDestruct(MDRequestRef
& _mdr
,
201 const char *_message
) : mdr(_mdr
),
204 ~MarkEventOnDestruct() {
206 mdr
->mark_event(message
);
210 /* If this function returns false, the mdr has been placed
211 * on the appropriate wait list */
212 bool Locker::acquire_locks(MDRequestRef
& mdr
,
213 set
<SimpleLock
*> &rdlocks
,
214 set
<SimpleLock
*> &wrlocks
,
215 set
<SimpleLock
*> &xlocks
,
216 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
217 CInode
*auth_pin_freeze
,
218 bool auth_pin_nonblock
)
220 if (mdr
->done_locking
&&
221 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
222 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
223 return true; // at least we had better be!
225 dout(10) << "acquire_locks " << *mdr
<< dendl
;
227 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
229 client_t client
= mdr
->get_client();
231 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
232 set
<MDSCacheObject
*> mustpin
; // items to authpin
235 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
236 dout(20) << " must xlock " << **p
<< " " << *(*p
)->get_parent() << dendl
;
238 mustpin
.insert((*p
)->get_parent());
240 // augment xlock with a versionlock?
241 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
242 CDentry
*dn
= (CDentry
*)(*p
)->get_parent();
246 if (xlocks
.count(&dn
->versionlock
))
247 continue; // we're xlocking the versionlock too; don't wrlock it!
249 if (mdr
->is_master()) {
250 // master. wrlock versionlock so we can pipeline dentry updates to journal.
251 wrlocks
.insert(&dn
->versionlock
);
253 // slave. exclusively lock the dentry version (i.e. block other journal updates).
254 // this makes rollback safe.
255 xlocks
.insert(&dn
->versionlock
);
256 sorted
.insert(&dn
->versionlock
);
259 if ((*p
)->get_type() > CEPH_LOCK_IVERSION
) {
260 // inode version lock?
261 CInode
*in
= (CInode
*)(*p
)->get_parent();
264 if (mdr
->is_master()) {
265 // master. wrlock versionlock so we can pipeline inode updates to journal.
266 wrlocks
.insert(&in
->versionlock
);
268 // slave. exclusively lock the inode version (i.e. block other journal updates).
269 // this makes rollback safe.
270 xlocks
.insert(&in
->versionlock
);
271 sorted
.insert(&in
->versionlock
);
277 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
278 MDSCacheObject
*object
= (*p
)->get_parent();
279 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
281 if (object
->is_auth())
282 mustpin
.insert(object
);
283 else if (!object
->is_auth() &&
284 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
285 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
286 dout(15) << " will also auth_pin " << *object
287 << " in case we need to request a scatter" << dendl
;
288 mustpin
.insert(object
);
293 if (remote_wrlocks
) {
294 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
295 MDSCacheObject
*object
= p
->first
->get_parent();
296 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
297 << *p
->first
<< " " << *object
<< dendl
;
298 sorted
.insert(p
->first
);
299 mustpin
.insert(object
);
304 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
307 MDSCacheObject
*object
= (*p
)->get_parent();
308 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
310 if (object
->is_auth())
311 mustpin
.insert(object
);
312 else if (!object
->is_auth() &&
313 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
314 dout(15) << " will also auth_pin " << *object
315 << " in case we need to request a rdlock" << dendl
;
316 mustpin
.insert(object
);
322 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
324 // can i auth pin them all now?
325 marker
.message
= "failed to authpin local pins";
326 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
329 MDSCacheObject
*object
= *p
;
331 dout(10) << " must authpin " << *object
<< dendl
;
333 if (mdr
->is_auth_pinned(object
)) {
334 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
336 if (mdr
->more()->is_remote_frozen_authpin
) {
337 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
339 // unfreeze auth pin for the wrong inode
340 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
344 if (!object
->is_auth()) {
345 if (!mdr
->locks
.empty())
346 drop_locks(mdr
.get());
347 if (object
->is_ambiguous_auth()) {
349 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
350 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
351 mdr
->drop_local_auth_pins();
354 mustpin_remote
[object
->authority().first
].insert(object
);
357 if (!object
->can_auth_pin()) {
359 drop_locks(mdr
.get());
360 mdr
->drop_local_auth_pins();
361 if (auth_pin_nonblock
) {
362 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
366 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
367 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
369 if (!mdr
->remote_auth_pins
.empty())
370 notify_freeze_waiter(object
);
376 // ok, grab local auth pins
377 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
380 MDSCacheObject
*object
= *p
;
381 if (mdr
->is_auth_pinned(object
)) {
382 dout(10) << " already auth_pinned " << *object
<< dendl
;
383 } else if (object
->is_auth()) {
384 dout(10) << " auth_pinning " << *object
<< dendl
;
385 mdr
->auth_pin(object
);
389 // request remote auth_pins
390 if (!mustpin_remote
.empty()) {
391 marker
.message
= "requesting remote authpins";
392 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
393 p
!= mdr
->remote_auth_pins
.end();
395 if (mustpin
.count(p
->first
)) {
396 assert(p
->second
== p
->first
->authority().first
);
397 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
398 if (q
!= mustpin_remote
.end())
399 q
->second
.insert(p
->first
);
402 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
403 p
!= mustpin_remote
.end();
405 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
407 // wait for active auth
408 if (mds
->is_cluster_degraded() &&
409 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
410 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
411 if (mdr
->more()->waiting_on_slave
.empty())
412 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
416 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
417 MMDSSlaveRequest::OP_AUTHPIN
);
418 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
419 q
!= p
->second
.end();
421 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
422 MDSCacheObjectInfo info
;
423 (*q
)->set_object_info(info
);
424 req
->get_authpins().push_back(info
);
425 if (*q
== auth_pin_freeze
)
426 (*q
)->set_object_info(req
->get_authpin_freeze());
429 if (auth_pin_nonblock
)
430 req
->mark_nonblock();
431 mds
->send_message_mds(req
, p
->first
);
433 // put in waiting list
434 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
435 mdr
->more()->waiting_on_slave
.insert(p
->first
);
440 // caps i'll need to issue
441 set
<CInode
*> issue_set
;
445 // make sure they match currently acquired locks.
446 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
447 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
450 bool need_wrlock
= !!wrlocks
.count(*p
);
451 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
454 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
456 SimpleLock
*have
= *existing
;
458 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
459 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
462 if (mdr
->remote_wrlocks
.count(have
)) {
463 if (!need_remote_wrlock
||
464 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
465 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
466 << " " << *have
<< " " << *have
->get_parent() << dendl
;
467 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
470 if (need_wrlock
|| need_remote_wrlock
) {
471 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
472 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
474 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
475 if (need_remote_wrlock
)
476 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
480 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
481 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
486 // hose any stray locks
487 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
488 assert(need_wrlock
|| need_remote_wrlock
);
489 SimpleLock
*lock
= *existing
;
490 if (mdr
->wrlocks
.count(lock
)) {
492 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
493 else if (need_remote_wrlock
) // acquire remote_wrlock first
494 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
495 bool need_issue
= false;
496 wrlock_finish(lock
, mdr
.get(), &need_issue
);
498 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
502 while (existing
!= mdr
->locks
.end()) {
503 SimpleLock
*stray
= *existing
;
505 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
506 bool need_issue
= false;
507 if (mdr
->xlocks
.count(stray
)) {
508 xlock_finish(stray
, mdr
.get(), &need_issue
);
509 } else if (mdr
->rdlocks
.count(stray
)) {
510 rdlock_finish(stray
, mdr
.get(), &need_issue
);
512 // may have acquired both wrlock and remore wrlock
513 if (mdr
->wrlocks
.count(stray
))
514 wrlock_finish(stray
, mdr
.get(), &need_issue
);
515 if (mdr
->remote_wrlocks
.count(stray
))
516 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
519 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
523 if (mdr
->locking
&& *p
!= mdr
->locking
) {
524 cancel_locking(mdr
.get(), &issue_set
);
526 if (xlocks
.count(*p
)) {
527 marker
.message
= "failed to xlock, waiting";
528 if (!xlock_start(*p
, mdr
))
530 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
531 } else if (need_wrlock
|| need_remote_wrlock
) {
532 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
533 marker
.message
= "waiting for remote wrlocks";
534 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
537 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
538 marker
.message
= "failed to wrlock, waiting";
539 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
540 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
541 // can't take the wrlock because the scatter lock is gathering. need to
542 // release the remote wrlock, so that the gathering process can finish.
543 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
544 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
547 // nowait if we have already gotten remote wrlock
548 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
550 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
553 assert(mdr
->is_master());
554 if ((*p
)->is_scatterlock()) {
555 ScatterLock
*slock
= static_cast<ScatterLock
*>(*p
);
556 if (slock
->is_rejoin_mix()) {
557 // If there is a recovering mds who replcated an object when it failed
558 // and scatterlock in the object was in MIX state, It's possible that
559 // the recovering mds needs to take wrlock on the scatterlock when it
560 // replays unsafe requests. So this mds should delay taking rdlock on
561 // the scatterlock until the recovering mds finishes replaying unsafe.
562 // Otherwise unsafe requests may get replayed after current request.
565 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
566 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
567 // delay the rmdir request until the recovering mds has replayed unlink
569 if (mds
->is_cluster_degraded()) {
570 if (!mdr
->is_replay()) {
571 drop_locks(mdr
.get());
572 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
573 dout(10) << " rejoin mix scatterlock " << *slock
<< " " << *(*p
)->get_parent()
574 << ", waiting for cluster recovered" << dendl
;
575 marker
.message
= "rejoin mix scatterlock, waiting for cluster recovered";
579 slock
->clear_rejoin_mix();
584 marker
.message
= "failed to rdlock, waiting";
585 if (!rdlock_start(*p
, mdr
))
587 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
591 // any extra unneeded locks?
592 while (existing
!= mdr
->locks
.end()) {
593 SimpleLock
*stray
= *existing
;
595 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
596 bool need_issue
= false;
597 if (mdr
->xlocks
.count(stray
)) {
598 xlock_finish(stray
, mdr
.get(), &need_issue
);
599 } else if (mdr
->rdlocks
.count(stray
)) {
600 rdlock_finish(stray
, mdr
.get(), &need_issue
);
602 // may have acquired both wrlock and remore wrlock
603 if (mdr
->wrlocks
.count(stray
))
604 wrlock_finish(stray
, mdr
.get(), &need_issue
);
605 if (mdr
->remote_wrlocks
.count(stray
))
606 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
609 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
612 mdr
->done_locking
= true;
613 mdr
->set_mds_stamp(ceph_clock_now());
615 marker
.message
= "acquired locks";
618 issue_caps_set(issue_set
);
622 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
625 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
627 dir
= in
->get_parent_dir();
628 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
631 dir
= dynamic_cast<CDir
*>(o
);
635 if (dir
->is_freezing_dir())
636 mdcache
->fragment_freeze_inc_num_waiters(dir
);
637 if (dir
->is_freezing_tree()) {
638 while (!dir
->is_freezing_tree_root())
639 dir
= dir
->get_parent_dir();
640 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
645 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
647 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
648 p
!= mut
->xlocks
.end();
650 MDSCacheObject
*object
= (*p
)->get_parent();
651 assert(object
->is_auth());
653 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
655 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
656 (*p
)->set_xlock_done();
660 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
662 while (!mut
->rdlocks
.empty()) {
664 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
665 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
667 pneed_issue
->insert(static_cast<CInode
*>(p
));
671 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
673 set
<mds_rank_t
> slaves
;
675 while (!mut
->xlocks
.empty()) {
676 SimpleLock
*lock
= *mut
->xlocks
.begin();
677 MDSCacheObject
*p
= lock
->get_parent();
679 assert(lock
->get_sm()->can_remote_xlock
);
680 slaves
.insert(p
->authority().first
);
682 mut
->locks
.erase(lock
);
683 mut
->xlocks
.erase(lock
);
687 xlock_finish(lock
, mut
, &ni
);
689 pneed_issue
->insert(static_cast<CInode
*>(p
));
692 while (!mut
->remote_wrlocks
.empty()) {
693 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
694 slaves
.insert(p
->second
);
695 if (mut
->wrlocks
.count(p
->first
) == 0)
696 mut
->locks
.erase(p
->first
);
697 mut
->remote_wrlocks
.erase(p
);
700 while (!mut
->wrlocks
.empty()) {
702 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
703 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
705 pneed_issue
->insert(static_cast<CInode
*>(p
));
708 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
709 if (!mds
->is_cluster_degraded() ||
710 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
711 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
712 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
713 MMDSSlaveRequest::OP_DROPLOCKS
);
714 mds
->send_message_mds(slavereq
, *p
);
719 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
721 SimpleLock
*lock
= mut
->locking
;
723 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
725 if (lock
->get_parent()->is_auth()) {
726 bool need_issue
= false;
727 if (lock
->get_state() == LOCK_PREXLOCK
) {
728 _finish_xlock(lock
, -1, &need_issue
);
729 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
730 lock
->get_num_xlocks() == 0) {
731 lock
->set_state(LOCK_XLOCKDONE
);
732 eval_gather(lock
, true, &need_issue
);
735 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
737 mut
->finish_locking(lock
);
740 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
743 set
<CInode
*> my_need_issue
;
745 pneed_issue
= &my_need_issue
;
748 cancel_locking(mut
, pneed_issue
);
749 _drop_non_rdlocks(mut
, pneed_issue
);
750 _drop_rdlocks(mut
, pneed_issue
);
752 if (pneed_issue
== &my_need_issue
)
753 issue_caps_set(*pneed_issue
);
754 mut
->done_locking
= false;
757 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
759 set
<CInode
*> my_need_issue
;
761 pneed_issue
= &my_need_issue
;
763 _drop_non_rdlocks(mut
, pneed_issue
);
765 if (pneed_issue
== &my_need_issue
)
766 issue_caps_set(*pneed_issue
);
769 void Locker::drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
771 set
<CInode
*> my_need_issue
;
773 pneed_issue
= &my_need_issue
;
775 _drop_rdlocks(mut
, pneed_issue
);
777 if (pneed_issue
== &my_need_issue
)
778 issue_caps_set(*pneed_issue
);
784 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
786 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
787 assert(!lock
->is_stable());
789 int next
= lock
->get_next_state();
792 bool caps
= lock
->get_cap_shift();
793 if (lock
->get_type() != CEPH_LOCK_DN
)
794 in
= static_cast<CInode
*>(lock
->get_parent());
796 bool need_issue
= false;
798 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
799 assert(!caps
|| in
!= NULL
);
800 if (caps
&& in
->is_head()) {
801 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
802 lock
->get_cap_shift(), lock
->get_cap_mask());
803 dout(10) << " next state is " << lock
->get_state_name(next
)
804 << " issued/allows loner " << gcap_string(loner_issued
)
805 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
806 << " xlocker " << gcap_string(xlocker_issued
)
807 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
808 << " other " << gcap_string(other_issued
)
809 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
812 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
813 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
814 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
818 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
819 bool auth
= lock
->get_parent()->is_auth();
820 if (!lock
->is_gathering() &&
821 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
822 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
823 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
824 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
825 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
826 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
827 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
828 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
829 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
830 lock
->get_state() != LOCK_MIX_SYNC2
832 dout(7) << "eval_gather finished gather on " << *lock
833 << " on " << *lock
->get_parent() << dendl
;
835 if (lock
->get_sm() == &sm_filelock
) {
837 if (in
->state_test(CInode::STATE_RECOVERING
)) {
838 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
840 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
841 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
842 mds
->mdcache
->queue_file_recover(in
);
843 mds
->mdcache
->do_file_recover();
848 if (!lock
->get_parent()->is_auth()) {
849 // replica: tell auth
850 mds_rank_t auth
= lock
->get_parent()->authority().first
;
852 if (lock
->get_parent()->is_rejoining() &&
853 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
854 dout(7) << "eval_gather finished gather, but still rejoining "
855 << *lock
->get_parent() << dendl
;
859 if (!mds
->is_cluster_degraded() ||
860 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
861 switch (lock
->get_state()) {
863 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
869 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
870 lock
->encode_locked_state(reply
->get_data());
871 mds
->send_message_mds(reply
, auth
);
872 next
= LOCK_MIX_SYNC2
;
873 (static_cast<ScatterLock
*>(lock
))->start_flush();
878 (static_cast<ScatterLock
*>(lock
))->finish_flush();
879 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
882 // do nothing, we already acked
887 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
888 mds
->send_message_mds(reply
, auth
);
889 next
= LOCK_SYNC_MIX2
;
896 lock
->encode_locked_state(data
);
897 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
898 (static_cast<ScatterLock
*>(lock
))->start_flush();
899 // we'll get an AC_LOCKFLUSHED to complete
910 // once the first (local) stage of mix->lock gather complete we can
911 // gather from replicas
912 if (lock
->get_state() == LOCK_MIX_LOCK
&&
913 lock
->get_parent()->is_replicated()) {
914 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
915 send_lock_message(lock
, LOCK_AC_LOCK
);
917 lock
->set_state(LOCK_MIX_LOCK2
);
921 if (lock
->is_dirty() && !lock
->is_flushed()) {
922 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
926 lock
->clear_flushed();
928 switch (lock
->get_state()) {
933 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
934 if (lock
->get_parent()->is_replicated()) {
936 lock
->encode_locked_state(softdata
);
937 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
939 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
944 if (next
!= LOCK_SYNC
)
953 if (lock
->get_parent()->is_replicated()) {
955 lock
->encode_locked_state(softdata
);
956 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
963 lock
->set_state(next
);
965 if (lock
->get_parent()->is_auth() &&
967 lock
->get_parent()->auth_unpin(lock
);
969 // drop loner before doing waiters
973 in
->get_wanted_loner() != in
->get_loner()) {
974 dout(10) << " trying to drop loner" << dendl
;
975 if (in
->try_drop_loner()) {
976 dout(10) << " dropped loner" << dendl
;
982 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
985 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
987 if (caps
&& in
->is_head())
990 if (lock
->get_parent()->is_auth() &&
992 try_eval(lock
, &need_issue
);
998 else if (in
->is_head())
1004 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1006 bool need_issue
= caps_imported
;
1007 list
<MDSInternalContextBase
*> finishers
;
1009 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1012 if (in
->is_auth() && in
->is_head()) {
1013 if (in
->choose_ideal_loner() >= 0) {
1014 if (in
->try_set_loner()) {
1015 dout(10) << "eval set loner to client." << in
->get_loner() << dendl
;
1019 dout(10) << "eval want loner client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1021 dout(10) << "eval doesn't want loner" << dendl
;
1025 if (mask
& CEPH_LOCK_IFILE
)
1026 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1027 if (mask
& CEPH_LOCK_IAUTH
)
1028 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1029 if (mask
& CEPH_LOCK_ILINK
)
1030 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1031 if (mask
& CEPH_LOCK_IXATTR
)
1032 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1033 if (mask
& CEPH_LOCK_INEST
)
1034 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1035 if (mask
& CEPH_LOCK_IFLOCK
)
1036 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1037 if (mask
& CEPH_LOCK_IPOLICY
)
1038 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1041 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1042 dout(10) << " trying to drop loner" << dendl
;
1043 if (in
->try_drop_loner()) {
1044 dout(10) << " dropped loner" << dendl
;
1047 if (in
->get_wanted_loner() >= 0) {
1048 if (in
->try_set_loner()) {
1049 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1053 dout(10) << "eval want loner client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1059 finish_contexts(g_ceph_context
, finishers
);
1061 if (need_issue
&& in
->is_head())
1064 dout(10) << "eval done" << dendl
;
1068 class C_Locker_Eval
: public LockerContext
{
1072 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1073 // We are used as an MDSCacheObject waiter, so should
1074 // only be invoked by someone already holding the big lock.
1075 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1076 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1078 void finish(int r
) override
{
1079 locker
->try_eval(p
, mask
);
1080 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1084 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1086 // unstable and ambiguous auth?
1087 if (p
->is_ambiguous_auth()) {
1088 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1089 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1093 if (p
->is_auth() && p
->is_frozen()) {
1094 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1095 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1099 if (mask
& CEPH_LOCK_DN
) {
1100 assert(mask
== CEPH_LOCK_DN
);
1101 bool need_issue
= false; // ignore this, no caps on dentries
1102 CDentry
*dn
= static_cast<CDentry
*>(p
);
1103 eval_any(&dn
->lock
, &need_issue
);
1105 CInode
*in
= static_cast<CInode
*>(p
);
1110 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1112 MDSCacheObject
*p
= lock
->get_parent();
1114 // unstable and ambiguous auth?
1115 if (p
->is_ambiguous_auth()) {
1116 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1117 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1121 if (!p
->is_auth()) {
1122 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1126 if (p
->is_frozen()) {
1127 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1128 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1133 * We could have a situation like:
1135 * - mds A authpins item on mds B
1136 * - mds B starts to freeze tree containing item
1137 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1138 * - mds B lock is unstable, sets scatter_wanted
1139 * - mds B lock stabilizes, calls try_eval.
1141 * We can defer while freezing without causing a deadlock. Honor
1142 * scatter_wanted flag here. This will never get deferred by the
1143 * checks above due to the auth_pin held by the master.
1145 if (lock
->is_scatterlock()) {
1146 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1147 if (slock
->get_scatter_wanted() &&
1148 slock
->get_state() != LOCK_MIX
) {
1149 scatter_mix(slock
, pneed_issue
);
1150 if (!lock
->is_stable())
1152 } else if (slock
->get_unscatter_wanted() &&
1153 slock
->get_state() != LOCK_LOCK
) {
1154 simple_lock(slock
, pneed_issue
);
1155 if (!lock
->is_stable()) {
1161 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1162 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1163 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1167 eval(lock
, pneed_issue
);
1170 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1172 bool need_issue
= false;
1173 list
<MDSInternalContextBase
*> finishers
;
1176 if (!in
->filelock
.is_stable())
1177 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1178 if (!in
->authlock
.is_stable())
1179 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1180 if (!in
->linklock
.is_stable())
1181 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1182 if (!in
->xattrlock
.is_stable())
1183 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1185 if (need_issue
&& in
->is_head()) {
1187 issue_set
->insert(in
);
1192 finish_contexts(g_ceph_context
, finishers
);
1195 void Locker::eval_scatter_gathers(CInode
*in
)
1197 bool need_issue
= false;
1198 list
<MDSInternalContextBase
*> finishers
;
1200 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1203 if (!in
->filelock
.is_stable())
1204 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1205 if (!in
->nestlock
.is_stable())
1206 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1207 if (!in
->dirfragtreelock
.is_stable())
1208 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1210 if (need_issue
&& in
->is_head())
1213 finish_contexts(g_ceph_context
, finishers
);
1216 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1218 switch (lock
->get_type()) {
1219 case CEPH_LOCK_IFILE
:
1220 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1221 case CEPH_LOCK_IDFT
:
1222 case CEPH_LOCK_INEST
:
1223 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1225 return simple_eval(lock
, need_issue
);
1230 // ------------------
1233 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1236 if (lock
->is_stable()) {
1237 if (lock
->get_parent()->is_auth()) {
1238 if (lock
->get_sm() == &sm_scatterlock
) {
1239 // not until tempsync is fully implemented
1240 //if (lock->get_parent()->is_replicated())
1241 //scatter_tempsync((ScatterLock*)lock);
1244 } else if (lock
->get_sm() == &sm_filelock
) {
1245 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1246 if (lock
->get_state() == LOCK_EXCL
&&
1247 in
->get_target_loner() >= 0 &&
1248 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1256 // request rdlock state change from auth
1257 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1258 if (!mds
->is_cluster_degraded() ||
1259 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1260 dout(10) << "requesting rdlock from auth on "
1261 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1262 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1267 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1268 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1269 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1270 mds
->mdcache
->recovery_queue
.prioritize(in
);
1277 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1279 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1281 // can read? grab ref.
1282 if (lock
->can_rdlock(client
))
1285 _rdlock_kick(lock
, false);
1287 if (lock
->can_rdlock(client
))
1292 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1293 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1298 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1300 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1302 // client may be allowed to rdlock the same item it has xlocked.
1303 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1304 if (mut
->snapid
!= CEPH_NOSNAP
)
1306 client_t client
= as_anon
? -1 : mut
->get_client();
1309 if (lock
->get_type() != CEPH_LOCK_DN
)
1310 in
= static_cast<CInode
*>(lock
->get_parent());
1313 if (!lock->get_parent()->is_auth() &&
1314 lock->fw_rdlock_to_auth()) {
1315 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1321 // can read? grab ref.
1322 if (lock
->can_rdlock(client
)) {
1324 mut
->rdlocks
.insert(lock
);
1325 mut
->locks
.insert(lock
);
1329 // hmm, wait a second.
1330 if (in
&& !in
->is_head() && in
->is_auth() &&
1331 lock
->get_state() == LOCK_SNAP_SYNC
) {
1332 // okay, we actually need to kick the head's lock to get ourselves synced up.
1333 CInode
*head
= mdcache
->get_inode(in
->ino());
1335 SimpleLock
*hlock
= head
->get_lock(lock
->get_type());
1336 if (hlock
->get_state() != LOCK_SYNC
) {
1337 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1338 if (!rdlock_start(head
->get_lock(lock
->get_type()), mut
, true)) // ** as_anon, no rdlock on EXCL **
1340 // oh, check our lock again then
1344 if (!_rdlock_kick(lock
, as_anon
))
1350 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1351 wait_on
= SimpleLock::WAIT_RD
;
1353 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1354 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1355 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1360 void Locker::nudge_log(SimpleLock
*lock
)
1362 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1363 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1364 mds
->mdlog
->flush();
1367 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1372 mut
->rdlocks
.erase(lock
);
1373 mut
->locks
.erase(lock
);
1376 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1379 if (!lock
->is_rdlocked()) {
1380 if (!lock
->is_stable())
1381 eval_gather(lock
, false, pneed_issue
);
1382 else if (lock
->get_parent()->is_auth())
1383 try_eval(lock
, pneed_issue
);
1388 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1390 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1391 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1392 if (!(*p
)->can_rdlock(-1)) {
1393 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1399 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1401 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1402 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1403 if (!rdlock_try(*p
, -1, NULL
)) {
1404 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1410 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1412 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1413 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1415 mut
->rdlocks
.insert(*p
);
1416 mut
->locks
.insert(*p
);
1420 // ------------------
1423 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1425 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1426 lock
->get_type() == CEPH_LOCK_DVERSION
)
1427 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1429 dout(7) << "wrlock_force on " << *lock
1430 << " on " << *lock
->get_parent() << dendl
;
1431 lock
->get_wrlock(true);
1432 mut
->wrlocks
.insert(lock
);
1433 mut
->locks
.insert(lock
);
1436 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1438 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1439 lock
->get_type() == CEPH_LOCK_DVERSION
)
1440 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1442 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1444 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1445 client_t client
= mut
->get_client();
1446 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1447 (in
->has_subtree_or_exporting_dirfrag() ||
1448 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1452 if (lock
->can_wrlock(client
) &&
1453 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1455 mut
->wrlocks
.insert(lock
);
1456 mut
->locks
.insert(lock
);
1460 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1461 in
->state_test(CInode::STATE_RECOVERING
)) {
1462 mds
->mdcache
->recovery_queue
.prioritize(in
);
1465 if (!lock
->is_stable())
1468 if (in
->is_auth()) {
1469 // don't do nested lock state change if we have dirty scatterdata and
1470 // may scatter_writebehind or start_scatter, because nowait==true implies
1471 // that the caller already has a log entry open!
1472 if (nowait
&& lock
->is_dirty())
1476 scatter_mix(static_cast<ScatterLock
*>(lock
));
1480 if (nowait
&& !lock
->can_wrlock(client
))
1485 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1486 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1487 if (!mds
->is_cluster_degraded() ||
1488 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1489 dout(10) << "requesting scatter from auth on "
1490 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1491 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1498 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1499 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1506 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1508 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1509 lock
->get_type() == CEPH_LOCK_DVERSION
)
1510 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1512 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1515 mut
->wrlocks
.erase(lock
);
1516 if (mut
->remote_wrlocks
.count(lock
) == 0)
1517 mut
->locks
.erase(lock
);
1520 if (!lock
->is_wrlocked()) {
1521 if (!lock
->is_stable())
1522 eval_gather(lock
, false, pneed_issue
);
1523 else if (lock
->get_parent()->is_auth())
1524 try_eval(lock
, pneed_issue
);
1531 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1533 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1535 // wait for active target
1536 if (mds
->is_cluster_degraded() &&
1537 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1538 dout(7) << " mds." << target
<< " is not active" << dendl
;
1539 if (mut
->more()->waiting_on_slave
.empty())
1540 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1544 // send lock request
1545 mut
->start_locking(lock
, target
);
1546 mut
->more()->slaves
.insert(target
);
1547 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1548 MMDSSlaveRequest::OP_WRLOCK
);
1549 r
->set_lock_type(lock
->get_type());
1550 lock
->get_parent()->set_object_info(r
->get_object_info());
1551 mds
->send_message_mds(r
, target
);
1553 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1554 mut
->more()->waiting_on_slave
.insert(target
);
1557 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1561 mut
->remote_wrlocks
.erase(lock
);
1562 if (mut
->wrlocks
.count(lock
) == 0)
1563 mut
->locks
.erase(lock
);
1565 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1566 << " " << *lock
->get_parent() << dendl
;
1567 if (!mds
->is_cluster_degraded() ||
1568 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1569 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1570 MMDSSlaveRequest::OP_UNWRLOCK
);
1571 slavereq
->set_lock_type(lock
->get_type());
1572 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1573 mds
->send_message_mds(slavereq
, target
);
1578 // ------------------
1581 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1583 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1584 lock
->get_type() == CEPH_LOCK_DVERSION
)
1585 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1587 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1588 client_t client
= mut
->get_client();
1591 if (lock
->get_parent()->is_auth()) {
1594 if (lock
->can_xlock(client
)) {
1595 lock
->set_state(LOCK_XLOCK
);
1596 lock
->get_xlock(mut
, client
);
1597 mut
->xlocks
.insert(lock
);
1598 mut
->locks
.insert(lock
);
1599 mut
->finish_locking(lock
);
1603 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1604 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1605 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1606 mds
->mdcache
->recovery_queue
.prioritize(in
);
1610 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1611 lock
->get_xlock_by_client() != client
||
1612 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1615 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1616 mut
->start_locking(lock
);
1623 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1628 assert(lock
->get_sm()->can_remote_xlock
);
1629 assert(!mut
->slave_request
);
1631 // wait for single auth
1632 if (lock
->get_parent()->is_ambiguous_auth()) {
1633 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1634 new C_MDS_RetryRequest(mdcache
, mut
));
1638 // wait for active auth
1639 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1640 if (mds
->is_cluster_degraded() &&
1641 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1642 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1643 if (mut
->more()->waiting_on_slave
.empty())
1644 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1648 // send lock request
1649 mut
->more()->slaves
.insert(auth
);
1650 mut
->start_locking(lock
, auth
);
1651 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1652 MMDSSlaveRequest::OP_XLOCK
);
1653 r
->set_lock_type(lock
->get_type());
1654 lock
->get_parent()->set_object_info(r
->get_object_info());
1655 mds
->send_message_mds(r
, auth
);
1657 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1658 mut
->more()->waiting_on_slave
.insert(auth
);
1664 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1666 assert(!lock
->is_stable());
1667 if (lock
->get_num_rdlocks() == 0 &&
1668 lock
->get_num_wrlocks() == 0 &&
1669 lock
->get_num_client_lease() == 0 &&
1670 lock
->get_state() != LOCK_XLOCKSNAP
&&
1671 lock
->get_type() != CEPH_LOCK_DN
) {
1672 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1673 client_t loner
= in
->get_target_loner();
1674 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1675 lock
->set_state(LOCK_EXCL
);
1676 lock
->get_parent()->auth_unpin(lock
);
1677 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1678 if (lock
->get_cap_shift())
1679 *pneed_issue
= true;
1680 if (lock
->get_parent()->is_auth() &&
1682 try_eval(lock
, pneed_issue
);
1686 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1687 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1690 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1692 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1693 lock
->get_type() == CEPH_LOCK_DVERSION
)
1694 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1696 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1698 client_t xlocker
= lock
->get_xlock_by_client();
1703 mut
->xlocks
.erase(lock
);
1704 mut
->locks
.erase(lock
);
1706 bool do_issue
= false;
1709 if (!lock
->get_parent()->is_auth()) {
1710 assert(lock
->get_sm()->can_remote_xlock
);
1713 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1714 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1715 if (!mds
->is_cluster_degraded() ||
1716 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1717 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1718 MMDSSlaveRequest::OP_UNXLOCK
);
1719 slavereq
->set_lock_type(lock
->get_type());
1720 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1721 mds
->send_message_mds(slavereq
, auth
);
1724 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1725 SimpleLock::WAIT_WR
|
1726 SimpleLock::WAIT_RD
, 0);
1728 if (lock
->get_num_xlocks() == 0) {
1729 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1730 lock
->set_state(LOCK_XLOCKDONE
);
1731 _finish_xlock(lock
, xlocker
, &do_issue
);
1736 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1737 if (in
->is_head()) {
1739 *pneed_issue
= true;
1746 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1748 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1751 mut
->xlocks
.erase(lock
);
1752 mut
->locks
.erase(lock
);
1754 MDSCacheObject
*p
= lock
->get_parent();
1755 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1757 if (!lock
->is_stable())
1758 lock
->get_parent()->auth_unpin(lock
);
1760 lock
->set_state(LOCK_LOCK
);
1763 void Locker::xlock_import(SimpleLock
*lock
)
1765 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1766 lock
->get_parent()->auth_pin(lock
);
1771 // file i/o -----------------------------------------
1773 version_t
Locker::issue_file_data_version(CInode
*in
)
1775 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1776 return in
->inode
.file_data_version
;
1779 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1787 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1788 bool sm
=false, bool ni
=false, client_t c
=-1,
1789 MClientCaps
*ac
= 0)
1790 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1791 client(c
), ack(ac
) {
1792 in
->get(CInode::PIN_PTRWAITER
);
1794 void finish(int r
) override
{
1795 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1796 in
->put(CInode::PIN_PTRWAITER
);
1800 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1801 client_t client
, MClientCaps
*ack
)
1803 dout(10) << "file_update_finish on " << *in
<< dendl
;
1804 in
->pop_and_dirty_projected_inode(mut
->ls
);
1809 Session
*session
= mds
->get_session(client
);
1811 // "oldest flush tid" > 0 means client uses unique TID for each flush
1812 if (ack
->get_oldest_flush_tid() > 0)
1813 session
->add_completed_flush(ack
->get_client_tid());
1814 mds
->send_message_client_counted(ack
, session
);
1816 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1821 set
<CInode
*> need_issue
;
1822 drop_locks(mut
.get(), &need_issue
);
1824 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1825 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1826 // check for snap writeback completion
1827 bool gather
= false;
1828 compact_map
<int,set
<client_t
> >::iterator p
= in
->client_snap_caps
.begin();
1829 while (p
!= in
->client_snap_caps
.end()) {
1830 SimpleLock
*lock
= in
->get_lock(p
->first
);
1832 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1833 << " lock " << *lock
<< " on " << *in
<< dendl
;
1836 p
->second
.erase(client
);
1837 if (p
->second
.empty()) {
1839 in
->client_snap_caps
.erase(p
++);
1844 if (in
->client_snap_caps
.empty())
1845 in
->item_open_file
.remove_myself();
1846 eval_cap_gather(in
, &need_issue
);
1849 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1850 Capability
*cap
= in
->get_client_cap(client
);
1851 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1852 issue_caps(in
, cap
);
1855 if (share_max
&& in
->is_auth() &&
1856 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1857 share_inode_max_size(in
);
1859 issue_caps_set(need_issue
);
1861 // auth unpin after issuing caps
1865 Capability
* Locker::issue_new_caps(CInode
*in
,
1871 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1874 // if replay, try to reconnect cap, and otherwise do nothing.
1876 mds
->mdcache
->try_reconnect_cap(in
, session
);
1881 assert(session
->info
.inst
.name
.is_client());
1882 client_t my_client
= session
->info
.inst
.name
.num();
1883 int my_want
= ceph_caps_for_mode(mode
);
1885 // register a capability
1886 Capability
*cap
= in
->get_client_cap(my_client
);
1889 cap
= in
->add_client_cap(my_client
, session
, realm
);
1890 cap
->set_wanted(my_want
);
1892 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1896 // make sure it wants sufficient caps
1897 if (my_want
& ~cap
->wanted()) {
1898 // augment wanted caps for this client
1899 cap
->set_wanted(cap
->wanted() | my_want
);
1903 if (in
->is_auth()) {
1904 // [auth] twiddle mode?
1905 eval(in
, CEPH_CAP_LOCKS
);
1907 if (_need_flush_mdlog(in
, my_want
))
1908 mds
->mdlog
->flush();
1911 // [replica] tell auth about any new caps wanted
1912 request_inode_file_caps(in
);
1915 // issue caps (pot. incl new one)
1916 //issue_caps(in); // note: _eval above may have done this already...
1918 // re-issue whatever we can
1919 //cap->issue(cap->pending());
1922 cap
->dec_suppress();
1928 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1930 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1934 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1936 // allowed caps are determined by the lock mode.
1937 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1938 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1939 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1941 client_t loner
= in
->get_loner();
1943 dout(7) << "issue_caps loner client." << loner
1944 << " allowed=" << ccap_string(loner_allowed
)
1945 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1946 << ", others allowed=" << ccap_string(all_allowed
)
1947 << " on " << *in
<< dendl
;
1949 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1950 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1951 << " on " << *in
<< dendl
;
1954 assert(in
->is_head());
1956 // count conflicts with
1960 map
<client_t
, Capability
*>::iterator it
;
1962 it
= in
->client_caps
.find(only_cap
->get_client());
1964 it
= in
->client_caps
.begin();
1965 for (; it
!= in
->client_caps
.end(); ++it
) {
1966 Capability
*cap
= it
->second
;
1967 if (cap
->is_stale())
1970 // do not issue _new_ bits when size|mtime is projected
1972 if (loner
== it
->first
)
1973 allowed
= loner_allowed
;
1975 allowed
= all_allowed
;
1977 // add in any xlocker-only caps (for locks this client is the xlocker for)
1978 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
1980 Session
*session
= mds
->get_session(it
->first
);
1981 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
1982 !(session
&& session
->connection
&&
1983 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
1984 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
1986 int pending
= cap
->pending();
1987 int wanted
= cap
->wanted();
1989 dout(20) << " client." << it
->first
1990 << " pending " << ccap_string(pending
)
1991 << " allowed " << ccap_string(allowed
)
1992 << " wanted " << ccap_string(wanted
)
1995 if (!(pending
& ~allowed
)) {
1996 // skip if suppress or new, and not revocation
1997 if (cap
->is_new() || cap
->is_suppress()) {
1998 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2003 // notify clients about deleted inode, to make sure they release caps ASAP.
2004 if (in
->inode
.nlink
== 0)
2005 wanted
|= CEPH_CAP_LINK_SHARED
;
2007 // are there caps that the client _wants_ and can have, but aren't pending?
2008 // or do we need to revoke?
2009 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2010 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2014 // include caps that clients generally like, while we're at it.
2015 int likes
= in
->get_caps_liked();
2016 int before
= pending
;
2018 if (pending
& ~allowed
)
2019 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2021 seq
= cap
->issue((wanted
|likes
) & allowed
);
2022 int after
= cap
->pending();
2024 if (cap
->is_new()) {
2025 // haven't send caps to client yet
2026 if (before
& ~after
)
2027 cap
->confirm_receipt(seq
, after
);
2029 dout(7) << " sending MClientCaps to client." << it
->first
2030 << " seq " << cap
->get_last_seq()
2031 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2034 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2035 if (op
== CEPH_CAP_OP_REVOKE
) {
2036 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2037 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2038 cap
->set_last_revoke_stamp(ceph_clock_now());
2039 cap
->reset_num_revoke_warnings();
2042 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2043 in
->find_snaprealm()->inode
->ino(),
2044 cap
->get_cap_id(), cap
->get_last_seq(),
2047 mds
->get_osd_epoch_barrier());
2048 in
->encode_cap_message(m
, cap
);
2050 mds
->send_message_client_counted(m
, it
->first
);
2058 return (nissued
== 0); // true if no re-issued, no callbacks
2061 void Locker::issue_truncate(CInode
*in
)
2063 dout(7) << "issue_truncate on " << *in
<< dendl
;
2065 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2066 it
!= in
->client_caps
.end();
2068 Capability
*cap
= it
->second
;
2069 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2071 in
->find_snaprealm()->inode
->ino(),
2072 cap
->get_cap_id(), cap
->get_last_seq(),
2073 cap
->pending(), cap
->wanted(), 0,
2075 mds
->get_osd_epoch_barrier());
2076 in
->encode_cap_message(m
, cap
);
2077 mds
->send_message_client_counted(m
, it
->first
);
2080 // should we increase max_size?
2081 if (in
->is_auth() && in
->is_file())
2082 check_inode_max_size(in
);
2086 void Locker::revoke_stale_caps(Capability
*cap
)
2088 CInode
*in
= cap
->get_inode();
2089 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2090 // if export succeeds, the cap will be removed. if export fails, we need to
2091 // revoke the cap if it's still stale.
2092 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2096 int issued
= cap
->issued();
2097 if (issued
& ~CEPH_CAP_PIN
) {
2098 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2101 if (in
->is_auth() &&
2102 in
->inode
.client_ranges
.count(cap
->get_client()))
2103 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2105 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2106 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2107 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2108 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2110 if (in
->is_auth()) {
2111 try_eval(in
, CEPH_CAP_LOCKS
);
2113 request_inode_file_caps(in
);
2118 void Locker::revoke_stale_caps(Session
*session
)
2120 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2122 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2123 Capability
*cap
= *p
;
2125 revoke_stale_caps(cap
);
2129 void Locker::resume_stale_caps(Session
*session
)
2131 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2133 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2134 Capability
*cap
= *p
;
2135 CInode
*in
= cap
->get_inode();
2136 assert(in
->is_head());
2137 if (cap
->is_stale()) {
2138 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2141 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2142 // if export succeeds, the cap will be removed. if export fails,
2143 // we need to re-issue the cap if it's not stale.
2144 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2148 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2149 issue_caps(in
, cap
);
2154 void Locker::remove_stale_leases(Session
*session
)
2156 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2157 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2159 ClientLease
*l
= *p
;
2161 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2162 dout(15) << " removing lease on " << *parent
<< dendl
;
2163 parent
->remove_client_lease(l
, this);
2168 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2171 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2172 in
->get(CInode::PIN_PTRWAITER
);
2174 void finish(int r
) override
{
2176 locker
->request_inode_file_caps(in
);
2177 in
->put(CInode::PIN_PTRWAITER
);
2181 void Locker::request_inode_file_caps(CInode
*in
)
2183 assert(!in
->is_auth());
2185 int wanted
= in
->get_caps_wanted() & ~CEPH_CAP_PIN
;
2186 if (wanted
!= in
->replica_caps_wanted
) {
2187 // wait for single auth
2188 if (in
->is_ambiguous_auth()) {
2189 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2190 new C_MDL_RequestInodeFileCaps(this, in
));
2194 mds_rank_t auth
= in
->authority().first
;
2195 if (mds
->is_cluster_degraded() &&
2196 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2197 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2201 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2202 << " was " << ccap_string(in
->replica_caps_wanted
)
2203 << " on " << *in
<< " to mds." << auth
<< dendl
;
2205 in
->replica_caps_wanted
= wanted
;
2207 if (!mds
->is_cluster_degraded() ||
2208 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2209 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2214 /* This function DOES put the passed message before returning */
2215 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2217 // nobody should be talking to us during recovery.
2218 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2221 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2222 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2225 assert(in
->is_auth());
2227 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2230 in
->mds_caps_wanted
[from
] = m
->get_caps();
2232 in
->mds_caps_wanted
.erase(from
);
2234 try_eval(in
, CEPH_CAP_LOCKS
);
2239 class C_MDL_CheckMaxSize
: public LockerContext
{
2241 uint64_t new_max_size
;
2246 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2247 uint64_t _newsize
, utime_t _mtime
) :
2248 LockerContext(l
), in(i
),
2249 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2251 in
->get(CInode::PIN_PTRWAITER
);
2253 void finish(int r
) override
{
2255 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2256 in
->put(CInode::PIN_PTRWAITER
);
2260 uint64_t Locker::calc_new_max_size(inode_t
*pi
, uint64_t size
)
2262 uint64_t new_max
= (size
+ 1) << 1;
2263 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2265 max_inc
*= pi
->get_layout_size_increment();
2266 new_max
= MIN(new_max
, size
+ max_inc
);
2268 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2271 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2272 map
<client_t
,client_writeable_range_t
> *new_ranges
,
2273 bool *max_increased
)
2275 inode_t
*latest
= in
->get_projected_inode();
2277 if(latest
->has_layout()) {
2278 ms
= calc_new_max_size(latest
, size
);
2280 // Layout-less directories like ~mds0/, have zero size
2284 // increase ranges as appropriate.
2285 // shrink to 0 if no WR|BUFFER caps issued.
2286 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2287 p
!= in
->client_caps
.end();
2289 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2290 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2292 if (latest
->client_ranges
.count(p
->first
)) {
2293 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2294 if (ms
> oldr
.range
.last
)
2295 *max_increased
= true;
2296 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2297 nr
.follows
= oldr
.follows
;
2299 *max_increased
= true;
2301 nr
.follows
= in
->first
- 1;
2307 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2308 uint64_t new_max_size
, uint64_t new_size
,
2311 assert(in
->is_auth());
2312 assert(in
->is_file());
2314 inode_t
*latest
= in
->get_projected_inode();
2315 map
<client_t
, client_writeable_range_t
> new_ranges
;
2316 uint64_t size
= latest
->size
;
2317 bool update_size
= new_size
> 0;
2318 bool update_max
= false;
2319 bool max_increased
= false;
2322 new_size
= size
= MAX(size
, new_size
);
2323 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2324 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2325 update_size
= false;
2328 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2330 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2333 if (!update_size
&& !update_max
) {
2334 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2338 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2339 << " update_size " << update_size
2340 << " on " << *in
<< dendl
;
2342 if (in
->is_frozen()) {
2343 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2344 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2348 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2351 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2353 if (in
->filelock
.is_stable()) {
2354 if (in
->get_target_loner() >= 0)
2355 file_excl(&in
->filelock
);
2357 simple_lock(&in
->filelock
);
2359 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2361 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2366 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2367 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2372 MutationRef
mut(new MutationImpl());
2373 mut
->ls
= mds
->mdlog
->get_current_segment();
2375 inode_t
*pi
= in
->project_inode();
2376 pi
->version
= in
->pre_dirty();
2379 dout(10) << "check_inode_max_size client_ranges " << pi
->client_ranges
<< " -> " << new_ranges
<< dendl
;
2380 pi
->client_ranges
= new_ranges
;
2384 dout(10) << "check_inode_max_size size " << pi
->size
<< " -> " << new_size
<< dendl
;
2385 pi
->size
= new_size
;
2386 pi
->rstat
.rbytes
= new_size
;
2387 dout(10) << "check_inode_max_size mtime " << pi
->mtime
<< " -> " << new_mtime
<< dendl
;
2388 pi
->mtime
= new_mtime
;
2391 // use EOpen if the file is still open; otherwise, use EUpdate.
2392 // this is just an optimization to push open files forward into
2393 // newer log segments.
2395 EMetaBlob
*metablob
;
2396 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2397 EOpen
*eo
= new EOpen(mds
->mdlog
);
2398 eo
->add_ino(in
->ino());
2399 metablob
= &eo
->metablob
;
2401 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2403 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2404 metablob
= &eu
->metablob
;
2407 mds
->mdlog
->start_entry(le
);
2408 if (update_size
) { // FIXME if/when we do max_size nested accounting
2409 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2411 CDentry
*parent
= in
->get_projected_parent_dn();
2412 metablob
->add_primary_dentry(parent
, in
, true);
2414 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2415 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2417 mds
->mdlog
->submit_entry(le
,
2418 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2419 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2422 // make max_size _increase_ timely
2424 mds
->mdlog
->flush();
2430 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2433 * only share if currently issued a WR cap. if client doesn't have it,
2434 * file_max doesn't matter, and the client will get it if/when they get
2437 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2438 map
<client_t
, Capability
*>::iterator it
;
2440 it
= in
->client_caps
.find(only_cap
->get_client());
2442 it
= in
->client_caps
.begin();
2443 for (; it
!= in
->client_caps
.end(); ++it
) {
2444 const client_t client
= it
->first
;
2445 Capability
*cap
= it
->second
;
2446 if (cap
->is_suppress())
2448 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2449 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2450 cap
->inc_last_seq();
2451 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2453 in
->find_snaprealm()->inode
->ino(),
2454 cap
->get_cap_id(), cap
->get_last_seq(),
2455 cap
->pending(), cap
->wanted(), 0,
2457 mds
->get_osd_epoch_barrier());
2458 in
->encode_cap_message(m
, cap
);
2459 mds
->send_message_client_counted(m
, client
);
2466 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2468 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2469 * pending mutations. */
2470 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2471 in
->filelock
.is_unstable_and_locked()) ||
2472 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2473 in
->authlock
.is_unstable_and_locked()) ||
2474 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2475 in
->linklock
.is_unstable_and_locked()) ||
2476 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2477 in
->xattrlock
.is_unstable_and_locked()))
2482 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2484 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2485 dout(10) << " wanted " << ccap_string(cap
->wanted())
2486 << " -> " << ccap_string(wanted
) << dendl
;
2487 cap
->set_wanted(wanted
);
2488 } else if (wanted
& ~cap
->wanted()) {
2489 dout(10) << " wanted " << ccap_string(cap
->wanted())
2490 << " -> " << ccap_string(wanted
)
2491 << " (added caps even though we had seq mismatch!)" << dendl
;
2492 cap
->set_wanted(wanted
| cap
->wanted());
2494 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2495 << " -> " << ccap_string(wanted
)
2496 << " (issue_seq " << issue_seq
<< " != last_issue "
2497 << cap
->get_last_issue() << ")" << dendl
;
2501 CInode
*cur
= cap
->get_inode();
2502 if (!cur
->is_auth()) {
2503 request_inode_file_caps(cur
);
2507 if (cap
->wanted() == 0) {
2508 if (cur
->item_open_file
.is_on_list() &&
2509 !cur
->is_any_caps_wanted()) {
2510 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2511 cur
->item_open_file
.remove_myself();
2514 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2515 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2516 CEPH_CAP_FILE_WR
))) {
2517 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2520 if (!cur
->item_open_file
.is_on_list()) {
2521 dout(10) << " adding to open file list " << *cur
<< dendl
;
2522 assert(cur
->last
== CEPH_NOSNAP
);
2523 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2524 EOpen
*le
= new EOpen(mds
->mdlog
);
2525 mds
->mdlog
->start_entry(le
);
2526 le
->add_clean_inode(cur
);
2527 ls
->open_files
.push_back(&cur
->item_open_file
);
2528 mds
->mdlog
->submit_entry(le
);
2535 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
)
2537 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2538 compact_map
<snapid_t
, set
<client_t
> >::iterator p
= head_in
->client_need_snapflush
.begin();
2539 while (p
!= head_in
->client_need_snapflush
.end()) {
2540 snapid_t snapid
= p
->first
;
2541 set
<client_t
>& clients
= p
->second
;
2542 ++p
; // be careful, q loop below depends on this
2544 if (clients
.count(client
)) {
2545 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2546 CInode
*sin
= mdcache
->get_inode(head_in
->ino(), snapid
);
2548 // hrm, look forward until we find the inode.
2549 // (we can only look it up by the last snapid it is valid for)
2550 dout(10) << " didn't have " << head_in
->ino() << " snapid " << snapid
<< dendl
;
2551 for (compact_map
<snapid_t
, set
<client_t
> >::iterator q
= p
; // p is already at next entry
2552 q
!= head_in
->client_need_snapflush
.end();
2554 dout(10) << " trying snapid " << q
->first
<< dendl
;
2555 sin
= mdcache
->get_inode(head_in
->ino(), q
->first
);
2557 assert(sin
->first
<= snapid
);
2560 dout(10) << " didn't have " << head_in
->ino() << " snapid " << q
->first
<< dendl
;
2562 if (!sin
&& head_in
->is_multiversion())
2566 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2567 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2573 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2576 * This policy needs to be AT LEAST as permissive as allowing a client request
2577 * to go forward, or else a client request can release something, the release
2578 * gets deferred, but the request gets processed and deadlocks because when the
2579 * caps can't get revoked.
2581 * Currently, a request wait if anything locked is freezing (can't
2582 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2583 * _MUST_ be in the lock/auth_pin set.
2585 * auth_pins==0 implies no unstable lock and not auth pinnned by
2586 * client request, otherwise continue even it's freezing.
2588 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2592 * This function DOES put the passed message before returning
2594 void Locker::handle_client_caps(MClientCaps
*m
)
2596 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
2597 client_t client
= m
->get_source().num();
2599 snapid_t follows
= m
->get_snap_follows();
2600 dout(7) << "handle_client_caps "
2601 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2602 << " on " << m
->get_ino()
2603 << " tid " << m
->get_client_tid() << " follows " << follows
2604 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2606 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2608 dout(5) << " no session, dropping " << *m
<< dendl
;
2612 if (session
->is_closed() ||
2613 session
->is_closing() ||
2614 session
->is_killing()) {
2615 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2619 if (mds
->is_reconnect() &&
2620 m
->get_dirty() && m
->get_client_tid() > 0 &&
2621 !session
->have_completed_flush(m
->get_client_tid())) {
2622 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2624 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2628 if (m
->get_client_tid() > 0 && session
&&
2629 session
->have_completed_flush(m
->get_client_tid())) {
2630 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2631 << " for client." << client
<< dendl
;
2633 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2634 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2635 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2637 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2638 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2639 mds
->get_osd_epoch_barrier());
2641 ack
->set_snap_follows(follows
);
2642 ack
->set_client_tid(m
->get_client_tid());
2643 mds
->send_message_client_counted(ack
, m
->get_connection());
2644 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2648 // fall-thru because the message may release some caps
2650 m
->set_op(CEPH_CAP_OP_UPDATE
);
2654 // "oldest flush tid" > 0 means client uses unique TID for each flush
2655 if (m
->get_oldest_flush_tid() > 0 && session
) {
2656 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2657 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2659 if (session
->get_num_trim_flushes_warnings() > 0 &&
2660 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2661 session
->reset_num_trim_flushes_warnings();
2663 if (session
->get_num_completed_flushes() >=
2664 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2665 session
->inc_num_trim_flushes_warnings();
2667 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2668 << m
->get_oldest_flush_tid() << "), "
2669 << session
->get_num_completed_flushes()
2670 << " completed flushes recorded in session";
2671 mds
->clog
->warn() << ss
.str();
2672 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2677 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2679 if (mds
->is_clientreplay()) {
2680 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2681 << ", will try again after replayed client requests" << dendl
;
2682 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2685 dout(1) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2690 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2691 // Pause RADOS operations until we see the required epoch
2692 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2695 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2696 // Record the barrier so that we will retransmit it to clients
2697 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2700 CInode
*in
= head_in
;
2702 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2704 dout(10) << " head inode " << *head_in
<< dendl
;
2706 dout(10) << " cap inode " << *in
<< dendl
;
2708 Capability
*cap
= 0;
2709 cap
= in
->get_client_cap(client
);
2710 if (!cap
&& in
!= head_in
)
2711 cap
= head_in
->get_client_cap(client
);
2713 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *in
<< dendl
;
2720 if (should_defer_client_cap_frozen(in
)) {
2721 dout(7) << "handle_client_caps freezing|frozen on " << *in
<< dendl
;
2722 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2725 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2726 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2727 << ", dropping" << dendl
;
2732 int op
= m
->get_op();
2735 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2736 if (!in
->is_auth()) {
2737 dout(7) << " not auth, ignoring flushsnap on " << *in
<< dendl
;
2741 SnapRealm
*realm
= in
->find_snaprealm();
2742 snapid_t snap
= realm
->get_snap_following(follows
);
2743 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2745 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2746 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2747 // case we get a dup response, so whatever.)
2748 MClientCaps
*ack
= 0;
2749 if (m
->get_dirty()) {
2750 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2751 ack
->set_snap_follows(follows
);
2752 ack
->set_client_tid(m
->get_client_tid());
2753 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2756 if (in
== head_in
||
2757 (head_in
->client_need_snapflush
.count(snap
) &&
2758 head_in
->client_need_snapflush
[snap
].count(client
))) {
2759 dout(7) << " flushsnap snap " << snap
2760 << " client." << client
<< " on " << *in
<< dendl
;
2762 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2764 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2766 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2769 head_in
->remove_need_snapflush(in
, snap
, client
);
2772 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2774 mds
->send_message_client_counted(ack
, m
->get_connection());
2779 if (cap
->get_cap_id() != m
->get_cap_id()) {
2780 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2782 // intermediate snap inodes
2783 while (in
!= head_in
) {
2784 assert(in
->last
!= CEPH_NOSNAP
);
2785 if (in
->is_auth() && m
->get_dirty()) {
2786 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2787 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2789 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2792 // head inode, and cap
2793 MClientCaps
*ack
= 0;
2795 int caps
= m
->get_caps();
2796 if (caps
& ~cap
->issued()) {
2797 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2798 caps
&= cap
->issued();
2801 cap
->confirm_receipt(m
->get_seq(), caps
);
2802 dout(10) << " follows " << follows
2803 << " retains " << ccap_string(m
->get_caps())
2804 << " dirty " << ccap_string(m
->get_dirty())
2805 << " on " << *in
<< dendl
;
2808 // missing/skipped snapflush?
2809 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2810 // presently only does so when it has actual dirty metadata. But, we
2811 // set up the need_snapflush stuff based on the issued caps.
2812 // We can infer that the client WONT send a FLUSHSNAP once they have
2813 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2815 if (!head_in
->client_need_snapflush
.empty()) {
2816 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2817 _do_null_snapflush(head_in
, client
);
2819 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2823 if (m
->get_dirty() && in
->is_auth()) {
2824 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2825 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2826 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2827 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2828 ack
->set_client_tid(m
->get_client_tid());
2829 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2832 // filter wanted based on what we could ever give out (given auth/replica status)
2833 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2834 int new_wanted
= m
->get_wanted() & head_in
->get_caps_allowed_ever();
2835 if (new_wanted
!= cap
->wanted()) {
2836 if (!need_flush
&& (new_wanted
& ~cap
->pending())) {
2837 // exapnding caps. make sure we aren't waiting for a log flush
2838 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2841 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2844 if (in
->is_auth() &&
2845 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2847 eval(in
, CEPH_CAP_LOCKS
);
2849 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2850 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2852 // no update, ack now.
2854 mds
->send_message_client_counted(ack
, m
->get_connection());
2856 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2857 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2858 issue_caps(in
, cap
);
2860 if (cap
->get_last_seq() == 0 &&
2861 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2862 cap
->issue_norevoke(cap
->issued());
2863 share_inode_max_size(in
, cap
);
2868 mds
->mdlog
->flush();
2876 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2878 ceph_mds_request_release item
;
2880 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2881 LockerContext(l
), client(c
), item(it
) { }
2882 void finish(int r
) override
{
2884 MDRequestRef null_ref
;
2885 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2889 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2890 const string
&dname
)
2892 inodeno_t ino
= (uint64_t)item
.ino
;
2893 uint64_t cap_id
= item
.cap_id
;
2894 int caps
= item
.caps
;
2895 int wanted
= item
.wanted
;
2897 int issue_seq
= item
.issue_seq
;
2898 int mseq
= item
.mseq
;
2900 CInode
*in
= mdcache
->get_inode(ino
);
2904 if (dname
.length()) {
2905 frag_t fg
= in
->pick_dirfrag(dname
);
2906 CDir
*dir
= in
->get_dirfrag(fg
);
2908 CDentry
*dn
= dir
->lookup(dname
);
2910 ClientLease
*l
= dn
->get_client_lease(client
);
2912 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2913 dn
->remove_client_lease(l
, this);
2915 dout(7) << "process_cap_release client." << client
2916 << " doesn't have lease on " << *dn
<< dendl
;
2919 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2920 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2925 Capability
*cap
= in
->get_client_cap(client
);
2929 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2930 << (mdr
? "" : " (DEFERRED, no mdr)")
2933 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2934 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2938 if (cap
->get_cap_id() != cap_id
) {
2939 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2943 if (should_defer_client_cap_frozen(in
)) {
2944 dout(7) << " frozen, deferring" << dendl
;
2945 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2949 if (caps
& ~cap
->issued()) {
2950 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2951 caps
&= cap
->issued();
2953 cap
->confirm_receipt(seq
, caps
);
2955 if (!in
->client_need_snapflush
.empty() &&
2956 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2957 _do_null_snapflush(in
, client
);
2960 adjust_cap_wanted(cap
, wanted
, issue_seq
);
2963 cap
->inc_suppress();
2964 eval(in
, CEPH_CAP_LOCKS
);
2966 cap
->dec_suppress();
2968 // take note; we may need to reissue on this cap later
2970 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
2973 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
2978 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
2979 LockerContext(l
), in(i
), client(c
), seq(s
) {
2980 in
->get(CInode::PIN_PTRWAITER
);
2982 void finish(int r
) override
{
2983 locker
->kick_issue_caps(in
, client
, seq
);
2984 in
->put(CInode::PIN_PTRWAITER
);
2988 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
2990 Capability
*cap
= in
->get_client_cap(client
);
2991 if (!cap
|| cap
->get_last_sent() != seq
)
2993 if (in
->is_frozen()) {
2994 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
2995 in
->add_waiter(CInode::WAIT_UNFREEZE
,
2996 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
2999 dout(10) << "kick_issue_caps released at current seq " << seq
3000 << ", reissuing" << dendl
;
3001 issue_caps(in
, cap
);
3004 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3006 client_t client
= mdr
->get_client();
3007 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3008 p
!= mdr
->cap_releases
.end();
3010 CInode
*in
= mdcache
->get_inode(p
->first
);
3013 kick_issue_caps(in
, client
, p
->second
);
3018 * m and ack might be NULL, so don't dereference them unless dirty != 0
3020 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3022 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3023 << " follows " << follows
<< " snap " << snap
3024 << " on " << *in
<< dendl
;
3026 if (snap
== CEPH_NOSNAP
) {
3027 // hmm, i guess snap was already deleted? just ack!
3028 dout(10) << " wow, the snap following " << follows
3029 << " was already deleted. nothing to record, just ack." << dendl
;
3031 mds
->send_message_client_counted(ack
, m
->get_connection());
3035 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3036 mds
->mdlog
->start_entry(le
);
3037 MutationRef mut
= new MutationImpl();
3038 mut
->ls
= mds
->mdlog
->get_current_segment();
3040 // normal metadata updates that we can apply to the head as well.
3043 bool xattrs
= false;
3044 map
<string
,bufferptr
> *px
= 0;
3045 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3046 m
->xattrbl
.length() &&
3047 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3050 old_inode_t
*oi
= 0;
3051 if (in
->is_multiversion()) {
3052 oi
= in
->pick_old_inode(snap
);
3057 dout(10) << " writing into old inode" << dendl
;
3058 pi
= in
->project_inode();
3059 pi
->version
= in
->pre_dirty();
3060 if (snap
> oi
->first
)
3061 in
->split_old_inode(snap
);
3067 px
= new map
<string
,bufferptr
>;
3068 pi
= in
->project_inode(px
);
3069 pi
->version
= in
->pre_dirty();
3072 _update_cap_fields(in
, dirty
, m
, pi
);
3076 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
3077 << " len " << m
->xattrbl
.length() << dendl
;
3078 pi
->xattr_version
= m
->head
.xattr_version
;
3079 bufferlist::iterator p
= m
->xattrbl
.begin();
3083 if (pi
->client_ranges
.count(client
)) {
3084 if (in
->last
== snap
) {
3085 dout(10) << " removing client_range entirely" << dendl
;
3086 pi
->client_ranges
.erase(client
);
3088 dout(10) << " client_range now follows " << snap
<< dendl
;
3089 pi
->client_ranges
[client
].follows
= snap
;
3094 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3095 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3097 // "oldest flush tid" > 0 means client uses unique TID for each flush
3098 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3099 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3100 ack
->get_oldest_flush_tid());
3102 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3106 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, inode_t
*pi
)
3111 /* m must be valid if there are dirty caps */
3113 uint64_t features
= m
->get_connection()->get_features();
3115 if (m
->get_ctime() > pi
->ctime
) {
3116 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3117 << " for " << *in
<< dendl
;
3118 pi
->ctime
= m
->get_ctime();
3121 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3122 m
->get_change_attr() > pi
->change_attr
) {
3123 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3124 << " for " << *in
<< dendl
;
3125 pi
->change_attr
= m
->get_change_attr();
3129 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3130 utime_t atime
= m
->get_atime();
3131 utime_t mtime
= m
->get_mtime();
3132 uint64_t size
= m
->get_size();
3133 version_t inline_version
= m
->inline_version
;
3135 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3136 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3137 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3138 << " for " << *in
<< dendl
;
3141 if (in
->inode
.is_file() && // ONLY if regular file
3143 dout(7) << " size " << pi
->size
<< " -> " << size
3144 << " for " << *in
<< dendl
;
3146 pi
->rstat
.rbytes
= size
;
3148 if (in
->inode
.is_file() &&
3149 (dirty
& CEPH_CAP_FILE_WR
) &&
3150 inline_version
> pi
->inline_data
.version
) {
3151 pi
->inline_data
.version
= inline_version
;
3152 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3153 pi
->inline_data
.get_data() = m
->inline_data
;
3155 pi
->inline_data
.free_data();
3157 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3158 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3159 << " for " << *in
<< dendl
;
3162 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3163 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3164 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3165 << " for " << *in
<< dendl
;
3166 pi
->time_warp_seq
= m
->get_time_warp_seq();
3170 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3171 if (m
->head
.uid
!= pi
->uid
) {
3172 dout(7) << " uid " << pi
->uid
3173 << " -> " << m
->head
.uid
3174 << " for " << *in
<< dendl
;
3175 pi
->uid
= m
->head
.uid
;
3177 if (m
->head
.gid
!= pi
->gid
) {
3178 dout(7) << " gid " << pi
->gid
3179 << " -> " << m
->head
.gid
3180 << " for " << *in
<< dendl
;
3181 pi
->gid
= m
->head
.gid
;
3183 if (m
->head
.mode
!= pi
->mode
) {
3184 dout(7) << " mode " << oct
<< pi
->mode
3185 << " -> " << m
->head
.mode
<< dec
3186 << " for " << *in
<< dendl
;
3187 pi
->mode
= m
->head
.mode
;
3189 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3190 dout(7) << " btime " << oct
<< pi
->btime
3191 << " -> " << m
->get_btime() << dec
3192 << " for " << *in
<< dendl
;
3193 pi
->btime
= m
->get_btime();
3199 * update inode based on cap flush|flushsnap|wanted.
3200 * adjust max_size, if needed.
3201 * if we update, return true; otherwise, false (no updated needed).
3203 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3204 int dirty
, snapid_t follows
,
3205 MClientCaps
*m
, MClientCaps
*ack
,
3208 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3209 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3210 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3211 << " on " << *in
<< dendl
;
3212 assert(in
->is_auth());
3213 client_t client
= m
->get_source().num();
3214 inode_t
*latest
= in
->get_projected_inode();
3216 // increase or zero max_size?
3217 uint64_t size
= m
->get_size();
3218 bool change_max
= false;
3219 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3220 uint64_t new_max
= old_max
;
3222 if (in
->is_file()) {
3223 bool forced_change_max
= false;
3224 dout(20) << "inode is file" << dendl
;
3225 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3226 dout(20) << "client has write caps; m->get_max_size="
3227 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3228 if (m
->get_max_size() > new_max
) {
3229 dout(10) << "client requests file_max " << m
->get_max_size()
3230 << " > max " << old_max
<< dendl
;
3232 forced_change_max
= true;
3233 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3235 new_max
= calc_new_max_size(latest
, size
);
3237 if (new_max
> old_max
)
3249 if (in
->last
== CEPH_NOSNAP
&&
3251 !in
->filelock
.can_wrlock(client
) &&
3252 !in
->filelock
.can_force_wrlock(client
)) {
3253 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3254 if (in
->filelock
.is_stable()) {
3255 bool need_issue
= false;
3257 cap
->inc_suppress();
3258 if (in
->mds_caps_wanted
.empty() &&
3259 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3260 if (in
->filelock
.get_state() != LOCK_EXCL
)
3261 file_excl(&in
->filelock
, &need_issue
);
3263 simple_lock(&in
->filelock
, &need_issue
);
3267 cap
->dec_suppress();
3269 if (!in
->filelock
.can_wrlock(client
) &&
3270 !in
->filelock
.can_force_wrlock(client
)) {
3271 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3272 forced_change_max
? new_max
: 0,
3275 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3281 if (m
->flockbl
.length()) {
3283 bufferlist::iterator bli
= m
->flockbl
.begin();
3284 ::decode(num_locks
, bli
);
3285 for ( int i
=0; i
< num_locks
; ++i
) {
3286 ceph_filelock decoded_lock
;
3287 ::decode(decoded_lock
, bli
);
3288 in
->get_fcntl_lock_state()->held_locks
.
3289 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3290 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3292 ::decode(num_locks
, bli
);
3293 for ( int i
=0; i
< num_locks
; ++i
) {
3294 ceph_filelock decoded_lock
;
3295 ::decode(decoded_lock
, bli
);
3296 in
->get_flock_lock_state()->held_locks
.
3297 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3298 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3302 if (!dirty
&& !change_max
)
3305 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3306 if (session
->check_access(in
, MAY_WRITE
,
3307 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3309 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3315 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3316 mds
->mdlog
->start_entry(le
);
3319 map
<string
,bufferptr
> *px
= 0;
3320 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3321 m
->xattrbl
.length() &&
3322 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3323 px
= new map
<string
,bufferptr
>;
3325 inode_t
*pi
= in
->project_inode(px
);
3326 pi
->version
= in
->pre_dirty();
3328 MutationRef
mut(new MutationImpl());
3329 mut
->ls
= mds
->mdlog
->get_current_segment();
3331 _update_cap_fields(in
, dirty
, m
, pi
);
3334 dout(7) << " max_size " << old_max
<< " -> " << new_max
3335 << " for " << *in
<< dendl
;
3337 pi
->client_ranges
[client
].range
.first
= 0;
3338 pi
->client_ranges
[client
].range
.last
= new_max
;
3339 pi
->client_ranges
[client
].follows
= in
->first
- 1;
3341 pi
->client_ranges
.erase(client
);
3344 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3345 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3348 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3349 wrlock_force(&in
->authlock
, mut
);
3353 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3354 pi
->xattr_version
= m
->head
.xattr_version
;
3355 bufferlist::iterator p
= m
->xattrbl
.begin();
3358 wrlock_force(&in
->xattrlock
, mut
);
3362 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3363 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3365 // "oldest flush tid" > 0 means client uses unique TID for each flush
3366 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3367 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3368 ack
->get_oldest_flush_tid());
3370 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3373 if (need_flush
&& !*need_flush
&&
3374 ((change_max
&& new_max
) || // max INCREASE
3375 _need_flush_mdlog(in
, dirty
)))
3381 /* This function DOES put the passed message before returning */
3382 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3384 client_t client
= m
->get_source().num();
3385 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3387 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3388 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3392 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3393 // Pause RADOS operations until we see the required epoch
3394 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3397 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3398 // Record the barrier so that we will retransmit it to clients
3399 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3402 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3404 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3405 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3409 session
->notify_cap_release(m
->caps
.size());
3415 class C_Locker_RetryCapRelease
: public LockerContext
{
3419 ceph_seq_t migrate_seq
;
3420 ceph_seq_t issue_seq
;
3422 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3423 ceph_seq_t mseq
, ceph_seq_t seq
) :
3424 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3425 void finish(int r
) override
{
3426 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3430 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3431 ceph_seq_t mseq
, ceph_seq_t seq
)
3433 CInode
*in
= mdcache
->get_inode(ino
);
3435 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3438 Capability
*cap
= in
->get_client_cap(client
);
3440 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3444 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3445 if (cap
->get_cap_id() != cap_id
) {
3446 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3449 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3450 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3453 if (should_defer_client_cap_frozen(in
)) {
3454 dout(7) << " freezing|frozen, deferring" << dendl
;
3455 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3456 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3459 if (seq
!= cap
->get_last_issue()) {
3460 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3461 // clean out any old revoke history
3462 cap
->clean_revoke_from(seq
);
3463 eval_cap_gather(in
);
3466 remove_client_cap(in
, client
);
3469 /* This function DOES put the passed message before returning */
3471 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3473 // clean out any pending snapflush state
3474 if (!in
->client_need_snapflush
.empty())
3475 _do_null_snapflush(in
, client
);
3477 in
->remove_client_cap(client
);
3479 if (in
->is_auth()) {
3480 // make sure we clear out the client byte range
3481 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3482 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3483 check_inode_max_size(in
);
3485 request_inode_file_caps(in
);
3488 try_eval(in
, CEPH_CAP_LOCKS
);
3493 * Return true if any currently revoking caps exceed the
3494 * mds_revoke_cap_timeout threshold.
3496 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
) const
3498 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3500 // No revoking caps at the moment
3503 utime_t now
= ceph_clock_now();
3504 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3505 if (age
<= g_conf
->mds_revoke_cap_timeout
) {
3514 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
) const
3516 if (!any_late_revoking_caps(revoking_caps
)) {
3517 // Fast path: no misbehaving clients, execute in O(1)
3521 // Slow path: execute in O(N_clients)
3522 std::map
<client_t
, xlist
<Capability
*> >::const_iterator client_rc_iter
;
3523 for (client_rc_iter
= revoking_caps_by_client
.begin();
3524 client_rc_iter
!= revoking_caps_by_client
.end(); ++client_rc_iter
) {
3525 xlist
<Capability
*> const &client_rc
= client_rc_iter
->second
;
3526 bool any_late
= any_late_revoking_caps(client_rc
);
3528 result
->push_back(client_rc_iter
->first
);
3533 // Hard-code instead of surfacing a config settings because this is
3534 // really a hack that should go away at some point when we have better
3535 // inspection tools for getting at detailed cap state (#7316)
3536 #define MAX_WARN_CAPS 100
3538 void Locker::caps_tick()
3540 utime_t now
= ceph_clock_now();
3542 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3545 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3546 Capability
*cap
= *p
;
3548 utime_t age
= now
- cap
->get_last_revoke_stamp();
3549 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3550 if (age
<= g_conf
->mds_revoke_cap_timeout
) {
3551 dout(20) << __func__
<< " age below timeout " << g_conf
->mds_revoke_cap_timeout
<< dendl
;
3555 if (i
> MAX_WARN_CAPS
) {
3556 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3557 << "revoking, ignoring subsequent caps" << dendl
;
3561 // exponential backoff of warning intervals
3562 if (age
> g_conf
->mds_revoke_cap_timeout
* (1 << cap
->get_num_revoke_warnings())) {
3563 cap
->inc_num_revoke_warnings();
3565 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3566 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3567 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3568 mds
->clog
->warn() << ss
.str();
3569 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3571 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3577 void Locker::handle_client_lease(MClientLease
*m
)
3579 dout(10) << "handle_client_lease " << *m
<< dendl
;
3581 assert(m
->get_source().is_client());
3582 client_t client
= m
->get_source().num();
3584 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3586 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3592 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3593 CDir
*dir
= in
->get_dirfrag(fg
);
3595 dn
= dir
->lookup(m
->dname
);
3597 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3601 dout(10) << " on " << *dn
<< dendl
;
3604 ClientLease
*l
= dn
->get_client_lease(client
);
3606 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3611 switch (m
->get_action()) {
3612 case CEPH_MDS_LEASE_REVOKE_ACK
:
3613 case CEPH_MDS_LEASE_RELEASE
:
3614 if (l
->seq
!= m
->get_seq()) {
3615 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3617 dout(7) << "handle_client_lease client." << client
3618 << " on " << *dn
<< dendl
;
3619 dn
->remove_client_lease(l
, this);
3624 case CEPH_MDS_LEASE_RENEW
:
3626 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3627 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3628 if (dn
->lock
.can_lease(client
)) {
3629 int pool
= 1; // fixme.. do something smart!
3630 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3631 m
->h
.seq
= ++l
->seq
;
3634 utime_t now
= ceph_clock_now();
3635 now
+= mdcache
->client_lease_durations
[pool
];
3636 mdcache
->touch_client_lease(l
, pool
, now
);
3638 mds
->send_message_client_counted(m
, m
->get_connection());
3644 ceph_abort(); // implement me
3650 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3651 bufferlist
&bl
, utime_t now
, Session
*session
)
3653 CInode
*diri
= dn
->get_dir()->get_inode();
3654 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3655 ((!diri
->filelock
.can_lease(client
) &&
3656 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3657 dn
->lock
.can_lease(client
)) {
3658 int pool
= 1; // fixme.. do something smart!
3659 // issue a dentry lease
3660 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3661 session
->touch_lease(l
);
3663 now
+= mdcache
->client_lease_durations
[pool
];
3664 mdcache
->touch_client_lease(l
, pool
, now
);
3667 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3669 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3671 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3672 << " on " << *dn
<< dendl
;
3680 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3685 void Locker::revoke_client_leases(SimpleLock
*lock
)
3688 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3689 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3690 p
!= dn
->client_lease_map
.end();
3692 ClientLease
*l
= p
->second
;
3695 assert(lock
->get_type() == CEPH_LOCK_DN
);
3697 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3698 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3700 // i should also revoke the dir ICONTENT lease, if they have it!
3701 CInode
*diri
= dn
->get_dir()->get_inode();
3702 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3705 diri
->first
, CEPH_NOSNAP
,
3709 assert(n
== lock
->get_num_client_lease());
3714 // locks ----------------------------------------------------------------
3716 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3718 switch (lock_type
) {
3721 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3722 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3727 fg
= diri
->pick_dirfrag(info
.dname
);
3728 dir
= diri
->get_dirfrag(fg
);
3730 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3733 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3739 case CEPH_LOCK_IAUTH
:
3740 case CEPH_LOCK_ILINK
:
3741 case CEPH_LOCK_IDFT
:
3742 case CEPH_LOCK_IFILE
:
3743 case CEPH_LOCK_INEST
:
3744 case CEPH_LOCK_IXATTR
:
3745 case CEPH_LOCK_ISNAP
:
3746 case CEPH_LOCK_IFLOCK
:
3747 case CEPH_LOCK_IPOLICY
:
3749 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3751 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3754 switch (lock_type
) {
3755 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3756 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3757 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3758 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3759 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3760 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3761 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3762 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3763 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3768 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3776 /* This function DOES put the passed message before returning */
3777 void Locker::handle_lock(MLock
*m
)
3779 // nobody should be talking to us during recovery.
3780 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3782 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3784 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3789 switch (lock
->get_type()) {
3791 case CEPH_LOCK_IAUTH
:
3792 case CEPH_LOCK_ILINK
:
3793 case CEPH_LOCK_ISNAP
:
3794 case CEPH_LOCK_IXATTR
:
3795 case CEPH_LOCK_IFLOCK
:
3796 case CEPH_LOCK_IPOLICY
:
3797 handle_simple_lock(lock
, m
);
3800 case CEPH_LOCK_IDFT
:
3801 case CEPH_LOCK_INEST
:
3802 //handle_scatter_lock((ScatterLock*)lock, m);
3805 case CEPH_LOCK_IFILE
:
3806 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3810 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3820 // ==========================================================================
3823 /** This function may take a reference to m if it needs one, but does
3824 * not put references. */
3825 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3827 MDSCacheObject
*parent
= lock
->get_parent();
3828 if (parent
->is_auth() &&
3829 lock
->get_state() != LOCK_SYNC
&&
3830 !parent
->is_frozen()) {
3831 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3832 << " on " << *parent
<< dendl
;
3833 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3834 if (lock
->is_stable()) {
3837 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3838 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3839 new C_MDS_RetryMessage(mds
, m
->get()));
3842 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3843 << " on " << *parent
<< dendl
;
3844 // replica should retry
3848 /* This function DOES put the passed message before returning */
3849 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3851 int from
= m
->get_asker();
3853 dout(10) << "handle_simple_lock " << *m
3854 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3856 if (mds
->is_rejoin()) {
3857 if (lock
->get_parent()->is_rejoining()) {
3858 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3859 << ", dropping " << *m
<< dendl
;
3865 switch (m
->get_action()) {
3868 assert(lock
->get_state() == LOCK_LOCK
);
3869 lock
->decode_locked_state(m
->get_data());
3870 lock
->set_state(LOCK_SYNC
);
3871 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3875 assert(lock
->get_state() == LOCK_SYNC
);
3876 lock
->set_state(LOCK_SYNC_LOCK
);
3877 if (lock
->is_leased())
3878 revoke_client_leases(lock
);
3879 eval_gather(lock
, true);
3880 if (lock
->is_unstable_and_locked())
3881 mds
->mdlog
->flush();
3886 case LOCK_AC_LOCKACK
:
3887 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3888 lock
->get_state() == LOCK_SYNC_EXCL
);
3889 assert(lock
->is_gathering(from
));
3890 lock
->remove_gather(from
);
3892 if (lock
->is_gathering()) {
3893 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3894 << ", still gathering " << lock
->get_gather_set() << dendl
;
3896 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3897 << ", last one" << dendl
;
3902 case LOCK_AC_REQRDLOCK
:
3903 handle_reqrdlock(lock
, m
);
3911 /* unused, currently.
3913 class C_Locker_SimpleEval : public Context {
3917 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3918 void finish(int r) {
3919 locker->try_simple_eval(lock);
3923 void Locker::try_simple_eval(SimpleLock *lock)
3925 // unstable and ambiguous auth?
3926 if (!lock->is_stable() &&
3927 lock->get_parent()->is_ambiguous_auth()) {
3928 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3929 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3930 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3934 if (!lock->get_parent()->is_auth()) {
3935 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3939 if (!lock->get_parent()->can_auth_pin()) {
3940 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3941 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3942 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3946 if (lock->is_stable())
3952 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3954 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3956 assert(lock
->get_parent()->is_auth());
3957 assert(lock
->is_stable());
3959 if (lock
->get_parent()->is_freezing_or_frozen()) {
3960 // dentry lock in unreadable state can block path traverse
3961 if ((lock
->get_type() != CEPH_LOCK_DN
||
3962 lock
->get_state() == LOCK_SYNC
||
3963 lock
->get_parent()->is_frozen()))
3967 if (mdcache
->is_readonly()) {
3968 if (lock
->get_state() != LOCK_SYNC
) {
3969 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3970 simple_sync(lock
, need_issue
);
3977 if (lock
->get_type() != CEPH_LOCK_DN
) {
3978 in
= static_cast<CInode
*>(lock
->get_parent());
3979 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
3983 if (lock
->get_state() != LOCK_EXCL
&&
3984 in
&& in
->get_target_loner() >= 0 &&
3985 (wanted
& CEPH_CAP_GEXCL
)) {
3986 dout(7) << "simple_eval stable, going to excl " << *lock
3987 << " on " << *lock
->get_parent() << dendl
;
3988 simple_excl(lock
, need_issue
);
3992 else if (lock
->get_state() != LOCK_SYNC
&&
3993 !lock
->is_wrlocked() &&
3994 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
3995 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
3996 dout(7) << "simple_eval stable, syncing " << *lock
3997 << " on " << *lock
->get_parent() << dendl
;
3998 simple_sync(lock
, need_issue
);
4005 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4007 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4008 assert(lock
->get_parent()->is_auth());
4009 assert(lock
->is_stable());
4012 if (lock
->get_cap_shift())
4013 in
= static_cast<CInode
*>(lock
->get_parent());
4015 int old_state
= lock
->get_state();
4017 if (old_state
!= LOCK_TSYN
) {
4019 switch (lock
->get_state()) {
4020 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4021 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4022 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4023 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4024 default: ceph_abort();
4028 if (lock
->is_wrlocked())
4031 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4032 send_lock_message(lock
, LOCK_AC_SYNC
);
4033 lock
->init_gather();
4037 if (in
&& in
->is_head()) {
4038 if (in
->issued_caps_need_gather(lock
)) {
4047 bool need_recover
= false;
4048 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4050 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4051 mds
->mdcache
->queue_file_recover(in
);
4052 need_recover
= true;
4057 if (!gather
&& lock
->is_dirty()) {
4058 lock
->get_parent()->auth_pin(lock
);
4059 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4060 mds
->mdlog
->flush();
4065 lock
->get_parent()->auth_pin(lock
);
4067 mds
->mdcache
->do_file_recover();
4072 if (lock
->get_parent()->is_replicated()) { // FIXME
4074 lock
->encode_locked_state(data
);
4075 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4077 lock
->set_state(LOCK_SYNC
);
4078 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4079 if (in
&& in
->is_head()) {
4088 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4090 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4091 assert(lock
->get_parent()->is_auth());
4092 assert(lock
->is_stable());
4095 if (lock
->get_cap_shift())
4096 in
= static_cast<CInode
*>(lock
->get_parent());
4098 switch (lock
->get_state()) {
4099 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4100 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4101 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4102 default: ceph_abort();
4106 if (lock
->is_rdlocked())
4108 if (lock
->is_wrlocked())
4111 if (lock
->get_parent()->is_replicated() &&
4112 lock
->get_state() != LOCK_LOCK_EXCL
&&
4113 lock
->get_state() != LOCK_XSYN_EXCL
) {
4114 send_lock_message(lock
, LOCK_AC_LOCK
);
4115 lock
->init_gather();
4119 if (in
&& in
->is_head()) {
4120 if (in
->issued_caps_need_gather(lock
)) {
4130 lock
->get_parent()->auth_pin(lock
);
4132 lock
->set_state(LOCK_EXCL
);
4133 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4143 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4145 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4146 assert(lock
->get_parent()->is_auth());
4147 assert(lock
->is_stable());
4148 assert(lock
->get_state() != LOCK_LOCK
);
4151 if (lock
->get_cap_shift())
4152 in
= static_cast<CInode
*>(lock
->get_parent());
4154 int old_state
= lock
->get_state();
4156 switch (lock
->get_state()) {
4157 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4159 file_excl(static_cast<ScatterLock
*>(lock
), need_issue
);
4160 if (lock
->get_state() != LOCK_EXCL
)
4163 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4164 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4165 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4167 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4168 default: ceph_abort();
4172 if (lock
->is_leased()) {
4174 revoke_client_leases(lock
);
4176 if (lock
->is_rdlocked())
4178 if (in
&& in
->is_head()) {
4179 if (in
->issued_caps_need_gather(lock
)) {
4188 bool need_recover
= false;
4189 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4191 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4192 mds
->mdcache
->queue_file_recover(in
);
4193 need_recover
= true;
4198 if (lock
->get_parent()->is_replicated() &&
4199 lock
->get_state() == LOCK_MIX_LOCK
&&
4201 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4203 // move to second stage of gather now, so we don't send the lock action later.
4204 if (lock
->get_state() == LOCK_MIX_LOCK
)
4205 lock
->set_state(LOCK_MIX_LOCK2
);
4207 if (lock
->get_parent()->is_replicated() &&
4208 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4210 send_lock_message(lock
, LOCK_AC_LOCK
);
4211 lock
->init_gather();
4215 if (!gather
&& lock
->is_dirty()) {
4216 lock
->get_parent()->auth_pin(lock
);
4217 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4218 mds
->mdlog
->flush();
4223 lock
->get_parent()->auth_pin(lock
);
4225 mds
->mdcache
->do_file_recover();
4227 lock
->set_state(LOCK_LOCK
);
4228 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4233 void Locker::simple_xlock(SimpleLock
*lock
)
4235 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4236 assert(lock
->get_parent()->is_auth());
4237 //assert(lock->is_stable());
4238 assert(lock
->get_state() != LOCK_XLOCK
);
4241 if (lock
->get_cap_shift())
4242 in
= static_cast<CInode
*>(lock
->get_parent());
4244 if (lock
->is_stable())
4245 lock
->get_parent()->auth_pin(lock
);
4247 switch (lock
->get_state()) {
4249 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4250 default: ceph_abort();
4254 if (lock
->is_rdlocked())
4256 if (lock
->is_wrlocked())
4259 if (in
&& in
->is_head()) {
4260 if (in
->issued_caps_need_gather(lock
)) {
4267 lock
->set_state(LOCK_PREXLOCK
);
4268 //assert("shouldn't be called if we are already xlockable" == 0);
4276 // ==========================================================================
4281 Some notes on scatterlocks.
4283 - The scatter/gather is driven by the inode lock. The scatter always
4284 brings in the latest metadata from the fragments.
4286 - When in a scattered/MIX state, fragments are only allowed to
4287 update/be written to if the accounted stat matches the inode's
4290 - That means, on gather, we _only_ assimilate diffs for frag metadata
4291 that match the current version, because those are the only ones
4292 written during this scatter/gather cycle. (Others didn't permit
4293 it.) We increment the version and journal this to disk.
4295 - When possible, we also simultaneously update our local frag
4296 accounted stats to match.
4298 - On scatter, the new inode info is broadcast to frags, both local
4299 and remote. If possible (auth and !frozen), the dirfrag auth
4300 should update the accounted state (if it isn't already up to date).
4301 Note that this may occur on both the local inode auth node and
4302 inode replicas, so there are two potential paths. If it is NOT
4303 possible, they need to mark_stale to prevent any possible writes.
4305 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4306 only). Both are opportunities to update the frag accounted stats,
4307 even though only the MIX case is affected by a stale dirfrag.
4309 - Because many scatter/gather cycles can potentially go by without a
4310 frag being able to update its accounted stats (due to being frozen
4311 by exports/refragments in progress), the frag may have (even very)
4312 old stat versions. That's fine. If when we do want to update it,
4313 we can update accounted_* and the version first.
4317 class C_Locker_ScatterWB
: public LockerLogContext
{
4321 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4322 LockerLogContext(l
), lock(sl
), mut(m
) {}
4323 void finish(int r
) override
{
4324 locker
->scatter_writebehind_finish(lock
, mut
);
4328 void Locker::scatter_writebehind(ScatterLock
*lock
)
4330 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4331 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4334 MutationRef
mut(new MutationImpl());
4335 mut
->ls
= mds
->mdlog
->get_current_segment();
4337 // forcefully take a wrlock
4338 lock
->get_wrlock(true);
4339 mut
->wrlocks
.insert(lock
);
4340 mut
->locks
.insert(lock
);
4342 in
->pre_cow_old_inode(); // avoid cow mayhem
4344 inode_t
*pi
= in
->project_inode();
4345 pi
->version
= in
->pre_dirty();
4347 in
->finish_scatter_gather_update(lock
->get_type());
4348 lock
->start_flush();
4350 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4351 mds
->mdlog
->start_entry(le
);
4353 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4354 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4356 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4358 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4361 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4363 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4364 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4365 in
->pop_and_dirty_projected_inode(mut
->ls
);
4367 lock
->finish_flush();
4369 // if replicas may have flushed in a mix->lock state, send another
4370 // message so they can finish_flush().
4371 if (in
->is_replicated()) {
4372 switch (lock
->get_state()) {
4374 case LOCK_MIX_LOCK2
:
4377 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4382 drop_locks(mut
.get());
4385 if (lock
->is_stable())
4386 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4388 //scatter_eval_gather(lock);
4391 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4393 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4395 assert(lock
->get_parent()->is_auth());
4396 assert(lock
->is_stable());
4398 if (lock
->get_parent()->is_freezing_or_frozen()) {
4399 dout(20) << " freezing|frozen" << dendl
;
4403 if (mdcache
->is_readonly()) {
4404 if (lock
->get_state() != LOCK_SYNC
) {
4405 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4406 simple_sync(lock
, need_issue
);
4411 if (!lock
->is_rdlocked() &&
4412 lock
->get_state() != LOCK_MIX
&&
4413 lock
->get_scatter_wanted()) {
4414 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4415 << " on " << *lock
->get_parent() << dendl
;
4416 scatter_mix(lock
, need_issue
);
4420 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4421 // in general, we want to keep INEST writable at all times.
4422 if (!lock
->is_rdlocked()) {
4423 if (lock
->get_parent()->is_replicated()) {
4424 if (lock
->get_state() != LOCK_MIX
)
4425 scatter_mix(lock
, need_issue
);
4427 if (lock
->get_state() != LOCK_LOCK
)
4428 simple_lock(lock
, need_issue
);
4434 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4435 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4436 // i _should_ be sync.
4437 if (!lock
->is_wrlocked() &&
4438 lock
->get_state() != LOCK_SYNC
) {
4439 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4440 simple_sync(lock
, need_issue
);
4447 * mark a scatterlock to indicate that the dir fnode has some dirty data
4449 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4452 if (lock
->get_updated_item()->is_on_list()) {
4453 dout(10) << "mark_updated_scatterlock " << *lock
4454 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4456 updated_scatterlocks
.push_back(lock
->get_updated_item());
4457 utime_t now
= ceph_clock_now();
4458 lock
->set_update_stamp(now
);
4459 dout(10) << "mark_updated_scatterlock " << *lock
4460 << " - added at " << now
<< dendl
;
4465 * this is called by scatter_tick and LogSegment::try_to_trim() when
4466 * trying to flush dirty scattered data (i.e. updated fnode) back to
4469 * we need to lock|scatter in order to push fnode changes into the
4472 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4474 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4476 if (p
->is_frozen() || p
->is_freezing()) {
4477 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4479 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4481 // just requeue. not ideal.. starvation prone..
4482 updated_scatterlocks
.push_back(lock
->get_updated_item());
4486 if (p
->is_ambiguous_auth()) {
4487 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4489 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4491 // just requeue. not ideal.. starvation prone..
4492 updated_scatterlocks
.push_back(lock
->get_updated_item());
4499 if (lock
->is_stable()) {
4500 // can we do it now?
4501 // (only if we're not replicated.. if we are, we really do need
4502 // to nudge the lock state!)
4504 actually, even if we're not replicated, we can't stay in MIX, because another mds
4505 could discover and replicate us at any time. if that happens while we're flushing,
4506 they end up in MIX but their inode has the old scatterstat version.
4508 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4509 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4510 scatter_writebehind(lock);
4512 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4517 if (mdcache
->is_readonly()) {
4518 if (lock
->get_state() != LOCK_SYNC
) {
4519 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4520 simple_sync(static_cast<ScatterLock
*>(lock
));
4525 // adjust lock state
4526 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4527 switch (lock
->get_type()) {
4528 case CEPH_LOCK_IFILE
:
4529 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4530 scatter_mix(static_cast<ScatterLock
*>(lock
));
4531 else if (lock
->get_state() != LOCK_LOCK
)
4532 simple_lock(static_cast<ScatterLock
*>(lock
));
4534 simple_sync(static_cast<ScatterLock
*>(lock
));
4537 case CEPH_LOCK_IDFT
:
4538 case CEPH_LOCK_INEST
:
4539 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4541 else if (lock
->get_state() != LOCK_LOCK
)
4550 if (lock
->is_stable() && count
== 2) {
4551 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4552 // this should only realy happen when called via
4553 // handle_file_lock due to AC_NUDGE, because the rest of the
4554 // time we are replicated or have dirty data and won't get
4555 // called. bailing here avoids an infinite loop.
4560 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4562 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4567 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4568 << *lock
<< " on " << *p
<< dendl
;
4569 // request unscatter?
4570 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4571 if (!mds
->is_cluster_degraded() ||
4572 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4573 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4577 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4579 // also, requeue, in case we had wrong auth or something
4580 updated_scatterlocks
.push_back(lock
->get_updated_item());
4584 void Locker::scatter_tick()
4586 dout(10) << "scatter_tick" << dendl
;
4589 utime_t now
= ceph_clock_now();
4590 int n
= updated_scatterlocks
.size();
4591 while (!updated_scatterlocks
.empty()) {
4592 ScatterLock
*lock
= updated_scatterlocks
.front();
4594 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4596 if (!lock
->is_dirty()) {
4597 updated_scatterlocks
.pop_front();
4598 dout(10) << " removing from updated_scatterlocks "
4599 << *lock
<< " " << *lock
->get_parent() << dendl
;
4602 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4604 updated_scatterlocks
.pop_front();
4605 scatter_nudge(lock
, 0);
4607 mds
->mdlog
->flush();
4611 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4613 dout(10) << "scatter_tempsync " << *lock
4614 << " on " << *lock
->get_parent() << dendl
;
4615 assert(lock
->get_parent()->is_auth());
4616 assert(lock
->is_stable());
4618 assert(0 == "not fully implemented, at least not for filelock");
4620 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4622 switch (lock
->get_state()) {
4623 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4624 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4625 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4626 default: ceph_abort();
4630 if (lock
->is_wrlocked())
4633 if (lock
->get_cap_shift() &&
4635 in
->issued_caps_need_gather(lock
)) {
4643 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4644 in
->is_replicated()) {
4645 lock
->init_gather();
4646 send_lock_message(lock
, LOCK_AC_LOCK
);
4654 lock
->set_state(LOCK_TSYN
);
4655 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4656 if (lock
->get_cap_shift()) {
4667 // ==========================================================================
4670 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4672 dout(7) << "local_wrlock_grab on " << *lock
4673 << " on " << *lock
->get_parent() << dendl
;
4675 assert(lock
->get_parent()->is_auth());
4676 assert(lock
->can_wrlock());
4677 assert(!mut
->wrlocks
.count(lock
));
4678 lock
->get_wrlock(mut
->get_client());
4679 mut
->wrlocks
.insert(lock
);
4680 mut
->locks
.insert(lock
);
4683 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4685 dout(7) << "local_wrlock_start on " << *lock
4686 << " on " << *lock
->get_parent() << dendl
;
4688 assert(lock
->get_parent()->is_auth());
4689 if (lock
->can_wrlock()) {
4690 assert(!mut
->wrlocks
.count(lock
));
4691 lock
->get_wrlock(mut
->get_client());
4692 mut
->wrlocks
.insert(lock
);
4693 mut
->locks
.insert(lock
);
4696 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4701 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4703 dout(7) << "local_wrlock_finish on " << *lock
4704 << " on " << *lock
->get_parent() << dendl
;
4706 mut
->wrlocks
.erase(lock
);
4707 mut
->locks
.erase(lock
);
4708 if (lock
->get_num_wrlocks() == 0) {
4709 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4710 SimpleLock::WAIT_WR
|
4711 SimpleLock::WAIT_RD
);
4715 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4717 dout(7) << "local_xlock_start on " << *lock
4718 << " on " << *lock
->get_parent() << dendl
;
4720 assert(lock
->get_parent()->is_auth());
4721 if (!lock
->can_xlock_local()) {
4722 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4726 lock
->get_xlock(mut
, mut
->get_client());
4727 mut
->xlocks
.insert(lock
);
4728 mut
->locks
.insert(lock
);
4732 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4734 dout(7) << "local_xlock_finish on " << *lock
4735 << " on " << *lock
->get_parent() << dendl
;
4737 mut
->xlocks
.erase(lock
);
4738 mut
->locks
.erase(lock
);
4740 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4741 SimpleLock::WAIT_WR
|
4742 SimpleLock::WAIT_RD
);
4747 // ==========================================================================
4751 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4753 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4754 int loner_wanted
, other_wanted
;
4755 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4756 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4757 << " loner_wanted=" << gcap_string(loner_wanted
)
4758 << " other_wanted=" << gcap_string(other_wanted
)
4759 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4762 assert(lock
->get_parent()->is_auth());
4763 assert(lock
->is_stable());
4765 if (lock
->get_parent()->is_freezing_or_frozen())
4768 if (mdcache
->is_readonly()) {
4769 if (lock
->get_state() != LOCK_SYNC
) {
4770 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4771 simple_sync(lock
, need_issue
);
4777 if (lock
->get_state() == LOCK_EXCL
) {
4778 dout(20) << " is excl" << dendl
;
4779 int loner_issued
, other_issued
, xlocker_issued
;
4780 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4781 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4782 << " other_issued=" << gcap_string(other_issued
)
4783 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4785 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4786 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4787 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4788 dout(20) << " should lose it" << dendl
;
4789 // we should lose it.
4800 // -> any writer means MIX; RD doesn't matter.
4801 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4802 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4803 scatter_mix(lock
, need_issue
);
4804 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4805 simple_sync(lock
, need_issue
);
4807 dout(10) << " waiting for wrlock to drain" << dendl
;
4812 else if (lock
->get_state() != LOCK_EXCL
&&
4813 !lock
->is_rdlocked() &&
4814 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4815 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4816 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4817 in
->get_target_loner() >= 0) {
4818 dout(7) << "file_eval stable, bump to loner " << *lock
4819 << " on " << *lock
->get_parent() << dendl
;
4820 file_excl(lock
, need_issue
);
4824 else if (lock
->get_state() != LOCK_MIX
&&
4825 !lock
->is_rdlocked() &&
4826 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4827 (lock
->get_scatter_wanted() ||
4828 (in
->get_wanted_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4829 dout(7) << "file_eval stable, bump to mixed " << *lock
4830 << " on " << *lock
->get_parent() << dendl
;
4831 scatter_mix(lock
, need_issue
);
4835 else if (lock
->get_state() != LOCK_SYNC
&&
4836 !lock
->is_wrlocked() && // drain wrlocks first!
4837 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4838 !(wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) &&
4839 !((lock
->get_state() == LOCK_MIX
) &&
4840 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4841 //((wanted & CEPH_CAP_RD) ||
4842 //in->is_replicated() ||
4843 //lock->get_num_client_lease() ||
4844 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4846 dout(7) << "file_eval stable, bump to sync " << *lock
4847 << " on " << *lock
->get_parent() << dendl
;
4848 simple_sync(lock
, need_issue
);
4854 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4856 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4858 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4859 assert(in
->is_auth());
4860 assert(lock
->is_stable());
4862 if (lock
->get_state() == LOCK_LOCK
) {
4863 in
->start_scatter(lock
);
4864 if (in
->is_replicated()) {
4866 bufferlist softdata
;
4867 lock
->encode_locked_state(softdata
);
4869 // bcast to replicas
4870 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4874 lock
->set_state(LOCK_MIX
);
4875 lock
->clear_scatter_wanted();
4876 if (lock
->get_cap_shift()) {
4884 switch (lock
->get_state()) {
4885 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4887 file_excl(lock
, need_issue
);
4888 if (lock
->get_state() != LOCK_EXCL
)
4891 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4892 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4893 default: ceph_abort();
4897 if (lock
->is_rdlocked())
4899 if (in
->is_replicated()) {
4900 if (lock
->get_state() != LOCK_EXCL_MIX
&& // EXCL replica is already LOCK
4901 lock
->get_state() != LOCK_XSYN_EXCL
) { // XSYN replica is already LOCK; ** FIXME here too!
4902 send_lock_message(lock
, LOCK_AC_MIX
);
4903 lock
->init_gather();
4907 if (lock
->is_leased()) {
4908 revoke_client_leases(lock
);
4911 if (lock
->get_cap_shift() &&
4913 in
->issued_caps_need_gather(lock
)) {
4920 bool need_recover
= false;
4921 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4922 mds
->mdcache
->queue_file_recover(in
);
4923 need_recover
= true;
4928 lock
->get_parent()->auth_pin(lock
);
4930 mds
->mdcache
->do_file_recover();
4932 in
->start_scatter(lock
);
4933 lock
->set_state(LOCK_MIX
);
4934 lock
->clear_scatter_wanted();
4935 if (in
->is_replicated()) {
4936 bufferlist softdata
;
4937 lock
->encode_locked_state(softdata
);
4938 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4940 if (lock
->get_cap_shift()) {
4951 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4953 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4954 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4956 assert(in
->is_auth());
4957 assert(lock
->is_stable());
4959 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4960 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4962 switch (lock
->get_state()) {
4963 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4964 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4965 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4966 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4967 default: ceph_abort();
4971 if (lock
->is_rdlocked())
4973 if (lock
->is_wrlocked())
4976 if (in
->is_replicated() &&
4977 lock
->get_state() != LOCK_LOCK_EXCL
&&
4978 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
4979 send_lock_message(lock
, LOCK_AC_LOCK
);
4980 lock
->init_gather();
4983 if (lock
->is_leased()) {
4984 revoke_client_leases(lock
);
4987 if (in
->is_head() &&
4988 in
->issued_caps_need_gather(lock
)) {
4995 bool need_recover
= false;
4996 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4997 mds
->mdcache
->queue_file_recover(in
);
4998 need_recover
= true;
5003 lock
->get_parent()->auth_pin(lock
);
5005 mds
->mdcache
->do_file_recover();
5007 lock
->set_state(LOCK_EXCL
);
5015 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5017 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5018 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5019 assert(in
->is_auth());
5020 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5022 switch (lock
->get_state()) {
5023 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5024 default: ceph_abort();
5028 if (lock
->is_wrlocked())
5031 if (in
->is_head() &&
5032 in
->issued_caps_need_gather(lock
)) {
5041 lock
->get_parent()->auth_pin(lock
);
5043 lock
->set_state(LOCK_XSYN
);
5044 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5052 void Locker::file_recover(ScatterLock
*lock
)
5054 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5055 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5057 assert(in
->is_auth());
5058 //assert(lock->is_stable());
5059 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5064 if (in->is_replicated()
5065 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5066 send_lock_message(lock, LOCK_AC_LOCK);
5067 lock->init_gather();
5071 if (in
->is_head() &&
5072 in
->issued_caps_need_gather(lock
)) {
5077 lock
->set_state(LOCK_SCAN
);
5079 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5081 mds
->mdcache
->queue_file_recover(in
);
5086 /* This function DOES put the passed message before returning */
5087 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5089 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5090 int from
= m
->get_asker();
5092 if (mds
->is_rejoin()) {
5093 if (in
->is_rejoining()) {
5094 dout(7) << "handle_file_lock still rejoining " << *in
5095 << ", dropping " << *m
<< dendl
;
5101 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5103 << " from mds." << from
<< " "
5106 bool caps
= lock
->get_cap_shift();
5108 switch (m
->get_action()) {
5111 assert(lock
->get_state() == LOCK_LOCK
||
5112 lock
->get_state() == LOCK_MIX
||
5113 lock
->get_state() == LOCK_MIX_SYNC2
);
5115 if (lock
->get_state() == LOCK_MIX
) {
5116 lock
->set_state(LOCK_MIX_SYNC
);
5117 eval_gather(lock
, true);
5118 if (lock
->is_unstable_and_locked())
5119 mds
->mdlog
->flush();
5123 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5124 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5127 lock
->decode_locked_state(m
->get_data());
5128 lock
->set_state(LOCK_SYNC
);
5133 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5138 switch (lock
->get_state()) {
5139 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5140 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5141 default: ceph_abort();
5144 eval_gather(lock
, true);
5145 if (lock
->is_unstable_and_locked())
5146 mds
->mdlog
->flush();
5150 case LOCK_AC_LOCKFLUSHED
:
5151 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5152 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5153 // wake up scatter_nudge waiters
5154 if (lock
->is_stable())
5155 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5159 assert(lock
->get_state() == LOCK_SYNC
||
5160 lock
->get_state() == LOCK_LOCK
||
5161 lock
->get_state() == LOCK_SYNC_MIX2
);
5163 if (lock
->get_state() == LOCK_SYNC
) {
5165 lock
->set_state(LOCK_SYNC_MIX
);
5166 eval_gather(lock
, true);
5167 if (lock
->is_unstable_and_locked())
5168 mds
->mdlog
->flush();
5173 lock
->decode_locked_state(m
->get_data());
5174 lock
->set_state(LOCK_MIX
);
5179 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5184 case LOCK_AC_LOCKACK
:
5185 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5186 lock
->get_state() == LOCK_MIX_LOCK
||
5187 lock
->get_state() == LOCK_MIX_LOCK2
||
5188 lock
->get_state() == LOCK_MIX_EXCL
||
5189 lock
->get_state() == LOCK_SYNC_EXCL
||
5190 lock
->get_state() == LOCK_SYNC_MIX
||
5191 lock
->get_state() == LOCK_MIX_TSYN
);
5192 assert(lock
->is_gathering(from
));
5193 lock
->remove_gather(from
);
5195 if (lock
->get_state() == LOCK_MIX_LOCK
||
5196 lock
->get_state() == LOCK_MIX_LOCK2
||
5197 lock
->get_state() == LOCK_MIX_EXCL
||
5198 lock
->get_state() == LOCK_MIX_TSYN
) {
5199 lock
->decode_locked_state(m
->get_data());
5200 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5201 // delay calling scatter_writebehind().
5202 lock
->clear_flushed();
5205 if (lock
->is_gathering()) {
5206 dout(7) << "handle_file_lock " << *in
<< " from " << from
5207 << ", still gathering " << lock
->get_gather_set() << dendl
;
5209 dout(7) << "handle_file_lock " << *in
<< " from " << from
5210 << ", last one" << dendl
;
5215 case LOCK_AC_SYNCACK
:
5216 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5217 assert(lock
->is_gathering(from
));
5218 lock
->remove_gather(from
);
5220 lock
->decode_locked_state(m
->get_data());
5222 if (lock
->is_gathering()) {
5223 dout(7) << "handle_file_lock " << *in
<< " from " << from
5224 << ", still gathering " << lock
->get_gather_set() << dendl
;
5226 dout(7) << "handle_file_lock " << *in
<< " from " << from
5227 << ", last one" << dendl
;
5232 case LOCK_AC_MIXACK
:
5233 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5234 assert(lock
->is_gathering(from
));
5235 lock
->remove_gather(from
);
5237 if (lock
->is_gathering()) {
5238 dout(7) << "handle_file_lock " << *in
<< " from " << from
5239 << ", still gathering " << lock
->get_gather_set() << dendl
;
5241 dout(7) << "handle_file_lock " << *in
<< " from " << from
5242 << ", last one" << dendl
;
5249 case LOCK_AC_REQSCATTER
:
5250 if (lock
->is_stable()) {
5251 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5252 * because the replica should be holding an auth_pin if they're
5253 * doing this (and thus, we are freezing, not frozen, and indefinite
5254 * starvation isn't an issue).
5256 dout(7) << "handle_file_lock got scatter request on " << *lock
5257 << " on " << *lock
->get_parent() << dendl
;
5258 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5261 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5262 << " on " << *lock
->get_parent() << dendl
;
5263 lock
->set_scatter_wanted();
5267 case LOCK_AC_REQUNSCATTER
:
5268 if (lock
->is_stable()) {
5269 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5270 * because the replica should be holding an auth_pin if they're
5271 * doing this (and thus, we are freezing, not frozen, and indefinite
5272 * starvation isn't an issue).
5274 dout(7) << "handle_file_lock got unscatter request on " << *lock
5275 << " on " << *lock
->get_parent() << dendl
;
5276 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5277 simple_lock(lock
); // FIXME tempsync?
5279 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5280 << " on " << *lock
->get_parent() << dendl
;
5281 lock
->set_unscatter_wanted();
5285 case LOCK_AC_REQRDLOCK
:
5286 handle_reqrdlock(lock
, m
);
5290 if (!lock
->get_parent()->is_auth()) {
5291 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5292 << " on " << *lock
->get_parent() << dendl
;
5293 } else if (!lock
->get_parent()->is_replicated()) {
5294 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5295 << " on " << *lock
->get_parent() << dendl
;
5297 dout(7) << "handle_file_lock trying nudge on " << *lock
5298 << " on " << *lock
->get_parent() << dendl
;
5299 scatter_nudge(lock
, 0, true);
5300 mds
->mdlog
->flush();