1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
20 #include "MDBalancer.h"
26 #include "MDSContext.h"
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
44 #include "messages/MMDSSlaveRequest.h"
48 #include "common/config.h"
51 #define dout_subsys ceph_subsys_mds
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
56 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
60 class LockerContext
: public MDSInternalContextBase
{
63 MDSRank
*get_mds() override
69 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
70 assert(locker
!= NULL
);
74 class LockerLogContext
: public MDSLogContextBase
{
77 MDSRank
*get_mds() override
83 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
84 assert(locker
!= NULL
);
88 /* This function DOES put the passed message before returning */
89 void Locker::dispatch(Message
*m
)
92 switch (m
->get_type()) {
96 handle_lock(static_cast<MLock
*>(m
));
99 case MSG_MDS_INODEFILECAPS
:
100 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
104 case CEPH_MSG_CLIENT_CAPS
:
105 handle_client_caps(static_cast<MClientCaps
*>(m
));
108 case CEPH_MSG_CLIENT_CAPRELEASE
:
109 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
111 case CEPH_MSG_CLIENT_LEASE
:
112 handle_client_lease(static_cast<MClientLease
*>(m
));
116 derr
<< "locker unknown message " << m
->get_type() << dendl
;
117 assert(0 == "locker unknown message");
134 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
136 for (const auto &it
: lock
->get_parent()->get_replicas()) {
137 if (mds
->is_cluster_degraded() &&
138 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
140 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
141 mds
->send_message_mds(m
, it
.first
);
145 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
147 for (const auto &it
: lock
->get_parent()->get_replicas()) {
148 if (mds
->is_cluster_degraded() &&
149 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
151 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
153 mds
->send_message_mds(m
, it
.first
);
160 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
162 // rdlock ancestor snaps
164 rdlocks
.insert(&in
->snaplock
);
165 while (t
->get_projected_parent_dn()) {
166 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
167 rdlocks
.insert(&t
->snaplock
);
171 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
172 file_layout_t
**layout
)
174 //rdlock ancestor snaps
176 rdlocks
.insert(&in
->snaplock
);
177 rdlocks
.insert(&in
->policylock
);
178 bool found_layout
= false;
180 rdlocks
.insert(&t
->snaplock
);
182 rdlocks
.insert(&t
->policylock
);
183 if (t
->get_projected_inode()->has_layout()) {
184 *layout
= &t
->get_projected_inode()->layout
;
188 if (t
->get_projected_parent_dn() &&
189 t
->get_projected_parent_dn()->get_dir())
190 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
195 struct MarkEventOnDestruct
{
199 MarkEventOnDestruct(MDRequestRef
& _mdr
,
200 const char *_message
) : mdr(_mdr
),
203 ~MarkEventOnDestruct() {
205 mdr
->mark_event(message
);
209 /* If this function returns false, the mdr has been placed
210 * on the appropriate wait list */
211 bool Locker::acquire_locks(MDRequestRef
& mdr
,
212 set
<SimpleLock
*> &rdlocks
,
213 set
<SimpleLock
*> &wrlocks
,
214 set
<SimpleLock
*> &xlocks
,
215 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
216 CInode
*auth_pin_freeze
,
217 bool auth_pin_nonblock
)
219 if (mdr
->done_locking
&&
220 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
221 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
222 return true; // at least we had better be!
224 dout(10) << "acquire_locks " << *mdr
<< dendl
;
226 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
228 client_t client
= mdr
->get_client();
230 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
231 set
<MDSCacheObject
*> mustpin
; // items to authpin
234 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
235 SimpleLock
*lock
= *p
;
237 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
238 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
239 mds
->is_cluster_degraded() &&
241 !mdr
->is_queued_for_replay()) {
242 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
243 // get processed in proper order.
245 if (lock
->get_parent()->is_auth()) {
246 if (!mdr
->locks
.count(lock
)) {
248 lock
->get_parent()->list_replicas(ls
);
250 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
257 // if the lock is the latest locked one, it's possible that slave mds got the lock
258 // while there are recovering mds.
259 if (!mdr
->locks
.count(lock
) || lock
== *mdr
->locks
.rbegin())
263 dout(10) << " must xlock " << *lock
<< " " << *lock
->get_parent()
264 << ", waiting for cluster recovered" << dendl
;
265 mds
->locker
->drop_locks(mdr
.get(), NULL
);
266 mdr
->drop_local_auth_pins();
267 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
272 dout(20) << " must xlock " << *lock
<< " " << *lock
->get_parent() << dendl
;
275 mustpin
.insert(lock
->get_parent());
277 // augment xlock with a versionlock?
278 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
279 CDentry
*dn
= (CDentry
*)lock
->get_parent();
283 if (xlocks
.count(&dn
->versionlock
))
284 continue; // we're xlocking the versionlock too; don't wrlock it!
286 if (mdr
->is_master()) {
287 // master. wrlock versionlock so we can pipeline dentry updates to journal.
288 wrlocks
.insert(&dn
->versionlock
);
290 // slave. exclusively lock the dentry version (i.e. block other journal updates).
291 // this makes rollback safe.
292 xlocks
.insert(&dn
->versionlock
);
293 sorted
.insert(&dn
->versionlock
);
296 if (lock
->get_type() > CEPH_LOCK_IVERSION
) {
297 // inode version lock?
298 CInode
*in
= (CInode
*)lock
->get_parent();
301 if (mdr
->is_master()) {
302 // master. wrlock versionlock so we can pipeline inode updates to journal.
303 wrlocks
.insert(&in
->versionlock
);
305 // slave. exclusively lock the inode version (i.e. block other journal updates).
306 // this makes rollback safe.
307 xlocks
.insert(&in
->versionlock
);
308 sorted
.insert(&in
->versionlock
);
314 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
315 MDSCacheObject
*object
= (*p
)->get_parent();
316 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
318 if (object
->is_auth())
319 mustpin
.insert(object
);
320 else if (!object
->is_auth() &&
321 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
322 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
323 dout(15) << " will also auth_pin " << *object
324 << " in case we need to request a scatter" << dendl
;
325 mustpin
.insert(object
);
330 if (remote_wrlocks
) {
331 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
332 MDSCacheObject
*object
= p
->first
->get_parent();
333 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
334 << *p
->first
<< " " << *object
<< dendl
;
335 sorted
.insert(p
->first
);
336 mustpin
.insert(object
);
341 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
344 MDSCacheObject
*object
= (*p
)->get_parent();
345 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
347 if (object
->is_auth())
348 mustpin
.insert(object
);
349 else if (!object
->is_auth() &&
350 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
351 dout(15) << " will also auth_pin " << *object
352 << " in case we need to request a rdlock" << dendl
;
353 mustpin
.insert(object
);
359 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
361 // can i auth pin them all now?
362 marker
.message
= "failed to authpin local pins";
363 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
366 MDSCacheObject
*object
= *p
;
368 dout(10) << " must authpin " << *object
<< dendl
;
370 if (mdr
->is_auth_pinned(object
)) {
371 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
373 if (mdr
->more()->is_remote_frozen_authpin
) {
374 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
376 // unfreeze auth pin for the wrong inode
377 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
381 if (!object
->is_auth()) {
382 if (!mdr
->locks
.empty())
383 drop_locks(mdr
.get());
384 if (object
->is_ambiguous_auth()) {
386 marker
.message
= "waiting for single auth, object is being migrated";
387 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
388 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
389 mdr
->drop_local_auth_pins();
392 mustpin_remote
[object
->authority().first
].insert(object
);
396 if (!object
->can_auth_pin(&err
)) {
398 drop_locks(mdr
.get());
399 mdr
->drop_local_auth_pins();
400 if (auth_pin_nonblock
) {
401 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
405 if (err
== MDSCacheObject::ERR_EXPORTING_TREE
) {
406 marker
.message
= "failed to authpin, subtree is being exported";
407 } else if (err
== MDSCacheObject::ERR_FRAGMENTING_DIR
) {
408 marker
.message
= "failed to authpin, dir is being fragmented";
409 } else if (err
== MDSCacheObject::ERR_EXPORTING_INODE
) {
410 marker
.message
= "failed to authpin, inode is being exported";
412 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
413 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
415 if (!mdr
->remote_auth_pins
.empty())
416 notify_freeze_waiter(object
);
422 // ok, grab local auth pins
423 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
426 MDSCacheObject
*object
= *p
;
427 if (mdr
->is_auth_pinned(object
)) {
428 dout(10) << " already auth_pinned " << *object
<< dendl
;
429 } else if (object
->is_auth()) {
430 dout(10) << " auth_pinning " << *object
<< dendl
;
431 mdr
->auth_pin(object
);
435 // request remote auth_pins
436 if (!mustpin_remote
.empty()) {
437 marker
.message
= "requesting remote authpins";
438 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
439 p
!= mdr
->remote_auth_pins
.end();
441 if (mustpin
.count(p
->first
)) {
442 assert(p
->second
== p
->first
->authority().first
);
443 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
444 if (q
!= mustpin_remote
.end())
445 q
->second
.insert(p
->first
);
448 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
449 p
!= mustpin_remote
.end();
451 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
453 // wait for active auth
454 if (mds
->is_cluster_degraded() &&
455 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
456 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
457 if (mdr
->more()->waiting_on_slave
.empty())
458 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
462 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
463 MMDSSlaveRequest::OP_AUTHPIN
);
464 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
465 q
!= p
->second
.end();
467 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
468 MDSCacheObjectInfo info
;
469 (*q
)->set_object_info(info
);
470 req
->get_authpins().push_back(info
);
471 if (*q
== auth_pin_freeze
)
472 (*q
)->set_object_info(req
->get_authpin_freeze());
475 if (auth_pin_nonblock
)
476 req
->mark_nonblock();
477 mds
->send_message_mds(req
, p
->first
);
479 // put in waiting list
480 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
481 mdr
->more()->waiting_on_slave
.insert(p
->first
);
486 // caps i'll need to issue
487 set
<CInode
*> issue_set
;
491 // make sure they match currently acquired locks.
492 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
493 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
496 bool need_wrlock
= !!wrlocks
.count(*p
);
497 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
500 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
502 SimpleLock
*have
= *existing
;
504 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
505 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
508 if (mdr
->remote_wrlocks
.count(have
)) {
509 if (!need_remote_wrlock
||
510 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
511 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
512 << " " << *have
<< " " << *have
->get_parent() << dendl
;
513 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
516 if (need_wrlock
|| need_remote_wrlock
) {
517 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
518 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
520 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
521 if (need_remote_wrlock
)
522 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
526 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
527 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
532 // hose any stray locks
533 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
534 assert(need_wrlock
|| need_remote_wrlock
);
535 SimpleLock
*lock
= *existing
;
536 if (mdr
->wrlocks
.count(lock
)) {
538 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
539 else if (need_remote_wrlock
) // acquire remote_wrlock first
540 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
541 bool need_issue
= false;
542 wrlock_finish(lock
, mdr
.get(), &need_issue
);
544 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
548 while (existing
!= mdr
->locks
.end()) {
549 SimpleLock
*stray
= *existing
;
551 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
552 bool need_issue
= false;
553 if (mdr
->xlocks
.count(stray
)) {
554 xlock_finish(stray
, mdr
.get(), &need_issue
);
555 } else if (mdr
->rdlocks
.count(stray
)) {
556 rdlock_finish(stray
, mdr
.get(), &need_issue
);
558 // may have acquired both wrlock and remore wrlock
559 if (mdr
->wrlocks
.count(stray
))
560 wrlock_finish(stray
, mdr
.get(), &need_issue
);
561 if (mdr
->remote_wrlocks
.count(stray
))
562 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
565 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
569 if (mdr
->locking
&& *p
!= mdr
->locking
) {
570 cancel_locking(mdr
.get(), &issue_set
);
572 if (xlocks
.count(*p
)) {
573 marker
.message
= "failed to xlock, waiting";
574 if (!xlock_start(*p
, mdr
))
576 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
577 } else if (need_wrlock
|| need_remote_wrlock
) {
578 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
579 marker
.message
= "waiting for remote wrlocks";
580 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
583 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
584 marker
.message
= "failed to wrlock, waiting";
585 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
586 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
587 // can't take the wrlock because the scatter lock is gathering. need to
588 // release the remote wrlock, so that the gathering process can finish.
589 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
590 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
593 // nowait if we have already gotten remote wrlock
594 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
596 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
599 assert(mdr
->is_master());
600 if ((*p
)->needs_recover()) {
601 if (mds
->is_cluster_degraded()) {
602 if (!mdr
->is_queued_for_replay()) {
603 // see comments in SimpleLock::set_state_rejoin() and
604 // ScatterLock::encode_state_for_rejoin()
605 drop_locks(mdr
.get());
606 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
607 dout(10) << " rejoin recovering " << **p
<< " " << *(*p
)->get_parent()
608 << ", waiting for cluster recovered" << dendl
;
609 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
613 (*p
)->clear_need_recover();
617 marker
.message
= "failed to rdlock, waiting";
618 if (!rdlock_start(*p
, mdr
))
620 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
624 // any extra unneeded locks?
625 while (existing
!= mdr
->locks
.end()) {
626 SimpleLock
*stray
= *existing
;
628 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
629 bool need_issue
= false;
630 if (mdr
->xlocks
.count(stray
)) {
631 xlock_finish(stray
, mdr
.get(), &need_issue
);
632 } else if (mdr
->rdlocks
.count(stray
)) {
633 rdlock_finish(stray
, mdr
.get(), &need_issue
);
635 // may have acquired both wrlock and remore wrlock
636 if (mdr
->wrlocks
.count(stray
))
637 wrlock_finish(stray
, mdr
.get(), &need_issue
);
638 if (mdr
->remote_wrlocks
.count(stray
))
639 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
642 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
645 mdr
->done_locking
= true;
646 mdr
->set_mds_stamp(ceph_clock_now());
648 marker
.message
= "acquired locks";
651 issue_caps_set(issue_set
);
655 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
658 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
660 dir
= in
->get_parent_dir();
661 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
664 dir
= dynamic_cast<CDir
*>(o
);
668 if (dir
->is_freezing_dir())
669 mdcache
->fragment_freeze_inc_num_waiters(dir
);
670 if (dir
->is_freezing_tree()) {
671 while (!dir
->is_freezing_tree_root())
672 dir
= dir
->get_parent_dir();
673 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
678 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
680 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
681 p
!= mut
->xlocks
.end();
683 MDSCacheObject
*object
= (*p
)->get_parent();
684 assert(object
->is_auth());
686 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
688 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
689 (*p
)->set_xlock_done();
693 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
695 while (!mut
->rdlocks
.empty()) {
697 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
698 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
700 pneed_issue
->insert(static_cast<CInode
*>(p
));
704 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
706 set
<mds_rank_t
> slaves
;
708 while (!mut
->xlocks
.empty()) {
709 SimpleLock
*lock
= *mut
->xlocks
.begin();
710 MDSCacheObject
*p
= lock
->get_parent();
712 assert(lock
->get_sm()->can_remote_xlock
);
713 slaves
.insert(p
->authority().first
);
715 mut
->locks
.erase(lock
);
716 mut
->xlocks
.erase(lock
);
720 xlock_finish(lock
, mut
, &ni
);
722 pneed_issue
->insert(static_cast<CInode
*>(p
));
725 while (!mut
->remote_wrlocks
.empty()) {
726 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
727 slaves
.insert(p
->second
);
728 if (mut
->wrlocks
.count(p
->first
) == 0)
729 mut
->locks
.erase(p
->first
);
730 mut
->remote_wrlocks
.erase(p
);
733 while (!mut
->wrlocks
.empty()) {
735 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
736 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
738 pneed_issue
->insert(static_cast<CInode
*>(p
));
741 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
742 if (!mds
->is_cluster_degraded() ||
743 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
744 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
745 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
746 MMDSSlaveRequest::OP_DROPLOCKS
);
747 mds
->send_message_mds(slavereq
, *p
);
752 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
754 SimpleLock
*lock
= mut
->locking
;
756 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
758 if (lock
->get_parent()->is_auth()) {
759 bool need_issue
= false;
760 if (lock
->get_state() == LOCK_PREXLOCK
) {
761 _finish_xlock(lock
, -1, &need_issue
);
762 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
) {
763 lock
->set_state(LOCK_XLOCKDONE
);
764 eval_gather(lock
, true, &need_issue
);
767 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
769 mut
->finish_locking(lock
);
772 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
775 set
<CInode
*> my_need_issue
;
777 pneed_issue
= &my_need_issue
;
780 cancel_locking(mut
, pneed_issue
);
781 _drop_non_rdlocks(mut
, pneed_issue
);
782 _drop_rdlocks(mut
, pneed_issue
);
784 if (pneed_issue
== &my_need_issue
)
785 issue_caps_set(*pneed_issue
);
786 mut
->done_locking
= false;
789 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
791 set
<CInode
*> my_need_issue
;
793 pneed_issue
= &my_need_issue
;
795 _drop_non_rdlocks(mut
, pneed_issue
);
797 if (pneed_issue
== &my_need_issue
)
798 issue_caps_set(*pneed_issue
);
801 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
803 set
<CInode
*> need_issue
;
805 for (auto p
= mut
->rdlocks
.begin(); p
!= mut
->rdlocks
.end(); ) {
806 SimpleLock
*lock
= *p
;
808 // make later mksnap/setlayout (at other mds) wait for this unsafe request
809 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
810 lock
->get_type() == CEPH_LOCK_IPOLICY
)
813 rdlock_finish(lock
, mut
, &ni
);
815 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
818 issue_caps_set(need_issue
);
821 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl
*mut
)
823 set
<CInode
*> need_issue
;
825 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
826 SimpleLock
*lock
= *it
;
828 if (lock
->get_type() == CEPH_LOCK_IDFT
) {
832 wrlock_finish(lock
, mut
, &ni
);
834 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
836 issue_caps_set(need_issue
);
841 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
843 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
844 assert(!lock
->is_stable());
846 int next
= lock
->get_next_state();
849 bool caps
= lock
->get_cap_shift();
850 if (lock
->get_type() != CEPH_LOCK_DN
)
851 in
= static_cast<CInode
*>(lock
->get_parent());
853 bool need_issue
= false;
855 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
856 assert(!caps
|| in
!= NULL
);
857 if (caps
&& in
->is_head()) {
858 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
859 lock
->get_cap_shift(), lock
->get_cap_mask());
860 dout(10) << " next state is " << lock
->get_state_name(next
)
861 << " issued/allows loner " << gcap_string(loner_issued
)
862 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
863 << " xlocker " << gcap_string(xlocker_issued
)
864 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
865 << " other " << gcap_string(other_issued
)
866 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
869 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
870 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
871 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
875 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
876 bool auth
= lock
->get_parent()->is_auth();
877 if (!lock
->is_gathering() &&
878 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
879 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
880 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
881 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
882 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
883 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
884 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
885 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
886 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
887 lock
->get_state() != LOCK_MIX_SYNC2
889 dout(7) << "eval_gather finished gather on " << *lock
890 << " on " << *lock
->get_parent() << dendl
;
892 if (lock
->get_sm() == &sm_filelock
) {
894 if (in
->state_test(CInode::STATE_RECOVERING
)) {
895 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
897 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
898 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
899 mds
->mdcache
->queue_file_recover(in
);
900 mds
->mdcache
->do_file_recover();
905 if (!lock
->get_parent()->is_auth()) {
906 // replica: tell auth
907 mds_rank_t auth
= lock
->get_parent()->authority().first
;
909 if (lock
->get_parent()->is_rejoining() &&
910 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
911 dout(7) << "eval_gather finished gather, but still rejoining "
912 << *lock
->get_parent() << dendl
;
916 if (!mds
->is_cluster_degraded() ||
917 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
918 switch (lock
->get_state()) {
920 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
926 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
927 lock
->encode_locked_state(reply
->get_data());
928 mds
->send_message_mds(reply
, auth
);
929 next
= LOCK_MIX_SYNC2
;
930 (static_cast<ScatterLock
*>(lock
))->start_flush();
935 (static_cast<ScatterLock
*>(lock
))->finish_flush();
936 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
939 // do nothing, we already acked
944 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
945 mds
->send_message_mds(reply
, auth
);
946 next
= LOCK_SYNC_MIX2
;
953 lock
->encode_locked_state(data
);
954 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
955 (static_cast<ScatterLock
*>(lock
))->start_flush();
956 // we'll get an AC_LOCKFLUSHED to complete
967 // once the first (local) stage of mix->lock gather complete we can
968 // gather from replicas
969 if (lock
->get_state() == LOCK_MIX_LOCK
&&
970 lock
->get_parent()->is_replicated()) {
971 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
972 send_lock_message(lock
, LOCK_AC_LOCK
);
974 lock
->set_state(LOCK_MIX_LOCK2
);
978 if (lock
->is_dirty() && !lock
->is_flushed()) {
979 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
983 lock
->clear_flushed();
985 switch (lock
->get_state()) {
991 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
992 if (lock
->get_parent()->is_replicated()) {
994 lock
->encode_locked_state(softdata
);
995 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
997 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
1001 case LOCK_XLOCKDONE
:
1002 if (next
!= LOCK_SYNC
)
1007 case LOCK_EXCL_SYNC
:
1008 case LOCK_LOCK_SYNC
:
1010 case LOCK_XSYN_SYNC
:
1011 if (lock
->get_parent()->is_replicated()) {
1012 bufferlist softdata
;
1013 lock
->encode_locked_state(softdata
);
1014 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
1021 lock
->set_state(next
);
1023 if (lock
->get_parent()->is_auth() &&
1025 lock
->get_parent()->auth_unpin(lock
);
1027 // drop loner before doing waiters
1031 in
->get_wanted_loner() != in
->get_loner()) {
1032 dout(10) << " trying to drop loner" << dendl
;
1033 if (in
->try_drop_loner()) {
1034 dout(10) << " dropped loner" << dendl
;
1040 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1043 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1045 if (caps
&& in
->is_head())
1048 if (lock
->get_parent()->is_auth() &&
1050 try_eval(lock
, &need_issue
);
1055 *pneed_issue
= true;
1056 else if (in
->is_head())
1062 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1064 bool need_issue
= caps_imported
;
1065 list
<MDSInternalContextBase
*> finishers
;
1067 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1070 if (in
->is_auth() && in
->is_head()) {
1071 client_t orig_loner
= in
->get_loner();
1072 if (in
->choose_ideal_loner()) {
1073 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1076 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1077 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1083 if (mask
& CEPH_LOCK_IFILE
)
1084 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1085 if (mask
& CEPH_LOCK_IAUTH
)
1086 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1087 if (mask
& CEPH_LOCK_ILINK
)
1088 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1089 if (mask
& CEPH_LOCK_IXATTR
)
1090 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1091 if (mask
& CEPH_LOCK_INEST
)
1092 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1093 if (mask
& CEPH_LOCK_IFLOCK
)
1094 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1095 if (mask
& CEPH_LOCK_IPOLICY
)
1096 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1099 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1100 if (in
->try_drop_loner()) {
1102 if (in
->get_wanted_loner() >= 0) {
1103 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1104 bool ok
= in
->try_set_loner();
1112 finish_contexts(g_ceph_context
, finishers
);
1114 if (need_issue
&& in
->is_head())
1117 dout(10) << "eval done" << dendl
;
1121 class C_Locker_Eval
: public LockerContext
{
1125 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1126 // We are used as an MDSCacheObject waiter, so should
1127 // only be invoked by someone already holding the big lock.
1128 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1129 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1131 void finish(int r
) override
{
1132 locker
->try_eval(p
, mask
);
1133 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1137 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1139 // unstable and ambiguous auth?
1140 if (p
->is_ambiguous_auth()) {
1141 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1142 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1146 if (p
->is_auth() && p
->is_frozen()) {
1147 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1148 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1152 if (mask
& CEPH_LOCK_DN
) {
1153 assert(mask
== CEPH_LOCK_DN
);
1154 bool need_issue
= false; // ignore this, no caps on dentries
1155 CDentry
*dn
= static_cast<CDentry
*>(p
);
1156 eval_any(&dn
->lock
, &need_issue
);
1158 CInode
*in
= static_cast<CInode
*>(p
);
1163 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1165 MDSCacheObject
*p
= lock
->get_parent();
1167 // unstable and ambiguous auth?
1168 if (p
->is_ambiguous_auth()) {
1169 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1170 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1174 if (!p
->is_auth()) {
1175 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1179 if (p
->is_frozen()) {
1180 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1181 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1186 * We could have a situation like:
1188 * - mds A authpins item on mds B
1189 * - mds B starts to freeze tree containing item
1190 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1191 * - mds B lock is unstable, sets scatter_wanted
1192 * - mds B lock stabilizes, calls try_eval.
1194 * We can defer while freezing without causing a deadlock. Honor
1195 * scatter_wanted flag here. This will never get deferred by the
1196 * checks above due to the auth_pin held by the master.
1198 if (lock
->is_scatterlock()) {
1199 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1200 if (slock
->get_scatter_wanted() &&
1201 slock
->get_state() != LOCK_MIX
) {
1202 scatter_mix(slock
, pneed_issue
);
1203 if (!lock
->is_stable())
1205 } else if (slock
->get_unscatter_wanted() &&
1206 slock
->get_state() != LOCK_LOCK
) {
1207 simple_lock(slock
, pneed_issue
);
1208 if (!lock
->is_stable()) {
1214 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1215 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1216 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1220 eval(lock
, pneed_issue
);
1223 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1225 bool need_issue
= false;
1226 list
<MDSInternalContextBase
*> finishers
;
1229 if (!in
->filelock
.is_stable())
1230 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1231 if (!in
->authlock
.is_stable())
1232 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1233 if (!in
->linklock
.is_stable())
1234 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1235 if (!in
->xattrlock
.is_stable())
1236 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1238 if (need_issue
&& in
->is_head()) {
1240 issue_set
->insert(in
);
1245 finish_contexts(g_ceph_context
, finishers
);
1248 void Locker::eval_scatter_gathers(CInode
*in
)
1250 bool need_issue
= false;
1251 list
<MDSInternalContextBase
*> finishers
;
1253 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1256 if (!in
->filelock
.is_stable())
1257 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1258 if (!in
->nestlock
.is_stable())
1259 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1260 if (!in
->dirfragtreelock
.is_stable())
1261 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1263 if (need_issue
&& in
->is_head())
1266 finish_contexts(g_ceph_context
, finishers
);
1269 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1271 switch (lock
->get_type()) {
1272 case CEPH_LOCK_IFILE
:
1273 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1274 case CEPH_LOCK_IDFT
:
1275 case CEPH_LOCK_INEST
:
1276 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1278 return simple_eval(lock
, need_issue
);
1283 // ------------------
1286 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1289 if (lock
->is_stable()) {
1290 if (lock
->get_parent()->is_auth()) {
1291 if (lock
->get_sm() == &sm_scatterlock
) {
1292 // not until tempsync is fully implemented
1293 //if (lock->get_parent()->is_replicated())
1294 //scatter_tempsync((ScatterLock*)lock);
1297 } else if (lock
->get_sm() == &sm_filelock
) {
1298 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1299 if (lock
->get_state() == LOCK_EXCL
&&
1300 in
->get_target_loner() >= 0 &&
1301 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1309 // request rdlock state change from auth
1310 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1311 if (!mds
->is_cluster_degraded() ||
1312 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1313 dout(10) << "requesting rdlock from auth on "
1314 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1315 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1320 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1321 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1322 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1323 mds
->mdcache
->recovery_queue
.prioritize(in
);
1330 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1332 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1334 // can read? grab ref.
1335 if (lock
->can_rdlock(client
))
1338 _rdlock_kick(lock
, false);
1340 if (lock
->can_rdlock(client
))
1345 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1346 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1351 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1353 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1355 // client may be allowed to rdlock the same item it has xlocked.
1356 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1357 if (mut
->snapid
!= CEPH_NOSNAP
)
1359 client_t client
= as_anon
? -1 : mut
->get_client();
1362 if (lock
->get_type() != CEPH_LOCK_DN
)
1363 in
= static_cast<CInode
*>(lock
->get_parent());
1366 if (!lock->get_parent()->is_auth() &&
1367 lock->fw_rdlock_to_auth()) {
1368 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1374 // can read? grab ref.
1375 if (lock
->can_rdlock(client
)) {
1377 mut
->rdlocks
.insert(lock
);
1378 mut
->locks
.insert(lock
);
1382 // hmm, wait a second.
1383 if (in
&& !in
->is_head() && in
->is_auth() &&
1384 lock
->get_state() == LOCK_SNAP_SYNC
) {
1385 // okay, we actually need to kick the head's lock to get ourselves synced up.
1386 CInode
*head
= mdcache
->get_inode(in
->ino());
1388 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1389 if (hlock
->get_state() == LOCK_SYNC
)
1390 hlock
= head
->get_lock(lock
->get_type());
1392 if (hlock
->get_state() != LOCK_SYNC
) {
1393 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1394 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1396 // oh, check our lock again then
1400 if (!_rdlock_kick(lock
, as_anon
))
1406 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1407 wait_on
= SimpleLock::WAIT_RD
;
1409 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1410 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1411 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1416 void Locker::nudge_log(SimpleLock
*lock
)
1418 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1419 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1420 mds
->mdlog
->flush();
1423 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1428 mut
->rdlocks
.erase(lock
);
1429 mut
->locks
.erase(lock
);
1432 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1435 if (!lock
->is_rdlocked()) {
1436 if (!lock
->is_stable())
1437 eval_gather(lock
, false, pneed_issue
);
1438 else if (lock
->get_parent()->is_auth())
1439 try_eval(lock
, pneed_issue
);
1444 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1446 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1447 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1448 if (!(*p
)->can_rdlock(-1)) {
1449 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1455 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1457 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1458 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1459 if (!rdlock_try(*p
, -1, NULL
)) {
1460 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1466 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1468 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1469 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1471 mut
->rdlocks
.insert(*p
);
1472 mut
->locks
.insert(*p
);
1476 // ------------------
1479 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1481 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1482 lock
->get_type() == CEPH_LOCK_DVERSION
)
1483 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1485 dout(7) << "wrlock_force on " << *lock
1486 << " on " << *lock
->get_parent() << dendl
;
1487 lock
->get_wrlock(true);
1488 mut
->wrlocks
.insert(lock
);
1489 mut
->locks
.insert(lock
);
1492 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1494 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1495 lock
->get_type() == CEPH_LOCK_DVERSION
)
1496 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1498 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1500 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1501 client_t client
= mut
->get_client();
1502 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1503 (in
->has_subtree_or_exporting_dirfrag() ||
1504 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1508 if (lock
->can_wrlock(client
) &&
1509 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1511 mut
->wrlocks
.insert(lock
);
1512 mut
->locks
.insert(lock
);
1516 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1517 in
->state_test(CInode::STATE_RECOVERING
)) {
1518 mds
->mdcache
->recovery_queue
.prioritize(in
);
1521 if (!lock
->is_stable())
1524 if (in
->is_auth()) {
1525 // don't do nested lock state change if we have dirty scatterdata and
1526 // may scatter_writebehind or start_scatter, because nowait==true implies
1527 // that the caller already has a log entry open!
1528 if (nowait
&& lock
->is_dirty())
1532 scatter_mix(static_cast<ScatterLock
*>(lock
));
1536 if (nowait
&& !lock
->can_wrlock(client
))
1541 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1542 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1543 if (!mds
->is_cluster_degraded() ||
1544 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1545 dout(10) << "requesting scatter from auth on "
1546 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1547 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1554 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1555 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1562 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1564 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1565 lock
->get_type() == CEPH_LOCK_DVERSION
)
1566 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1568 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1571 mut
->wrlocks
.erase(lock
);
1572 if (mut
->remote_wrlocks
.count(lock
) == 0)
1573 mut
->locks
.erase(lock
);
1576 if (!lock
->is_wrlocked()) {
1577 if (!lock
->is_stable())
1578 eval_gather(lock
, false, pneed_issue
);
1579 else if (lock
->get_parent()->is_auth())
1580 try_eval(lock
, pneed_issue
);
1587 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1589 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1591 // wait for active target
1592 if (mds
->is_cluster_degraded() &&
1593 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1594 dout(7) << " mds." << target
<< " is not active" << dendl
;
1595 if (mut
->more()->waiting_on_slave
.empty())
1596 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1600 // send lock request
1601 mut
->start_locking(lock
, target
);
1602 mut
->more()->slaves
.insert(target
);
1603 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1604 MMDSSlaveRequest::OP_WRLOCK
);
1605 r
->set_lock_type(lock
->get_type());
1606 lock
->get_parent()->set_object_info(r
->get_object_info());
1607 mds
->send_message_mds(r
, target
);
1609 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1610 mut
->more()->waiting_on_slave
.insert(target
);
1613 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1617 mut
->remote_wrlocks
.erase(lock
);
1618 if (mut
->wrlocks
.count(lock
) == 0)
1619 mut
->locks
.erase(lock
);
1621 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1622 << " " << *lock
->get_parent() << dendl
;
1623 if (!mds
->is_cluster_degraded() ||
1624 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1625 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1626 MMDSSlaveRequest::OP_UNWRLOCK
);
1627 slavereq
->set_lock_type(lock
->get_type());
1628 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1629 mds
->send_message_mds(slavereq
, target
);
1634 // ------------------
1637 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1639 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1640 lock
->get_type() == CEPH_LOCK_DVERSION
)
1641 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1643 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1644 client_t client
= mut
->get_client();
1646 CInode
*in
= nullptr;
1647 if (lock
->get_cap_shift())
1648 in
= static_cast<CInode
*>(lock
->get_parent());
1651 if (lock
->get_parent()->is_auth()) {
1654 if (lock
->can_xlock(client
) &&
1655 !(lock
->get_state() == LOCK_LOCK_XLOCK
&& // client is not xlocker or
1656 in
&& in
->issued_caps_need_gather(lock
))) { // xlocker does not hold shared cap
1657 lock
->set_state(LOCK_XLOCK
);
1658 lock
->get_xlock(mut
, client
);
1659 mut
->xlocks
.insert(lock
);
1660 mut
->locks
.insert(lock
);
1661 mut
->finish_locking(lock
);
1665 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1666 in
->state_test(CInode::STATE_RECOVERING
)) {
1667 mds
->mdcache
->recovery_queue
.prioritize(in
);
1670 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1671 lock
->get_xlock_by_client() != client
||
1672 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1675 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1676 mut
->start_locking(lock
);
1683 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1688 assert(lock
->get_sm()->can_remote_xlock
);
1689 assert(!mut
->slave_request
);
1691 // wait for single auth
1692 if (lock
->get_parent()->is_ambiguous_auth()) {
1693 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1694 new C_MDS_RetryRequest(mdcache
, mut
));
1698 // wait for active auth
1699 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1700 if (mds
->is_cluster_degraded() &&
1701 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1702 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1703 if (mut
->more()->waiting_on_slave
.empty())
1704 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1708 // send lock request
1709 mut
->more()->slaves
.insert(auth
);
1710 mut
->start_locking(lock
, auth
);
1711 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1712 MMDSSlaveRequest::OP_XLOCK
);
1713 r
->set_lock_type(lock
->get_type());
1714 lock
->get_parent()->set_object_info(r
->get_object_info());
1715 mds
->send_message_mds(r
, auth
);
1717 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1718 mut
->more()->waiting_on_slave
.insert(auth
);
1724 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1726 assert(!lock
->is_stable());
1727 if (lock
->get_num_rdlocks() == 0 &&
1728 lock
->get_num_wrlocks() == 0 &&
1729 !lock
->is_leased() &&
1730 lock
->get_state() != LOCK_XLOCKSNAP
&&
1731 lock
->get_type() != CEPH_LOCK_DN
) {
1732 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1733 client_t loner
= in
->get_target_loner();
1734 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1735 lock
->set_state(LOCK_EXCL
);
1736 lock
->get_parent()->auth_unpin(lock
);
1737 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1738 if (lock
->get_cap_shift())
1739 *pneed_issue
= true;
1740 if (lock
->get_parent()->is_auth() &&
1742 try_eval(lock
, pneed_issue
);
1746 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1747 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1750 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1752 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1753 lock
->get_type() == CEPH_LOCK_DVERSION
)
1754 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1756 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1758 client_t xlocker
= lock
->get_xlock_by_client();
1763 mut
->xlocks
.erase(lock
);
1764 mut
->locks
.erase(lock
);
1766 bool do_issue
= false;
1769 if (!lock
->get_parent()->is_auth()) {
1770 assert(lock
->get_sm()->can_remote_xlock
);
1773 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1774 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1775 if (!mds
->is_cluster_degraded() ||
1776 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1777 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1778 MMDSSlaveRequest::OP_UNXLOCK
);
1779 slavereq
->set_lock_type(lock
->get_type());
1780 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1781 mds
->send_message_mds(slavereq
, auth
);
1784 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1785 SimpleLock::WAIT_WR
|
1786 SimpleLock::WAIT_RD
, 0);
1788 if (lock
->get_num_xlocks() == 0 &&
1789 lock
->get_state() != LOCK_LOCK_XLOCK
) { // no one is taking xlock
1790 _finish_xlock(lock
, xlocker
, &do_issue
);
1795 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1796 if (in
->is_head()) {
1798 *pneed_issue
= true;
1805 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1807 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1810 mut
->xlocks
.erase(lock
);
1811 mut
->locks
.erase(lock
);
1813 MDSCacheObject
*p
= lock
->get_parent();
1814 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1816 if (!lock
->is_stable())
1817 lock
->get_parent()->auth_unpin(lock
);
1819 lock
->set_state(LOCK_LOCK
);
1822 void Locker::xlock_import(SimpleLock
*lock
)
1824 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1825 lock
->get_parent()->auth_pin(lock
);
1830 // file i/o -----------------------------------------
1832 version_t
Locker::issue_file_data_version(CInode
*in
)
1834 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1835 return in
->inode
.file_data_version
;
1838 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1846 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1847 bool sm
=false, bool ni
=false, client_t c
=-1,
1848 MClientCaps
*ac
= 0)
1849 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1850 client(c
), ack(ac
) {
1851 in
->get(CInode::PIN_PTRWAITER
);
1853 void finish(int r
) override
{
1854 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1855 in
->put(CInode::PIN_PTRWAITER
);
1859 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1860 client_t client
, MClientCaps
*ack
)
1862 dout(10) << "file_update_finish on " << *in
<< dendl
;
1863 in
->pop_and_dirty_projected_inode(mut
->ls
);
1868 Session
*session
= mds
->get_session(client
);
1870 // "oldest flush tid" > 0 means client uses unique TID for each flush
1871 if (ack
->get_oldest_flush_tid() > 0)
1872 session
->add_completed_flush(ack
->get_client_tid());
1873 mds
->send_message_client_counted(ack
, session
);
1875 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1880 set
<CInode
*> need_issue
;
1881 drop_locks(mut
.get(), &need_issue
);
1883 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1884 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1885 // check for snap writeback completion
1886 bool gather
= false;
1887 auto p
= in
->client_snap_caps
.begin();
1888 while (p
!= in
->client_snap_caps
.end()) {
1889 SimpleLock
*lock
= in
->get_lock(p
->first
);
1891 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1892 << " lock " << *lock
<< " on " << *in
<< dendl
;
1895 p
->second
.erase(client
);
1896 if (p
->second
.empty()) {
1898 in
->client_snap_caps
.erase(p
++);
1903 if (in
->client_snap_caps
.empty())
1904 in
->item_open_file
.remove_myself();
1905 eval_cap_gather(in
, &need_issue
);
1908 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1909 Capability
*cap
= in
->get_client_cap(client
);
1910 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1911 issue_caps(in
, cap
);
1914 if (share_max
&& in
->is_auth() &&
1915 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1916 share_inode_max_size(in
);
1918 issue_caps_set(need_issue
);
1920 utime_t now
= ceph_clock_now();
1921 mds
->balancer
->hit_inode(now
, in
, META_POP_IWR
);
1923 // auth unpin after issuing caps
1927 Capability
* Locker::issue_new_caps(CInode
*in
,
1933 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1936 // if replay, try to reconnect cap, and otherwise do nothing.
1938 return mds
->mdcache
->try_reconnect_cap(in
, session
);
1942 assert(session
->info
.inst
.name
.is_client());
1943 client_t my_client
= session
->info
.inst
.name
.num();
1944 int my_want
= ceph_caps_for_mode(mode
);
1946 // register a capability
1947 Capability
*cap
= in
->get_client_cap(my_client
);
1950 cap
= in
->add_client_cap(my_client
, session
, realm
);
1951 cap
->set_wanted(my_want
);
1953 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1957 // make sure it wants sufficient caps
1958 if (my_want
& ~cap
->wanted()) {
1959 // augment wanted caps for this client
1960 cap
->set_wanted(cap
->wanted() | my_want
);
1964 if (in
->is_auth()) {
1965 // [auth] twiddle mode?
1966 eval(in
, CEPH_CAP_LOCKS
);
1968 if (_need_flush_mdlog(in
, my_want
))
1969 mds
->mdlog
->flush();
1972 // [replica] tell auth about any new caps wanted
1973 request_inode_file_caps(in
);
1976 // issue caps (pot. incl new one)
1977 //issue_caps(in); // note: _eval above may have done this already...
1979 // re-issue whatever we can
1980 //cap->issue(cap->pending());
1983 cap
->dec_suppress();
1989 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1991 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1995 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1997 // allowed caps are determined by the lock mode.
1998 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1999 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
2000 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
2002 client_t loner
= in
->get_loner();
2004 dout(7) << "issue_caps loner client." << loner
2005 << " allowed=" << ccap_string(loner_allowed
)
2006 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
2007 << ", others allowed=" << ccap_string(all_allowed
)
2008 << " on " << *in
<< dendl
;
2010 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
2011 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
2012 << " on " << *in
<< dendl
;
2015 assert(in
->is_head());
2017 // count conflicts with
2021 map
<client_t
, Capability
*>::iterator it
;
2023 it
= in
->client_caps
.find(only_cap
->get_client());
2025 it
= in
->client_caps
.begin();
2026 for (; it
!= in
->client_caps
.end(); ++it
) {
2027 Capability
*cap
= it
->second
;
2028 if (cap
->is_stale())
2031 // do not issue _new_ bits when size|mtime is projected
2033 if (loner
== it
->first
)
2034 allowed
= loner_allowed
;
2036 allowed
= all_allowed
;
2038 // add in any xlocker-only caps (for locks this client is the xlocker for)
2039 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2041 Session
*session
= mds
->get_session(it
->first
);
2042 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
2043 !(session
&& session
->connection
&&
2044 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
2045 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2047 int pending
= cap
->pending();
2048 int wanted
= cap
->wanted();
2050 dout(20) << " client." << it
->first
2051 << " pending " << ccap_string(pending
)
2052 << " allowed " << ccap_string(allowed
)
2053 << " wanted " << ccap_string(wanted
)
2056 if (!(pending
& ~allowed
)) {
2057 // skip if suppress or new, and not revocation
2058 if (cap
->is_new() || cap
->is_suppress()) {
2059 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2064 // notify clients about deleted inode, to make sure they release caps ASAP.
2065 if (in
->inode
.nlink
== 0)
2066 wanted
|= CEPH_CAP_LINK_SHARED
;
2068 // are there caps that the client _wants_ and can have, but aren't pending?
2069 // or do we need to revoke?
2070 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2071 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2075 // include caps that clients generally like, while we're at it.
2076 int likes
= in
->get_caps_liked();
2077 int before
= pending
;
2079 if (pending
& ~allowed
)
2080 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2082 seq
= cap
->issue((wanted
|likes
) & allowed
);
2083 int after
= cap
->pending();
2085 if (cap
->is_new()) {
2086 // haven't send caps to client yet
2087 if (before
& ~after
)
2088 cap
->confirm_receipt(seq
, after
);
2090 dout(7) << " sending MClientCaps to client." << it
->first
2091 << " seq " << cap
->get_last_seq()
2092 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2095 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2096 if (op
== CEPH_CAP_OP_REVOKE
) {
2097 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2098 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2099 cap
->set_last_revoke_stamp(ceph_clock_now());
2100 cap
->reset_num_revoke_warnings();
2103 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2104 in
->find_snaprealm()->inode
->ino(),
2105 cap
->get_cap_id(), cap
->get_last_seq(),
2108 mds
->get_osd_epoch_barrier());
2109 in
->encode_cap_message(m
, cap
);
2111 mds
->send_message_client_counted(m
, it
->first
);
2119 return (nissued
== 0); // true if no re-issued, no callbacks
2122 void Locker::issue_truncate(CInode
*in
)
2124 dout(7) << "issue_truncate on " << *in
<< dendl
;
2126 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2127 it
!= in
->client_caps
.end();
2129 Capability
*cap
= it
->second
;
2130 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2132 in
->find_snaprealm()->inode
->ino(),
2133 cap
->get_cap_id(), cap
->get_last_seq(),
2134 cap
->pending(), cap
->wanted(), 0,
2136 mds
->get_osd_epoch_barrier());
2137 in
->encode_cap_message(m
, cap
);
2138 mds
->send_message_client_counted(m
, it
->first
);
2141 // should we increase max_size?
2142 if (in
->is_auth() && in
->is_file())
2143 check_inode_max_size(in
);
2146 void Locker::revoke_stale_caps(Session
*session
)
2148 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2150 std::vector
<CInode
*> to_eval
;
2152 for (auto p
= session
->caps
.begin(); !p
.end(); ) {
2153 Capability
*cap
= *p
;
2155 if (!cap
->is_notable()) {
2156 // the rest ones are not being revoked and don't have writeable range
2157 // and don't want exclusive caps or want file read/write. They don't
2158 // need recover, they don't affect eval_gather()/try_eval()
2162 int issued
= cap
->issued();
2163 if (!(issued
& ~CEPH_CAP_PIN
))
2166 CInode
*in
= cap
->get_inode();
2167 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2170 if (in
->is_auth() &&
2171 in
->inode
.client_ranges
.count(cap
->get_client()))
2172 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2174 // eval lock/inode may finish contexts, which may modify other cap's position
2175 // in the session->caps.
2176 to_eval
.push_back(in
);
2179 // invalidate the rest
2180 session
->inc_cap_gen();
2182 for (auto in
: to_eval
) {
2183 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
))
2186 if (!in
->filelock
.is_stable())
2187 eval_gather(&in
->filelock
);
2188 if (!in
->linklock
.is_stable())
2189 eval_gather(&in
->linklock
);
2190 if (!in
->authlock
.is_stable())
2191 eval_gather(&in
->authlock
);
2192 if (!in
->xattrlock
.is_stable())
2193 eval_gather(&in
->xattrlock
);
2196 try_eval(in
, CEPH_CAP_LOCKS
);
2198 request_inode_file_caps(in
);
2202 void Locker::resume_stale_caps(Session
*session
)
2204 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2206 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ) {
2207 Capability
*cap
= *p
;
2209 if (!cap
->is_notable())
2210 break; // see revoke_stale_caps()
2212 CInode
*in
= cap
->get_inode();
2213 ceph_assert(in
->is_head());
2214 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2216 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2217 // if export succeeds, the cap will be removed. if export fails,
2218 // we need to re-issue the cap if it's not stale.
2219 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2223 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2224 issue_caps(in
, cap
);
2228 void Locker::remove_stale_leases(Session
*session
)
2230 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2231 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2233 ClientLease
*l
= *p
;
2235 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2236 dout(15) << " removing lease on " << *parent
<< dendl
;
2237 parent
->remove_client_lease(l
, this);
2242 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2245 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2246 in
->get(CInode::PIN_PTRWAITER
);
2248 void finish(int r
) override
{
2250 locker
->request_inode_file_caps(in
);
2251 in
->put(CInode::PIN_PTRWAITER
);
2255 void Locker::request_inode_file_caps(CInode
*in
)
2257 assert(!in
->is_auth());
2259 int wanted
= in
->get_caps_wanted() & in
->get_caps_allowed_ever() & ~CEPH_CAP_PIN
;
2260 if (wanted
!= in
->replica_caps_wanted
) {
2261 // wait for single auth
2262 if (in
->is_ambiguous_auth()) {
2263 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2264 new C_MDL_RequestInodeFileCaps(this, in
));
2268 mds_rank_t auth
= in
->authority().first
;
2269 if (mds
->is_cluster_degraded() &&
2270 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2271 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2275 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2276 << " was " << ccap_string(in
->replica_caps_wanted
)
2277 << " on " << *in
<< " to mds." << auth
<< dendl
;
2279 in
->replica_caps_wanted
= wanted
;
2281 if (!mds
->is_cluster_degraded() ||
2282 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2283 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2288 /* This function DOES put the passed message before returning */
2289 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2291 // nobody should be talking to us during recovery.
2292 if (mds
->get_state() < MDSMap::STATE_CLIENTREPLAY
) {
2293 if (mds
->get_want_state() >= MDSMap::STATE_CLIENTREPLAY
) {
2294 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2297 assert(!"got unexpected message during recovery");
2301 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2302 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2305 assert(in
->is_auth());
2307 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2310 in
->mds_caps_wanted
[from
] = m
->get_caps();
2312 in
->mds_caps_wanted
.erase(from
);
2314 try_eval(in
, CEPH_CAP_LOCKS
);
2319 class C_MDL_CheckMaxSize
: public LockerContext
{
2321 uint64_t new_max_size
;
2326 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2327 uint64_t _newsize
, utime_t _mtime
) :
2328 LockerContext(l
), in(i
),
2329 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2331 in
->get(CInode::PIN_PTRWAITER
);
2333 void finish(int r
) override
{
2335 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2336 in
->put(CInode::PIN_PTRWAITER
);
2340 uint64_t Locker::calc_new_max_size(CInode::mempool_inode
*pi
, uint64_t size
)
2342 uint64_t new_max
= (size
+ 1) << 1;
2343 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2345 max_inc
*= pi
->layout
.object_size
;
2346 new_max
= MIN(new_max
, size
+ max_inc
);
2348 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2351 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
, bool update
,
2352 CInode::mempool_inode::client_range_map
*new_ranges
,
2353 bool *max_increased
)
2355 auto latest
= in
->get_projected_inode();
2357 if (latest
->has_layout()) {
2358 ms
= calc_new_max_size(latest
, size
);
2360 // Layout-less directories like ~mds0/, have zero size
2364 // increase ranges as appropriate.
2365 // shrink to 0 if no WR|BUFFER caps issued.
2366 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2367 p
!= in
->client_caps
.end();
2369 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_ANY_FILE_WR
)) {
2370 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2372 if (latest
->client_ranges
.count(p
->first
)) {
2373 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2374 if (ms
> oldr
.range
.last
)
2375 *max_increased
= true;
2376 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2377 nr
.follows
= oldr
.follows
;
2379 *max_increased
= true;
2381 nr
.follows
= in
->first
- 1;
2384 p
->second
->mark_clientwriteable();
2387 p
->second
->clear_clientwriteable();
2392 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2393 uint64_t new_max_size
, uint64_t new_size
,
2396 assert(in
->is_auth());
2397 assert(in
->is_file());
2399 CInode::mempool_inode
*latest
= in
->get_projected_inode();
2400 CInode::mempool_inode::client_range_map new_ranges
;
2401 uint64_t size
= latest
->size
;
2402 bool update_size
= new_size
> 0;
2403 bool update_max
= false;
2404 bool max_increased
= false;
2407 new_size
= size
= MAX(size
, new_size
);
2408 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2409 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2410 update_size
= false;
2414 if (in
->is_frozen()) {
2416 } else if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2418 if (in
->filelock
.is_stable()) {
2419 if (in
->get_target_loner() >= 0)
2420 file_excl(&in
->filelock
);
2422 simple_lock(&in
->filelock
);
2424 if (!in
->filelock
.can_wrlock(in
->get_loner()))
2428 calc_new_client_ranges(in
, std::max(new_max_size
, size
), can_update
> 0,
2429 &new_ranges
, &max_increased
);
2431 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2434 if (!update_size
&& !update_max
) {
2435 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2439 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2440 << " update_size " << update_size
2441 << " on " << *in
<< dendl
;
2443 if (can_update
< 0) {
2444 auto cms
= new C_MDL_CheckMaxSize(this, in
, new_max_size
, new_size
, new_mtime
);
2445 if (can_update
== -1) {
2446 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2447 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2449 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2450 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2455 MutationRef
mut(new MutationImpl());
2456 mut
->ls
= mds
->mdlog
->get_current_segment();
2458 auto &pi
= in
->project_inode();
2459 pi
.inode
.version
= in
->pre_dirty();
2462 dout(10) << "check_inode_max_size client_ranges " << pi
.inode
.client_ranges
<< " -> " << new_ranges
<< dendl
;
2463 pi
.inode
.client_ranges
= new_ranges
;
2467 dout(10) << "check_inode_max_size size " << pi
.inode
.size
<< " -> " << new_size
<< dendl
;
2468 pi
.inode
.size
= new_size
;
2469 pi
.inode
.rstat
.rbytes
= new_size
;
2470 dout(10) << "check_inode_max_size mtime " << pi
.inode
.mtime
<< " -> " << new_mtime
<< dendl
;
2471 pi
.inode
.mtime
= new_mtime
;
2472 if (new_mtime
> pi
.inode
.ctime
) {
2473 pi
.inode
.ctime
= new_mtime
;
2474 if (new_mtime
> pi
.inode
.rstat
.rctime
)
2475 pi
.inode
.rstat
.rctime
= new_mtime
;
2479 // use EOpen if the file is still open; otherwise, use EUpdate.
2480 // this is just an optimization to push open files forward into
2481 // newer log segments.
2483 EMetaBlob
*metablob
;
2484 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2485 EOpen
*eo
= new EOpen(mds
->mdlog
);
2486 eo
->add_ino(in
->ino());
2487 metablob
= &eo
->metablob
;
2489 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2491 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2492 metablob
= &eu
->metablob
;
2495 mds
->mdlog
->start_entry(le
);
2496 if (update_size
) { // FIXME if/when we do max_size nested accounting
2497 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2499 CDentry
*parent
= in
->get_projected_parent_dn();
2500 metablob
->add_primary_dentry(parent
, in
, true);
2502 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2503 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2505 mds
->mdlog
->submit_entry(le
,
2506 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2507 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2510 // make max_size _increase_ timely
2512 mds
->mdlog
->flush();
2518 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2521 * only share if currently issued a WR cap. if client doesn't have it,
2522 * file_max doesn't matter, and the client will get it if/when they get
2525 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2526 map
<client_t
, Capability
*>::iterator it
;
2528 it
= in
->client_caps
.find(only_cap
->get_client());
2530 it
= in
->client_caps
.begin();
2531 for (; it
!= in
->client_caps
.end(); ++it
) {
2532 const client_t client
= it
->first
;
2533 Capability
*cap
= it
->second
;
2534 if (cap
->is_suppress())
2536 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2537 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2538 cap
->inc_last_seq();
2539 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2541 in
->find_snaprealm()->inode
->ino(),
2542 cap
->get_cap_id(), cap
->get_last_seq(),
2543 cap
->pending(), cap
->wanted(), 0,
2545 mds
->get_osd_epoch_barrier());
2546 in
->encode_cap_message(m
, cap
);
2547 mds
->send_message_client_counted(m
, client
);
2554 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2556 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2557 * pending mutations. */
2558 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2559 in
->filelock
.is_unstable_and_locked()) ||
2560 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2561 in
->authlock
.is_unstable_and_locked()) ||
2562 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2563 in
->linklock
.is_unstable_and_locked()) ||
2564 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2565 in
->xattrlock
.is_unstable_and_locked()))
2570 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2572 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2573 dout(10) << " wanted " << ccap_string(cap
->wanted())
2574 << " -> " << ccap_string(wanted
) << dendl
;
2575 cap
->set_wanted(wanted
);
2576 } else if (wanted
& ~cap
->wanted()) {
2577 dout(10) << " wanted " << ccap_string(cap
->wanted())
2578 << " -> " << ccap_string(wanted
)
2579 << " (added caps even though we had seq mismatch!)" << dendl
;
2580 cap
->set_wanted(wanted
| cap
->wanted());
2582 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2583 << " -> " << ccap_string(wanted
)
2584 << " (issue_seq " << issue_seq
<< " != last_issue "
2585 << cap
->get_last_issue() << ")" << dendl
;
2589 CInode
*cur
= cap
->get_inode();
2590 if (!cur
->is_auth()) {
2591 request_inode_file_caps(cur
);
2595 if (cap
->wanted() == 0) {
2596 if (cur
->item_open_file
.is_on_list() &&
2597 !cur
->is_any_caps_wanted()) {
2598 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2599 cur
->item_open_file
.remove_myself();
2602 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2603 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2604 CEPH_CAP_FILE_WR
))) {
2605 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2608 if (!cur
->item_open_file
.is_on_list()) {
2609 dout(10) << " adding to open file list " << *cur
<< dendl
;
2610 assert(cur
->last
== CEPH_NOSNAP
);
2611 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2612 EOpen
*le
= new EOpen(mds
->mdlog
);
2613 mds
->mdlog
->start_entry(le
);
2614 le
->add_clean_inode(cur
);
2615 ls
->open_files
.push_back(&cur
->item_open_file
);
2616 mds
->mdlog
->submit_entry(le
);
2623 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2625 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2626 for (auto p
= head_in
->client_need_snapflush
.begin();
2627 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2628 snapid_t snapid
= p
->first
;
2629 auto &clients
= p
->second
;
2630 ++p
; // be careful, q loop below depends on this
2632 if (clients
.count(client
)) {
2633 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2634 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
2636 assert(sin
->first
<= snapid
);
2637 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2638 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2644 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2647 * This policy needs to be AT LEAST as permissive as allowing a client request
2648 * to go forward, or else a client request can release something, the release
2649 * gets deferred, but the request gets processed and deadlocks because when the
2650 * caps can't get revoked.
2652 * Currently, a request wait if anything locked is freezing (can't
2653 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2654 * _MUST_ be in the lock/auth_pin set.
2656 * auth_pins==0 implies no unstable lock and not auth pinnned by
2657 * client request, otherwise continue even it's freezing.
2659 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2663 * This function DOES put the passed message before returning
2665 void Locker::handle_client_caps(MClientCaps
*m
)
2667 client_t client
= m
->get_source().num();
2668 snapid_t follows
= m
->get_snap_follows();
2669 dout(7) << "handle_client_caps "
2670 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2671 << " on " << m
->get_ino()
2672 << " tid " << m
->get_client_tid() << " follows " << follows
2673 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2675 Session
*session
= mds
->get_session(m
);
2676 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2678 dout(5) << " no session, dropping " << *m
<< dendl
;
2682 if (session
->is_closed() ||
2683 session
->is_closing() ||
2684 session
->is_killing()) {
2685 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2689 if (mds
->is_reconnect() &&
2690 m
->get_dirty() && m
->get_client_tid() > 0 &&
2691 !session
->have_completed_flush(m
->get_client_tid())) {
2692 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2694 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2698 if (m
->get_client_tid() > 0 && session
&&
2699 session
->have_completed_flush(m
->get_client_tid())) {
2700 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2701 << " for client." << client
<< dendl
;
2703 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2704 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2705 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2707 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2708 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2709 mds
->get_osd_epoch_barrier());
2711 ack
->set_snap_follows(follows
);
2712 ack
->set_client_tid(m
->get_client_tid());
2713 mds
->send_message_client_counted(ack
, m
->get_connection());
2714 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2718 // fall-thru because the message may release some caps
2720 m
->set_op(CEPH_CAP_OP_UPDATE
);
2724 // "oldest flush tid" > 0 means client uses unique TID for each flush
2725 if (m
->get_oldest_flush_tid() > 0 && session
) {
2726 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2727 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2729 if (session
->get_num_trim_flushes_warnings() > 0 &&
2730 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2731 session
->reset_num_trim_flushes_warnings();
2733 if (session
->get_num_completed_flushes() >=
2734 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2735 session
->inc_num_trim_flushes_warnings();
2737 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2738 << m
->get_oldest_flush_tid() << "), "
2739 << session
->get_num_completed_flushes()
2740 << " completed flushes recorded in session";
2741 mds
->clog
->warn() << ss
.str();
2742 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2747 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2749 if (mds
->is_clientreplay()) {
2750 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2751 << ", will try again after replayed client requests" << dendl
;
2752 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2757 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2758 * Sequence of events that cause this are:
2759 * - client sends caps message to mds.a
2760 * - mds finishes subtree migration, send cap export to client
2761 * - mds trim its cache
2762 * - mds receives cap messages from client
2764 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2769 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2770 // Pause RADOS operations until we see the required epoch
2771 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2774 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2775 // Record the barrier so that we will retransmit it to clients
2776 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2779 dout(10) << " head inode " << *head_in
<< dendl
;
2781 Capability
*cap
= 0;
2782 cap
= head_in
->get_client_cap(client
);
2784 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
2791 if (should_defer_client_cap_frozen(head_in
)) {
2792 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
2793 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2796 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2797 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2798 << ", dropping" << dendl
;
2803 int op
= m
->get_op();
2806 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2807 if (!head_in
->is_auth()) {
2808 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
2812 SnapRealm
*realm
= head_in
->find_snaprealm();
2813 snapid_t snap
= realm
->get_snap_following(follows
);
2814 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2816 CInode
*in
= head_in
;
2817 if (snap
!= CEPH_NOSNAP
) {
2818 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
2820 dout(10) << " snapped inode " << *in
<< dendl
;
2823 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2824 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2825 // case we get a dup response, so whatever.)
2826 MClientCaps
*ack
= 0;
2827 if (m
->get_dirty()) {
2828 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2829 ack
->set_snap_follows(follows
);
2830 ack
->set_client_tid(m
->get_client_tid());
2831 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2834 if (in
== head_in
||
2835 (head_in
->client_need_snapflush
.count(snap
) &&
2836 head_in
->client_need_snapflush
[snap
].count(client
))) {
2837 dout(7) << " flushsnap snap " << snap
2838 << " client." << client
<< " on " << *in
<< dendl
;
2840 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2842 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2843 else if (head_in
->client_need_snapflush
.begin()->first
< snap
)
2844 _do_null_snapflush(head_in
, client
, snap
);
2846 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2849 head_in
->remove_need_snapflush(in
, snap
, client
);
2851 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2853 mds
->send_message_client_counted(ack
, m
->get_connection());
2858 if (cap
->get_cap_id() != m
->get_cap_id()) {
2859 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2861 CInode
*in
= head_in
;
2863 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2864 // intermediate snap inodes
2865 while (in
!= head_in
) {
2866 assert(in
->last
!= CEPH_NOSNAP
);
2867 if (in
->is_auth() && m
->get_dirty()) {
2868 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2869 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2871 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2875 // head inode, and cap
2876 MClientCaps
*ack
= 0;
2878 int caps
= m
->get_caps();
2879 if (caps
& ~cap
->issued()) {
2880 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2881 caps
&= cap
->issued();
2884 cap
->confirm_receipt(m
->get_seq(), caps
);
2885 dout(10) << " follows " << follows
2886 << " retains " << ccap_string(m
->get_caps())
2887 << " dirty " << ccap_string(m
->get_dirty())
2888 << " on " << *in
<< dendl
;
2891 // missing/skipped snapflush?
2892 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2893 // presently only does so when it has actual dirty metadata. But, we
2894 // set up the need_snapflush stuff based on the issued caps.
2895 // We can infer that the client WONT send a FLUSHSNAP once they have
2896 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2898 if (!head_in
->client_need_snapflush
.empty()) {
2899 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2900 _do_null_snapflush(head_in
, client
);
2902 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2906 if (m
->get_dirty() && in
->is_auth()) {
2907 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2908 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2909 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2910 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2911 ack
->set_client_tid(m
->get_client_tid());
2912 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2915 // filter wanted based on what we could ever give out (given auth/replica status)
2916 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2917 int new_wanted
= m
->get_wanted();
2918 if (new_wanted
!= cap
->wanted()) {
2919 if (!need_flush
&& in
->is_auth() && (new_wanted
& ~cap
->pending())) {
2920 // exapnding caps. make sure we aren't waiting for a log flush
2921 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2924 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2927 if (in
->is_auth() &&
2928 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2930 eval(in
, CEPH_CAP_LOCKS
);
2932 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2933 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2935 // no update, ack now.
2937 mds
->send_message_client_counted(ack
, m
->get_connection());
2939 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2940 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2941 issue_caps(in
, cap
);
2943 if (cap
->get_last_seq() == 0 &&
2944 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2945 cap
->issue_norevoke(cap
->issued());
2946 share_inode_max_size(in
, cap
);
2951 mds
->mdlog
->flush();
2959 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2961 ceph_mds_request_release item
;
2963 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2964 LockerContext(l
), client(c
), item(it
) { }
2965 void finish(int r
) override
{
2967 MDRequestRef null_ref
;
2968 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2972 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2973 boost::string_view dname
)
2975 inodeno_t ino
= (uint64_t)item
.ino
;
2976 uint64_t cap_id
= item
.cap_id
;
2977 int caps
= item
.caps
;
2978 int wanted
= item
.wanted
;
2980 int issue_seq
= item
.issue_seq
;
2981 int mseq
= item
.mseq
;
2983 CInode
*in
= mdcache
->get_inode(ino
);
2987 if (dname
.length()) {
2988 frag_t fg
= in
->pick_dirfrag(dname
);
2989 CDir
*dir
= in
->get_dirfrag(fg
);
2991 CDentry
*dn
= dir
->lookup(dname
);
2993 ClientLease
*l
= dn
->get_client_lease(client
);
2995 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2996 dn
->remove_client_lease(l
, this);
2998 dout(7) << "process_cap_release client." << client
2999 << " doesn't have lease on " << *dn
<< dendl
;
3002 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
3003 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
3008 Capability
*cap
= in
->get_client_cap(client
);
3012 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
3013 << (mdr
? "" : " (DEFERRED, no mdr)")
3016 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3017 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
3021 if (cap
->get_cap_id() != cap_id
) {
3022 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
3026 if (should_defer_client_cap_frozen(in
)) {
3027 dout(7) << " frozen, deferring" << dendl
;
3028 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
3032 if (caps
& ~cap
->issued()) {
3033 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
3034 caps
&= cap
->issued();
3036 cap
->confirm_receipt(seq
, caps
);
3038 if (!in
->client_need_snapflush
.empty() &&
3039 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
3040 _do_null_snapflush(in
, client
);
3043 adjust_cap_wanted(cap
, wanted
, issue_seq
);
3046 cap
->inc_suppress();
3047 eval(in
, CEPH_CAP_LOCKS
);
3049 cap
->dec_suppress();
3051 // take note; we may need to reissue on this cap later
3053 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
3056 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
3061 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
3062 LockerContext(l
), in(i
), client(c
), seq(s
) {
3063 in
->get(CInode::PIN_PTRWAITER
);
3065 void finish(int r
) override
{
3066 locker
->kick_issue_caps(in
, client
, seq
);
3067 in
->put(CInode::PIN_PTRWAITER
);
3071 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3073 Capability
*cap
= in
->get_client_cap(client
);
3074 if (!cap
|| cap
->get_last_seq() != seq
)
3076 if (in
->is_frozen()) {
3077 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3078 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3079 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3082 dout(10) << "kick_issue_caps released at current seq " << seq
3083 << ", reissuing" << dendl
;
3084 issue_caps(in
, cap
);
3087 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3089 client_t client
= mdr
->get_client();
3090 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3091 p
!= mdr
->cap_releases
.end();
3093 CInode
*in
= mdcache
->get_inode(p
->first
);
3096 kick_issue_caps(in
, client
, p
->second
);
3101 * m and ack might be NULL, so don't dereference them unless dirty != 0
3103 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3105 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3106 << " follows " << follows
<< " snap " << snap
3107 << " on " << *in
<< dendl
;
3109 if (snap
== CEPH_NOSNAP
) {
3110 // hmm, i guess snap was already deleted? just ack!
3111 dout(10) << " wow, the snap following " << follows
3112 << " was already deleted. nothing to record, just ack." << dendl
;
3114 mds
->send_message_client_counted(ack
, m
->get_connection());
3118 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3119 mds
->mdlog
->start_entry(le
);
3120 MutationRef mut
= new MutationImpl();
3121 mut
->ls
= mds
->mdlog
->get_current_segment();
3123 // normal metadata updates that we can apply to the head as well.
3126 CInode::mempool_xattr_map
*px
= nullptr;
3127 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3128 m
->xattrbl
.length() &&
3129 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3131 CInode::mempool_old_inode
*oi
= 0;
3132 if (in
->is_multiversion()) {
3133 oi
= in
->pick_old_inode(snap
);
3136 CInode::mempool_inode
*i
;
3138 dout(10) << " writing into old inode" << dendl
;
3139 auto &pi
= in
->project_inode();
3140 pi
.inode
.version
= in
->pre_dirty();
3141 if (snap
> oi
->first
)
3142 in
->split_old_inode(snap
);
3147 auto &pi
= in
->project_inode(xattrs
);
3148 pi
.inode
.version
= in
->pre_dirty();
3151 px
= pi
.xattrs
.get();
3154 _update_cap_fields(in
, dirty
, m
, i
);
3158 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3159 << " len " << m
->xattrbl
.length() << dendl
;
3160 i
->xattr_version
= m
->head
.xattr_version
;
3161 bufferlist::iterator p
= m
->xattrbl
.begin();
3166 auto it
= i
->client_ranges
.find(client
);
3167 if (it
!= i
->client_ranges
.end()) {
3168 if (in
->last
== snap
) {
3169 dout(10) << " removing client_range entirely" << dendl
;
3170 i
->client_ranges
.erase(it
);
3172 dout(10) << " client_range now follows " << snap
<< dendl
;
3173 it
->second
.follows
= snap
;
3179 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3180 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3182 // "oldest flush tid" > 0 means client uses unique TID for each flush
3183 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3184 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3185 ack
->get_oldest_flush_tid());
3187 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3191 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, CInode::mempool_inode
*pi
)
3196 /* m must be valid if there are dirty caps */
3198 uint64_t features
= m
->get_connection()->get_features();
3200 if (m
->get_ctime() > pi
->ctime
) {
3201 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3202 << " for " << *in
<< dendl
;
3203 pi
->ctime
= m
->get_ctime();
3204 if (m
->get_ctime() > pi
->rstat
.rctime
)
3205 pi
->rstat
.rctime
= m
->get_ctime();
3208 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3209 m
->get_change_attr() > pi
->change_attr
) {
3210 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3211 << " for " << *in
<< dendl
;
3212 pi
->change_attr
= m
->get_change_attr();
3216 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3217 utime_t atime
= m
->get_atime();
3218 utime_t mtime
= m
->get_mtime();
3219 uint64_t size
= m
->get_size();
3220 version_t inline_version
= m
->inline_version
;
3222 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3223 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3224 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3225 << " for " << *in
<< dendl
;
3227 if (mtime
> pi
->rstat
.rctime
)
3228 pi
->rstat
.rctime
= mtime
;
3230 if (in
->inode
.is_file() && // ONLY if regular file
3232 dout(7) << " size " << pi
->size
<< " -> " << size
3233 << " for " << *in
<< dendl
;
3235 pi
->rstat
.rbytes
= size
;
3237 if (in
->inode
.is_file() &&
3238 (dirty
& CEPH_CAP_FILE_WR
) &&
3239 inline_version
> pi
->inline_data
.version
) {
3240 pi
->inline_data
.version
= inline_version
;
3241 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3242 pi
->inline_data
.get_data() = m
->inline_data
;
3244 pi
->inline_data
.free_data();
3246 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3247 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3248 << " for " << *in
<< dendl
;
3251 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3252 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3253 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3254 << " for " << *in
<< dendl
;
3255 pi
->time_warp_seq
= m
->get_time_warp_seq();
3259 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3260 if (m
->head
.uid
!= pi
->uid
) {
3261 dout(7) << " uid " << pi
->uid
3262 << " -> " << m
->head
.uid
3263 << " for " << *in
<< dendl
;
3264 pi
->uid
= m
->head
.uid
;
3266 if (m
->head
.gid
!= pi
->gid
) {
3267 dout(7) << " gid " << pi
->gid
3268 << " -> " << m
->head
.gid
3269 << " for " << *in
<< dendl
;
3270 pi
->gid
= m
->head
.gid
;
3272 if (m
->head
.mode
!= pi
->mode
) {
3273 dout(7) << " mode " << oct
<< pi
->mode
3274 << " -> " << m
->head
.mode
<< dec
3275 << " for " << *in
<< dendl
;
3276 pi
->mode
= m
->head
.mode
;
3278 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3279 dout(7) << " btime " << oct
<< pi
->btime
3280 << " -> " << m
->get_btime() << dec
3281 << " for " << *in
<< dendl
;
3282 pi
->btime
= m
->get_btime();
3288 * update inode based on cap flush|flushsnap|wanted.
3289 * adjust max_size, if needed.
3290 * if we update, return true; otherwise, false (no updated needed).
3292 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3293 int dirty
, snapid_t follows
,
3294 MClientCaps
*m
, MClientCaps
*ack
,
3297 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3298 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3299 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3300 << " on " << *in
<< dendl
;
3301 assert(in
->is_auth());
3302 client_t client
= m
->get_source().num();
3303 CInode::mempool_inode
*latest
= in
->get_projected_inode();
3305 // increase or zero max_size?
3306 uint64_t size
= m
->get_size();
3307 bool change_max
= false;
3308 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3309 uint64_t new_max
= old_max
;
3311 if (in
->is_file()) {
3312 bool forced_change_max
= false;
3313 dout(20) << "inode is file" << dendl
;
3314 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3315 dout(20) << "client has write caps; m->get_max_size="
3316 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3317 if (m
->get_max_size() > new_max
) {
3318 dout(10) << "client requests file_max " << m
->get_max_size()
3319 << " > max " << old_max
<< dendl
;
3321 forced_change_max
= true;
3322 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3324 new_max
= calc_new_max_size(latest
, size
);
3326 if (new_max
> old_max
)
3338 if (in
->last
== CEPH_NOSNAP
&&
3340 !in
->filelock
.can_wrlock(client
) &&
3341 !in
->filelock
.can_force_wrlock(client
)) {
3342 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3343 if (in
->filelock
.is_stable()) {
3344 bool need_issue
= false;
3346 cap
->inc_suppress();
3347 if (in
->mds_caps_wanted
.empty() &&
3348 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3349 if (in
->filelock
.get_state() != LOCK_EXCL
)
3350 file_excl(&in
->filelock
, &need_issue
);
3352 simple_lock(&in
->filelock
, &need_issue
);
3356 cap
->dec_suppress();
3358 if (!in
->filelock
.can_wrlock(client
) &&
3359 !in
->filelock
.can_force_wrlock(client
)) {
3360 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3361 forced_change_max
? new_max
: 0,
3364 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3370 if (m
->flockbl
.length()) {
3372 bufferlist::iterator bli
= m
->flockbl
.begin();
3373 ::decode(num_locks
, bli
);
3374 for ( int i
=0; i
< num_locks
; ++i
) {
3375 ceph_filelock decoded_lock
;
3376 ::decode(decoded_lock
, bli
);
3377 in
->get_fcntl_lock_state()->held_locks
.
3378 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3379 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3381 ::decode(num_locks
, bli
);
3382 for ( int i
=0; i
< num_locks
; ++i
) {
3383 ceph_filelock decoded_lock
;
3384 ::decode(decoded_lock
, bli
);
3385 in
->get_flock_lock_state()->held_locks
.
3386 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3387 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3391 if (!dirty
&& !change_max
)
3394 Session
*session
= mds
->get_session(m
);
3395 if (session
->check_access(in
, MAY_WRITE
,
3396 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3397 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3402 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3403 mds
->mdlog
->start_entry(le
);
3405 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3406 m
->xattrbl
.length() &&
3407 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3409 auto &pi
= in
->project_inode(xattr
);
3410 pi
.inode
.version
= in
->pre_dirty();
3412 MutationRef
mut(new MutationImpl());
3413 mut
->ls
= mds
->mdlog
->get_current_segment();
3415 _update_cap_fields(in
, dirty
, m
, &pi
.inode
);
3418 dout(7) << " max_size " << old_max
<< " -> " << new_max
3419 << " for " << *in
<< dendl
;
3421 auto &cr
= pi
.inode
.client_ranges
[client
];
3423 cr
.range
.last
= new_max
;
3424 cr
.follows
= in
->first
- 1;
3426 cap
->mark_clientwriteable();
3428 pi
.inode
.client_ranges
.erase(client
);
3430 cap
->clear_clientwriteable();
3434 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3435 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3438 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3439 wrlock_force(&in
->authlock
, mut
);
3443 dout(7) << " xattrs v" << pi
.inode
.xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3444 pi
.inode
.xattr_version
= m
->head
.xattr_version
;
3445 bufferlist::iterator p
= m
->xattrbl
.begin();
3446 ::decode(*pi
.xattrs
, p
);
3447 wrlock_force(&in
->xattrlock
, mut
);
3451 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3452 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3454 // "oldest flush tid" > 0 means client uses unique TID for each flush
3455 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3456 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3457 ack
->get_oldest_flush_tid());
3459 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3462 if (need_flush
&& !*need_flush
&&
3463 ((change_max
&& new_max
) || // max INCREASE
3464 _need_flush_mdlog(in
, dirty
)))
3470 /* This function DOES put the passed message before returning */
3471 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3473 client_t client
= m
->get_source().num();
3474 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3476 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3477 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3481 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3482 // Pause RADOS operations until we see the required epoch
3483 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3486 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3487 // Record the barrier so that we will retransmit it to clients
3488 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3491 Session
*session
= mds
->get_session(m
);
3493 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3494 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3498 session
->notify_cap_release(m
->caps
.size());
3504 class C_Locker_RetryCapRelease
: public LockerContext
{
3508 ceph_seq_t migrate_seq
;
3509 ceph_seq_t issue_seq
;
3511 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3512 ceph_seq_t mseq
, ceph_seq_t seq
) :
3513 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3514 void finish(int r
) override
{
3515 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3519 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3520 ceph_seq_t mseq
, ceph_seq_t seq
)
3522 CInode
*in
= mdcache
->get_inode(ino
);
3524 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3527 Capability
*cap
= in
->get_client_cap(client
);
3529 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3533 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3534 if (cap
->get_cap_id() != cap_id
) {
3535 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3538 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3539 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3542 if (should_defer_client_cap_frozen(in
)) {
3543 dout(7) << " freezing|frozen, deferring" << dendl
;
3544 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3545 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3548 if (seq
!= cap
->get_last_issue()) {
3549 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3550 // clean out any old revoke history
3551 cap
->clean_revoke_from(seq
);
3552 eval_cap_gather(in
);
3555 remove_client_cap(in
, cap
);
3558 void Locker::remove_client_cap(CInode
*in
, Capability
*cap
)
3560 client_t client
= cap
->get_client();
3561 // clean out any pending snapflush state
3562 if (!in
->client_need_snapflush
.empty())
3563 _do_null_snapflush(in
, client
);
3565 bool notable
= cap
->is_notable();
3566 in
->remove_client_cap(client
);
3570 if (in
->is_auth()) {
3571 // make sure we clear out the client byte range
3572 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3573 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3574 check_inode_max_size(in
);
3576 request_inode_file_caps(in
);
3579 try_eval(in
, CEPH_CAP_LOCKS
);
3584 * Return true if any currently revoking caps exceed the
3585 * session_timeout threshold.
3587 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
,
3588 double timeout
) const
3590 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3592 // No revoking caps at the moment
3595 utime_t now
= ceph_clock_now();
3596 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3597 if (age
<= timeout
) {
3605 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
,
3606 double timeout
) const
3608 if (!any_late_revoking_caps(revoking_caps
, timeout
)) {
3609 // Fast path: no misbehaving clients, execute in O(1)
3613 // Slow path: execute in O(N_clients)
3614 for (auto &p
: revoking_caps_by_client
) {
3615 if (any_late_revoking_caps(p
.second
, timeout
)) {
3616 result
->push_back(p
.first
);
3621 // Hard-code instead of surfacing a config settings because this is
3622 // really a hack that should go away at some point when we have better
3623 // inspection tools for getting at detailed cap state (#7316)
3624 #define MAX_WARN_CAPS 100
3626 void Locker::caps_tick()
3628 utime_t now
= ceph_clock_now();
3630 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3633 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3634 Capability
*cap
= *p
;
3636 utime_t age
= now
- cap
->get_last_revoke_stamp();
3637 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3638 if (age
<= mds
->mdsmap
->get_session_timeout()) {
3639 dout(20) << __func__
<< " age below timeout " << mds
->mdsmap
->get_session_timeout() << dendl
;
3643 if (i
> MAX_WARN_CAPS
) {
3644 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3645 << "revoking, ignoring subsequent caps" << dendl
;
3649 // exponential backoff of warning intervals
3650 if (age
> mds
->mdsmap
->get_session_timeout() * (1 << cap
->get_num_revoke_warnings())) {
3651 cap
->inc_num_revoke_warnings();
3653 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3654 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3655 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3656 mds
->clog
->warn() << ss
.str();
3657 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3659 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3665 void Locker::handle_client_lease(MClientLease
*m
)
3667 dout(10) << "handle_client_lease " << *m
<< dendl
;
3669 assert(m
->get_source().is_client());
3670 client_t client
= m
->get_source().num();
3672 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3674 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3680 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3681 CDir
*dir
= in
->get_dirfrag(fg
);
3683 dn
= dir
->lookup(m
->dname
);
3685 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3689 dout(10) << " on " << *dn
<< dendl
;
3692 ClientLease
*l
= dn
->get_client_lease(client
);
3694 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3699 switch (m
->get_action()) {
3700 case CEPH_MDS_LEASE_REVOKE_ACK
:
3701 case CEPH_MDS_LEASE_RELEASE
:
3702 if (l
->seq
!= m
->get_seq()) {
3703 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3705 dout(7) << "handle_client_lease client." << client
3706 << " on " << *dn
<< dendl
;
3707 dn
->remove_client_lease(l
, this);
3712 case CEPH_MDS_LEASE_RENEW
:
3714 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3715 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3716 if (dn
->lock
.can_lease(client
)) {
3717 int pool
= 1; // fixme.. do something smart!
3718 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3719 m
->h
.seq
= ++l
->seq
;
3722 utime_t now
= ceph_clock_now();
3723 now
+= mdcache
->client_lease_durations
[pool
];
3724 mdcache
->touch_client_lease(l
, pool
, now
);
3726 mds
->send_message_client_counted(m
, m
->get_connection());
3732 ceph_abort(); // implement me
3738 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3739 bufferlist
&bl
, utime_t now
, Session
*session
)
3741 CInode
*diri
= dn
->get_dir()->get_inode();
3742 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3743 ((!diri
->filelock
.can_lease(client
) &&
3744 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3745 dn
->lock
.can_lease(client
)) {
3746 int pool
= 1; // fixme.. do something smart!
3747 // issue a dentry lease
3748 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3749 session
->touch_lease(l
);
3751 now
+= mdcache
->client_lease_durations
[pool
];
3752 mdcache
->touch_client_lease(l
, pool
, now
);
3755 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3757 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3759 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3760 << " on " << *dn
<< dendl
;
3768 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3773 void Locker::revoke_client_leases(SimpleLock
*lock
)
3776 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3777 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3778 p
!= dn
->client_lease_map
.end();
3780 ClientLease
*l
= p
->second
;
3783 assert(lock
->get_type() == CEPH_LOCK_DN
);
3785 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3786 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3788 // i should also revoke the dir ICONTENT lease, if they have it!
3789 CInode
*diri
= dn
->get_dir()->get_inode();
3790 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3793 diri
->first
, CEPH_NOSNAP
,
3801 // locks ----------------------------------------------------------------
3803 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3805 switch (lock_type
) {
3808 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3809 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3814 fg
= diri
->pick_dirfrag(info
.dname
);
3815 dir
= diri
->get_dirfrag(fg
);
3817 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3820 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3826 case CEPH_LOCK_IAUTH
:
3827 case CEPH_LOCK_ILINK
:
3828 case CEPH_LOCK_IDFT
:
3829 case CEPH_LOCK_IFILE
:
3830 case CEPH_LOCK_INEST
:
3831 case CEPH_LOCK_IXATTR
:
3832 case CEPH_LOCK_ISNAP
:
3833 case CEPH_LOCK_IFLOCK
:
3834 case CEPH_LOCK_IPOLICY
:
3836 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3838 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3841 switch (lock_type
) {
3842 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3843 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3844 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3845 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3846 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3847 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3848 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3849 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3850 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3855 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3863 /* This function DOES put the passed message before returning */
3864 void Locker::handle_lock(MLock
*m
)
3866 // nobody should be talking to us during recovery.
3867 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3869 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3871 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3876 switch (lock
->get_type()) {
3878 case CEPH_LOCK_IAUTH
:
3879 case CEPH_LOCK_ILINK
:
3880 case CEPH_LOCK_ISNAP
:
3881 case CEPH_LOCK_IXATTR
:
3882 case CEPH_LOCK_IFLOCK
:
3883 case CEPH_LOCK_IPOLICY
:
3884 handle_simple_lock(lock
, m
);
3887 case CEPH_LOCK_IDFT
:
3888 case CEPH_LOCK_INEST
:
3889 //handle_scatter_lock((ScatterLock*)lock, m);
3892 case CEPH_LOCK_IFILE
:
3893 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3897 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3907 // ==========================================================================
3910 /** This function may take a reference to m if it needs one, but does
3911 * not put references. */
3912 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3914 MDSCacheObject
*parent
= lock
->get_parent();
3915 if (parent
->is_auth() &&
3916 lock
->get_state() != LOCK_SYNC
&&
3917 !parent
->is_frozen()) {
3918 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3919 << " on " << *parent
<< dendl
;
3920 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3921 if (lock
->is_stable()) {
3924 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3925 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3926 new C_MDS_RetryMessage(mds
, m
->get()));
3929 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3930 << " on " << *parent
<< dendl
;
3931 // replica should retry
3935 /* This function DOES put the passed message before returning */
3936 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3938 int from
= m
->get_asker();
3940 dout(10) << "handle_simple_lock " << *m
3941 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3943 if (mds
->is_rejoin()) {
3944 if (lock
->get_parent()->is_rejoining()) {
3945 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3946 << ", dropping " << *m
<< dendl
;
3952 switch (m
->get_action()) {
3955 assert(lock
->get_state() == LOCK_LOCK
);
3956 lock
->decode_locked_state(m
->get_data());
3957 lock
->set_state(LOCK_SYNC
);
3958 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3962 assert(lock
->get_state() == LOCK_SYNC
);
3963 lock
->set_state(LOCK_SYNC_LOCK
);
3964 if (lock
->is_leased())
3965 revoke_client_leases(lock
);
3966 eval_gather(lock
, true);
3967 if (lock
->is_unstable_and_locked())
3968 mds
->mdlog
->flush();
3973 case LOCK_AC_LOCKACK
:
3974 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3975 lock
->get_state() == LOCK_SYNC_EXCL
);
3976 assert(lock
->is_gathering(from
));
3977 lock
->remove_gather(from
);
3979 if (lock
->is_gathering()) {
3980 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3981 << ", still gathering " << lock
->get_gather_set() << dendl
;
3983 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3984 << ", last one" << dendl
;
3989 case LOCK_AC_REQRDLOCK
:
3990 handle_reqrdlock(lock
, m
);
3998 /* unused, currently.
4000 class C_Locker_SimpleEval : public Context {
4004 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4005 void finish(int r) {
4006 locker->try_simple_eval(lock);
4010 void Locker::try_simple_eval(SimpleLock *lock)
4012 // unstable and ambiguous auth?
4013 if (!lock->is_stable() &&
4014 lock->get_parent()->is_ambiguous_auth()) {
4015 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4016 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4017 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4021 if (!lock->get_parent()->is_auth()) {
4022 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4026 if (!lock->get_parent()->can_auth_pin()) {
4027 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4028 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4029 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4033 if (lock->is_stable())
4039 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
4041 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4043 assert(lock
->get_parent()->is_auth());
4044 assert(lock
->is_stable());
4046 if (lock
->get_parent()->is_freezing_or_frozen()) {
4047 // dentry lock in unreadable state can block path traverse
4048 if ((lock
->get_type() != CEPH_LOCK_DN
||
4049 lock
->get_state() == LOCK_SYNC
||
4050 lock
->get_parent()->is_frozen()))
4054 if (mdcache
->is_readonly()) {
4055 if (lock
->get_state() != LOCK_SYNC
) {
4056 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4057 simple_sync(lock
, need_issue
);
4064 if (lock
->get_type() != CEPH_LOCK_DN
) {
4065 in
= static_cast<CInode
*>(lock
->get_parent());
4066 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
4070 if (lock
->get_state() != LOCK_EXCL
&&
4071 in
&& in
->get_target_loner() >= 0 &&
4072 (wanted
& CEPH_CAP_GEXCL
)) {
4073 dout(7) << "simple_eval stable, going to excl " << *lock
4074 << " on " << *lock
->get_parent() << dendl
;
4075 simple_excl(lock
, need_issue
);
4079 else if (lock
->get_state() != LOCK_SYNC
&&
4080 !lock
->is_wrlocked() &&
4081 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4082 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4083 dout(7) << "simple_eval stable, syncing " << *lock
4084 << " on " << *lock
->get_parent() << dendl
;
4085 simple_sync(lock
, need_issue
);
4092 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4094 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4095 assert(lock
->get_parent()->is_auth());
4096 assert(lock
->is_stable());
4099 if (lock
->get_cap_shift())
4100 in
= static_cast<CInode
*>(lock
->get_parent());
4102 int old_state
= lock
->get_state();
4104 if (old_state
!= LOCK_TSYN
) {
4106 switch (lock
->get_state()) {
4107 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4108 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4109 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4110 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4111 default: ceph_abort();
4115 if (lock
->is_wrlocked())
4118 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4119 send_lock_message(lock
, LOCK_AC_SYNC
);
4120 lock
->init_gather();
4124 if (in
&& in
->is_head()) {
4125 if (in
->issued_caps_need_gather(lock
)) {
4134 bool need_recover
= false;
4135 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4137 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4138 mds
->mdcache
->queue_file_recover(in
);
4139 need_recover
= true;
4144 if (!gather
&& lock
->is_dirty()) {
4145 lock
->get_parent()->auth_pin(lock
);
4146 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4147 mds
->mdlog
->flush();
4152 lock
->get_parent()->auth_pin(lock
);
4154 mds
->mdcache
->do_file_recover();
4159 if (lock
->get_parent()->is_replicated()) { // FIXME
4161 lock
->encode_locked_state(data
);
4162 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4164 lock
->set_state(LOCK_SYNC
);
4165 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4166 if (in
&& in
->is_head()) {
4175 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4177 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4178 assert(lock
->get_parent()->is_auth());
4179 assert(lock
->is_stable());
4182 if (lock
->get_cap_shift())
4183 in
= static_cast<CInode
*>(lock
->get_parent());
4185 switch (lock
->get_state()) {
4186 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4187 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4188 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4189 default: ceph_abort();
4193 if (lock
->is_rdlocked())
4195 if (lock
->is_wrlocked())
4198 if (lock
->get_parent()->is_replicated() &&
4199 lock
->get_state() != LOCK_LOCK_EXCL
&&
4200 lock
->get_state() != LOCK_XSYN_EXCL
) {
4201 send_lock_message(lock
, LOCK_AC_LOCK
);
4202 lock
->init_gather();
4206 if (in
&& in
->is_head()) {
4207 if (in
->issued_caps_need_gather(lock
)) {
4217 lock
->get_parent()->auth_pin(lock
);
4219 lock
->set_state(LOCK_EXCL
);
4220 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4230 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4232 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4233 assert(lock
->get_parent()->is_auth());
4234 assert(lock
->is_stable());
4235 assert(lock
->get_state() != LOCK_LOCK
);
4238 if (lock
->get_cap_shift())
4239 in
= static_cast<CInode
*>(lock
->get_parent());
4241 int old_state
= lock
->get_state();
4243 switch (lock
->get_state()) {
4244 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4245 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4246 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4247 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4248 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4250 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4251 default: ceph_abort();
4255 if (lock
->is_leased()) {
4257 revoke_client_leases(lock
);
4259 if (lock
->is_rdlocked())
4261 if (in
&& in
->is_head()) {
4262 if (in
->issued_caps_need_gather(lock
)) {
4271 bool need_recover
= false;
4272 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4274 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4275 mds
->mdcache
->queue_file_recover(in
);
4276 need_recover
= true;
4281 if (lock
->get_parent()->is_replicated() &&
4282 lock
->get_state() == LOCK_MIX_LOCK
&&
4284 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4286 // move to second stage of gather now, so we don't send the lock action later.
4287 if (lock
->get_state() == LOCK_MIX_LOCK
)
4288 lock
->set_state(LOCK_MIX_LOCK2
);
4290 if (lock
->get_parent()->is_replicated() &&
4291 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4293 send_lock_message(lock
, LOCK_AC_LOCK
);
4294 lock
->init_gather();
4298 if (!gather
&& lock
->is_dirty()) {
4299 lock
->get_parent()->auth_pin(lock
);
4300 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4301 mds
->mdlog
->flush();
4306 lock
->get_parent()->auth_pin(lock
);
4308 mds
->mdcache
->do_file_recover();
4310 lock
->set_state(LOCK_LOCK
);
4311 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4316 void Locker::simple_xlock(SimpleLock
*lock
)
4318 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4319 assert(lock
->get_parent()->is_auth());
4320 //assert(lock->is_stable());
4321 assert(lock
->get_state() != LOCK_XLOCK
);
4324 if (lock
->get_cap_shift())
4325 in
= static_cast<CInode
*>(lock
->get_parent());
4327 if (lock
->is_stable())
4328 lock
->get_parent()->auth_pin(lock
);
4330 switch (lock
->get_state()) {
4332 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4333 default: ceph_abort();
4337 if (lock
->is_rdlocked())
4339 if (lock
->is_wrlocked())
4342 if (in
&& in
->is_head()) {
4343 if (in
->issued_caps_need_gather(lock
)) {
4350 lock
->set_state(LOCK_PREXLOCK
);
4351 //assert("shouldn't be called if we are already xlockable" == 0);
4359 // ==========================================================================
4364 Some notes on scatterlocks.
4366 - The scatter/gather is driven by the inode lock. The scatter always
4367 brings in the latest metadata from the fragments.
4369 - When in a scattered/MIX state, fragments are only allowed to
4370 update/be written to if the accounted stat matches the inode's
4373 - That means, on gather, we _only_ assimilate diffs for frag metadata
4374 that match the current version, because those are the only ones
4375 written during this scatter/gather cycle. (Others didn't permit
4376 it.) We increment the version and journal this to disk.
4378 - When possible, we also simultaneously update our local frag
4379 accounted stats to match.
4381 - On scatter, the new inode info is broadcast to frags, both local
4382 and remote. If possible (auth and !frozen), the dirfrag auth
4383 should update the accounted state (if it isn't already up to date).
4384 Note that this may occur on both the local inode auth node and
4385 inode replicas, so there are two potential paths. If it is NOT
4386 possible, they need to mark_stale to prevent any possible writes.
4388 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4389 only). Both are opportunities to update the frag accounted stats,
4390 even though only the MIX case is affected by a stale dirfrag.
4392 - Because many scatter/gather cycles can potentially go by without a
4393 frag being able to update its accounted stats (due to being frozen
4394 by exports/refragments in progress), the frag may have (even very)
4395 old stat versions. That's fine. If when we do want to update it,
4396 we can update accounted_* and the version first.
4400 class C_Locker_ScatterWB
: public LockerLogContext
{
4404 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4405 LockerLogContext(l
), lock(sl
), mut(m
) {}
4406 void finish(int r
) override
{
4407 locker
->scatter_writebehind_finish(lock
, mut
);
4411 void Locker::scatter_writebehind(ScatterLock
*lock
)
4413 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4414 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4417 MutationRef
mut(new MutationImpl());
4418 mut
->ls
= mds
->mdlog
->get_current_segment();
4420 // forcefully take a wrlock
4421 lock
->get_wrlock(true);
4422 mut
->wrlocks
.insert(lock
);
4423 mut
->locks
.insert(lock
);
4425 in
->pre_cow_old_inode(); // avoid cow mayhem
4427 auto &pi
= in
->project_inode();
4428 pi
.inode
.version
= in
->pre_dirty();
4430 in
->finish_scatter_gather_update(lock
->get_type());
4431 lock
->start_flush();
4433 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4434 mds
->mdlog
->start_entry(le
);
4436 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4437 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4439 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4441 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4444 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4446 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4447 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4448 in
->pop_and_dirty_projected_inode(mut
->ls
);
4450 lock
->finish_flush();
4452 // if replicas may have flushed in a mix->lock state, send another
4453 // message so they can finish_flush().
4454 if (in
->is_replicated()) {
4455 switch (lock
->get_state()) {
4457 case LOCK_MIX_LOCK2
:
4460 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4465 drop_locks(mut
.get());
4468 if (lock
->is_stable())
4469 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4471 //scatter_eval_gather(lock);
4474 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4476 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4478 assert(lock
->get_parent()->is_auth());
4479 assert(lock
->is_stable());
4481 if (lock
->get_parent()->is_freezing_or_frozen()) {
4482 dout(20) << " freezing|frozen" << dendl
;
4486 if (mdcache
->is_readonly()) {
4487 if (lock
->get_state() != LOCK_SYNC
) {
4488 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4489 simple_sync(lock
, need_issue
);
4494 if (!lock
->is_rdlocked() &&
4495 lock
->get_state() != LOCK_MIX
&&
4496 lock
->get_scatter_wanted()) {
4497 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4498 << " on " << *lock
->get_parent() << dendl
;
4499 scatter_mix(lock
, need_issue
);
4503 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4504 // in general, we want to keep INEST writable at all times.
4505 if (!lock
->is_rdlocked()) {
4506 if (lock
->get_parent()->is_replicated()) {
4507 if (lock
->get_state() != LOCK_MIX
)
4508 scatter_mix(lock
, need_issue
);
4510 if (lock
->get_state() != LOCK_LOCK
)
4511 simple_lock(lock
, need_issue
);
4517 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4518 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4519 // i _should_ be sync.
4520 if (!lock
->is_wrlocked() &&
4521 lock
->get_state() != LOCK_SYNC
) {
4522 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4523 simple_sync(lock
, need_issue
);
4530 * mark a scatterlock to indicate that the dir fnode has some dirty data
4532 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4535 if (lock
->get_updated_item()->is_on_list()) {
4536 dout(10) << "mark_updated_scatterlock " << *lock
4537 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4539 updated_scatterlocks
.push_back(lock
->get_updated_item());
4540 utime_t now
= ceph_clock_now();
4541 lock
->set_update_stamp(now
);
4542 dout(10) << "mark_updated_scatterlock " << *lock
4543 << " - added at " << now
<< dendl
;
4548 * this is called by scatter_tick and LogSegment::try_to_trim() when
4549 * trying to flush dirty scattered data (i.e. updated fnode) back to
4552 * we need to lock|scatter in order to push fnode changes into the
4555 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4557 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4559 if (p
->is_frozen() || p
->is_freezing()) {
4560 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4562 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4563 else if (lock
->is_dirty())
4564 // just requeue. not ideal.. starvation prone..
4565 updated_scatterlocks
.push_back(lock
->get_updated_item());
4569 if (p
->is_ambiguous_auth()) {
4570 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4572 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4573 else if (lock
->is_dirty())
4574 // just requeue. not ideal.. starvation prone..
4575 updated_scatterlocks
.push_back(lock
->get_updated_item());
4582 if (lock
->is_stable()) {
4583 // can we do it now?
4584 // (only if we're not replicated.. if we are, we really do need
4585 // to nudge the lock state!)
4587 actually, even if we're not replicated, we can't stay in MIX, because another mds
4588 could discover and replicate us at any time. if that happens while we're flushing,
4589 they end up in MIX but their inode has the old scatterstat version.
4591 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4592 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4593 scatter_writebehind(lock);
4595 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4600 if (mdcache
->is_readonly()) {
4601 if (lock
->get_state() != LOCK_SYNC
) {
4602 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4603 simple_sync(static_cast<ScatterLock
*>(lock
));
4608 // adjust lock state
4609 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4610 switch (lock
->get_type()) {
4611 case CEPH_LOCK_IFILE
:
4612 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4613 scatter_mix(static_cast<ScatterLock
*>(lock
));
4614 else if (lock
->get_state() != LOCK_LOCK
)
4615 simple_lock(static_cast<ScatterLock
*>(lock
));
4617 simple_sync(static_cast<ScatterLock
*>(lock
));
4620 case CEPH_LOCK_IDFT
:
4621 case CEPH_LOCK_INEST
:
4622 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4624 else if (lock
->get_state() != LOCK_LOCK
)
4633 if (lock
->is_stable() && count
== 2) {
4634 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4635 // this should only realy happen when called via
4636 // handle_file_lock due to AC_NUDGE, because the rest of the
4637 // time we are replicated or have dirty data and won't get
4638 // called. bailing here avoids an infinite loop.
4643 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4645 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4650 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4651 << *lock
<< " on " << *p
<< dendl
;
4652 // request unscatter?
4653 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4654 if (!mds
->is_cluster_degraded() ||
4655 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4656 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4660 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4662 // also, requeue, in case we had wrong auth or something
4663 if (lock
->is_dirty())
4664 updated_scatterlocks
.push_back(lock
->get_updated_item());
4668 void Locker::scatter_tick()
4670 dout(10) << "scatter_tick" << dendl
;
4673 utime_t now
= ceph_clock_now();
4674 int n
= updated_scatterlocks
.size();
4675 while (!updated_scatterlocks
.empty()) {
4676 ScatterLock
*lock
= updated_scatterlocks
.front();
4678 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4680 if (!lock
->is_dirty()) {
4681 updated_scatterlocks
.pop_front();
4682 dout(10) << " removing from updated_scatterlocks "
4683 << *lock
<< " " << *lock
->get_parent() << dendl
;
4686 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4688 updated_scatterlocks
.pop_front();
4689 scatter_nudge(lock
, 0);
4691 mds
->mdlog
->flush();
4695 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4697 dout(10) << "scatter_tempsync " << *lock
4698 << " on " << *lock
->get_parent() << dendl
;
4699 assert(lock
->get_parent()->is_auth());
4700 assert(lock
->is_stable());
4702 assert(0 == "not fully implemented, at least not for filelock");
4704 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4706 switch (lock
->get_state()) {
4707 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4708 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4709 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4710 default: ceph_abort();
4714 if (lock
->is_wrlocked())
4717 if (lock
->get_cap_shift() &&
4719 in
->issued_caps_need_gather(lock
)) {
4727 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4728 in
->is_replicated()) {
4729 lock
->init_gather();
4730 send_lock_message(lock
, LOCK_AC_LOCK
);
4738 lock
->set_state(LOCK_TSYN
);
4739 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4740 if (lock
->get_cap_shift()) {
4751 // ==========================================================================
4754 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4756 dout(7) << "local_wrlock_grab on " << *lock
4757 << " on " << *lock
->get_parent() << dendl
;
4759 assert(lock
->get_parent()->is_auth());
4760 assert(lock
->can_wrlock());
4761 assert(!mut
->wrlocks
.count(lock
));
4762 lock
->get_wrlock(mut
->get_client());
4763 mut
->wrlocks
.insert(lock
);
4764 mut
->locks
.insert(lock
);
4767 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4769 dout(7) << "local_wrlock_start on " << *lock
4770 << " on " << *lock
->get_parent() << dendl
;
4772 assert(lock
->get_parent()->is_auth());
4773 if (lock
->can_wrlock()) {
4774 assert(!mut
->wrlocks
.count(lock
));
4775 lock
->get_wrlock(mut
->get_client());
4776 mut
->wrlocks
.insert(lock
);
4777 mut
->locks
.insert(lock
);
4780 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4785 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4787 dout(7) << "local_wrlock_finish on " << *lock
4788 << " on " << *lock
->get_parent() << dendl
;
4790 mut
->wrlocks
.erase(lock
);
4791 mut
->locks
.erase(lock
);
4792 if (lock
->get_num_wrlocks() == 0) {
4793 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4794 SimpleLock::WAIT_WR
|
4795 SimpleLock::WAIT_RD
);
4799 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4801 dout(7) << "local_xlock_start on " << *lock
4802 << " on " << *lock
->get_parent() << dendl
;
4804 assert(lock
->get_parent()->is_auth());
4805 if (!lock
->can_xlock_local()) {
4806 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4810 lock
->get_xlock(mut
, mut
->get_client());
4811 mut
->xlocks
.insert(lock
);
4812 mut
->locks
.insert(lock
);
4816 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4818 dout(7) << "local_xlock_finish on " << *lock
4819 << " on " << *lock
->get_parent() << dendl
;
4821 mut
->xlocks
.erase(lock
);
4822 mut
->locks
.erase(lock
);
4824 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4825 SimpleLock::WAIT_WR
|
4826 SimpleLock::WAIT_RD
);
4831 // ==========================================================================
4835 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4837 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4838 int loner_wanted
, other_wanted
;
4839 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4840 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4841 << " loner_wanted=" << gcap_string(loner_wanted
)
4842 << " other_wanted=" << gcap_string(other_wanted
)
4843 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4846 assert(lock
->get_parent()->is_auth());
4847 assert(lock
->is_stable());
4849 if (lock
->get_parent()->is_freezing_or_frozen())
4852 if (mdcache
->is_readonly()) {
4853 if (lock
->get_state() != LOCK_SYNC
) {
4854 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4855 simple_sync(lock
, need_issue
);
4861 if (lock
->get_state() == LOCK_EXCL
) {
4862 dout(20) << " is excl" << dendl
;
4863 int loner_issued
, other_issued
, xlocker_issued
;
4864 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4865 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4866 << " other_issued=" << gcap_string(other_issued
)
4867 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4869 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4870 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4871 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4872 dout(20) << " should lose it" << dendl
;
4873 // we should lose it.
4884 // -> any writer means MIX; RD doesn't matter.
4885 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4886 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4887 scatter_mix(lock
, need_issue
);
4888 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4889 simple_sync(lock
, need_issue
);
4891 dout(10) << " waiting for wrlock to drain" << dendl
;
4896 else if (lock
->get_state() != LOCK_EXCL
&&
4897 !lock
->is_rdlocked() &&
4898 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4899 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4900 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4901 in
->get_target_loner() >= 0) {
4902 dout(7) << "file_eval stable, bump to loner " << *lock
4903 << " on " << *lock
->get_parent() << dendl
;
4904 file_excl(lock
, need_issue
);
4908 else if (lock
->get_state() != LOCK_MIX
&&
4909 !lock
->is_rdlocked() &&
4910 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4911 (lock
->get_scatter_wanted() ||
4912 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4913 dout(7) << "file_eval stable, bump to mixed " << *lock
4914 << " on " << *lock
->get_parent() << dendl
;
4915 scatter_mix(lock
, need_issue
);
4919 else if (lock
->get_state() != LOCK_SYNC
&&
4920 !lock
->is_wrlocked() && // drain wrlocks first!
4921 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4922 !(wanted
& CEPH_CAP_GWR
) &&
4923 !((lock
->get_state() == LOCK_MIX
) &&
4924 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4925 //((wanted & CEPH_CAP_RD) ||
4926 //in->is_replicated() ||
4927 //lock->is_leased() ||
4928 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4930 dout(7) << "file_eval stable, bump to sync " << *lock
4931 << " on " << *lock
->get_parent() << dendl
;
4932 simple_sync(lock
, need_issue
);
4938 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4940 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4942 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4943 assert(in
->is_auth());
4944 assert(lock
->is_stable());
4946 if (lock
->get_state() == LOCK_LOCK
) {
4947 in
->start_scatter(lock
);
4948 if (in
->is_replicated()) {
4950 bufferlist softdata
;
4951 lock
->encode_locked_state(softdata
);
4953 // bcast to replicas
4954 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4958 lock
->set_state(LOCK_MIX
);
4959 lock
->clear_scatter_wanted();
4960 if (lock
->get_cap_shift()) {
4968 switch (lock
->get_state()) {
4969 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4970 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4971 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
4972 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4973 default: ceph_abort();
4977 if (lock
->is_rdlocked())
4979 if (in
->is_replicated()) {
4980 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
4981 send_lock_message(lock
, LOCK_AC_MIX
);
4982 lock
->init_gather();
4986 if (lock
->is_leased()) {
4987 revoke_client_leases(lock
);
4990 if (lock
->get_cap_shift() &&
4992 in
->issued_caps_need_gather(lock
)) {
4999 bool need_recover
= false;
5000 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5001 mds
->mdcache
->queue_file_recover(in
);
5002 need_recover
= true;
5007 lock
->get_parent()->auth_pin(lock
);
5009 mds
->mdcache
->do_file_recover();
5011 in
->start_scatter(lock
);
5012 lock
->set_state(LOCK_MIX
);
5013 lock
->clear_scatter_wanted();
5014 if (in
->is_replicated()) {
5015 bufferlist softdata
;
5016 lock
->encode_locked_state(softdata
);
5017 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
5019 if (lock
->get_cap_shift()) {
5030 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
5032 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5033 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5035 assert(in
->is_auth());
5036 assert(lock
->is_stable());
5038 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
5039 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
5041 switch (lock
->get_state()) {
5042 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
5043 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
5044 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
5045 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
5046 default: ceph_abort();
5050 if (lock
->is_rdlocked())
5052 if (lock
->is_wrlocked())
5055 if (in
->is_replicated() &&
5056 lock
->get_state() != LOCK_LOCK_EXCL
&&
5057 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
5058 send_lock_message(lock
, LOCK_AC_LOCK
);
5059 lock
->init_gather();
5062 if (lock
->is_leased()) {
5063 revoke_client_leases(lock
);
5066 if (in
->is_head() &&
5067 in
->issued_caps_need_gather(lock
)) {
5074 bool need_recover
= false;
5075 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5076 mds
->mdcache
->queue_file_recover(in
);
5077 need_recover
= true;
5082 lock
->get_parent()->auth_pin(lock
);
5084 mds
->mdcache
->do_file_recover();
5086 lock
->set_state(LOCK_EXCL
);
5094 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5096 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5097 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5098 assert(in
->is_auth());
5099 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5101 switch (lock
->get_state()) {
5102 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5103 default: ceph_abort();
5107 if (lock
->is_wrlocked())
5110 if (in
->is_head() &&
5111 in
->issued_caps_need_gather(lock
)) {
5120 lock
->get_parent()->auth_pin(lock
);
5122 lock
->set_state(LOCK_XSYN
);
5123 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5131 void Locker::file_recover(ScatterLock
*lock
)
5133 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5134 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5136 assert(in
->is_auth());
5137 //assert(lock->is_stable());
5138 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5143 if (in->is_replicated()
5144 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5145 send_lock_message(lock, LOCK_AC_LOCK);
5146 lock->init_gather();
5150 if (in
->is_head() &&
5151 in
->issued_caps_need_gather(lock
)) {
5156 lock
->set_state(LOCK_SCAN
);
5158 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5160 mds
->mdcache
->queue_file_recover(in
);
5165 /* This function DOES put the passed message before returning */
5166 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5168 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5169 int from
= m
->get_asker();
5171 if (mds
->is_rejoin()) {
5172 if (in
->is_rejoining()) {
5173 dout(7) << "handle_file_lock still rejoining " << *in
5174 << ", dropping " << *m
<< dendl
;
5180 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5182 << " from mds." << from
<< " "
5185 bool caps
= lock
->get_cap_shift();
5187 switch (m
->get_action()) {
5190 assert(lock
->get_state() == LOCK_LOCK
||
5191 lock
->get_state() == LOCK_MIX
||
5192 lock
->get_state() == LOCK_MIX_SYNC2
);
5194 if (lock
->get_state() == LOCK_MIX
) {
5195 lock
->set_state(LOCK_MIX_SYNC
);
5196 eval_gather(lock
, true);
5197 if (lock
->is_unstable_and_locked())
5198 mds
->mdlog
->flush();
5202 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5203 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5206 lock
->decode_locked_state(m
->get_data());
5207 lock
->set_state(LOCK_SYNC
);
5212 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5217 switch (lock
->get_state()) {
5218 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5219 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5220 default: ceph_abort();
5223 eval_gather(lock
, true);
5224 if (lock
->is_unstable_and_locked())
5225 mds
->mdlog
->flush();
5229 case LOCK_AC_LOCKFLUSHED
:
5230 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5231 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5232 // wake up scatter_nudge waiters
5233 if (lock
->is_stable())
5234 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5238 assert(lock
->get_state() == LOCK_SYNC
||
5239 lock
->get_state() == LOCK_LOCK
||
5240 lock
->get_state() == LOCK_SYNC_MIX2
);
5242 if (lock
->get_state() == LOCK_SYNC
) {
5244 lock
->set_state(LOCK_SYNC_MIX
);
5245 eval_gather(lock
, true);
5246 if (lock
->is_unstable_and_locked())
5247 mds
->mdlog
->flush();
5252 lock
->set_state(LOCK_MIX
);
5253 lock
->decode_locked_state(m
->get_data());
5258 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5263 case LOCK_AC_LOCKACK
:
5264 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5265 lock
->get_state() == LOCK_MIX_LOCK
||
5266 lock
->get_state() == LOCK_MIX_LOCK2
||
5267 lock
->get_state() == LOCK_MIX_EXCL
||
5268 lock
->get_state() == LOCK_SYNC_EXCL
||
5269 lock
->get_state() == LOCK_SYNC_MIX
||
5270 lock
->get_state() == LOCK_MIX_TSYN
);
5271 assert(lock
->is_gathering(from
));
5272 lock
->remove_gather(from
);
5274 if (lock
->get_state() == LOCK_MIX_LOCK
||
5275 lock
->get_state() == LOCK_MIX_LOCK2
||
5276 lock
->get_state() == LOCK_MIX_EXCL
||
5277 lock
->get_state() == LOCK_MIX_TSYN
) {
5278 lock
->decode_locked_state(m
->get_data());
5279 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5280 // delay calling scatter_writebehind().
5281 lock
->clear_flushed();
5284 if (lock
->is_gathering()) {
5285 dout(7) << "handle_file_lock " << *in
<< " from " << from
5286 << ", still gathering " << lock
->get_gather_set() << dendl
;
5288 dout(7) << "handle_file_lock " << *in
<< " from " << from
5289 << ", last one" << dendl
;
5294 case LOCK_AC_SYNCACK
:
5295 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5296 assert(lock
->is_gathering(from
));
5297 lock
->remove_gather(from
);
5299 lock
->decode_locked_state(m
->get_data());
5301 if (lock
->is_gathering()) {
5302 dout(7) << "handle_file_lock " << *in
<< " from " << from
5303 << ", still gathering " << lock
->get_gather_set() << dendl
;
5305 dout(7) << "handle_file_lock " << *in
<< " from " << from
5306 << ", last one" << dendl
;
5311 case LOCK_AC_MIXACK
:
5312 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5313 assert(lock
->is_gathering(from
));
5314 lock
->remove_gather(from
);
5316 if (lock
->is_gathering()) {
5317 dout(7) << "handle_file_lock " << *in
<< " from " << from
5318 << ", still gathering " << lock
->get_gather_set() << dendl
;
5320 dout(7) << "handle_file_lock " << *in
<< " from " << from
5321 << ", last one" << dendl
;
5328 case LOCK_AC_REQSCATTER
:
5329 if (lock
->is_stable()) {
5330 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5331 * because the replica should be holding an auth_pin if they're
5332 * doing this (and thus, we are freezing, not frozen, and indefinite
5333 * starvation isn't an issue).
5335 dout(7) << "handle_file_lock got scatter request on " << *lock
5336 << " on " << *lock
->get_parent() << dendl
;
5337 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5340 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5341 << " on " << *lock
->get_parent() << dendl
;
5342 lock
->set_scatter_wanted();
5346 case LOCK_AC_REQUNSCATTER
:
5347 if (lock
->is_stable()) {
5348 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5349 * because the replica should be holding an auth_pin if they're
5350 * doing this (and thus, we are freezing, not frozen, and indefinite
5351 * starvation isn't an issue).
5353 dout(7) << "handle_file_lock got unscatter request on " << *lock
5354 << " on " << *lock
->get_parent() << dendl
;
5355 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5356 simple_lock(lock
); // FIXME tempsync?
5358 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5359 << " on " << *lock
->get_parent() << dendl
;
5360 lock
->set_unscatter_wanted();
5364 case LOCK_AC_REQRDLOCK
:
5365 handle_reqrdlock(lock
, m
);
5369 if (!lock
->get_parent()->is_auth()) {
5370 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5371 << " on " << *lock
->get_parent() << dendl
;
5372 } else if (!lock
->get_parent()->is_replicated()) {
5373 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5374 << " on " << *lock
->get_parent() << dendl
;
5376 dout(7) << "handle_file_lock trying nudge on " << *lock
5377 << " on " << *lock
->get_parent() << dendl
;
5378 scatter_nudge(lock
, 0, true);
5379 mds
->mdlog
->flush();