1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
23 #include "MDSContext.h"
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
41 #include "messages/MMDSSlaveRequest.h"
45 #include "common/config.h"
48 #define dout_subsys ceph_subsys_mds
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
53 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
57 class LockerContext
: public MDSInternalContextBase
{
60 MDSRank
*get_mds() override
66 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
67 assert(locker
!= NULL
);
71 class LockerLogContext
: public MDSLogContextBase
{
74 MDSRank
*get_mds() override
80 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
81 assert(locker
!= NULL
);
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message
*m
)
89 switch (m
->get_type()) {
93 handle_lock(static_cast<MLock
*>(m
));
96 case MSG_MDS_INODEFILECAPS
:
97 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
101 case CEPH_MSG_CLIENT_CAPS
:
102 handle_client_caps(static_cast<MClientCaps
*>(m
));
105 case CEPH_MSG_CLIENT_CAPRELEASE
:
106 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
108 case CEPH_MSG_CLIENT_LEASE
:
109 handle_client_lease(static_cast<MClientLease
*>(m
));
113 derr
<< "locker unknown message " << m
->get_type() << dendl
;
114 assert(0 == "locker unknown message");
131 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
133 for (compact_map
<mds_rank_t
,unsigned>::iterator it
= lock
->get_parent()->replicas_begin();
134 it
!= lock
->get_parent()->replicas_end();
136 if (mds
->is_cluster_degraded() &&
137 mds
->mdsmap
->get_state(it
->first
) < MDSMap::STATE_REJOIN
)
139 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
140 mds
->send_message_mds(m
, it
->first
);
144 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
146 for (compact_map
<mds_rank_t
,unsigned>::iterator it
= lock
->get_parent()->replicas_begin();
147 it
!= lock
->get_parent()->replicas_end();
149 if (mds
->is_cluster_degraded() &&
150 mds
->mdsmap
->get_state(it
->first
) < MDSMap::STATE_REJOIN
)
152 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
154 mds
->send_message_mds(m
, it
->first
);
161 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
163 // rdlock ancestor snaps
165 rdlocks
.insert(&in
->snaplock
);
166 while (t
->get_projected_parent_dn()) {
167 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
168 rdlocks
.insert(&t
->snaplock
);
172 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
173 file_layout_t
**layout
)
175 //rdlock ancestor snaps
177 rdlocks
.insert(&in
->snaplock
);
178 rdlocks
.insert(&in
->policylock
);
179 bool found_layout
= false;
181 rdlocks
.insert(&t
->snaplock
);
183 rdlocks
.insert(&t
->policylock
);
184 if (t
->get_projected_inode()->has_layout()) {
185 *layout
= &t
->get_projected_inode()->layout
;
189 if (t
->get_projected_parent_dn() &&
190 t
->get_projected_parent_dn()->get_dir())
191 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
196 struct MarkEventOnDestruct
{
200 MarkEventOnDestruct(MDRequestRef
& _mdr
,
201 const char *_message
) : mdr(_mdr
),
204 ~MarkEventOnDestruct() {
206 mdr
->mark_event(message
);
210 /* If this function returns false, the mdr has been placed
211 * on the appropriate wait list */
212 bool Locker::acquire_locks(MDRequestRef
& mdr
,
213 set
<SimpleLock
*> &rdlocks
,
214 set
<SimpleLock
*> &wrlocks
,
215 set
<SimpleLock
*> &xlocks
,
216 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
217 CInode
*auth_pin_freeze
,
218 bool auth_pin_nonblock
)
220 if (mdr
->done_locking
&&
221 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
222 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
223 return true; // at least we had better be!
225 dout(10) << "acquire_locks " << *mdr
<< dendl
;
227 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
229 client_t client
= mdr
->get_client();
231 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
232 set
<MDSCacheObject
*> mustpin
; // items to authpin
235 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
236 dout(20) << " must xlock " << **p
<< " " << *(*p
)->get_parent() << dendl
;
238 mustpin
.insert((*p
)->get_parent());
240 // augment xlock with a versionlock?
241 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
242 CDentry
*dn
= (CDentry
*)(*p
)->get_parent();
246 if (xlocks
.count(&dn
->versionlock
))
247 continue; // we're xlocking the versionlock too; don't wrlock it!
249 if (mdr
->is_master()) {
250 // master. wrlock versionlock so we can pipeline dentry updates to journal.
251 wrlocks
.insert(&dn
->versionlock
);
253 // slave. exclusively lock the dentry version (i.e. block other journal updates).
254 // this makes rollback safe.
255 xlocks
.insert(&dn
->versionlock
);
256 sorted
.insert(&dn
->versionlock
);
259 if ((*p
)->get_type() > CEPH_LOCK_IVERSION
) {
260 // inode version lock?
261 CInode
*in
= (CInode
*)(*p
)->get_parent();
264 if (mdr
->is_master()) {
265 // master. wrlock versionlock so we can pipeline inode updates to journal.
266 wrlocks
.insert(&in
->versionlock
);
268 // slave. exclusively lock the inode version (i.e. block other journal updates).
269 // this makes rollback safe.
270 xlocks
.insert(&in
->versionlock
);
271 sorted
.insert(&in
->versionlock
);
277 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
278 MDSCacheObject
*object
= (*p
)->get_parent();
279 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
281 if (object
->is_auth())
282 mustpin
.insert(object
);
283 else if (!object
->is_auth() &&
284 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
285 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
286 dout(15) << " will also auth_pin " << *object
287 << " in case we need to request a scatter" << dendl
;
288 mustpin
.insert(object
);
293 if (remote_wrlocks
) {
294 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
295 MDSCacheObject
*object
= p
->first
->get_parent();
296 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
297 << *p
->first
<< " " << *object
<< dendl
;
298 sorted
.insert(p
->first
);
299 mustpin
.insert(object
);
304 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
307 MDSCacheObject
*object
= (*p
)->get_parent();
308 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
310 if (object
->is_auth())
311 mustpin
.insert(object
);
312 else if (!object
->is_auth() &&
313 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
314 dout(15) << " will also auth_pin " << *object
315 << " in case we need to request a rdlock" << dendl
;
316 mustpin
.insert(object
);
322 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
324 // can i auth pin them all now?
325 marker
.message
= "failed to authpin local pins";
326 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
329 MDSCacheObject
*object
= *p
;
331 dout(10) << " must authpin " << *object
<< dendl
;
333 if (mdr
->is_auth_pinned(object
)) {
334 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
336 if (mdr
->more()->is_remote_frozen_authpin
) {
337 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
339 // unfreeze auth pin for the wrong inode
340 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
344 if (!object
->is_auth()) {
345 if (!mdr
->locks
.empty())
346 drop_locks(mdr
.get());
347 if (object
->is_ambiguous_auth()) {
349 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
350 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
351 mdr
->drop_local_auth_pins();
354 mustpin_remote
[object
->authority().first
].insert(object
);
357 if (!object
->can_auth_pin()) {
359 drop_locks(mdr
.get());
360 mdr
->drop_local_auth_pins();
361 if (auth_pin_nonblock
) {
362 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
366 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
367 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
369 if (!mdr
->remote_auth_pins
.empty())
370 notify_freeze_waiter(object
);
376 // ok, grab local auth pins
377 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
380 MDSCacheObject
*object
= *p
;
381 if (mdr
->is_auth_pinned(object
)) {
382 dout(10) << " already auth_pinned " << *object
<< dendl
;
383 } else if (object
->is_auth()) {
384 dout(10) << " auth_pinning " << *object
<< dendl
;
385 mdr
->auth_pin(object
);
389 // request remote auth_pins
390 if (!mustpin_remote
.empty()) {
391 marker
.message
= "requesting remote authpins";
392 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
393 p
!= mdr
->remote_auth_pins
.end();
395 if (mustpin
.count(p
->first
)) {
396 assert(p
->second
== p
->first
->authority().first
);
397 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
398 if (q
!= mustpin_remote
.end())
399 q
->second
.insert(p
->first
);
402 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
403 p
!= mustpin_remote
.end();
405 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
407 // wait for active auth
408 if (mds
->is_cluster_degraded() &&
409 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
410 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
411 if (mdr
->more()->waiting_on_slave
.empty())
412 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
416 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
417 MMDSSlaveRequest::OP_AUTHPIN
);
418 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
419 q
!= p
->second
.end();
421 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
422 MDSCacheObjectInfo info
;
423 (*q
)->set_object_info(info
);
424 req
->get_authpins().push_back(info
);
425 if (*q
== auth_pin_freeze
)
426 (*q
)->set_object_info(req
->get_authpin_freeze());
429 if (auth_pin_nonblock
)
430 req
->mark_nonblock();
431 mds
->send_message_mds(req
, p
->first
);
433 // put in waiting list
434 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
435 mdr
->more()->waiting_on_slave
.insert(p
->first
);
440 // caps i'll need to issue
441 set
<CInode
*> issue_set
;
445 // make sure they match currently acquired locks.
446 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
447 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
450 bool need_wrlock
= !!wrlocks
.count(*p
);
451 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
454 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
456 SimpleLock
*have
= *existing
;
458 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
459 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
462 if (mdr
->remote_wrlocks
.count(have
)) {
463 if (!need_remote_wrlock
||
464 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
465 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
466 << " " << *have
<< " " << *have
->get_parent() << dendl
;
467 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
470 if (need_wrlock
|| need_remote_wrlock
) {
471 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
472 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
474 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
475 if (need_remote_wrlock
)
476 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
480 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
481 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
486 // hose any stray locks
487 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
488 assert(need_wrlock
|| need_remote_wrlock
);
489 SimpleLock
*lock
= *existing
;
490 if (mdr
->wrlocks
.count(lock
)) {
492 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
493 else if (need_remote_wrlock
) // acquire remote_wrlock first
494 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
495 bool need_issue
= false;
496 wrlock_finish(lock
, mdr
.get(), &need_issue
);
498 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
502 while (existing
!= mdr
->locks
.end()) {
503 SimpleLock
*stray
= *existing
;
505 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
506 bool need_issue
= false;
507 if (mdr
->xlocks
.count(stray
)) {
508 xlock_finish(stray
, mdr
.get(), &need_issue
);
509 } else if (mdr
->rdlocks
.count(stray
)) {
510 rdlock_finish(stray
, mdr
.get(), &need_issue
);
512 // may have acquired both wrlock and remore wrlock
513 if (mdr
->wrlocks
.count(stray
))
514 wrlock_finish(stray
, mdr
.get(), &need_issue
);
515 if (mdr
->remote_wrlocks
.count(stray
))
516 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
519 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
523 if (mdr
->locking
&& *p
!= mdr
->locking
) {
524 cancel_locking(mdr
.get(), &issue_set
);
526 if (xlocks
.count(*p
)) {
527 marker
.message
= "failed to xlock, waiting";
528 if (!xlock_start(*p
, mdr
))
530 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
531 } else if (need_wrlock
|| need_remote_wrlock
) {
532 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
533 marker
.message
= "waiting for remote wrlocks";
534 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
537 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
538 marker
.message
= "failed to wrlock, waiting";
539 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
540 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
541 // can't take the wrlock because the scatter lock is gathering. need to
542 // release the remote wrlock, so that the gathering process can finish.
543 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
544 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
547 // nowait if we have already gotten remote wrlock
548 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
550 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
553 assert(mdr
->is_master());
554 if ((*p
)->is_scatterlock()) {
555 ScatterLock
*slock
= static_cast<ScatterLock
*>(*p
);
556 if (slock
->is_rejoin_mix()) {
557 // If there is a recovering mds who replcated an object when it failed
558 // and scatterlock in the object was in MIX state, It's possible that
559 // the recovering mds needs to take wrlock on the scatterlock when it
560 // replays unsafe requests. So this mds should delay taking rdlock on
561 // the scatterlock until the recovering mds finishes replaying unsafe.
562 // Otherwise unsafe requests may get replayed after current request.
565 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
566 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
567 // delay the rmdir request until the recovering mds has replayed unlink
569 if (mds
->is_cluster_degraded()) {
570 if (!mdr
->is_replay()) {
571 drop_locks(mdr
.get());
572 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
573 dout(10) << " rejoin mix scatterlock " << *slock
<< " " << *(*p
)->get_parent()
574 << ", waiting for cluster recovered" << dendl
;
575 marker
.message
= "rejoin mix scatterlock, waiting for cluster recovered";
579 slock
->clear_rejoin_mix();
584 marker
.message
= "failed to rdlock, waiting";
585 if (!rdlock_start(*p
, mdr
))
587 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
591 // any extra unneeded locks?
592 while (existing
!= mdr
->locks
.end()) {
593 SimpleLock
*stray
= *existing
;
595 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
596 bool need_issue
= false;
597 if (mdr
->xlocks
.count(stray
)) {
598 xlock_finish(stray
, mdr
.get(), &need_issue
);
599 } else if (mdr
->rdlocks
.count(stray
)) {
600 rdlock_finish(stray
, mdr
.get(), &need_issue
);
602 // may have acquired both wrlock and remore wrlock
603 if (mdr
->wrlocks
.count(stray
))
604 wrlock_finish(stray
, mdr
.get(), &need_issue
);
605 if (mdr
->remote_wrlocks
.count(stray
))
606 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
609 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
612 mdr
->done_locking
= true;
613 mdr
->set_mds_stamp(ceph_clock_now());
615 marker
.message
= "acquired locks";
618 issue_caps_set(issue_set
);
622 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
625 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
627 dir
= in
->get_parent_dir();
628 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
631 dir
= dynamic_cast<CDir
*>(o
);
635 if (dir
->is_freezing_dir())
636 mdcache
->fragment_freeze_inc_num_waiters(dir
);
637 if (dir
->is_freezing_tree()) {
638 while (!dir
->is_freezing_tree_root())
639 dir
= dir
->get_parent_dir();
640 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
645 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
647 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
648 p
!= mut
->xlocks
.end();
650 MDSCacheObject
*object
= (*p
)->get_parent();
651 assert(object
->is_auth());
653 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
655 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
656 (*p
)->set_xlock_done();
660 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
662 while (!mut
->rdlocks
.empty()) {
664 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
665 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
667 pneed_issue
->insert(static_cast<CInode
*>(p
));
671 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
673 set
<mds_rank_t
> slaves
;
675 while (!mut
->xlocks
.empty()) {
676 SimpleLock
*lock
= *mut
->xlocks
.begin();
677 MDSCacheObject
*p
= lock
->get_parent();
679 assert(lock
->get_sm()->can_remote_xlock
);
680 slaves
.insert(p
->authority().first
);
682 mut
->locks
.erase(lock
);
683 mut
->xlocks
.erase(lock
);
687 xlock_finish(lock
, mut
, &ni
);
689 pneed_issue
->insert(static_cast<CInode
*>(p
));
692 while (!mut
->remote_wrlocks
.empty()) {
693 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
694 slaves
.insert(p
->second
);
695 if (mut
->wrlocks
.count(p
->first
) == 0)
696 mut
->locks
.erase(p
->first
);
697 mut
->remote_wrlocks
.erase(p
);
700 while (!mut
->wrlocks
.empty()) {
702 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
703 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
705 pneed_issue
->insert(static_cast<CInode
*>(p
));
708 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
709 if (!mds
->is_cluster_degraded() ||
710 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
711 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
712 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
713 MMDSSlaveRequest::OP_DROPLOCKS
);
714 mds
->send_message_mds(slavereq
, *p
);
719 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
721 SimpleLock
*lock
= mut
->locking
;
723 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
725 if (lock
->get_parent()->is_auth()) {
726 bool need_issue
= false;
727 if (lock
->get_state() == LOCK_PREXLOCK
) {
728 _finish_xlock(lock
, -1, &need_issue
);
729 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
730 lock
->get_num_xlocks() == 0) {
731 lock
->set_state(LOCK_XLOCKDONE
);
732 eval_gather(lock
, true, &need_issue
);
735 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
737 mut
->finish_locking(lock
);
740 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
743 set
<CInode
*> my_need_issue
;
745 pneed_issue
= &my_need_issue
;
748 cancel_locking(mut
, pneed_issue
);
749 _drop_non_rdlocks(mut
, pneed_issue
);
750 _drop_rdlocks(mut
, pneed_issue
);
752 if (pneed_issue
== &my_need_issue
)
753 issue_caps_set(*pneed_issue
);
754 mut
->done_locking
= false;
757 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
759 set
<CInode
*> my_need_issue
;
761 pneed_issue
= &my_need_issue
;
763 _drop_non_rdlocks(mut
, pneed_issue
);
765 if (pneed_issue
== &my_need_issue
)
766 issue_caps_set(*pneed_issue
);
769 void Locker::drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
771 set
<CInode
*> my_need_issue
;
773 pneed_issue
= &my_need_issue
;
775 _drop_rdlocks(mut
, pneed_issue
);
777 if (pneed_issue
== &my_need_issue
)
778 issue_caps_set(*pneed_issue
);
784 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
786 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
787 assert(!lock
->is_stable());
789 int next
= lock
->get_next_state();
792 bool caps
= lock
->get_cap_shift();
793 if (lock
->get_type() != CEPH_LOCK_DN
)
794 in
= static_cast<CInode
*>(lock
->get_parent());
796 bool need_issue
= false;
798 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
799 assert(!caps
|| in
!= NULL
);
800 if (caps
&& in
->is_head()) {
801 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
802 lock
->get_cap_shift(), lock
->get_cap_mask());
803 dout(10) << " next state is " << lock
->get_state_name(next
)
804 << " issued/allows loner " << gcap_string(loner_issued
)
805 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
806 << " xlocker " << gcap_string(xlocker_issued
)
807 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
808 << " other " << gcap_string(other_issued
)
809 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
812 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
813 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
814 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
818 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
819 bool auth
= lock
->get_parent()->is_auth();
820 if (!lock
->is_gathering() &&
821 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
822 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
823 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
824 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
825 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
826 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
827 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
828 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
829 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
830 lock
->get_state() != LOCK_MIX_SYNC2
832 dout(7) << "eval_gather finished gather on " << *lock
833 << " on " << *lock
->get_parent() << dendl
;
835 if (lock
->get_sm() == &sm_filelock
) {
837 if (in
->state_test(CInode::STATE_RECOVERING
)) {
838 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
840 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
841 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
842 mds
->mdcache
->queue_file_recover(in
);
843 mds
->mdcache
->do_file_recover();
848 if (!lock
->get_parent()->is_auth()) {
849 // replica: tell auth
850 mds_rank_t auth
= lock
->get_parent()->authority().first
;
852 if (lock
->get_parent()->is_rejoining() &&
853 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
854 dout(7) << "eval_gather finished gather, but still rejoining "
855 << *lock
->get_parent() << dendl
;
859 if (!mds
->is_cluster_degraded() ||
860 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
861 switch (lock
->get_state()) {
863 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
869 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
870 lock
->encode_locked_state(reply
->get_data());
871 mds
->send_message_mds(reply
, auth
);
872 next
= LOCK_MIX_SYNC2
;
873 (static_cast<ScatterLock
*>(lock
))->start_flush();
878 (static_cast<ScatterLock
*>(lock
))->finish_flush();
879 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
882 // do nothing, we already acked
887 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
888 mds
->send_message_mds(reply
, auth
);
889 next
= LOCK_SYNC_MIX2
;
896 lock
->encode_locked_state(data
);
897 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
898 (static_cast<ScatterLock
*>(lock
))->start_flush();
899 // we'll get an AC_LOCKFLUSHED to complete
910 // once the first (local) stage of mix->lock gather complete we can
911 // gather from replicas
912 if (lock
->get_state() == LOCK_MIX_LOCK
&&
913 lock
->get_parent()->is_replicated()) {
914 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
915 send_lock_message(lock
, LOCK_AC_LOCK
);
917 lock
->set_state(LOCK_MIX_LOCK2
);
921 if (lock
->is_dirty() && !lock
->is_flushed()) {
922 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
926 lock
->clear_flushed();
928 switch (lock
->get_state()) {
933 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
934 if (lock
->get_parent()->is_replicated()) {
936 lock
->encode_locked_state(softdata
);
937 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
939 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
944 if (next
!= LOCK_SYNC
)
953 if (lock
->get_parent()->is_replicated()) {
955 lock
->encode_locked_state(softdata
);
956 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
963 lock
->set_state(next
);
965 if (lock
->get_parent()->is_auth() &&
967 lock
->get_parent()->auth_unpin(lock
);
969 // drop loner before doing waiters
973 in
->get_wanted_loner() != in
->get_loner()) {
974 dout(10) << " trying to drop loner" << dendl
;
975 if (in
->try_drop_loner()) {
976 dout(10) << " dropped loner" << dendl
;
982 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
985 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
987 if (caps
&& in
->is_head())
990 if (lock
->get_parent()->is_auth() &&
992 try_eval(lock
, &need_issue
);
998 else if (in
->is_head())
1004 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1006 bool need_issue
= caps_imported
;
1007 list
<MDSInternalContextBase
*> finishers
;
1009 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1012 if (in
->is_auth() && in
->is_head()) {
1013 if (in
->choose_ideal_loner() >= 0) {
1014 if (in
->try_set_loner()) {
1015 dout(10) << "eval set loner to client." << in
->get_loner() << dendl
;
1019 dout(10) << "eval want loner client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1021 dout(10) << "eval doesn't want loner" << dendl
;
1025 if (mask
& CEPH_LOCK_IFILE
)
1026 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1027 if (mask
& CEPH_LOCK_IAUTH
)
1028 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1029 if (mask
& CEPH_LOCK_ILINK
)
1030 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1031 if (mask
& CEPH_LOCK_IXATTR
)
1032 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1033 if (mask
& CEPH_LOCK_INEST
)
1034 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1035 if (mask
& CEPH_LOCK_IFLOCK
)
1036 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1037 if (mask
& CEPH_LOCK_IPOLICY
)
1038 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1041 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1042 dout(10) << " trying to drop loner" << dendl
;
1043 if (in
->try_drop_loner()) {
1044 dout(10) << " dropped loner" << dendl
;
1047 if (in
->get_wanted_loner() >= 0) {
1048 if (in
->try_set_loner()) {
1049 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1053 dout(10) << "eval want loner client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1059 finish_contexts(g_ceph_context
, finishers
);
1061 if (need_issue
&& in
->is_head())
1064 dout(10) << "eval done" << dendl
;
1068 class C_Locker_Eval
: public LockerContext
{
1072 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1073 // We are used as an MDSCacheObject waiter, so should
1074 // only be invoked by someone already holding the big lock.
1075 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1076 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1078 void finish(int r
) override
{
1079 locker
->try_eval(p
, mask
);
1080 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1084 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1086 // unstable and ambiguous auth?
1087 if (p
->is_ambiguous_auth()) {
1088 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1089 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1093 if (p
->is_auth() && p
->is_frozen()) {
1094 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1095 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1099 if (mask
& CEPH_LOCK_DN
) {
1100 assert(mask
== CEPH_LOCK_DN
);
1101 bool need_issue
= false; // ignore this, no caps on dentries
1102 CDentry
*dn
= static_cast<CDentry
*>(p
);
1103 eval_any(&dn
->lock
, &need_issue
);
1105 CInode
*in
= static_cast<CInode
*>(p
);
1110 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1112 MDSCacheObject
*p
= lock
->get_parent();
1114 // unstable and ambiguous auth?
1115 if (p
->is_ambiguous_auth()) {
1116 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1117 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1121 if (!p
->is_auth()) {
1122 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1126 if (p
->is_frozen()) {
1127 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1128 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1133 * We could have a situation like:
1135 * - mds A authpins item on mds B
1136 * - mds B starts to freeze tree containing item
1137 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1138 * - mds B lock is unstable, sets scatter_wanted
1139 * - mds B lock stabilizes, calls try_eval.
1141 * We can defer while freezing without causing a deadlock. Honor
1142 * scatter_wanted flag here. This will never get deferred by the
1143 * checks above due to the auth_pin held by the master.
1145 if (lock
->is_scatterlock()) {
1146 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1147 if (slock
->get_scatter_wanted() &&
1148 slock
->get_state() != LOCK_MIX
) {
1149 scatter_mix(slock
, pneed_issue
);
1150 if (!lock
->is_stable())
1152 } else if (slock
->get_unscatter_wanted() &&
1153 slock
->get_state() != LOCK_LOCK
) {
1154 simple_lock(slock
, pneed_issue
);
1155 if (!lock
->is_stable()) {
1161 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1162 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1163 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1167 eval(lock
, pneed_issue
);
1170 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1172 bool need_issue
= false;
1173 list
<MDSInternalContextBase
*> finishers
;
1176 if (!in
->filelock
.is_stable())
1177 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1178 if (!in
->authlock
.is_stable())
1179 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1180 if (!in
->linklock
.is_stable())
1181 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1182 if (!in
->xattrlock
.is_stable())
1183 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1185 if (need_issue
&& in
->is_head()) {
1187 issue_set
->insert(in
);
1192 finish_contexts(g_ceph_context
, finishers
);
1195 void Locker::eval_scatter_gathers(CInode
*in
)
1197 bool need_issue
= false;
1198 list
<MDSInternalContextBase
*> finishers
;
1200 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1203 if (!in
->filelock
.is_stable())
1204 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1205 if (!in
->nestlock
.is_stable())
1206 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1207 if (!in
->dirfragtreelock
.is_stable())
1208 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1210 if (need_issue
&& in
->is_head())
1213 finish_contexts(g_ceph_context
, finishers
);
1216 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1218 switch (lock
->get_type()) {
1219 case CEPH_LOCK_IFILE
:
1220 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1221 case CEPH_LOCK_IDFT
:
1222 case CEPH_LOCK_INEST
:
1223 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1225 return simple_eval(lock
, need_issue
);
1230 // ------------------
1233 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1236 if (lock
->is_stable()) {
1237 if (lock
->get_parent()->is_auth()) {
1238 if (lock
->get_sm() == &sm_scatterlock
) {
1239 // not until tempsync is fully implemented
1240 //if (lock->get_parent()->is_replicated())
1241 //scatter_tempsync((ScatterLock*)lock);
1244 } else if (lock
->get_sm() == &sm_filelock
) {
1245 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1246 if (lock
->get_state() == LOCK_EXCL
&&
1247 in
->get_target_loner() >= 0 &&
1248 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1256 // request rdlock state change from auth
1257 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1258 if (!mds
->is_cluster_degraded() ||
1259 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1260 dout(10) << "requesting rdlock from auth on "
1261 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1262 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1267 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1268 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1269 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1270 mds
->mdcache
->recovery_queue
.prioritize(in
);
1277 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1279 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1281 // can read? grab ref.
1282 if (lock
->can_rdlock(client
))
1285 _rdlock_kick(lock
, false);
1287 if (lock
->can_rdlock(client
))
1292 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1293 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1298 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1300 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1302 // client may be allowed to rdlock the same item it has xlocked.
1303 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1304 if (mut
->snapid
!= CEPH_NOSNAP
)
1306 client_t client
= as_anon
? -1 : mut
->get_client();
1309 if (lock
->get_type() != CEPH_LOCK_DN
)
1310 in
= static_cast<CInode
*>(lock
->get_parent());
1313 if (!lock->get_parent()->is_auth() &&
1314 lock->fw_rdlock_to_auth()) {
1315 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1321 // can read? grab ref.
1322 if (lock
->can_rdlock(client
)) {
1324 mut
->rdlocks
.insert(lock
);
1325 mut
->locks
.insert(lock
);
1329 // hmm, wait a second.
1330 if (in
&& !in
->is_head() && in
->is_auth() &&
1331 lock
->get_state() == LOCK_SNAP_SYNC
) {
1332 // okay, we actually need to kick the head's lock to get ourselves synced up.
1333 CInode
*head
= mdcache
->get_inode(in
->ino());
1335 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1336 if (hlock
->get_state() == LOCK_SYNC
)
1337 hlock
= head
->get_lock(lock
->get_type());
1339 if (hlock
->get_state() != LOCK_SYNC
) {
1340 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1341 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1343 // oh, check our lock again then
1347 if (!_rdlock_kick(lock
, as_anon
))
1353 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1354 wait_on
= SimpleLock::WAIT_RD
;
1356 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1357 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1358 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1363 void Locker::nudge_log(SimpleLock
*lock
)
1365 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1366 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1367 mds
->mdlog
->flush();
1370 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1375 mut
->rdlocks
.erase(lock
);
1376 mut
->locks
.erase(lock
);
1379 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1382 if (!lock
->is_rdlocked()) {
1383 if (!lock
->is_stable())
1384 eval_gather(lock
, false, pneed_issue
);
1385 else if (lock
->get_parent()->is_auth())
1386 try_eval(lock
, pneed_issue
);
1391 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1393 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1394 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1395 if (!(*p
)->can_rdlock(-1)) {
1396 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1402 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1404 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1405 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1406 if (!rdlock_try(*p
, -1, NULL
)) {
1407 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1413 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1415 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1416 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1418 mut
->rdlocks
.insert(*p
);
1419 mut
->locks
.insert(*p
);
1423 // ------------------
1426 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1428 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1429 lock
->get_type() == CEPH_LOCK_DVERSION
)
1430 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1432 dout(7) << "wrlock_force on " << *lock
1433 << " on " << *lock
->get_parent() << dendl
;
1434 lock
->get_wrlock(true);
1435 mut
->wrlocks
.insert(lock
);
1436 mut
->locks
.insert(lock
);
1439 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1441 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1442 lock
->get_type() == CEPH_LOCK_DVERSION
)
1443 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1445 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1447 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1448 client_t client
= mut
->get_client();
1449 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1450 (in
->has_subtree_or_exporting_dirfrag() ||
1451 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1455 if (lock
->can_wrlock(client
) &&
1456 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1458 mut
->wrlocks
.insert(lock
);
1459 mut
->locks
.insert(lock
);
1463 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1464 in
->state_test(CInode::STATE_RECOVERING
)) {
1465 mds
->mdcache
->recovery_queue
.prioritize(in
);
1468 if (!lock
->is_stable())
1471 if (in
->is_auth()) {
1472 // don't do nested lock state change if we have dirty scatterdata and
1473 // may scatter_writebehind or start_scatter, because nowait==true implies
1474 // that the caller already has a log entry open!
1475 if (nowait
&& lock
->is_dirty())
1479 scatter_mix(static_cast<ScatterLock
*>(lock
));
1483 if (nowait
&& !lock
->can_wrlock(client
))
1488 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1489 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1490 if (!mds
->is_cluster_degraded() ||
1491 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1492 dout(10) << "requesting scatter from auth on "
1493 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1494 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1501 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1502 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1509 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1511 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1512 lock
->get_type() == CEPH_LOCK_DVERSION
)
1513 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1515 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1518 mut
->wrlocks
.erase(lock
);
1519 if (mut
->remote_wrlocks
.count(lock
) == 0)
1520 mut
->locks
.erase(lock
);
1523 if (!lock
->is_wrlocked()) {
1524 if (!lock
->is_stable())
1525 eval_gather(lock
, false, pneed_issue
);
1526 else if (lock
->get_parent()->is_auth())
1527 try_eval(lock
, pneed_issue
);
1534 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1536 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1538 // wait for active target
1539 if (mds
->is_cluster_degraded() &&
1540 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1541 dout(7) << " mds." << target
<< " is not active" << dendl
;
1542 if (mut
->more()->waiting_on_slave
.empty())
1543 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1547 // send lock request
1548 mut
->start_locking(lock
, target
);
1549 mut
->more()->slaves
.insert(target
);
1550 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1551 MMDSSlaveRequest::OP_WRLOCK
);
1552 r
->set_lock_type(lock
->get_type());
1553 lock
->get_parent()->set_object_info(r
->get_object_info());
1554 mds
->send_message_mds(r
, target
);
1556 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1557 mut
->more()->waiting_on_slave
.insert(target
);
1560 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1564 mut
->remote_wrlocks
.erase(lock
);
1565 if (mut
->wrlocks
.count(lock
) == 0)
1566 mut
->locks
.erase(lock
);
1568 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1569 << " " << *lock
->get_parent() << dendl
;
1570 if (!mds
->is_cluster_degraded() ||
1571 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1572 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1573 MMDSSlaveRequest::OP_UNWRLOCK
);
1574 slavereq
->set_lock_type(lock
->get_type());
1575 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1576 mds
->send_message_mds(slavereq
, target
);
1581 // ------------------
1584 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1586 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1587 lock
->get_type() == CEPH_LOCK_DVERSION
)
1588 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1590 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1591 client_t client
= mut
->get_client();
1594 if (lock
->get_parent()->is_auth()) {
1597 if (lock
->can_xlock(client
)) {
1598 lock
->set_state(LOCK_XLOCK
);
1599 lock
->get_xlock(mut
, client
);
1600 mut
->xlocks
.insert(lock
);
1601 mut
->locks
.insert(lock
);
1602 mut
->finish_locking(lock
);
1606 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1607 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1608 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1609 mds
->mdcache
->recovery_queue
.prioritize(in
);
1613 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1614 lock
->get_xlock_by_client() != client
||
1615 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1618 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1619 mut
->start_locking(lock
);
1626 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1631 assert(lock
->get_sm()->can_remote_xlock
);
1632 assert(!mut
->slave_request
);
1634 // wait for single auth
1635 if (lock
->get_parent()->is_ambiguous_auth()) {
1636 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1637 new C_MDS_RetryRequest(mdcache
, mut
));
1641 // wait for active auth
1642 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1643 if (mds
->is_cluster_degraded() &&
1644 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1645 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1646 if (mut
->more()->waiting_on_slave
.empty())
1647 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1651 // send lock request
1652 mut
->more()->slaves
.insert(auth
);
1653 mut
->start_locking(lock
, auth
);
1654 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1655 MMDSSlaveRequest::OP_XLOCK
);
1656 r
->set_lock_type(lock
->get_type());
1657 lock
->get_parent()->set_object_info(r
->get_object_info());
1658 mds
->send_message_mds(r
, auth
);
1660 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1661 mut
->more()->waiting_on_slave
.insert(auth
);
1667 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1669 assert(!lock
->is_stable());
1670 if (lock
->get_num_rdlocks() == 0 &&
1671 lock
->get_num_wrlocks() == 0 &&
1672 lock
->get_num_client_lease() == 0 &&
1673 lock
->get_state() != LOCK_XLOCKSNAP
&&
1674 lock
->get_type() != CEPH_LOCK_DN
) {
1675 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1676 client_t loner
= in
->get_target_loner();
1677 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1678 lock
->set_state(LOCK_EXCL
);
1679 lock
->get_parent()->auth_unpin(lock
);
1680 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1681 if (lock
->get_cap_shift())
1682 *pneed_issue
= true;
1683 if (lock
->get_parent()->is_auth() &&
1685 try_eval(lock
, pneed_issue
);
1689 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1690 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1693 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1695 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1696 lock
->get_type() == CEPH_LOCK_DVERSION
)
1697 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1699 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1701 client_t xlocker
= lock
->get_xlock_by_client();
1706 mut
->xlocks
.erase(lock
);
1707 mut
->locks
.erase(lock
);
1709 bool do_issue
= false;
1712 if (!lock
->get_parent()->is_auth()) {
1713 assert(lock
->get_sm()->can_remote_xlock
);
1716 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1717 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1718 if (!mds
->is_cluster_degraded() ||
1719 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1720 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1721 MMDSSlaveRequest::OP_UNXLOCK
);
1722 slavereq
->set_lock_type(lock
->get_type());
1723 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1724 mds
->send_message_mds(slavereq
, auth
);
1727 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1728 SimpleLock::WAIT_WR
|
1729 SimpleLock::WAIT_RD
, 0);
1731 if (lock
->get_num_xlocks() == 0) {
1732 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1733 lock
->set_state(LOCK_XLOCKDONE
);
1734 _finish_xlock(lock
, xlocker
, &do_issue
);
1739 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1740 if (in
->is_head()) {
1742 *pneed_issue
= true;
1749 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1751 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1754 mut
->xlocks
.erase(lock
);
1755 mut
->locks
.erase(lock
);
1757 MDSCacheObject
*p
= lock
->get_parent();
1758 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1760 if (!lock
->is_stable())
1761 lock
->get_parent()->auth_unpin(lock
);
1763 lock
->set_state(LOCK_LOCK
);
1766 void Locker::xlock_import(SimpleLock
*lock
)
1768 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1769 lock
->get_parent()->auth_pin(lock
);
1774 // file i/o -----------------------------------------
1776 version_t
Locker::issue_file_data_version(CInode
*in
)
1778 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1779 return in
->inode
.file_data_version
;
1782 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1790 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1791 bool sm
=false, bool ni
=false, client_t c
=-1,
1792 MClientCaps
*ac
= 0)
1793 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1794 client(c
), ack(ac
) {
1795 in
->get(CInode::PIN_PTRWAITER
);
1797 void finish(int r
) override
{
1798 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1799 in
->put(CInode::PIN_PTRWAITER
);
1803 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1804 client_t client
, MClientCaps
*ack
)
1806 dout(10) << "file_update_finish on " << *in
<< dendl
;
1807 in
->pop_and_dirty_projected_inode(mut
->ls
);
1812 Session
*session
= mds
->get_session(client
);
1814 // "oldest flush tid" > 0 means client uses unique TID for each flush
1815 if (ack
->get_oldest_flush_tid() > 0)
1816 session
->add_completed_flush(ack
->get_client_tid());
1817 mds
->send_message_client_counted(ack
, session
);
1819 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1824 set
<CInode
*> need_issue
;
1825 drop_locks(mut
.get(), &need_issue
);
1827 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1828 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1829 // check for snap writeback completion
1830 bool gather
= false;
1831 compact_map
<int,set
<client_t
> >::iterator p
= in
->client_snap_caps
.begin();
1832 while (p
!= in
->client_snap_caps
.end()) {
1833 SimpleLock
*lock
= in
->get_lock(p
->first
);
1835 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1836 << " lock " << *lock
<< " on " << *in
<< dendl
;
1839 p
->second
.erase(client
);
1840 if (p
->second
.empty()) {
1842 in
->client_snap_caps
.erase(p
++);
1847 if (in
->client_snap_caps
.empty())
1848 in
->item_open_file
.remove_myself();
1849 eval_cap_gather(in
, &need_issue
);
1852 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1853 Capability
*cap
= in
->get_client_cap(client
);
1854 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1855 issue_caps(in
, cap
);
1858 if (share_max
&& in
->is_auth() &&
1859 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1860 share_inode_max_size(in
);
1862 issue_caps_set(need_issue
);
1864 // auth unpin after issuing caps
1868 Capability
* Locker::issue_new_caps(CInode
*in
,
1874 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1877 // if replay, try to reconnect cap, and otherwise do nothing.
1879 mds
->mdcache
->try_reconnect_cap(in
, session
);
1884 assert(session
->info
.inst
.name
.is_client());
1885 client_t my_client
= session
->info
.inst
.name
.num();
1886 int my_want
= ceph_caps_for_mode(mode
);
1888 // register a capability
1889 Capability
*cap
= in
->get_client_cap(my_client
);
1892 cap
= in
->add_client_cap(my_client
, session
, realm
);
1893 cap
->set_wanted(my_want
);
1895 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1899 // make sure it wants sufficient caps
1900 if (my_want
& ~cap
->wanted()) {
1901 // augment wanted caps for this client
1902 cap
->set_wanted(cap
->wanted() | my_want
);
1906 if (in
->is_auth()) {
1907 // [auth] twiddle mode?
1908 eval(in
, CEPH_CAP_LOCKS
);
1910 if (_need_flush_mdlog(in
, my_want
))
1911 mds
->mdlog
->flush();
1914 // [replica] tell auth about any new caps wanted
1915 request_inode_file_caps(in
);
1918 // issue caps (pot. incl new one)
1919 //issue_caps(in); // note: _eval above may have done this already...
1921 // re-issue whatever we can
1922 //cap->issue(cap->pending());
1925 cap
->dec_suppress();
1931 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1933 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1937 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1939 // allowed caps are determined by the lock mode.
1940 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1941 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1942 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1944 client_t loner
= in
->get_loner();
1946 dout(7) << "issue_caps loner client." << loner
1947 << " allowed=" << ccap_string(loner_allowed
)
1948 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1949 << ", others allowed=" << ccap_string(all_allowed
)
1950 << " on " << *in
<< dendl
;
1952 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1953 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1954 << " on " << *in
<< dendl
;
1957 assert(in
->is_head());
1959 // count conflicts with
1963 map
<client_t
, Capability
*>::iterator it
;
1965 it
= in
->client_caps
.find(only_cap
->get_client());
1967 it
= in
->client_caps
.begin();
1968 for (; it
!= in
->client_caps
.end(); ++it
) {
1969 Capability
*cap
= it
->second
;
1970 if (cap
->is_stale())
1973 // do not issue _new_ bits when size|mtime is projected
1975 if (loner
== it
->first
)
1976 allowed
= loner_allowed
;
1978 allowed
= all_allowed
;
1980 // add in any xlocker-only caps (for locks this client is the xlocker for)
1981 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
1983 Session
*session
= mds
->get_session(it
->first
);
1984 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
1985 !(session
&& session
->connection
&&
1986 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
1987 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
1989 int pending
= cap
->pending();
1990 int wanted
= cap
->wanted();
1992 dout(20) << " client." << it
->first
1993 << " pending " << ccap_string(pending
)
1994 << " allowed " << ccap_string(allowed
)
1995 << " wanted " << ccap_string(wanted
)
1998 if (!(pending
& ~allowed
)) {
1999 // skip if suppress or new, and not revocation
2000 if (cap
->is_new() || cap
->is_suppress()) {
2001 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2006 // notify clients about deleted inode, to make sure they release caps ASAP.
2007 if (in
->inode
.nlink
== 0)
2008 wanted
|= CEPH_CAP_LINK_SHARED
;
2010 // are there caps that the client _wants_ and can have, but aren't pending?
2011 // or do we need to revoke?
2012 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2013 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2017 // include caps that clients generally like, while we're at it.
2018 int likes
= in
->get_caps_liked();
2019 int before
= pending
;
2021 if (pending
& ~allowed
)
2022 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2024 seq
= cap
->issue((wanted
|likes
) & allowed
);
2025 int after
= cap
->pending();
2027 if (cap
->is_new()) {
2028 // haven't send caps to client yet
2029 if (before
& ~after
)
2030 cap
->confirm_receipt(seq
, after
);
2032 dout(7) << " sending MClientCaps to client." << it
->first
2033 << " seq " << cap
->get_last_seq()
2034 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2037 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2038 if (op
== CEPH_CAP_OP_REVOKE
) {
2039 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2040 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2041 cap
->set_last_revoke_stamp(ceph_clock_now());
2042 cap
->reset_num_revoke_warnings();
2045 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2046 in
->find_snaprealm()->inode
->ino(),
2047 cap
->get_cap_id(), cap
->get_last_seq(),
2050 mds
->get_osd_epoch_barrier());
2051 in
->encode_cap_message(m
, cap
);
2053 mds
->send_message_client_counted(m
, it
->first
);
2061 return (nissued
== 0); // true if no re-issued, no callbacks
2064 void Locker::issue_truncate(CInode
*in
)
2066 dout(7) << "issue_truncate on " << *in
<< dendl
;
2068 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2069 it
!= in
->client_caps
.end();
2071 Capability
*cap
= it
->second
;
2072 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2074 in
->find_snaprealm()->inode
->ino(),
2075 cap
->get_cap_id(), cap
->get_last_seq(),
2076 cap
->pending(), cap
->wanted(), 0,
2078 mds
->get_osd_epoch_barrier());
2079 in
->encode_cap_message(m
, cap
);
2080 mds
->send_message_client_counted(m
, it
->first
);
2083 // should we increase max_size?
2084 if (in
->is_auth() && in
->is_file())
2085 check_inode_max_size(in
);
2089 void Locker::revoke_stale_caps(Capability
*cap
)
2091 CInode
*in
= cap
->get_inode();
2092 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2093 // if export succeeds, the cap will be removed. if export fails, we need to
2094 // revoke the cap if it's still stale.
2095 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2099 int issued
= cap
->issued();
2100 if (issued
& ~CEPH_CAP_PIN
) {
2101 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2104 if (in
->is_auth() &&
2105 in
->inode
.client_ranges
.count(cap
->get_client()))
2106 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2108 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2109 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2110 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2111 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2113 if (in
->is_auth()) {
2114 try_eval(in
, CEPH_CAP_LOCKS
);
2116 request_inode_file_caps(in
);
2121 void Locker::revoke_stale_caps(Session
*session
)
2123 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2125 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2126 Capability
*cap
= *p
;
2128 revoke_stale_caps(cap
);
2132 void Locker::resume_stale_caps(Session
*session
)
2134 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2136 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2137 Capability
*cap
= *p
;
2138 CInode
*in
= cap
->get_inode();
2139 assert(in
->is_head());
2140 if (cap
->is_stale()) {
2141 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2144 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2145 // if export succeeds, the cap will be removed. if export fails,
2146 // we need to re-issue the cap if it's not stale.
2147 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2151 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2152 issue_caps(in
, cap
);
2157 void Locker::remove_stale_leases(Session
*session
)
2159 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2160 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2162 ClientLease
*l
= *p
;
2164 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2165 dout(15) << " removing lease on " << *parent
<< dendl
;
2166 parent
->remove_client_lease(l
, this);
2171 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2174 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2175 in
->get(CInode::PIN_PTRWAITER
);
2177 void finish(int r
) override
{
2179 locker
->request_inode_file_caps(in
);
2180 in
->put(CInode::PIN_PTRWAITER
);
2184 void Locker::request_inode_file_caps(CInode
*in
)
2186 assert(!in
->is_auth());
2188 int wanted
= in
->get_caps_wanted() & ~CEPH_CAP_PIN
;
2189 if (wanted
!= in
->replica_caps_wanted
) {
2190 // wait for single auth
2191 if (in
->is_ambiguous_auth()) {
2192 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2193 new C_MDL_RequestInodeFileCaps(this, in
));
2197 mds_rank_t auth
= in
->authority().first
;
2198 if (mds
->is_cluster_degraded() &&
2199 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2200 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2204 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2205 << " was " << ccap_string(in
->replica_caps_wanted
)
2206 << " on " << *in
<< " to mds." << auth
<< dendl
;
2208 in
->replica_caps_wanted
= wanted
;
2210 if (!mds
->is_cluster_degraded() ||
2211 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2212 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2217 /* This function DOES put the passed message before returning */
2218 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2220 // nobody should be talking to us during recovery.
2221 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2224 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2225 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2228 assert(in
->is_auth());
2230 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2233 in
->mds_caps_wanted
[from
] = m
->get_caps();
2235 in
->mds_caps_wanted
.erase(from
);
2237 try_eval(in
, CEPH_CAP_LOCKS
);
2242 class C_MDL_CheckMaxSize
: public LockerContext
{
2244 uint64_t new_max_size
;
2249 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2250 uint64_t _newsize
, utime_t _mtime
) :
2251 LockerContext(l
), in(i
),
2252 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2254 in
->get(CInode::PIN_PTRWAITER
);
2256 void finish(int r
) override
{
2258 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2259 in
->put(CInode::PIN_PTRWAITER
);
2263 uint64_t Locker::calc_new_max_size(inode_t
*pi
, uint64_t size
)
2265 uint64_t new_max
= (size
+ 1) << 1;
2266 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2268 max_inc
*= pi
->get_layout_size_increment();
2269 new_max
= MIN(new_max
, size
+ max_inc
);
2271 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2274 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2275 map
<client_t
,client_writeable_range_t
> *new_ranges
,
2276 bool *max_increased
)
2278 inode_t
*latest
= in
->get_projected_inode();
2280 if(latest
->has_layout()) {
2281 ms
= calc_new_max_size(latest
, size
);
2283 // Layout-less directories like ~mds0/, have zero size
2287 // increase ranges as appropriate.
2288 // shrink to 0 if no WR|BUFFER caps issued.
2289 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2290 p
!= in
->client_caps
.end();
2292 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2293 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2295 if (latest
->client_ranges
.count(p
->first
)) {
2296 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2297 if (ms
> oldr
.range
.last
)
2298 *max_increased
= true;
2299 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2300 nr
.follows
= oldr
.follows
;
2302 *max_increased
= true;
2304 nr
.follows
= in
->first
- 1;
2310 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2311 uint64_t new_max_size
, uint64_t new_size
,
2314 assert(in
->is_auth());
2315 assert(in
->is_file());
2317 inode_t
*latest
= in
->get_projected_inode();
2318 map
<client_t
, client_writeable_range_t
> new_ranges
;
2319 uint64_t size
= latest
->size
;
2320 bool update_size
= new_size
> 0;
2321 bool update_max
= false;
2322 bool max_increased
= false;
2325 new_size
= size
= MAX(size
, new_size
);
2326 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2327 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2328 update_size
= false;
2331 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2333 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2336 if (!update_size
&& !update_max
) {
2337 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2341 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2342 << " update_size " << update_size
2343 << " on " << *in
<< dendl
;
2345 if (in
->is_frozen()) {
2346 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2347 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2351 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2354 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2356 if (in
->filelock
.is_stable()) {
2357 if (in
->get_target_loner() >= 0)
2358 file_excl(&in
->filelock
);
2360 simple_lock(&in
->filelock
);
2362 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2364 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2369 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2370 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2375 MutationRef
mut(new MutationImpl());
2376 mut
->ls
= mds
->mdlog
->get_current_segment();
2378 inode_t
*pi
= in
->project_inode();
2379 pi
->version
= in
->pre_dirty();
2382 dout(10) << "check_inode_max_size client_ranges " << pi
->client_ranges
<< " -> " << new_ranges
<< dendl
;
2383 pi
->client_ranges
= new_ranges
;
2387 dout(10) << "check_inode_max_size size " << pi
->size
<< " -> " << new_size
<< dendl
;
2388 pi
->size
= new_size
;
2389 pi
->rstat
.rbytes
= new_size
;
2390 dout(10) << "check_inode_max_size mtime " << pi
->mtime
<< " -> " << new_mtime
<< dendl
;
2391 pi
->mtime
= new_mtime
;
2394 // use EOpen if the file is still open; otherwise, use EUpdate.
2395 // this is just an optimization to push open files forward into
2396 // newer log segments.
2398 EMetaBlob
*metablob
;
2399 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2400 EOpen
*eo
= new EOpen(mds
->mdlog
);
2401 eo
->add_ino(in
->ino());
2402 metablob
= &eo
->metablob
;
2404 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2406 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2407 metablob
= &eu
->metablob
;
2410 mds
->mdlog
->start_entry(le
);
2411 if (update_size
) { // FIXME if/when we do max_size nested accounting
2412 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2414 CDentry
*parent
= in
->get_projected_parent_dn();
2415 metablob
->add_primary_dentry(parent
, in
, true);
2417 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2418 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2420 mds
->mdlog
->submit_entry(le
,
2421 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2422 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2425 // make max_size _increase_ timely
2427 mds
->mdlog
->flush();
2433 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2436 * only share if currently issued a WR cap. if client doesn't have it,
2437 * file_max doesn't matter, and the client will get it if/when they get
2440 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2441 map
<client_t
, Capability
*>::iterator it
;
2443 it
= in
->client_caps
.find(only_cap
->get_client());
2445 it
= in
->client_caps
.begin();
2446 for (; it
!= in
->client_caps
.end(); ++it
) {
2447 const client_t client
= it
->first
;
2448 Capability
*cap
= it
->second
;
2449 if (cap
->is_suppress())
2451 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2452 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2453 cap
->inc_last_seq();
2454 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2456 in
->find_snaprealm()->inode
->ino(),
2457 cap
->get_cap_id(), cap
->get_last_seq(),
2458 cap
->pending(), cap
->wanted(), 0,
2460 mds
->get_osd_epoch_barrier());
2461 in
->encode_cap_message(m
, cap
);
2462 mds
->send_message_client_counted(m
, client
);
2469 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2471 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2472 * pending mutations. */
2473 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2474 in
->filelock
.is_unstable_and_locked()) ||
2475 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2476 in
->authlock
.is_unstable_and_locked()) ||
2477 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2478 in
->linklock
.is_unstable_and_locked()) ||
2479 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2480 in
->xattrlock
.is_unstable_and_locked()))
2485 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2487 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2488 dout(10) << " wanted " << ccap_string(cap
->wanted())
2489 << " -> " << ccap_string(wanted
) << dendl
;
2490 cap
->set_wanted(wanted
);
2491 } else if (wanted
& ~cap
->wanted()) {
2492 dout(10) << " wanted " << ccap_string(cap
->wanted())
2493 << " -> " << ccap_string(wanted
)
2494 << " (added caps even though we had seq mismatch!)" << dendl
;
2495 cap
->set_wanted(wanted
| cap
->wanted());
2497 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2498 << " -> " << ccap_string(wanted
)
2499 << " (issue_seq " << issue_seq
<< " != last_issue "
2500 << cap
->get_last_issue() << ")" << dendl
;
2504 CInode
*cur
= cap
->get_inode();
2505 if (!cur
->is_auth()) {
2506 request_inode_file_caps(cur
);
2510 if (cap
->wanted() == 0) {
2511 if (cur
->item_open_file
.is_on_list() &&
2512 !cur
->is_any_caps_wanted()) {
2513 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2514 cur
->item_open_file
.remove_myself();
2517 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2518 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2519 CEPH_CAP_FILE_WR
))) {
2520 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2523 if (!cur
->item_open_file
.is_on_list()) {
2524 dout(10) << " adding to open file list " << *cur
<< dendl
;
2525 assert(cur
->last
== CEPH_NOSNAP
);
2526 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2527 EOpen
*le
= new EOpen(mds
->mdlog
);
2528 mds
->mdlog
->start_entry(le
);
2529 le
->add_clean_inode(cur
);
2530 ls
->open_files
.push_back(&cur
->item_open_file
);
2531 mds
->mdlog
->submit_entry(le
);
2538 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2540 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2541 for (auto p
= head_in
->client_need_snapflush
.begin();
2542 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2543 snapid_t snapid
= p
->first
;
2544 set
<client_t
>& clients
= p
->second
;
2545 ++p
; // be careful, q loop below depends on this
2547 if (clients
.count(client
)) {
2548 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2549 CInode
*sin
= mdcache
->get_inode(head_in
->ino(), snapid
);
2551 // hrm, look forward until we find the inode.
2552 // (we can only look it up by the last snapid it is valid for)
2553 dout(10) << " didn't have " << head_in
->ino() << " snapid " << snapid
<< dendl
;
2554 for (compact_map
<snapid_t
, set
<client_t
> >::iterator q
= p
; // p is already at next entry
2555 q
!= head_in
->client_need_snapflush
.end();
2557 dout(10) << " trying snapid " << q
->first
<< dendl
;
2558 sin
= mdcache
->get_inode(head_in
->ino(), q
->first
);
2560 assert(sin
->first
<= snapid
);
2563 dout(10) << " didn't have " << head_in
->ino() << " snapid " << q
->first
<< dendl
;
2565 if (!sin
&& head_in
->is_multiversion())
2569 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2570 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2576 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2579 * This policy needs to be AT LEAST as permissive as allowing a client request
2580 * to go forward, or else a client request can release something, the release
2581 * gets deferred, but the request gets processed and deadlocks because when the
2582 * caps can't get revoked.
2584 * Currently, a request wait if anything locked is freezing (can't
2585 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2586 * _MUST_ be in the lock/auth_pin set.
2588 * auth_pins==0 implies no unstable lock and not auth pinnned by
2589 * client request, otherwise continue even it's freezing.
2591 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2595 * This function DOES put the passed message before returning
2597 void Locker::handle_client_caps(MClientCaps
*m
)
2599 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
2600 client_t client
= m
->get_source().num();
2602 snapid_t follows
= m
->get_snap_follows();
2603 dout(7) << "handle_client_caps "
2604 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2605 << " on " << m
->get_ino()
2606 << " tid " << m
->get_client_tid() << " follows " << follows
2607 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2609 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2611 dout(5) << " no session, dropping " << *m
<< dendl
;
2615 if (session
->is_closed() ||
2616 session
->is_closing() ||
2617 session
->is_killing()) {
2618 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2622 if (mds
->is_reconnect() &&
2623 m
->get_dirty() && m
->get_client_tid() > 0 &&
2624 !session
->have_completed_flush(m
->get_client_tid())) {
2625 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2627 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2631 if (m
->get_client_tid() > 0 && session
&&
2632 session
->have_completed_flush(m
->get_client_tid())) {
2633 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2634 << " for client." << client
<< dendl
;
2636 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2637 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2638 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2640 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2641 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2642 mds
->get_osd_epoch_barrier());
2644 ack
->set_snap_follows(follows
);
2645 ack
->set_client_tid(m
->get_client_tid());
2646 mds
->send_message_client_counted(ack
, m
->get_connection());
2647 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2651 // fall-thru because the message may release some caps
2653 m
->set_op(CEPH_CAP_OP_UPDATE
);
2657 // "oldest flush tid" > 0 means client uses unique TID for each flush
2658 if (m
->get_oldest_flush_tid() > 0 && session
) {
2659 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2660 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2662 if (session
->get_num_trim_flushes_warnings() > 0 &&
2663 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2664 session
->reset_num_trim_flushes_warnings();
2666 if (session
->get_num_completed_flushes() >=
2667 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2668 session
->inc_num_trim_flushes_warnings();
2670 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2671 << m
->get_oldest_flush_tid() << "), "
2672 << session
->get_num_completed_flushes()
2673 << " completed flushes recorded in session";
2674 mds
->clog
->warn() << ss
.str();
2675 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2680 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2682 if (mds
->is_clientreplay()) {
2683 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2684 << ", will try again after replayed client requests" << dendl
;
2685 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2688 dout(1) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2693 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2694 // Pause RADOS operations until we see the required epoch
2695 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2698 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2699 // Record the barrier so that we will retransmit it to clients
2700 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2703 CInode
*in
= head_in
;
2705 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2707 dout(10) << " head inode " << *head_in
<< dendl
;
2709 dout(10) << " cap inode " << *in
<< dendl
;
2711 Capability
*cap
= 0;
2712 cap
= in
->get_client_cap(client
);
2713 if (!cap
&& in
!= head_in
)
2714 cap
= head_in
->get_client_cap(client
);
2716 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *in
<< dendl
;
2723 if (should_defer_client_cap_frozen(in
)) {
2724 dout(7) << "handle_client_caps freezing|frozen on " << *in
<< dendl
;
2725 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2728 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2729 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2730 << ", dropping" << dendl
;
2735 int op
= m
->get_op();
2738 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2739 if (!in
->is_auth()) {
2740 dout(7) << " not auth, ignoring flushsnap on " << *in
<< dendl
;
2744 SnapRealm
*realm
= in
->find_snaprealm();
2745 snapid_t snap
= realm
->get_snap_following(follows
);
2746 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2748 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2749 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2750 // case we get a dup response, so whatever.)
2751 MClientCaps
*ack
= 0;
2752 if (m
->get_dirty()) {
2753 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2754 ack
->set_snap_follows(follows
);
2755 ack
->set_client_tid(m
->get_client_tid());
2756 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2759 if (in
== head_in
||
2760 (head_in
->client_need_snapflush
.count(snap
) &&
2761 head_in
->client_need_snapflush
[snap
].count(client
))) {
2762 dout(7) << " flushsnap snap " << snap
2763 << " client." << client
<< " on " << *in
<< dendl
;
2765 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2767 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2768 else if (head_in
->client_need_snapflush
.begin()->first
< snap
)
2769 _do_null_snapflush(head_in
, client
, snap
);
2771 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2774 head_in
->remove_need_snapflush(in
, snap
, client
);
2777 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2779 mds
->send_message_client_counted(ack
, m
->get_connection());
2784 if (cap
->get_cap_id() != m
->get_cap_id()) {
2785 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2787 // intermediate snap inodes
2788 while (in
!= head_in
) {
2789 assert(in
->last
!= CEPH_NOSNAP
);
2790 if (in
->is_auth() && m
->get_dirty()) {
2791 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2792 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2794 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2797 // head inode, and cap
2798 MClientCaps
*ack
= 0;
2800 int caps
= m
->get_caps();
2801 if (caps
& ~cap
->issued()) {
2802 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2803 caps
&= cap
->issued();
2806 cap
->confirm_receipt(m
->get_seq(), caps
);
2807 dout(10) << " follows " << follows
2808 << " retains " << ccap_string(m
->get_caps())
2809 << " dirty " << ccap_string(m
->get_dirty())
2810 << " on " << *in
<< dendl
;
2813 // missing/skipped snapflush?
2814 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2815 // presently only does so when it has actual dirty metadata. But, we
2816 // set up the need_snapflush stuff based on the issued caps.
2817 // We can infer that the client WONT send a FLUSHSNAP once they have
2818 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2820 if (!head_in
->client_need_snapflush
.empty()) {
2821 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2822 _do_null_snapflush(head_in
, client
);
2824 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2828 if (m
->get_dirty() && in
->is_auth()) {
2829 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2830 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2831 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2832 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2833 ack
->set_client_tid(m
->get_client_tid());
2834 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2837 // filter wanted based on what we could ever give out (given auth/replica status)
2838 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2839 int new_wanted
= m
->get_wanted() & head_in
->get_caps_allowed_ever();
2840 if (new_wanted
!= cap
->wanted()) {
2841 if (!need_flush
&& (new_wanted
& ~cap
->pending())) {
2842 // exapnding caps. make sure we aren't waiting for a log flush
2843 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2846 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2849 if (in
->is_auth() &&
2850 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2852 eval(in
, CEPH_CAP_LOCKS
);
2854 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2855 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2857 // no update, ack now.
2859 mds
->send_message_client_counted(ack
, m
->get_connection());
2861 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2862 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2863 issue_caps(in
, cap
);
2865 if (cap
->get_last_seq() == 0 &&
2866 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2867 cap
->issue_norevoke(cap
->issued());
2868 share_inode_max_size(in
, cap
);
2873 mds
->mdlog
->flush();
2881 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2883 ceph_mds_request_release item
;
2885 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2886 LockerContext(l
), client(c
), item(it
) { }
2887 void finish(int r
) override
{
2889 MDRequestRef null_ref
;
2890 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2894 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2895 const string
&dname
)
2897 inodeno_t ino
= (uint64_t)item
.ino
;
2898 uint64_t cap_id
= item
.cap_id
;
2899 int caps
= item
.caps
;
2900 int wanted
= item
.wanted
;
2902 int issue_seq
= item
.issue_seq
;
2903 int mseq
= item
.mseq
;
2905 CInode
*in
= mdcache
->get_inode(ino
);
2909 if (dname
.length()) {
2910 frag_t fg
= in
->pick_dirfrag(dname
);
2911 CDir
*dir
= in
->get_dirfrag(fg
);
2913 CDentry
*dn
= dir
->lookup(dname
);
2915 ClientLease
*l
= dn
->get_client_lease(client
);
2917 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2918 dn
->remove_client_lease(l
, this);
2920 dout(7) << "process_cap_release client." << client
2921 << " doesn't have lease on " << *dn
<< dendl
;
2924 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2925 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2930 Capability
*cap
= in
->get_client_cap(client
);
2934 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2935 << (mdr
? "" : " (DEFERRED, no mdr)")
2938 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2939 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2943 if (cap
->get_cap_id() != cap_id
) {
2944 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2948 if (should_defer_client_cap_frozen(in
)) {
2949 dout(7) << " frozen, deferring" << dendl
;
2950 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2954 if (caps
& ~cap
->issued()) {
2955 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2956 caps
&= cap
->issued();
2958 cap
->confirm_receipt(seq
, caps
);
2960 if (!in
->client_need_snapflush
.empty() &&
2961 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2962 _do_null_snapflush(in
, client
);
2965 adjust_cap_wanted(cap
, wanted
, issue_seq
);
2968 cap
->inc_suppress();
2969 eval(in
, CEPH_CAP_LOCKS
);
2971 cap
->dec_suppress();
2973 // take note; we may need to reissue on this cap later
2975 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
2978 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
2983 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
2984 LockerContext(l
), in(i
), client(c
), seq(s
) {
2985 in
->get(CInode::PIN_PTRWAITER
);
2987 void finish(int r
) override
{
2988 locker
->kick_issue_caps(in
, client
, seq
);
2989 in
->put(CInode::PIN_PTRWAITER
);
2993 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
2995 Capability
*cap
= in
->get_client_cap(client
);
2996 if (!cap
|| cap
->get_last_sent() != seq
)
2998 if (in
->is_frozen()) {
2999 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3000 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3001 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3004 dout(10) << "kick_issue_caps released at current seq " << seq
3005 << ", reissuing" << dendl
;
3006 issue_caps(in
, cap
);
3009 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3011 client_t client
= mdr
->get_client();
3012 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3013 p
!= mdr
->cap_releases
.end();
3015 CInode
*in
= mdcache
->get_inode(p
->first
);
3018 kick_issue_caps(in
, client
, p
->second
);
3023 * m and ack might be NULL, so don't dereference them unless dirty != 0
3025 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3027 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3028 << " follows " << follows
<< " snap " << snap
3029 << " on " << *in
<< dendl
;
3031 if (snap
== CEPH_NOSNAP
) {
3032 // hmm, i guess snap was already deleted? just ack!
3033 dout(10) << " wow, the snap following " << follows
3034 << " was already deleted. nothing to record, just ack." << dendl
;
3036 mds
->send_message_client_counted(ack
, m
->get_connection());
3040 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3041 mds
->mdlog
->start_entry(le
);
3042 MutationRef mut
= new MutationImpl();
3043 mut
->ls
= mds
->mdlog
->get_current_segment();
3045 // normal metadata updates that we can apply to the head as well.
3048 bool xattrs
= false;
3049 map
<string
,bufferptr
> *px
= 0;
3050 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3051 m
->xattrbl
.length() &&
3052 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3055 old_inode_t
*oi
= 0;
3056 if (in
->is_multiversion()) {
3057 oi
= in
->pick_old_inode(snap
);
3062 dout(10) << " writing into old inode" << dendl
;
3063 pi
= in
->project_inode();
3064 pi
->version
= in
->pre_dirty();
3065 if (snap
> oi
->first
)
3066 in
->split_old_inode(snap
);
3072 px
= new map
<string
,bufferptr
>;
3073 pi
= in
->project_inode(px
);
3074 pi
->version
= in
->pre_dirty();
3077 _update_cap_fields(in
, dirty
, m
, pi
);
3081 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
3082 << " len " << m
->xattrbl
.length() << dendl
;
3083 pi
->xattr_version
= m
->head
.xattr_version
;
3084 bufferlist::iterator p
= m
->xattrbl
.begin();
3088 if (pi
->client_ranges
.count(client
)) {
3089 if (in
->last
== snap
) {
3090 dout(10) << " removing client_range entirely" << dendl
;
3091 pi
->client_ranges
.erase(client
);
3093 dout(10) << " client_range now follows " << snap
<< dendl
;
3094 pi
->client_ranges
[client
].follows
= snap
;
3099 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3100 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3102 // "oldest flush tid" > 0 means client uses unique TID for each flush
3103 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3104 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3105 ack
->get_oldest_flush_tid());
3107 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3111 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, inode_t
*pi
)
3116 /* m must be valid if there are dirty caps */
3118 uint64_t features
= m
->get_connection()->get_features();
3120 if (m
->get_ctime() > pi
->ctime
) {
3121 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3122 << " for " << *in
<< dendl
;
3123 pi
->ctime
= m
->get_ctime();
3126 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3127 m
->get_change_attr() > pi
->change_attr
) {
3128 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3129 << " for " << *in
<< dendl
;
3130 pi
->change_attr
= m
->get_change_attr();
3134 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3135 utime_t atime
= m
->get_atime();
3136 utime_t mtime
= m
->get_mtime();
3137 uint64_t size
= m
->get_size();
3138 version_t inline_version
= m
->inline_version
;
3140 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3141 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3142 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3143 << " for " << *in
<< dendl
;
3146 if (in
->inode
.is_file() && // ONLY if regular file
3148 dout(7) << " size " << pi
->size
<< " -> " << size
3149 << " for " << *in
<< dendl
;
3151 pi
->rstat
.rbytes
= size
;
3153 if (in
->inode
.is_file() &&
3154 (dirty
& CEPH_CAP_FILE_WR
) &&
3155 inline_version
> pi
->inline_data
.version
) {
3156 pi
->inline_data
.version
= inline_version
;
3157 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3158 pi
->inline_data
.get_data() = m
->inline_data
;
3160 pi
->inline_data
.free_data();
3162 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3163 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3164 << " for " << *in
<< dendl
;
3167 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3168 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3169 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3170 << " for " << *in
<< dendl
;
3171 pi
->time_warp_seq
= m
->get_time_warp_seq();
3175 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3176 if (m
->head
.uid
!= pi
->uid
) {
3177 dout(7) << " uid " << pi
->uid
3178 << " -> " << m
->head
.uid
3179 << " for " << *in
<< dendl
;
3180 pi
->uid
= m
->head
.uid
;
3182 if (m
->head
.gid
!= pi
->gid
) {
3183 dout(7) << " gid " << pi
->gid
3184 << " -> " << m
->head
.gid
3185 << " for " << *in
<< dendl
;
3186 pi
->gid
= m
->head
.gid
;
3188 if (m
->head
.mode
!= pi
->mode
) {
3189 dout(7) << " mode " << oct
<< pi
->mode
3190 << " -> " << m
->head
.mode
<< dec
3191 << " for " << *in
<< dendl
;
3192 pi
->mode
= m
->head
.mode
;
3194 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3195 dout(7) << " btime " << oct
<< pi
->btime
3196 << " -> " << m
->get_btime() << dec
3197 << " for " << *in
<< dendl
;
3198 pi
->btime
= m
->get_btime();
3204 * update inode based on cap flush|flushsnap|wanted.
3205 * adjust max_size, if needed.
3206 * if we update, return true; otherwise, false (no updated needed).
3208 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3209 int dirty
, snapid_t follows
,
3210 MClientCaps
*m
, MClientCaps
*ack
,
3213 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3214 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3215 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3216 << " on " << *in
<< dendl
;
3217 assert(in
->is_auth());
3218 client_t client
= m
->get_source().num();
3219 inode_t
*latest
= in
->get_projected_inode();
3221 // increase or zero max_size?
3222 uint64_t size
= m
->get_size();
3223 bool change_max
= false;
3224 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3225 uint64_t new_max
= old_max
;
3227 if (in
->is_file()) {
3228 bool forced_change_max
= false;
3229 dout(20) << "inode is file" << dendl
;
3230 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3231 dout(20) << "client has write caps; m->get_max_size="
3232 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3233 if (m
->get_max_size() > new_max
) {
3234 dout(10) << "client requests file_max " << m
->get_max_size()
3235 << " > max " << old_max
<< dendl
;
3237 forced_change_max
= true;
3238 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3240 new_max
= calc_new_max_size(latest
, size
);
3242 if (new_max
> old_max
)
3254 if (in
->last
== CEPH_NOSNAP
&&
3256 !in
->filelock
.can_wrlock(client
) &&
3257 !in
->filelock
.can_force_wrlock(client
)) {
3258 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3259 if (in
->filelock
.is_stable()) {
3260 bool need_issue
= false;
3262 cap
->inc_suppress();
3263 if (in
->mds_caps_wanted
.empty() &&
3264 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3265 if (in
->filelock
.get_state() != LOCK_EXCL
)
3266 file_excl(&in
->filelock
, &need_issue
);
3268 simple_lock(&in
->filelock
, &need_issue
);
3272 cap
->dec_suppress();
3274 if (!in
->filelock
.can_wrlock(client
) &&
3275 !in
->filelock
.can_force_wrlock(client
)) {
3276 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3277 forced_change_max
? new_max
: 0,
3280 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3286 if (m
->flockbl
.length()) {
3288 bufferlist::iterator bli
= m
->flockbl
.begin();
3289 ::decode(num_locks
, bli
);
3290 for ( int i
=0; i
< num_locks
; ++i
) {
3291 ceph_filelock decoded_lock
;
3292 ::decode(decoded_lock
, bli
);
3293 in
->get_fcntl_lock_state()->held_locks
.
3294 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3295 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3297 ::decode(num_locks
, bli
);
3298 for ( int i
=0; i
< num_locks
; ++i
) {
3299 ceph_filelock decoded_lock
;
3300 ::decode(decoded_lock
, bli
);
3301 in
->get_flock_lock_state()->held_locks
.
3302 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3303 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3307 if (!dirty
&& !change_max
)
3310 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3311 if (session
->check_access(in
, MAY_WRITE
,
3312 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3314 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3320 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3321 mds
->mdlog
->start_entry(le
);
3324 map
<string
,bufferptr
> *px
= 0;
3325 if ((dirty
& CEPH_CAP_XATTR_EXCL
) &&
3326 m
->xattrbl
.length() &&
3327 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
)
3328 px
= new map
<string
,bufferptr
>;
3330 inode_t
*pi
= in
->project_inode(px
);
3331 pi
->version
= in
->pre_dirty();
3333 MutationRef
mut(new MutationImpl());
3334 mut
->ls
= mds
->mdlog
->get_current_segment();
3336 _update_cap_fields(in
, dirty
, m
, pi
);
3339 dout(7) << " max_size " << old_max
<< " -> " << new_max
3340 << " for " << *in
<< dendl
;
3342 pi
->client_ranges
[client
].range
.first
= 0;
3343 pi
->client_ranges
[client
].range
.last
= new_max
;
3344 pi
->client_ranges
[client
].follows
= in
->first
- 1;
3346 pi
->client_ranges
.erase(client
);
3349 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3350 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3353 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3354 wrlock_force(&in
->authlock
, mut
);
3358 dout(7) << " xattrs v" << pi
->xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3359 pi
->xattr_version
= m
->head
.xattr_version
;
3360 bufferlist::iterator p
= m
->xattrbl
.begin();
3363 wrlock_force(&in
->xattrlock
, mut
);
3367 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3368 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3370 // "oldest flush tid" > 0 means client uses unique TID for each flush
3371 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3372 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3373 ack
->get_oldest_flush_tid());
3375 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3378 if (need_flush
&& !*need_flush
&&
3379 ((change_max
&& new_max
) || // max INCREASE
3380 _need_flush_mdlog(in
, dirty
)))
3386 /* This function DOES put the passed message before returning */
3387 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3389 client_t client
= m
->get_source().num();
3390 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3392 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3393 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3397 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3398 // Pause RADOS operations until we see the required epoch
3399 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3402 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3403 // Record the barrier so that we will retransmit it to clients
3404 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3407 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
3409 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3410 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3414 session
->notify_cap_release(m
->caps
.size());
3420 class C_Locker_RetryCapRelease
: public LockerContext
{
3424 ceph_seq_t migrate_seq
;
3425 ceph_seq_t issue_seq
;
3427 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3428 ceph_seq_t mseq
, ceph_seq_t seq
) :
3429 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3430 void finish(int r
) override
{
3431 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3435 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3436 ceph_seq_t mseq
, ceph_seq_t seq
)
3438 CInode
*in
= mdcache
->get_inode(ino
);
3440 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3443 Capability
*cap
= in
->get_client_cap(client
);
3445 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3449 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3450 if (cap
->get_cap_id() != cap_id
) {
3451 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3454 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3455 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3458 if (should_defer_client_cap_frozen(in
)) {
3459 dout(7) << " freezing|frozen, deferring" << dendl
;
3460 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3461 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3464 if (seq
!= cap
->get_last_issue()) {
3465 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3466 // clean out any old revoke history
3467 cap
->clean_revoke_from(seq
);
3468 eval_cap_gather(in
);
3471 remove_client_cap(in
, client
);
3474 /* This function DOES put the passed message before returning */
3476 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3478 // clean out any pending snapflush state
3479 if (!in
->client_need_snapflush
.empty())
3480 _do_null_snapflush(in
, client
);
3482 in
->remove_client_cap(client
);
3484 if (in
->is_auth()) {
3485 // make sure we clear out the client byte range
3486 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3487 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3488 check_inode_max_size(in
);
3490 request_inode_file_caps(in
);
3493 try_eval(in
, CEPH_CAP_LOCKS
);
3498 * Return true if any currently revoking caps exceed the
3499 * mds_revoke_cap_timeout threshold.
3501 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
) const
3503 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3505 // No revoking caps at the moment
3508 utime_t now
= ceph_clock_now();
3509 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3510 if (age
<= g_conf
->mds_revoke_cap_timeout
) {
3519 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
) const
3521 if (!any_late_revoking_caps(revoking_caps
)) {
3522 // Fast path: no misbehaving clients, execute in O(1)
3526 // Slow path: execute in O(N_clients)
3527 std::map
<client_t
, xlist
<Capability
*> >::const_iterator client_rc_iter
;
3528 for (client_rc_iter
= revoking_caps_by_client
.begin();
3529 client_rc_iter
!= revoking_caps_by_client
.end(); ++client_rc_iter
) {
3530 xlist
<Capability
*> const &client_rc
= client_rc_iter
->second
;
3531 bool any_late
= any_late_revoking_caps(client_rc
);
3533 result
->push_back(client_rc_iter
->first
);
3538 // Hard-code instead of surfacing a config settings because this is
3539 // really a hack that should go away at some point when we have better
3540 // inspection tools for getting at detailed cap state (#7316)
3541 #define MAX_WARN_CAPS 100
3543 void Locker::caps_tick()
3545 utime_t now
= ceph_clock_now();
3547 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3550 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3551 Capability
*cap
= *p
;
3553 utime_t age
= now
- cap
->get_last_revoke_stamp();
3554 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3555 if (age
<= g_conf
->mds_revoke_cap_timeout
) {
3556 dout(20) << __func__
<< " age below timeout " << g_conf
->mds_revoke_cap_timeout
<< dendl
;
3560 if (i
> MAX_WARN_CAPS
) {
3561 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3562 << "revoking, ignoring subsequent caps" << dendl
;
3566 // exponential backoff of warning intervals
3567 if (age
> g_conf
->mds_revoke_cap_timeout
* (1 << cap
->get_num_revoke_warnings())) {
3568 cap
->inc_num_revoke_warnings();
3570 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3571 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3572 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3573 mds
->clog
->warn() << ss
.str();
3574 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3576 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3582 void Locker::handle_client_lease(MClientLease
*m
)
3584 dout(10) << "handle_client_lease " << *m
<< dendl
;
3586 assert(m
->get_source().is_client());
3587 client_t client
= m
->get_source().num();
3589 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3591 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3597 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3598 CDir
*dir
= in
->get_dirfrag(fg
);
3600 dn
= dir
->lookup(m
->dname
);
3602 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3606 dout(10) << " on " << *dn
<< dendl
;
3609 ClientLease
*l
= dn
->get_client_lease(client
);
3611 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3616 switch (m
->get_action()) {
3617 case CEPH_MDS_LEASE_REVOKE_ACK
:
3618 case CEPH_MDS_LEASE_RELEASE
:
3619 if (l
->seq
!= m
->get_seq()) {
3620 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3622 dout(7) << "handle_client_lease client." << client
3623 << " on " << *dn
<< dendl
;
3624 dn
->remove_client_lease(l
, this);
3629 case CEPH_MDS_LEASE_RENEW
:
3631 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3632 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3633 if (dn
->lock
.can_lease(client
)) {
3634 int pool
= 1; // fixme.. do something smart!
3635 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3636 m
->h
.seq
= ++l
->seq
;
3639 utime_t now
= ceph_clock_now();
3640 now
+= mdcache
->client_lease_durations
[pool
];
3641 mdcache
->touch_client_lease(l
, pool
, now
);
3643 mds
->send_message_client_counted(m
, m
->get_connection());
3649 ceph_abort(); // implement me
3655 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3656 bufferlist
&bl
, utime_t now
, Session
*session
)
3658 CInode
*diri
= dn
->get_dir()->get_inode();
3659 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3660 ((!diri
->filelock
.can_lease(client
) &&
3661 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3662 dn
->lock
.can_lease(client
)) {
3663 int pool
= 1; // fixme.. do something smart!
3664 // issue a dentry lease
3665 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3666 session
->touch_lease(l
);
3668 now
+= mdcache
->client_lease_durations
[pool
];
3669 mdcache
->touch_client_lease(l
, pool
, now
);
3672 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3674 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3676 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3677 << " on " << *dn
<< dendl
;
3685 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3690 void Locker::revoke_client_leases(SimpleLock
*lock
)
3693 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3694 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3695 p
!= dn
->client_lease_map
.end();
3697 ClientLease
*l
= p
->second
;
3700 assert(lock
->get_type() == CEPH_LOCK_DN
);
3702 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3703 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3705 // i should also revoke the dir ICONTENT lease, if they have it!
3706 CInode
*diri
= dn
->get_dir()->get_inode();
3707 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3710 diri
->first
, CEPH_NOSNAP
,
3714 assert(n
== lock
->get_num_client_lease());
3719 // locks ----------------------------------------------------------------
3721 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3723 switch (lock_type
) {
3726 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3727 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3732 fg
= diri
->pick_dirfrag(info
.dname
);
3733 dir
= diri
->get_dirfrag(fg
);
3735 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3738 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3744 case CEPH_LOCK_IAUTH
:
3745 case CEPH_LOCK_ILINK
:
3746 case CEPH_LOCK_IDFT
:
3747 case CEPH_LOCK_IFILE
:
3748 case CEPH_LOCK_INEST
:
3749 case CEPH_LOCK_IXATTR
:
3750 case CEPH_LOCK_ISNAP
:
3751 case CEPH_LOCK_IFLOCK
:
3752 case CEPH_LOCK_IPOLICY
:
3754 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3756 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3759 switch (lock_type
) {
3760 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3761 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3762 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3763 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3764 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3765 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3766 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3767 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3768 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3773 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3781 /* This function DOES put the passed message before returning */
3782 void Locker::handle_lock(MLock
*m
)
3784 // nobody should be talking to us during recovery.
3785 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3787 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3789 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3794 switch (lock
->get_type()) {
3796 case CEPH_LOCK_IAUTH
:
3797 case CEPH_LOCK_ILINK
:
3798 case CEPH_LOCK_ISNAP
:
3799 case CEPH_LOCK_IXATTR
:
3800 case CEPH_LOCK_IFLOCK
:
3801 case CEPH_LOCK_IPOLICY
:
3802 handle_simple_lock(lock
, m
);
3805 case CEPH_LOCK_IDFT
:
3806 case CEPH_LOCK_INEST
:
3807 //handle_scatter_lock((ScatterLock*)lock, m);
3810 case CEPH_LOCK_IFILE
:
3811 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3815 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3825 // ==========================================================================
3828 /** This function may take a reference to m if it needs one, but does
3829 * not put references. */
3830 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3832 MDSCacheObject
*parent
= lock
->get_parent();
3833 if (parent
->is_auth() &&
3834 lock
->get_state() != LOCK_SYNC
&&
3835 !parent
->is_frozen()) {
3836 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3837 << " on " << *parent
<< dendl
;
3838 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3839 if (lock
->is_stable()) {
3842 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3843 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3844 new C_MDS_RetryMessage(mds
, m
->get()));
3847 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3848 << " on " << *parent
<< dendl
;
3849 // replica should retry
3853 /* This function DOES put the passed message before returning */
3854 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3856 int from
= m
->get_asker();
3858 dout(10) << "handle_simple_lock " << *m
3859 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3861 if (mds
->is_rejoin()) {
3862 if (lock
->get_parent()->is_rejoining()) {
3863 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3864 << ", dropping " << *m
<< dendl
;
3870 switch (m
->get_action()) {
3873 assert(lock
->get_state() == LOCK_LOCK
);
3874 lock
->decode_locked_state(m
->get_data());
3875 lock
->set_state(LOCK_SYNC
);
3876 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3880 assert(lock
->get_state() == LOCK_SYNC
);
3881 lock
->set_state(LOCK_SYNC_LOCK
);
3882 if (lock
->is_leased())
3883 revoke_client_leases(lock
);
3884 eval_gather(lock
, true);
3885 if (lock
->is_unstable_and_locked())
3886 mds
->mdlog
->flush();
3891 case LOCK_AC_LOCKACK
:
3892 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3893 lock
->get_state() == LOCK_SYNC_EXCL
);
3894 assert(lock
->is_gathering(from
));
3895 lock
->remove_gather(from
);
3897 if (lock
->is_gathering()) {
3898 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3899 << ", still gathering " << lock
->get_gather_set() << dendl
;
3901 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3902 << ", last one" << dendl
;
3907 case LOCK_AC_REQRDLOCK
:
3908 handle_reqrdlock(lock
, m
);
3916 /* unused, currently.
3918 class C_Locker_SimpleEval : public Context {
3922 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3923 void finish(int r) {
3924 locker->try_simple_eval(lock);
3928 void Locker::try_simple_eval(SimpleLock *lock)
3930 // unstable and ambiguous auth?
3931 if (!lock->is_stable() &&
3932 lock->get_parent()->is_ambiguous_auth()) {
3933 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3934 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3935 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3939 if (!lock->get_parent()->is_auth()) {
3940 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3944 if (!lock->get_parent()->can_auth_pin()) {
3945 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3946 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3947 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3951 if (lock->is_stable())
3957 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3959 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3961 assert(lock
->get_parent()->is_auth());
3962 assert(lock
->is_stable());
3964 if (lock
->get_parent()->is_freezing_or_frozen()) {
3965 // dentry lock in unreadable state can block path traverse
3966 if ((lock
->get_type() != CEPH_LOCK_DN
||
3967 lock
->get_state() == LOCK_SYNC
||
3968 lock
->get_parent()->is_frozen()))
3972 if (mdcache
->is_readonly()) {
3973 if (lock
->get_state() != LOCK_SYNC
) {
3974 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3975 simple_sync(lock
, need_issue
);
3982 if (lock
->get_type() != CEPH_LOCK_DN
) {
3983 in
= static_cast<CInode
*>(lock
->get_parent());
3984 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
3988 if (lock
->get_state() != LOCK_EXCL
&&
3989 in
&& in
->get_target_loner() >= 0 &&
3990 (wanted
& CEPH_CAP_GEXCL
)) {
3991 dout(7) << "simple_eval stable, going to excl " << *lock
3992 << " on " << *lock
->get_parent() << dendl
;
3993 simple_excl(lock
, need_issue
);
3997 else if (lock
->get_state() != LOCK_SYNC
&&
3998 !lock
->is_wrlocked() &&
3999 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4000 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4001 dout(7) << "simple_eval stable, syncing " << *lock
4002 << " on " << *lock
->get_parent() << dendl
;
4003 simple_sync(lock
, need_issue
);
4010 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4012 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4013 assert(lock
->get_parent()->is_auth());
4014 assert(lock
->is_stable());
4017 if (lock
->get_cap_shift())
4018 in
= static_cast<CInode
*>(lock
->get_parent());
4020 int old_state
= lock
->get_state();
4022 if (old_state
!= LOCK_TSYN
) {
4024 switch (lock
->get_state()) {
4025 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4026 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4027 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4028 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4029 default: ceph_abort();
4033 if (lock
->is_wrlocked())
4036 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4037 send_lock_message(lock
, LOCK_AC_SYNC
);
4038 lock
->init_gather();
4042 if (in
&& in
->is_head()) {
4043 if (in
->issued_caps_need_gather(lock
)) {
4052 bool need_recover
= false;
4053 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4055 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4056 mds
->mdcache
->queue_file_recover(in
);
4057 need_recover
= true;
4062 if (!gather
&& lock
->is_dirty()) {
4063 lock
->get_parent()->auth_pin(lock
);
4064 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4065 mds
->mdlog
->flush();
4070 lock
->get_parent()->auth_pin(lock
);
4072 mds
->mdcache
->do_file_recover();
4077 if (lock
->get_parent()->is_replicated()) { // FIXME
4079 lock
->encode_locked_state(data
);
4080 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4082 lock
->set_state(LOCK_SYNC
);
4083 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4084 if (in
&& in
->is_head()) {
4093 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4095 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4096 assert(lock
->get_parent()->is_auth());
4097 assert(lock
->is_stable());
4100 if (lock
->get_cap_shift())
4101 in
= static_cast<CInode
*>(lock
->get_parent());
4103 switch (lock
->get_state()) {
4104 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4105 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4106 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4107 default: ceph_abort();
4111 if (lock
->is_rdlocked())
4113 if (lock
->is_wrlocked())
4116 if (lock
->get_parent()->is_replicated() &&
4117 lock
->get_state() != LOCK_LOCK_EXCL
&&
4118 lock
->get_state() != LOCK_XSYN_EXCL
) {
4119 send_lock_message(lock
, LOCK_AC_LOCK
);
4120 lock
->init_gather();
4124 if (in
&& in
->is_head()) {
4125 if (in
->issued_caps_need_gather(lock
)) {
4135 lock
->get_parent()->auth_pin(lock
);
4137 lock
->set_state(LOCK_EXCL
);
4138 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4148 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4150 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4151 assert(lock
->get_parent()->is_auth());
4152 assert(lock
->is_stable());
4153 assert(lock
->get_state() != LOCK_LOCK
);
4156 if (lock
->get_cap_shift())
4157 in
= static_cast<CInode
*>(lock
->get_parent());
4159 int old_state
= lock
->get_state();
4161 switch (lock
->get_state()) {
4162 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4164 file_excl(static_cast<ScatterLock
*>(lock
), need_issue
);
4165 if (lock
->get_state() != LOCK_EXCL
)
4168 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4169 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4170 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4172 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4173 default: ceph_abort();
4177 if (lock
->is_leased()) {
4179 revoke_client_leases(lock
);
4181 if (lock
->is_rdlocked())
4183 if (in
&& in
->is_head()) {
4184 if (in
->issued_caps_need_gather(lock
)) {
4193 bool need_recover
= false;
4194 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4196 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4197 mds
->mdcache
->queue_file_recover(in
);
4198 need_recover
= true;
4203 if (lock
->get_parent()->is_replicated() &&
4204 lock
->get_state() == LOCK_MIX_LOCK
&&
4206 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4208 // move to second stage of gather now, so we don't send the lock action later.
4209 if (lock
->get_state() == LOCK_MIX_LOCK
)
4210 lock
->set_state(LOCK_MIX_LOCK2
);
4212 if (lock
->get_parent()->is_replicated() &&
4213 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4215 send_lock_message(lock
, LOCK_AC_LOCK
);
4216 lock
->init_gather();
4220 if (!gather
&& lock
->is_dirty()) {
4221 lock
->get_parent()->auth_pin(lock
);
4222 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4223 mds
->mdlog
->flush();
4228 lock
->get_parent()->auth_pin(lock
);
4230 mds
->mdcache
->do_file_recover();
4232 lock
->set_state(LOCK_LOCK
);
4233 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4238 void Locker::simple_xlock(SimpleLock
*lock
)
4240 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4241 assert(lock
->get_parent()->is_auth());
4242 //assert(lock->is_stable());
4243 assert(lock
->get_state() != LOCK_XLOCK
);
4246 if (lock
->get_cap_shift())
4247 in
= static_cast<CInode
*>(lock
->get_parent());
4249 if (lock
->is_stable())
4250 lock
->get_parent()->auth_pin(lock
);
4252 switch (lock
->get_state()) {
4254 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4255 default: ceph_abort();
4259 if (lock
->is_rdlocked())
4261 if (lock
->is_wrlocked())
4264 if (in
&& in
->is_head()) {
4265 if (in
->issued_caps_need_gather(lock
)) {
4272 lock
->set_state(LOCK_PREXLOCK
);
4273 //assert("shouldn't be called if we are already xlockable" == 0);
4281 // ==========================================================================
4286 Some notes on scatterlocks.
4288 - The scatter/gather is driven by the inode lock. The scatter always
4289 brings in the latest metadata from the fragments.
4291 - When in a scattered/MIX state, fragments are only allowed to
4292 update/be written to if the accounted stat matches the inode's
4295 - That means, on gather, we _only_ assimilate diffs for frag metadata
4296 that match the current version, because those are the only ones
4297 written during this scatter/gather cycle. (Others didn't permit
4298 it.) We increment the version and journal this to disk.
4300 - When possible, we also simultaneously update our local frag
4301 accounted stats to match.
4303 - On scatter, the new inode info is broadcast to frags, both local
4304 and remote. If possible (auth and !frozen), the dirfrag auth
4305 should update the accounted state (if it isn't already up to date).
4306 Note that this may occur on both the local inode auth node and
4307 inode replicas, so there are two potential paths. If it is NOT
4308 possible, they need to mark_stale to prevent any possible writes.
4310 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4311 only). Both are opportunities to update the frag accounted stats,
4312 even though only the MIX case is affected by a stale dirfrag.
4314 - Because many scatter/gather cycles can potentially go by without a
4315 frag being able to update its accounted stats (due to being frozen
4316 by exports/refragments in progress), the frag may have (even very)
4317 old stat versions. That's fine. If when we do want to update it,
4318 we can update accounted_* and the version first.
4322 class C_Locker_ScatterWB
: public LockerLogContext
{
4326 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4327 LockerLogContext(l
), lock(sl
), mut(m
) {}
4328 void finish(int r
) override
{
4329 locker
->scatter_writebehind_finish(lock
, mut
);
4333 void Locker::scatter_writebehind(ScatterLock
*lock
)
4335 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4336 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4339 MutationRef
mut(new MutationImpl());
4340 mut
->ls
= mds
->mdlog
->get_current_segment();
4342 // forcefully take a wrlock
4343 lock
->get_wrlock(true);
4344 mut
->wrlocks
.insert(lock
);
4345 mut
->locks
.insert(lock
);
4347 in
->pre_cow_old_inode(); // avoid cow mayhem
4349 inode_t
*pi
= in
->project_inode();
4350 pi
->version
= in
->pre_dirty();
4352 in
->finish_scatter_gather_update(lock
->get_type());
4353 lock
->start_flush();
4355 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4356 mds
->mdlog
->start_entry(le
);
4358 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4359 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4361 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4363 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4366 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4368 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4369 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4370 in
->pop_and_dirty_projected_inode(mut
->ls
);
4372 lock
->finish_flush();
4374 // if replicas may have flushed in a mix->lock state, send another
4375 // message so they can finish_flush().
4376 if (in
->is_replicated()) {
4377 switch (lock
->get_state()) {
4379 case LOCK_MIX_LOCK2
:
4382 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4387 drop_locks(mut
.get());
4390 if (lock
->is_stable())
4391 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4393 //scatter_eval_gather(lock);
4396 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4398 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4400 assert(lock
->get_parent()->is_auth());
4401 assert(lock
->is_stable());
4403 if (lock
->get_parent()->is_freezing_or_frozen()) {
4404 dout(20) << " freezing|frozen" << dendl
;
4408 if (mdcache
->is_readonly()) {
4409 if (lock
->get_state() != LOCK_SYNC
) {
4410 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4411 simple_sync(lock
, need_issue
);
4416 if (!lock
->is_rdlocked() &&
4417 lock
->get_state() != LOCK_MIX
&&
4418 lock
->get_scatter_wanted()) {
4419 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4420 << " on " << *lock
->get_parent() << dendl
;
4421 scatter_mix(lock
, need_issue
);
4425 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4426 // in general, we want to keep INEST writable at all times.
4427 if (!lock
->is_rdlocked()) {
4428 if (lock
->get_parent()->is_replicated()) {
4429 if (lock
->get_state() != LOCK_MIX
)
4430 scatter_mix(lock
, need_issue
);
4432 if (lock
->get_state() != LOCK_LOCK
)
4433 simple_lock(lock
, need_issue
);
4439 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4440 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4441 // i _should_ be sync.
4442 if (!lock
->is_wrlocked() &&
4443 lock
->get_state() != LOCK_SYNC
) {
4444 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4445 simple_sync(lock
, need_issue
);
4452 * mark a scatterlock to indicate that the dir fnode has some dirty data
4454 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4457 if (lock
->get_updated_item()->is_on_list()) {
4458 dout(10) << "mark_updated_scatterlock " << *lock
4459 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4461 updated_scatterlocks
.push_back(lock
->get_updated_item());
4462 utime_t now
= ceph_clock_now();
4463 lock
->set_update_stamp(now
);
4464 dout(10) << "mark_updated_scatterlock " << *lock
4465 << " - added at " << now
<< dendl
;
4470 * this is called by scatter_tick and LogSegment::try_to_trim() when
4471 * trying to flush dirty scattered data (i.e. updated fnode) back to
4474 * we need to lock|scatter in order to push fnode changes into the
4477 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4479 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4481 if (p
->is_frozen() || p
->is_freezing()) {
4482 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4484 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4486 // just requeue. not ideal.. starvation prone..
4487 updated_scatterlocks
.push_back(lock
->get_updated_item());
4491 if (p
->is_ambiguous_auth()) {
4492 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4494 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4496 // just requeue. not ideal.. starvation prone..
4497 updated_scatterlocks
.push_back(lock
->get_updated_item());
4504 if (lock
->is_stable()) {
4505 // can we do it now?
4506 // (only if we're not replicated.. if we are, we really do need
4507 // to nudge the lock state!)
4509 actually, even if we're not replicated, we can't stay in MIX, because another mds
4510 could discover and replicate us at any time. if that happens while we're flushing,
4511 they end up in MIX but their inode has the old scatterstat version.
4513 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4514 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4515 scatter_writebehind(lock);
4517 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4522 if (mdcache
->is_readonly()) {
4523 if (lock
->get_state() != LOCK_SYNC
) {
4524 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4525 simple_sync(static_cast<ScatterLock
*>(lock
));
4530 // adjust lock state
4531 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4532 switch (lock
->get_type()) {
4533 case CEPH_LOCK_IFILE
:
4534 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4535 scatter_mix(static_cast<ScatterLock
*>(lock
));
4536 else if (lock
->get_state() != LOCK_LOCK
)
4537 simple_lock(static_cast<ScatterLock
*>(lock
));
4539 simple_sync(static_cast<ScatterLock
*>(lock
));
4542 case CEPH_LOCK_IDFT
:
4543 case CEPH_LOCK_INEST
:
4544 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4546 else if (lock
->get_state() != LOCK_LOCK
)
4555 if (lock
->is_stable() && count
== 2) {
4556 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4557 // this should only realy happen when called via
4558 // handle_file_lock due to AC_NUDGE, because the rest of the
4559 // time we are replicated or have dirty data and won't get
4560 // called. bailing here avoids an infinite loop.
4565 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4567 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4572 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4573 << *lock
<< " on " << *p
<< dendl
;
4574 // request unscatter?
4575 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4576 if (!mds
->is_cluster_degraded() ||
4577 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4578 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4582 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4584 // also, requeue, in case we had wrong auth or something
4585 updated_scatterlocks
.push_back(lock
->get_updated_item());
4589 void Locker::scatter_tick()
4591 dout(10) << "scatter_tick" << dendl
;
4594 utime_t now
= ceph_clock_now();
4595 int n
= updated_scatterlocks
.size();
4596 while (!updated_scatterlocks
.empty()) {
4597 ScatterLock
*lock
= updated_scatterlocks
.front();
4599 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4601 if (!lock
->is_dirty()) {
4602 updated_scatterlocks
.pop_front();
4603 dout(10) << " removing from updated_scatterlocks "
4604 << *lock
<< " " << *lock
->get_parent() << dendl
;
4607 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4609 updated_scatterlocks
.pop_front();
4610 scatter_nudge(lock
, 0);
4612 mds
->mdlog
->flush();
4616 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4618 dout(10) << "scatter_tempsync " << *lock
4619 << " on " << *lock
->get_parent() << dendl
;
4620 assert(lock
->get_parent()->is_auth());
4621 assert(lock
->is_stable());
4623 assert(0 == "not fully implemented, at least not for filelock");
4625 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4627 switch (lock
->get_state()) {
4628 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4629 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4630 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4631 default: ceph_abort();
4635 if (lock
->is_wrlocked())
4638 if (lock
->get_cap_shift() &&
4640 in
->issued_caps_need_gather(lock
)) {
4648 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4649 in
->is_replicated()) {
4650 lock
->init_gather();
4651 send_lock_message(lock
, LOCK_AC_LOCK
);
4659 lock
->set_state(LOCK_TSYN
);
4660 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4661 if (lock
->get_cap_shift()) {
4672 // ==========================================================================
4675 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4677 dout(7) << "local_wrlock_grab on " << *lock
4678 << " on " << *lock
->get_parent() << dendl
;
4680 assert(lock
->get_parent()->is_auth());
4681 assert(lock
->can_wrlock());
4682 assert(!mut
->wrlocks
.count(lock
));
4683 lock
->get_wrlock(mut
->get_client());
4684 mut
->wrlocks
.insert(lock
);
4685 mut
->locks
.insert(lock
);
4688 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4690 dout(7) << "local_wrlock_start on " << *lock
4691 << " on " << *lock
->get_parent() << dendl
;
4693 assert(lock
->get_parent()->is_auth());
4694 if (lock
->can_wrlock()) {
4695 assert(!mut
->wrlocks
.count(lock
));
4696 lock
->get_wrlock(mut
->get_client());
4697 mut
->wrlocks
.insert(lock
);
4698 mut
->locks
.insert(lock
);
4701 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4706 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4708 dout(7) << "local_wrlock_finish on " << *lock
4709 << " on " << *lock
->get_parent() << dendl
;
4711 mut
->wrlocks
.erase(lock
);
4712 mut
->locks
.erase(lock
);
4713 if (lock
->get_num_wrlocks() == 0) {
4714 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4715 SimpleLock::WAIT_WR
|
4716 SimpleLock::WAIT_RD
);
4720 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4722 dout(7) << "local_xlock_start on " << *lock
4723 << " on " << *lock
->get_parent() << dendl
;
4725 assert(lock
->get_parent()->is_auth());
4726 if (!lock
->can_xlock_local()) {
4727 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4731 lock
->get_xlock(mut
, mut
->get_client());
4732 mut
->xlocks
.insert(lock
);
4733 mut
->locks
.insert(lock
);
4737 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4739 dout(7) << "local_xlock_finish on " << *lock
4740 << " on " << *lock
->get_parent() << dendl
;
4742 mut
->xlocks
.erase(lock
);
4743 mut
->locks
.erase(lock
);
4745 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4746 SimpleLock::WAIT_WR
|
4747 SimpleLock::WAIT_RD
);
4752 // ==========================================================================
4756 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4758 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4759 int loner_wanted
, other_wanted
;
4760 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4761 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4762 << " loner_wanted=" << gcap_string(loner_wanted
)
4763 << " other_wanted=" << gcap_string(other_wanted
)
4764 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4767 assert(lock
->get_parent()->is_auth());
4768 assert(lock
->is_stable());
4770 if (lock
->get_parent()->is_freezing_or_frozen())
4773 if (mdcache
->is_readonly()) {
4774 if (lock
->get_state() != LOCK_SYNC
) {
4775 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4776 simple_sync(lock
, need_issue
);
4782 if (lock
->get_state() == LOCK_EXCL
) {
4783 dout(20) << " is excl" << dendl
;
4784 int loner_issued
, other_issued
, xlocker_issued
;
4785 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4786 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4787 << " other_issued=" << gcap_string(other_issued
)
4788 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4790 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4791 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4792 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4793 dout(20) << " should lose it" << dendl
;
4794 // we should lose it.
4805 // -> any writer means MIX; RD doesn't matter.
4806 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4807 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4808 scatter_mix(lock
, need_issue
);
4809 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4810 simple_sync(lock
, need_issue
);
4812 dout(10) << " waiting for wrlock to drain" << dendl
;
4817 else if (lock
->get_state() != LOCK_EXCL
&&
4818 !lock
->is_rdlocked() &&
4819 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4820 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4821 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4822 in
->get_target_loner() >= 0) {
4823 dout(7) << "file_eval stable, bump to loner " << *lock
4824 << " on " << *lock
->get_parent() << dendl
;
4825 file_excl(lock
, need_issue
);
4829 else if (lock
->get_state() != LOCK_MIX
&&
4830 !lock
->is_rdlocked() &&
4831 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4832 (lock
->get_scatter_wanted() ||
4833 (in
->get_wanted_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4834 dout(7) << "file_eval stable, bump to mixed " << *lock
4835 << " on " << *lock
->get_parent() << dendl
;
4836 scatter_mix(lock
, need_issue
);
4840 else if (lock
->get_state() != LOCK_SYNC
&&
4841 !lock
->is_wrlocked() && // drain wrlocks first!
4842 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4843 !(wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) &&
4844 !((lock
->get_state() == LOCK_MIX
) &&
4845 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4846 //((wanted & CEPH_CAP_RD) ||
4847 //in->is_replicated() ||
4848 //lock->get_num_client_lease() ||
4849 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4851 dout(7) << "file_eval stable, bump to sync " << *lock
4852 << " on " << *lock
->get_parent() << dendl
;
4853 simple_sync(lock
, need_issue
);
4859 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4861 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4863 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4864 assert(in
->is_auth());
4865 assert(lock
->is_stable());
4867 if (lock
->get_state() == LOCK_LOCK
) {
4868 in
->start_scatter(lock
);
4869 if (in
->is_replicated()) {
4871 bufferlist softdata
;
4872 lock
->encode_locked_state(softdata
);
4874 // bcast to replicas
4875 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4879 lock
->set_state(LOCK_MIX
);
4880 lock
->clear_scatter_wanted();
4881 if (lock
->get_cap_shift()) {
4889 switch (lock
->get_state()) {
4890 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4892 file_excl(lock
, need_issue
);
4893 if (lock
->get_state() != LOCK_EXCL
)
4896 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4897 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4898 default: ceph_abort();
4902 if (lock
->is_rdlocked())
4904 if (in
->is_replicated()) {
4905 if (lock
->get_state() != LOCK_EXCL_MIX
&& // EXCL replica is already LOCK
4906 lock
->get_state() != LOCK_XSYN_EXCL
) { // XSYN replica is already LOCK; ** FIXME here too!
4907 send_lock_message(lock
, LOCK_AC_MIX
);
4908 lock
->init_gather();
4912 if (lock
->is_leased()) {
4913 revoke_client_leases(lock
);
4916 if (lock
->get_cap_shift() &&
4918 in
->issued_caps_need_gather(lock
)) {
4925 bool need_recover
= false;
4926 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4927 mds
->mdcache
->queue_file_recover(in
);
4928 need_recover
= true;
4933 lock
->get_parent()->auth_pin(lock
);
4935 mds
->mdcache
->do_file_recover();
4937 in
->start_scatter(lock
);
4938 lock
->set_state(LOCK_MIX
);
4939 lock
->clear_scatter_wanted();
4940 if (in
->is_replicated()) {
4941 bufferlist softdata
;
4942 lock
->encode_locked_state(softdata
);
4943 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4945 if (lock
->get_cap_shift()) {
4956 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4958 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4959 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4961 assert(in
->is_auth());
4962 assert(lock
->is_stable());
4964 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4965 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4967 switch (lock
->get_state()) {
4968 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4969 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4970 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4971 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4972 default: ceph_abort();
4976 if (lock
->is_rdlocked())
4978 if (lock
->is_wrlocked())
4981 if (in
->is_replicated() &&
4982 lock
->get_state() != LOCK_LOCK_EXCL
&&
4983 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
4984 send_lock_message(lock
, LOCK_AC_LOCK
);
4985 lock
->init_gather();
4988 if (lock
->is_leased()) {
4989 revoke_client_leases(lock
);
4992 if (in
->is_head() &&
4993 in
->issued_caps_need_gather(lock
)) {
5000 bool need_recover
= false;
5001 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5002 mds
->mdcache
->queue_file_recover(in
);
5003 need_recover
= true;
5008 lock
->get_parent()->auth_pin(lock
);
5010 mds
->mdcache
->do_file_recover();
5012 lock
->set_state(LOCK_EXCL
);
5020 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5022 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5023 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5024 assert(in
->is_auth());
5025 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5027 switch (lock
->get_state()) {
5028 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5029 default: ceph_abort();
5033 if (lock
->is_wrlocked())
5036 if (in
->is_head() &&
5037 in
->issued_caps_need_gather(lock
)) {
5046 lock
->get_parent()->auth_pin(lock
);
5048 lock
->set_state(LOCK_XSYN
);
5049 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5057 void Locker::file_recover(ScatterLock
*lock
)
5059 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5060 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5062 assert(in
->is_auth());
5063 //assert(lock->is_stable());
5064 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5069 if (in->is_replicated()
5070 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5071 send_lock_message(lock, LOCK_AC_LOCK);
5072 lock->init_gather();
5076 if (in
->is_head() &&
5077 in
->issued_caps_need_gather(lock
)) {
5082 lock
->set_state(LOCK_SCAN
);
5084 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5086 mds
->mdcache
->queue_file_recover(in
);
5091 /* This function DOES put the passed message before returning */
5092 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5094 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5095 int from
= m
->get_asker();
5097 if (mds
->is_rejoin()) {
5098 if (in
->is_rejoining()) {
5099 dout(7) << "handle_file_lock still rejoining " << *in
5100 << ", dropping " << *m
<< dendl
;
5106 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5108 << " from mds." << from
<< " "
5111 bool caps
= lock
->get_cap_shift();
5113 switch (m
->get_action()) {
5116 assert(lock
->get_state() == LOCK_LOCK
||
5117 lock
->get_state() == LOCK_MIX
||
5118 lock
->get_state() == LOCK_MIX_SYNC2
);
5120 if (lock
->get_state() == LOCK_MIX
) {
5121 lock
->set_state(LOCK_MIX_SYNC
);
5122 eval_gather(lock
, true);
5123 if (lock
->is_unstable_and_locked())
5124 mds
->mdlog
->flush();
5128 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5129 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5132 lock
->decode_locked_state(m
->get_data());
5133 lock
->set_state(LOCK_SYNC
);
5138 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5143 switch (lock
->get_state()) {
5144 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5145 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5146 default: ceph_abort();
5149 eval_gather(lock
, true);
5150 if (lock
->is_unstable_and_locked())
5151 mds
->mdlog
->flush();
5155 case LOCK_AC_LOCKFLUSHED
:
5156 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5157 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5158 // wake up scatter_nudge waiters
5159 if (lock
->is_stable())
5160 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5164 assert(lock
->get_state() == LOCK_SYNC
||
5165 lock
->get_state() == LOCK_LOCK
||
5166 lock
->get_state() == LOCK_SYNC_MIX2
);
5168 if (lock
->get_state() == LOCK_SYNC
) {
5170 lock
->set_state(LOCK_SYNC_MIX
);
5171 eval_gather(lock
, true);
5172 if (lock
->is_unstable_and_locked())
5173 mds
->mdlog
->flush();
5178 lock
->set_state(LOCK_MIX
);
5179 lock
->decode_locked_state(m
->get_data());
5184 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5189 case LOCK_AC_LOCKACK
:
5190 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5191 lock
->get_state() == LOCK_MIX_LOCK
||
5192 lock
->get_state() == LOCK_MIX_LOCK2
||
5193 lock
->get_state() == LOCK_MIX_EXCL
||
5194 lock
->get_state() == LOCK_SYNC_EXCL
||
5195 lock
->get_state() == LOCK_SYNC_MIX
||
5196 lock
->get_state() == LOCK_MIX_TSYN
);
5197 assert(lock
->is_gathering(from
));
5198 lock
->remove_gather(from
);
5200 if (lock
->get_state() == LOCK_MIX_LOCK
||
5201 lock
->get_state() == LOCK_MIX_LOCK2
||
5202 lock
->get_state() == LOCK_MIX_EXCL
||
5203 lock
->get_state() == LOCK_MIX_TSYN
) {
5204 lock
->decode_locked_state(m
->get_data());
5205 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5206 // delay calling scatter_writebehind().
5207 lock
->clear_flushed();
5210 if (lock
->is_gathering()) {
5211 dout(7) << "handle_file_lock " << *in
<< " from " << from
5212 << ", still gathering " << lock
->get_gather_set() << dendl
;
5214 dout(7) << "handle_file_lock " << *in
<< " from " << from
5215 << ", last one" << dendl
;
5220 case LOCK_AC_SYNCACK
:
5221 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5222 assert(lock
->is_gathering(from
));
5223 lock
->remove_gather(from
);
5225 lock
->decode_locked_state(m
->get_data());
5227 if (lock
->is_gathering()) {
5228 dout(7) << "handle_file_lock " << *in
<< " from " << from
5229 << ", still gathering " << lock
->get_gather_set() << dendl
;
5231 dout(7) << "handle_file_lock " << *in
<< " from " << from
5232 << ", last one" << dendl
;
5237 case LOCK_AC_MIXACK
:
5238 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5239 assert(lock
->is_gathering(from
));
5240 lock
->remove_gather(from
);
5242 if (lock
->is_gathering()) {
5243 dout(7) << "handle_file_lock " << *in
<< " from " << from
5244 << ", still gathering " << lock
->get_gather_set() << dendl
;
5246 dout(7) << "handle_file_lock " << *in
<< " from " << from
5247 << ", last one" << dendl
;
5254 case LOCK_AC_REQSCATTER
:
5255 if (lock
->is_stable()) {
5256 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5257 * because the replica should be holding an auth_pin if they're
5258 * doing this (and thus, we are freezing, not frozen, and indefinite
5259 * starvation isn't an issue).
5261 dout(7) << "handle_file_lock got scatter request on " << *lock
5262 << " on " << *lock
->get_parent() << dendl
;
5263 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5266 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5267 << " on " << *lock
->get_parent() << dendl
;
5268 lock
->set_scatter_wanted();
5272 case LOCK_AC_REQUNSCATTER
:
5273 if (lock
->is_stable()) {
5274 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5275 * because the replica should be holding an auth_pin if they're
5276 * doing this (and thus, we are freezing, not frozen, and indefinite
5277 * starvation isn't an issue).
5279 dout(7) << "handle_file_lock got unscatter request on " << *lock
5280 << " on " << *lock
->get_parent() << dendl
;
5281 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5282 simple_lock(lock
); // FIXME tempsync?
5284 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5285 << " on " << *lock
->get_parent() << dendl
;
5286 lock
->set_unscatter_wanted();
5290 case LOCK_AC_REQRDLOCK
:
5291 handle_reqrdlock(lock
, m
);
5295 if (!lock
->get_parent()->is_auth()) {
5296 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5297 << " on " << *lock
->get_parent() << dendl
;
5298 } else if (!lock
->get_parent()->is_replicated()) {
5299 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5300 << " on " << *lock
->get_parent() << dendl
;
5302 dout(7) << "handle_file_lock trying nudge on " << *lock
5303 << " on " << *lock
->get_parent() << dendl
;
5304 scatter_nudge(lock
, 0, true);
5305 mds
->mdlog
->flush();