1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
20 #include "MDBalancer.h"
26 #include "MDSContext.h"
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
44 #include "messages/MMDSSlaveRequest.h"
48 #include "common/config.h"
51 #define dout_subsys ceph_subsys_mds
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
56 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
60 class LockerContext
: public MDSInternalContextBase
{
63 MDSRank
*get_mds() override
69 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
70 assert(locker
!= NULL
);
74 class LockerLogContext
: public MDSLogContextBase
{
77 MDSRank
*get_mds() override
83 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
84 assert(locker
!= NULL
);
88 /* This function DOES put the passed message before returning */
89 void Locker::dispatch(Message
*m
)
92 switch (m
->get_type()) {
96 handle_lock(static_cast<MLock
*>(m
));
99 case MSG_MDS_INODEFILECAPS
:
100 handle_inode_file_caps(static_cast<MInodeFileCaps
*>(m
));
104 case CEPH_MSG_CLIENT_CAPS
:
105 handle_client_caps(static_cast<MClientCaps
*>(m
));
108 case CEPH_MSG_CLIENT_CAPRELEASE
:
109 handle_client_cap_release(static_cast<MClientCapRelease
*>(m
));
111 case CEPH_MSG_CLIENT_LEASE
:
112 handle_client_lease(static_cast<MClientLease
*>(m
));
116 derr
<< "locker unknown message " << m
->get_type() << dendl
;
117 assert(0 == "locker unknown message");
134 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
136 for (const auto &it
: lock
->get_parent()->get_replicas()) {
137 if (mds
->is_cluster_degraded() &&
138 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
140 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
141 mds
->send_message_mds(m
, it
.first
);
145 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
147 for (const auto &it
: lock
->get_parent()->get_replicas()) {
148 if (mds
->is_cluster_degraded() &&
149 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
151 MLock
*m
= new MLock(lock
, msg
, mds
->get_nodeid());
153 mds
->send_message_mds(m
, it
.first
);
160 void Locker::include_snap_rdlocks(set
<SimpleLock
*>& rdlocks
, CInode
*in
)
162 // rdlock ancestor snaps
164 rdlocks
.insert(&in
->snaplock
);
165 while (t
->get_projected_parent_dn()) {
166 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
167 rdlocks
.insert(&t
->snaplock
);
171 void Locker::include_snap_rdlocks_wlayout(set
<SimpleLock
*>& rdlocks
, CInode
*in
,
172 file_layout_t
**layout
)
174 //rdlock ancestor snaps
176 rdlocks
.insert(&in
->snaplock
);
177 rdlocks
.insert(&in
->policylock
);
178 bool found_layout
= false;
180 rdlocks
.insert(&t
->snaplock
);
182 rdlocks
.insert(&t
->policylock
);
183 if (t
->get_projected_inode()->has_layout()) {
184 *layout
= &t
->get_projected_inode()->layout
;
188 if (t
->get_projected_parent_dn() &&
189 t
->get_projected_parent_dn()->get_dir())
190 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
195 struct MarkEventOnDestruct
{
199 MarkEventOnDestruct(MDRequestRef
& _mdr
,
200 const char *_message
) : mdr(_mdr
),
203 ~MarkEventOnDestruct() {
205 mdr
->mark_event(message
);
209 /* If this function returns false, the mdr has been placed
210 * on the appropriate wait list */
211 bool Locker::acquire_locks(MDRequestRef
& mdr
,
212 set
<SimpleLock
*> &rdlocks
,
213 set
<SimpleLock
*> &wrlocks
,
214 set
<SimpleLock
*> &xlocks
,
215 map
<SimpleLock
*,mds_rank_t
> *remote_wrlocks
,
216 CInode
*auth_pin_freeze
,
217 bool auth_pin_nonblock
)
219 if (mdr
->done_locking
&&
220 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
221 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
222 return true; // at least we had better be!
224 dout(10) << "acquire_locks " << *mdr
<< dendl
;
226 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
228 client_t client
= mdr
->get_client();
230 set
<SimpleLock
*, SimpleLock::ptr_lt
> sorted
; // sort everything we will lock
231 set
<MDSCacheObject
*> mustpin
; // items to authpin
234 for (set
<SimpleLock
*>::iterator p
= xlocks
.begin(); p
!= xlocks
.end(); ++p
) {
235 SimpleLock
*lock
= *p
;
237 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
238 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
239 mds
->is_cluster_degraded() &&
241 !mdr
->is_queued_for_replay()) {
242 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
243 // get processed in proper order.
245 if (lock
->get_parent()->is_auth()) {
246 if (!mdr
->locks
.count(lock
)) {
248 lock
->get_parent()->list_replicas(ls
);
250 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
257 // if the lock is the latest locked one, it's possible that slave mds got the lock
258 // while there are recovering mds.
259 if (!mdr
->locks
.count(lock
) || lock
== *mdr
->locks
.rbegin())
263 dout(10) << " must xlock " << *lock
<< " " << *lock
->get_parent()
264 << ", waiting for cluster recovered" << dendl
;
265 mds
->locker
->drop_locks(mdr
.get(), NULL
);
266 mdr
->drop_local_auth_pins();
267 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
272 dout(20) << " must xlock " << *lock
<< " " << *lock
->get_parent() << dendl
;
275 mustpin
.insert(lock
->get_parent());
277 // augment xlock with a versionlock?
278 if ((*p
)->get_type() == CEPH_LOCK_DN
) {
279 CDentry
*dn
= (CDentry
*)lock
->get_parent();
283 if (xlocks
.count(&dn
->versionlock
))
284 continue; // we're xlocking the versionlock too; don't wrlock it!
286 if (mdr
->is_master()) {
287 // master. wrlock versionlock so we can pipeline dentry updates to journal.
288 wrlocks
.insert(&dn
->versionlock
);
290 // slave. exclusively lock the dentry version (i.e. block other journal updates).
291 // this makes rollback safe.
292 xlocks
.insert(&dn
->versionlock
);
293 sorted
.insert(&dn
->versionlock
);
296 if (lock
->get_type() > CEPH_LOCK_IVERSION
) {
297 // inode version lock?
298 CInode
*in
= (CInode
*)lock
->get_parent();
301 if (mdr
->is_master()) {
302 // master. wrlock versionlock so we can pipeline inode updates to journal.
303 wrlocks
.insert(&in
->versionlock
);
305 // slave. exclusively lock the inode version (i.e. block other journal updates).
306 // this makes rollback safe.
307 xlocks
.insert(&in
->versionlock
);
308 sorted
.insert(&in
->versionlock
);
314 for (set
<SimpleLock
*>::iterator p
= wrlocks
.begin(); p
!= wrlocks
.end(); ++p
) {
315 MDSCacheObject
*object
= (*p
)->get_parent();
316 dout(20) << " must wrlock " << **p
<< " " << *object
<< dendl
;
318 if (object
->is_auth())
319 mustpin
.insert(object
);
320 else if (!object
->is_auth() &&
321 !(*p
)->can_wrlock(client
) && // we might have to request a scatter
322 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
323 dout(15) << " will also auth_pin " << *object
324 << " in case we need to request a scatter" << dendl
;
325 mustpin
.insert(object
);
330 if (remote_wrlocks
) {
331 for (map
<SimpleLock
*,mds_rank_t
>::iterator p
= remote_wrlocks
->begin(); p
!= remote_wrlocks
->end(); ++p
) {
332 MDSCacheObject
*object
= p
->first
->get_parent();
333 dout(20) << " must remote_wrlock on mds." << p
->second
<< " "
334 << *p
->first
<< " " << *object
<< dendl
;
335 sorted
.insert(p
->first
);
336 mustpin
.insert(object
);
341 for (set
<SimpleLock
*>::iterator p
= rdlocks
.begin();
344 MDSCacheObject
*object
= (*p
)->get_parent();
345 dout(20) << " must rdlock " << **p
<< " " << *object
<< dendl
;
347 if (object
->is_auth())
348 mustpin
.insert(object
);
349 else if (!object
->is_auth() &&
350 !(*p
)->can_rdlock(client
)) { // we might have to request an rdlock
351 dout(15) << " will also auth_pin " << *object
352 << " in case we need to request a rdlock" << dendl
;
353 mustpin
.insert(object
);
359 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
361 // can i auth pin them all now?
362 marker
.message
= "failed to authpin local pins";
363 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
366 MDSCacheObject
*object
= *p
;
368 dout(10) << " must authpin " << *object
<< dendl
;
370 if (mdr
->is_auth_pinned(object
)) {
371 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
373 if (mdr
->more()->is_remote_frozen_authpin
) {
374 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
376 // unfreeze auth pin for the wrong inode
377 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
381 if (!object
->is_auth()) {
382 if (!mdr
->locks
.empty())
383 drop_locks(mdr
.get());
384 if (object
->is_ambiguous_auth()) {
386 marker
.message
= "waiting for single auth, object is being migrated";
387 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
388 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
389 mdr
->drop_local_auth_pins();
392 mustpin_remote
[object
->authority().first
].insert(object
);
396 if (!object
->can_auth_pin(&err
)) {
398 drop_locks(mdr
.get());
399 mdr
->drop_local_auth_pins();
400 if (auth_pin_nonblock
) {
401 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
405 if (err
== MDSCacheObject::ERR_EXPORTING_TREE
) {
406 marker
.message
= "failed to authpin, subtree is being exported";
407 } else if (err
== MDSCacheObject::ERR_FRAGMENTING_DIR
) {
408 marker
.message
= "failed to authpin, dir is being fragmented";
409 } else if (err
== MDSCacheObject::ERR_EXPORTING_INODE
) {
410 marker
.message
= "failed to authpin, inode is being exported";
412 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
413 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
415 if (!mdr
->remote_auth_pins
.empty())
416 notify_freeze_waiter(object
);
422 // ok, grab local auth pins
423 for (set
<MDSCacheObject
*>::iterator p
= mustpin
.begin();
426 MDSCacheObject
*object
= *p
;
427 if (mdr
->is_auth_pinned(object
)) {
428 dout(10) << " already auth_pinned " << *object
<< dendl
;
429 } else if (object
->is_auth()) {
430 dout(10) << " auth_pinning " << *object
<< dendl
;
431 mdr
->auth_pin(object
);
435 // request remote auth_pins
436 if (!mustpin_remote
.empty()) {
437 marker
.message
= "requesting remote authpins";
438 for (map
<MDSCacheObject
*,mds_rank_t
>::iterator p
= mdr
->remote_auth_pins
.begin();
439 p
!= mdr
->remote_auth_pins
.end();
441 if (mustpin
.count(p
->first
)) {
442 assert(p
->second
== p
->first
->authority().first
);
443 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
->second
);
444 if (q
!= mustpin_remote
.end())
445 q
->second
.insert(p
->first
);
448 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
449 p
!= mustpin_remote
.end();
451 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
453 // wait for active auth
454 if (mds
->is_cluster_degraded() &&
455 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
456 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
457 if (mdr
->more()->waiting_on_slave
.empty())
458 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
462 MMDSSlaveRequest
*req
= new MMDSSlaveRequest(mdr
->reqid
, mdr
->attempt
,
463 MMDSSlaveRequest::OP_AUTHPIN
);
464 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
465 q
!= p
->second
.end();
467 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
468 MDSCacheObjectInfo info
;
469 (*q
)->set_object_info(info
);
470 req
->get_authpins().push_back(info
);
471 if (*q
== auth_pin_freeze
)
472 (*q
)->set_object_info(req
->get_authpin_freeze());
475 if (auth_pin_nonblock
)
476 req
->mark_nonblock();
477 mds
->send_message_mds(req
, p
->first
);
479 // put in waiting list
480 assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
481 mdr
->more()->waiting_on_slave
.insert(p
->first
);
486 // caps i'll need to issue
487 set
<CInode
*> issue_set
;
491 // make sure they match currently acquired locks.
492 set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator existing
= mdr
->locks
.begin();
493 for (set
<SimpleLock
*, SimpleLock::ptr_lt
>::iterator p
= sorted
.begin();
496 bool need_wrlock
= !!wrlocks
.count(*p
);
497 bool need_remote_wrlock
= !!(remote_wrlocks
&& remote_wrlocks
->count(*p
));
500 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
502 SimpleLock
*have
= *existing
;
504 if (xlocks
.count(have
) && mdr
->xlocks
.count(have
)) {
505 dout(10) << " already xlocked " << *have
<< " " << *have
->get_parent() << dendl
;
508 if (mdr
->remote_wrlocks
.count(have
)) {
509 if (!need_remote_wrlock
||
510 mdr
->remote_wrlocks
[have
] != (*remote_wrlocks
)[have
]) {
511 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr
->remote_wrlocks
[have
]
512 << " " << *have
<< " " << *have
->get_parent() << dendl
;
513 remote_wrlock_finish(have
, mdr
->remote_wrlocks
[have
], mdr
.get());
516 if (need_wrlock
|| need_remote_wrlock
) {
517 if (need_wrlock
== !!mdr
->wrlocks
.count(have
) &&
518 need_remote_wrlock
== !!mdr
->remote_wrlocks
.count(have
)) {
520 dout(10) << " already wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
521 if (need_remote_wrlock
)
522 dout(10) << " already remote_wrlocked " << *have
<< " " << *have
->get_parent() << dendl
;
526 if (rdlocks
.count(have
) && mdr
->rdlocks
.count(have
)) {
527 dout(10) << " already rdlocked " << *have
<< " " << *have
->get_parent() << dendl
;
532 // hose any stray locks
533 if (existing
!= mdr
->locks
.end() && *existing
== *p
) {
534 assert(need_wrlock
|| need_remote_wrlock
);
535 SimpleLock
*lock
= *existing
;
536 if (mdr
->wrlocks
.count(lock
)) {
538 dout(10) << " unlocking extra " << *lock
<< " " << *lock
->get_parent() << dendl
;
539 else if (need_remote_wrlock
) // acquire remote_wrlock first
540 dout(10) << " unlocking out-of-order " << *lock
<< " " << *lock
->get_parent() << dendl
;
541 bool need_issue
= false;
542 wrlock_finish(lock
, mdr
.get(), &need_issue
);
544 issue_set
.insert(static_cast<CInode
*>(lock
->get_parent()));
548 while (existing
!= mdr
->locks
.end()) {
549 SimpleLock
*stray
= *existing
;
551 dout(10) << " unlocking out-of-order " << *stray
<< " " << *stray
->get_parent() << dendl
;
552 bool need_issue
= false;
553 if (mdr
->xlocks
.count(stray
)) {
554 xlock_finish(stray
, mdr
.get(), &need_issue
);
555 } else if (mdr
->rdlocks
.count(stray
)) {
556 rdlock_finish(stray
, mdr
.get(), &need_issue
);
558 // may have acquired both wrlock and remore wrlock
559 if (mdr
->wrlocks
.count(stray
))
560 wrlock_finish(stray
, mdr
.get(), &need_issue
);
561 if (mdr
->remote_wrlocks
.count(stray
))
562 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
565 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
569 if (mdr
->locking
&& *p
!= mdr
->locking
) {
570 cancel_locking(mdr
.get(), &issue_set
);
572 if (xlocks
.count(*p
)) {
573 marker
.message
= "failed to xlock, waiting";
574 if (!xlock_start(*p
, mdr
))
576 dout(10) << " got xlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
577 } else if (need_wrlock
|| need_remote_wrlock
) {
578 if (need_remote_wrlock
&& !mdr
->remote_wrlocks
.count(*p
)) {
579 marker
.message
= "waiting for remote wrlocks";
580 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
583 if (need_wrlock
&& !mdr
->wrlocks
.count(*p
)) {
584 marker
.message
= "failed to wrlock, waiting";
585 if (need_remote_wrlock
&& !(*p
)->can_wrlock(mdr
->get_client())) {
586 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
587 // can't take the wrlock because the scatter lock is gathering. need to
588 // release the remote wrlock, so that the gathering process can finish.
589 remote_wrlock_finish(*p
, mdr
->remote_wrlocks
[*p
], mdr
.get());
590 remote_wrlock_start(*p
, (*remote_wrlocks
)[*p
], mdr
);
593 // nowait if we have already gotten remote wrlock
594 if (!wrlock_start(*p
, mdr
, need_remote_wrlock
))
596 dout(10) << " got wrlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
599 assert(mdr
->is_master());
600 if ((*p
)->needs_recover()) {
601 if (mds
->is_cluster_degraded()) {
602 if (!mdr
->is_queued_for_replay()) {
603 // see comments in SimpleLock::set_state_rejoin() and
604 // ScatterLock::encode_state_for_rejoin()
605 drop_locks(mdr
.get());
606 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
607 dout(10) << " rejoin recovering " << **p
<< " " << *(*p
)->get_parent()
608 << ", waiting for cluster recovered" << dendl
;
609 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
613 (*p
)->clear_need_recover();
617 marker
.message
= "failed to rdlock, waiting";
618 if (!rdlock_start(*p
, mdr
))
620 dout(10) << " got rdlock on " << **p
<< " " << *(*p
)->get_parent() << dendl
;
624 // any extra unneeded locks?
625 while (existing
!= mdr
->locks
.end()) {
626 SimpleLock
*stray
= *existing
;
628 dout(10) << " unlocking extra " << *stray
<< " " << *stray
->get_parent() << dendl
;
629 bool need_issue
= false;
630 if (mdr
->xlocks
.count(stray
)) {
631 xlock_finish(stray
, mdr
.get(), &need_issue
);
632 } else if (mdr
->rdlocks
.count(stray
)) {
633 rdlock_finish(stray
, mdr
.get(), &need_issue
);
635 // may have acquired both wrlock and remore wrlock
636 if (mdr
->wrlocks
.count(stray
))
637 wrlock_finish(stray
, mdr
.get(), &need_issue
);
638 if (mdr
->remote_wrlocks
.count(stray
))
639 remote_wrlock_finish(stray
, mdr
->remote_wrlocks
[stray
], mdr
.get());
642 issue_set
.insert(static_cast<CInode
*>(stray
->get_parent()));
645 mdr
->done_locking
= true;
646 mdr
->set_mds_stamp(ceph_clock_now());
648 marker
.message
= "acquired locks";
651 issue_caps_set(issue_set
);
655 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
658 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
660 dir
= in
->get_parent_dir();
661 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
664 dir
= dynamic_cast<CDir
*>(o
);
668 if (dir
->is_freezing_dir())
669 mdcache
->fragment_freeze_inc_num_waiters(dir
);
670 if (dir
->is_freezing_tree()) {
671 while (!dir
->is_freezing_tree_root())
672 dir
= dir
->get_parent_dir();
673 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
678 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
680 for (set
<SimpleLock
*>::iterator p
= mut
->xlocks
.begin();
681 p
!= mut
->xlocks
.end();
683 MDSCacheObject
*object
= (*p
)->get_parent();
684 assert(object
->is_auth());
686 ((*p
)->get_type() == CEPH_LOCK_DN
|| (*p
)->get_type() == CEPH_LOCK_DVERSION
))
688 dout(10) << "set_xlocks_done on " << **p
<< " " << *object
<< dendl
;
689 (*p
)->set_xlock_done();
693 void Locker::_drop_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
695 while (!mut
->rdlocks
.empty()) {
697 MDSCacheObject
*p
= (*mut
->rdlocks
.begin())->get_parent();
698 rdlock_finish(*mut
->rdlocks
.begin(), mut
, &ni
);
700 pneed_issue
->insert(static_cast<CInode
*>(p
));
704 void Locker::_drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
706 set
<mds_rank_t
> slaves
;
708 while (!mut
->xlocks
.empty()) {
709 SimpleLock
*lock
= *mut
->xlocks
.begin();
710 MDSCacheObject
*p
= lock
->get_parent();
712 assert(lock
->get_sm()->can_remote_xlock
);
713 slaves
.insert(p
->authority().first
);
715 mut
->locks
.erase(lock
);
716 mut
->xlocks
.erase(lock
);
720 xlock_finish(lock
, mut
, &ni
);
722 pneed_issue
->insert(static_cast<CInode
*>(p
));
725 while (!mut
->remote_wrlocks
.empty()) {
726 map
<SimpleLock
*,mds_rank_t
>::iterator p
= mut
->remote_wrlocks
.begin();
727 slaves
.insert(p
->second
);
728 if (mut
->wrlocks
.count(p
->first
) == 0)
729 mut
->locks
.erase(p
->first
);
730 mut
->remote_wrlocks
.erase(p
);
733 while (!mut
->wrlocks
.empty()) {
735 MDSCacheObject
*p
= (*mut
->wrlocks
.begin())->get_parent();
736 wrlock_finish(*mut
->wrlocks
.begin(), mut
, &ni
);
738 pneed_issue
->insert(static_cast<CInode
*>(p
));
741 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
742 if (!mds
->is_cluster_degraded() ||
743 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
744 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
745 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
746 MMDSSlaveRequest::OP_DROPLOCKS
);
747 mds
->send_message_mds(slavereq
, *p
);
752 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
754 SimpleLock
*lock
= mut
->locking
;
756 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
758 if (lock
->get_parent()->is_auth()) {
759 bool need_issue
= false;
760 if (lock
->get_state() == LOCK_PREXLOCK
) {
761 _finish_xlock(lock
, -1, &need_issue
);
762 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
&&
763 lock
->get_num_xlocks() == 0) {
764 lock
->set_state(LOCK_XLOCKDONE
);
765 eval_gather(lock
, true, &need_issue
);
768 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
770 mut
->finish_locking(lock
);
773 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
776 set
<CInode
*> my_need_issue
;
778 pneed_issue
= &my_need_issue
;
781 cancel_locking(mut
, pneed_issue
);
782 _drop_non_rdlocks(mut
, pneed_issue
);
783 _drop_rdlocks(mut
, pneed_issue
);
785 if (pneed_issue
== &my_need_issue
)
786 issue_caps_set(*pneed_issue
);
787 mut
->done_locking
= false;
790 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
792 set
<CInode
*> my_need_issue
;
794 pneed_issue
= &my_need_issue
;
796 _drop_non_rdlocks(mut
, pneed_issue
);
798 if (pneed_issue
== &my_need_issue
)
799 issue_caps_set(*pneed_issue
);
802 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
804 set
<CInode
*> need_issue
;
806 for (auto p
= mut
->rdlocks
.begin(); p
!= mut
->rdlocks
.end(); ) {
807 SimpleLock
*lock
= *p
;
809 // make later mksnap/setlayout (at other mds) wait for this unsafe request
810 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
811 lock
->get_type() == CEPH_LOCK_IPOLICY
)
814 rdlock_finish(lock
, mut
, &ni
);
816 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
819 issue_caps_set(need_issue
);
824 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, list
<MDSInternalContextBase
*> *pfinishers
)
826 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
827 assert(!lock
->is_stable());
829 int next
= lock
->get_next_state();
832 bool caps
= lock
->get_cap_shift();
833 if (lock
->get_type() != CEPH_LOCK_DN
)
834 in
= static_cast<CInode
*>(lock
->get_parent());
836 bool need_issue
= false;
838 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
839 assert(!caps
|| in
!= NULL
);
840 if (caps
&& in
->is_head()) {
841 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
842 lock
->get_cap_shift(), lock
->get_cap_mask());
843 dout(10) << " next state is " << lock
->get_state_name(next
)
844 << " issued/allows loner " << gcap_string(loner_issued
)
845 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
846 << " xlocker " << gcap_string(xlocker_issued
)
847 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
848 << " other " << gcap_string(other_issued
)
849 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
852 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
853 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
854 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
858 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
859 bool auth
= lock
->get_parent()->is_auth();
860 if (!lock
->is_gathering() &&
861 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
862 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
863 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
864 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
865 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
866 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
867 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
868 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
869 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
870 lock
->get_state() != LOCK_MIX_SYNC2
872 dout(7) << "eval_gather finished gather on " << *lock
873 << " on " << *lock
->get_parent() << dendl
;
875 if (lock
->get_sm() == &sm_filelock
) {
877 if (in
->state_test(CInode::STATE_RECOVERING
)) {
878 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
880 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
881 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
882 mds
->mdcache
->queue_file_recover(in
);
883 mds
->mdcache
->do_file_recover();
888 if (!lock
->get_parent()->is_auth()) {
889 // replica: tell auth
890 mds_rank_t auth
= lock
->get_parent()->authority().first
;
892 if (lock
->get_parent()->is_rejoining() &&
893 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
894 dout(7) << "eval_gather finished gather, but still rejoining "
895 << *lock
->get_parent() << dendl
;
899 if (!mds
->is_cluster_degraded() ||
900 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
901 switch (lock
->get_state()) {
903 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()),
909 MLock
*reply
= new MLock(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
910 lock
->encode_locked_state(reply
->get_data());
911 mds
->send_message_mds(reply
, auth
);
912 next
= LOCK_MIX_SYNC2
;
913 (static_cast<ScatterLock
*>(lock
))->start_flush();
918 (static_cast<ScatterLock
*>(lock
))->finish_flush();
919 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
922 // do nothing, we already acked
927 MLock
*reply
= new MLock(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
928 mds
->send_message_mds(reply
, auth
);
929 next
= LOCK_SYNC_MIX2
;
936 lock
->encode_locked_state(data
);
937 mds
->send_message_mds(new MLock(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
938 (static_cast<ScatterLock
*>(lock
))->start_flush();
939 // we'll get an AC_LOCKFLUSHED to complete
950 // once the first (local) stage of mix->lock gather complete we can
951 // gather from replicas
952 if (lock
->get_state() == LOCK_MIX_LOCK
&&
953 lock
->get_parent()->is_replicated()) {
954 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
955 send_lock_message(lock
, LOCK_AC_LOCK
);
957 lock
->set_state(LOCK_MIX_LOCK2
);
961 if (lock
->is_dirty() && !lock
->is_flushed()) {
962 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
966 lock
->clear_flushed();
968 switch (lock
->get_state()) {
974 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
975 if (lock
->get_parent()->is_replicated()) {
977 lock
->encode_locked_state(softdata
);
978 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
980 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
985 if (next
!= LOCK_SYNC
)
994 if (lock
->get_parent()->is_replicated()) {
996 lock
->encode_locked_state(softdata
);
997 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
1004 lock
->set_state(next
);
1006 if (lock
->get_parent()->is_auth() &&
1008 lock
->get_parent()->auth_unpin(lock
);
1010 // drop loner before doing waiters
1014 in
->get_wanted_loner() != in
->get_loner()) {
1015 dout(10) << " trying to drop loner" << dendl
;
1016 if (in
->try_drop_loner()) {
1017 dout(10) << " dropped loner" << dendl
;
1023 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1026 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1028 if (caps
&& in
->is_head())
1031 if (lock
->get_parent()->is_auth() &&
1033 try_eval(lock
, &need_issue
);
1038 *pneed_issue
= true;
1039 else if (in
->is_head())
1045 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1047 bool need_issue
= caps_imported
;
1048 list
<MDSInternalContextBase
*> finishers
;
1050 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1053 if (in
->is_auth() && in
->is_head()) {
1054 client_t orig_loner
= in
->get_loner();
1055 if (in
->choose_ideal_loner()) {
1056 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1059 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1060 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1066 if (mask
& CEPH_LOCK_IFILE
)
1067 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1068 if (mask
& CEPH_LOCK_IAUTH
)
1069 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1070 if (mask
& CEPH_LOCK_ILINK
)
1071 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1072 if (mask
& CEPH_LOCK_IXATTR
)
1073 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1074 if (mask
& CEPH_LOCK_INEST
)
1075 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1076 if (mask
& CEPH_LOCK_IFLOCK
)
1077 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1078 if (mask
& CEPH_LOCK_IPOLICY
)
1079 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1082 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1083 if (in
->try_drop_loner()) {
1085 if (in
->get_wanted_loner() >= 0) {
1086 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1087 bool ok
= in
->try_set_loner();
1095 finish_contexts(g_ceph_context
, finishers
);
1097 if (need_issue
&& in
->is_head())
1100 dout(10) << "eval done" << dendl
;
1104 class C_Locker_Eval
: public LockerContext
{
1108 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1109 // We are used as an MDSCacheObject waiter, so should
1110 // only be invoked by someone already holding the big lock.
1111 assert(locker
->mds
->mds_lock
.is_locked_by_me());
1112 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1114 void finish(int r
) override
{
1115 locker
->try_eval(p
, mask
);
1116 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1120 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1122 // unstable and ambiguous auth?
1123 if (p
->is_ambiguous_auth()) {
1124 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1125 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1129 if (p
->is_auth() && p
->is_frozen()) {
1130 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1131 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1135 if (mask
& CEPH_LOCK_DN
) {
1136 assert(mask
== CEPH_LOCK_DN
);
1137 bool need_issue
= false; // ignore this, no caps on dentries
1138 CDentry
*dn
= static_cast<CDentry
*>(p
);
1139 eval_any(&dn
->lock
, &need_issue
);
1141 CInode
*in
= static_cast<CInode
*>(p
);
1146 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1148 MDSCacheObject
*p
= lock
->get_parent();
1150 // unstable and ambiguous auth?
1151 if (p
->is_ambiguous_auth()) {
1152 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1153 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1157 if (!p
->is_auth()) {
1158 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1162 if (p
->is_frozen()) {
1163 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1164 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1169 * We could have a situation like:
1171 * - mds A authpins item on mds B
1172 * - mds B starts to freeze tree containing item
1173 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1174 * - mds B lock is unstable, sets scatter_wanted
1175 * - mds B lock stabilizes, calls try_eval.
1177 * We can defer while freezing without causing a deadlock. Honor
1178 * scatter_wanted flag here. This will never get deferred by the
1179 * checks above due to the auth_pin held by the master.
1181 if (lock
->is_scatterlock()) {
1182 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1183 if (slock
->get_scatter_wanted() &&
1184 slock
->get_state() != LOCK_MIX
) {
1185 scatter_mix(slock
, pneed_issue
);
1186 if (!lock
->is_stable())
1188 } else if (slock
->get_unscatter_wanted() &&
1189 slock
->get_state() != LOCK_LOCK
) {
1190 simple_lock(slock
, pneed_issue
);
1191 if (!lock
->is_stable()) {
1197 if (lock
->get_type() != CEPH_LOCK_DN
&& p
->is_freezing()) {
1198 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1199 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1203 eval(lock
, pneed_issue
);
1206 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1208 bool need_issue
= false;
1209 list
<MDSInternalContextBase
*> finishers
;
1212 if (!in
->filelock
.is_stable())
1213 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1214 if (!in
->authlock
.is_stable())
1215 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1216 if (!in
->linklock
.is_stable())
1217 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1218 if (!in
->xattrlock
.is_stable())
1219 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1221 if (need_issue
&& in
->is_head()) {
1223 issue_set
->insert(in
);
1228 finish_contexts(g_ceph_context
, finishers
);
1231 void Locker::eval_scatter_gathers(CInode
*in
)
1233 bool need_issue
= false;
1234 list
<MDSInternalContextBase
*> finishers
;
1236 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1239 if (!in
->filelock
.is_stable())
1240 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1241 if (!in
->nestlock
.is_stable())
1242 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1243 if (!in
->dirfragtreelock
.is_stable())
1244 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1246 if (need_issue
&& in
->is_head())
1249 finish_contexts(g_ceph_context
, finishers
);
1252 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1254 switch (lock
->get_type()) {
1255 case CEPH_LOCK_IFILE
:
1256 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1257 case CEPH_LOCK_IDFT
:
1258 case CEPH_LOCK_INEST
:
1259 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1261 return simple_eval(lock
, need_issue
);
1266 // ------------------
1269 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1272 if (lock
->is_stable()) {
1273 if (lock
->get_parent()->is_auth()) {
1274 if (lock
->get_sm() == &sm_scatterlock
) {
1275 // not until tempsync is fully implemented
1276 //if (lock->get_parent()->is_replicated())
1277 //scatter_tempsync((ScatterLock*)lock);
1280 } else if (lock
->get_sm() == &sm_filelock
) {
1281 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1282 if (lock
->get_state() == LOCK_EXCL
&&
1283 in
->get_target_loner() >= 0 &&
1284 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1292 // request rdlock state change from auth
1293 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1294 if (!mds
->is_cluster_degraded() ||
1295 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1296 dout(10) << "requesting rdlock from auth on "
1297 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1298 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1303 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1304 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1305 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1306 mds
->mdcache
->recovery_queue
.prioritize(in
);
1313 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSInternalContextBase
*con
)
1315 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1317 // can read? grab ref.
1318 if (lock
->can_rdlock(client
))
1321 _rdlock_kick(lock
, false);
1323 if (lock
->can_rdlock(client
))
1328 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1329 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1334 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1336 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1338 // client may be allowed to rdlock the same item it has xlocked.
1339 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1340 if (mut
->snapid
!= CEPH_NOSNAP
)
1342 client_t client
= as_anon
? -1 : mut
->get_client();
1345 if (lock
->get_type() != CEPH_LOCK_DN
)
1346 in
= static_cast<CInode
*>(lock
->get_parent());
1349 if (!lock->get_parent()->is_auth() &&
1350 lock->fw_rdlock_to_auth()) {
1351 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1357 // can read? grab ref.
1358 if (lock
->can_rdlock(client
)) {
1360 mut
->rdlocks
.insert(lock
);
1361 mut
->locks
.insert(lock
);
1365 // hmm, wait a second.
1366 if (in
&& !in
->is_head() && in
->is_auth() &&
1367 lock
->get_state() == LOCK_SNAP_SYNC
) {
1368 // okay, we actually need to kick the head's lock to get ourselves synced up.
1369 CInode
*head
= mdcache
->get_inode(in
->ino());
1371 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1372 if (hlock
->get_state() == LOCK_SYNC
)
1373 hlock
= head
->get_lock(lock
->get_type());
1375 if (hlock
->get_state() != LOCK_SYNC
) {
1376 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1377 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1379 // oh, check our lock again then
1383 if (!_rdlock_kick(lock
, as_anon
))
1389 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1390 wait_on
= SimpleLock::WAIT_RD
;
1392 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1393 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1394 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1399 void Locker::nudge_log(SimpleLock
*lock
)
1401 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1402 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1403 mds
->mdlog
->flush();
1406 void Locker::rdlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1411 mut
->rdlocks
.erase(lock
);
1412 mut
->locks
.erase(lock
);
1415 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1418 if (!lock
->is_rdlocked()) {
1419 if (!lock
->is_stable())
1420 eval_gather(lock
, false, pneed_issue
);
1421 else if (lock
->get_parent()->is_auth())
1422 try_eval(lock
, pneed_issue
);
1427 bool Locker::can_rdlock_set(set
<SimpleLock
*>& locks
)
1429 dout(10) << "can_rdlock_set " << locks
<< dendl
;
1430 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1431 if (!(*p
)->can_rdlock(-1)) {
1432 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1438 bool Locker::rdlock_try_set(set
<SimpleLock
*>& locks
)
1440 dout(10) << "rdlock_try_set " << locks
<< dendl
;
1441 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
)
1442 if (!rdlock_try(*p
, -1, NULL
)) {
1443 dout(10) << "rdlock_try_set can't rdlock " << *p
<< " on " << *(*p
)->get_parent() << dendl
;
1449 void Locker::rdlock_take_set(set
<SimpleLock
*>& locks
, MutationRef
& mut
)
1451 dout(10) << "rdlock_take_set " << locks
<< dendl
;
1452 for (set
<SimpleLock
*>::iterator p
= locks
.begin(); p
!= locks
.end(); ++p
) {
1454 mut
->rdlocks
.insert(*p
);
1455 mut
->locks
.insert(*p
);
1459 // ------------------
1462 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1464 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1465 lock
->get_type() == CEPH_LOCK_DVERSION
)
1466 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1468 dout(7) << "wrlock_force on " << *lock
1469 << " on " << *lock
->get_parent() << dendl
;
1470 lock
->get_wrlock(true);
1471 mut
->wrlocks
.insert(lock
);
1472 mut
->locks
.insert(lock
);
1475 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1477 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1478 lock
->get_type() == CEPH_LOCK_DVERSION
)
1479 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1481 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1483 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1484 client_t client
= mut
->get_client();
1485 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1486 (in
->has_subtree_or_exporting_dirfrag() ||
1487 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1491 if (lock
->can_wrlock(client
) &&
1492 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1494 mut
->wrlocks
.insert(lock
);
1495 mut
->locks
.insert(lock
);
1499 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1500 in
->state_test(CInode::STATE_RECOVERING
)) {
1501 mds
->mdcache
->recovery_queue
.prioritize(in
);
1504 if (!lock
->is_stable())
1507 if (in
->is_auth()) {
1508 // don't do nested lock state change if we have dirty scatterdata and
1509 // may scatter_writebehind or start_scatter, because nowait==true implies
1510 // that the caller already has a log entry open!
1511 if (nowait
&& lock
->is_dirty())
1515 scatter_mix(static_cast<ScatterLock
*>(lock
));
1519 if (nowait
&& !lock
->can_wrlock(client
))
1524 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1525 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1526 if (!mds
->is_cluster_degraded() ||
1527 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1528 dout(10) << "requesting scatter from auth on "
1529 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1530 mds
->send_message_mds(new MLock(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1537 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1538 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1545 void Locker::wrlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1547 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1548 lock
->get_type() == CEPH_LOCK_DVERSION
)
1549 return local_wrlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1551 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1554 mut
->wrlocks
.erase(lock
);
1555 if (mut
->remote_wrlocks
.count(lock
) == 0)
1556 mut
->locks
.erase(lock
);
1559 if (!lock
->is_wrlocked()) {
1560 if (!lock
->is_stable())
1561 eval_gather(lock
, false, pneed_issue
);
1562 else if (lock
->get_parent()->is_auth())
1563 try_eval(lock
, pneed_issue
);
1570 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1572 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1574 // wait for active target
1575 if (mds
->is_cluster_degraded() &&
1576 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1577 dout(7) << " mds." << target
<< " is not active" << dendl
;
1578 if (mut
->more()->waiting_on_slave
.empty())
1579 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1583 // send lock request
1584 mut
->start_locking(lock
, target
);
1585 mut
->more()->slaves
.insert(target
);
1586 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1587 MMDSSlaveRequest::OP_WRLOCK
);
1588 r
->set_lock_type(lock
->get_type());
1589 lock
->get_parent()->set_object_info(r
->get_object_info());
1590 mds
->send_message_mds(r
, target
);
1592 assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1593 mut
->more()->waiting_on_slave
.insert(target
);
1596 void Locker::remote_wrlock_finish(SimpleLock
*lock
, mds_rank_t target
,
1600 mut
->remote_wrlocks
.erase(lock
);
1601 if (mut
->wrlocks
.count(lock
) == 0)
1602 mut
->locks
.erase(lock
);
1604 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1605 << " " << *lock
->get_parent() << dendl
;
1606 if (!mds
->is_cluster_degraded() ||
1607 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1608 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1609 MMDSSlaveRequest::OP_UNWRLOCK
);
1610 slavereq
->set_lock_type(lock
->get_type());
1611 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1612 mds
->send_message_mds(slavereq
, target
);
1617 // ------------------
1620 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1622 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1623 lock
->get_type() == CEPH_LOCK_DVERSION
)
1624 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1626 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1627 client_t client
= mut
->get_client();
1630 if (lock
->get_parent()->is_auth()) {
1633 if (lock
->can_xlock(client
)) {
1634 lock
->set_state(LOCK_XLOCK
);
1635 lock
->get_xlock(mut
, client
);
1636 mut
->xlocks
.insert(lock
);
1637 mut
->locks
.insert(lock
);
1638 mut
->finish_locking(lock
);
1642 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1643 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1644 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1645 mds
->mdcache
->recovery_queue
.prioritize(in
);
1649 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1650 lock
->get_xlock_by_client() != client
||
1651 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1654 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1655 mut
->start_locking(lock
);
1662 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1667 assert(lock
->get_sm()->can_remote_xlock
);
1668 assert(!mut
->slave_request
);
1670 // wait for single auth
1671 if (lock
->get_parent()->is_ambiguous_auth()) {
1672 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1673 new C_MDS_RetryRequest(mdcache
, mut
));
1677 // wait for active auth
1678 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1679 if (mds
->is_cluster_degraded() &&
1680 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1681 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1682 if (mut
->more()->waiting_on_slave
.empty())
1683 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1687 // send lock request
1688 mut
->more()->slaves
.insert(auth
);
1689 mut
->start_locking(lock
, auth
);
1690 MMDSSlaveRequest
*r
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1691 MMDSSlaveRequest::OP_XLOCK
);
1692 r
->set_lock_type(lock
->get_type());
1693 lock
->get_parent()->set_object_info(r
->get_object_info());
1694 mds
->send_message_mds(r
, auth
);
1696 assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1697 mut
->more()->waiting_on_slave
.insert(auth
);
1703 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1705 assert(!lock
->is_stable());
1706 if (lock
->get_num_rdlocks() == 0 &&
1707 lock
->get_num_wrlocks() == 0 &&
1708 !lock
->is_leased() &&
1709 lock
->get_state() != LOCK_XLOCKSNAP
&&
1710 lock
->get_type() != CEPH_LOCK_DN
) {
1711 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1712 client_t loner
= in
->get_target_loner();
1713 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1714 lock
->set_state(LOCK_EXCL
);
1715 lock
->get_parent()->auth_unpin(lock
);
1716 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1717 if (lock
->get_cap_shift())
1718 *pneed_issue
= true;
1719 if (lock
->get_parent()->is_auth() &&
1721 try_eval(lock
, pneed_issue
);
1725 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1726 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1729 void Locker::xlock_finish(SimpleLock
*lock
, MutationImpl
*mut
, bool *pneed_issue
)
1731 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1732 lock
->get_type() == CEPH_LOCK_DVERSION
)
1733 return local_xlock_finish(static_cast<LocalLock
*>(lock
), mut
);
1735 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1737 client_t xlocker
= lock
->get_xlock_by_client();
1742 mut
->xlocks
.erase(lock
);
1743 mut
->locks
.erase(lock
);
1745 bool do_issue
= false;
1748 if (!lock
->get_parent()->is_auth()) {
1749 assert(lock
->get_sm()->can_remote_xlock
);
1752 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1753 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1754 if (!mds
->is_cluster_degraded() ||
1755 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1756 MMDSSlaveRequest
*slavereq
= new MMDSSlaveRequest(mut
->reqid
, mut
->attempt
,
1757 MMDSSlaveRequest::OP_UNXLOCK
);
1758 slavereq
->set_lock_type(lock
->get_type());
1759 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1760 mds
->send_message_mds(slavereq
, auth
);
1763 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1764 SimpleLock::WAIT_WR
|
1765 SimpleLock::WAIT_RD
, 0);
1767 if (lock
->get_num_xlocks() == 0) {
1768 if (lock
->get_state() == LOCK_LOCK_XLOCK
)
1769 lock
->set_state(LOCK_XLOCKDONE
);
1770 _finish_xlock(lock
, xlocker
, &do_issue
);
1775 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1776 if (in
->is_head()) {
1778 *pneed_issue
= true;
1785 void Locker::xlock_export(SimpleLock
*lock
, MutationImpl
*mut
)
1787 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1790 mut
->xlocks
.erase(lock
);
1791 mut
->locks
.erase(lock
);
1793 MDSCacheObject
*p
= lock
->get_parent();
1794 assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1796 if (!lock
->is_stable())
1797 lock
->get_parent()->auth_unpin(lock
);
1799 lock
->set_state(LOCK_LOCK
);
1802 void Locker::xlock_import(SimpleLock
*lock
)
1804 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1805 lock
->get_parent()->auth_pin(lock
);
1810 // file i/o -----------------------------------------
1812 version_t
Locker::issue_file_data_version(CInode
*in
)
1814 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1815 return in
->inode
.file_data_version
;
1818 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1826 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
,
1827 bool sm
=false, bool ni
=false, client_t c
=-1,
1828 MClientCaps
*ac
= 0)
1829 : LockerLogContext(l
), in(i
), mut(m
), share_max(sm
), need_issue(ni
),
1830 client(c
), ack(ac
) {
1831 in
->get(CInode::PIN_PTRWAITER
);
1833 void finish(int r
) override
{
1834 locker
->file_update_finish(in
, mut
, share_max
, need_issue
, client
, ack
);
1835 in
->put(CInode::PIN_PTRWAITER
);
1839 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, bool share_max
, bool issue_client_cap
,
1840 client_t client
, MClientCaps
*ack
)
1842 dout(10) << "file_update_finish on " << *in
<< dendl
;
1843 in
->pop_and_dirty_projected_inode(mut
->ls
);
1848 Session
*session
= mds
->get_session(client
);
1850 // "oldest flush tid" > 0 means client uses unique TID for each flush
1851 if (ack
->get_oldest_flush_tid() > 0)
1852 session
->add_completed_flush(ack
->get_client_tid());
1853 mds
->send_message_client_counted(ack
, session
);
1855 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1860 set
<CInode
*> need_issue
;
1861 drop_locks(mut
.get(), &need_issue
);
1863 if (!in
->is_head() && !in
->client_snap_caps
.empty()) {
1864 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1865 // check for snap writeback completion
1866 bool gather
= false;
1867 auto p
= in
->client_snap_caps
.begin();
1868 while (p
!= in
->client_snap_caps
.end()) {
1869 SimpleLock
*lock
= in
->get_lock(p
->first
);
1871 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1872 << " lock " << *lock
<< " on " << *in
<< dendl
;
1875 p
->second
.erase(client
);
1876 if (p
->second
.empty()) {
1878 in
->client_snap_caps
.erase(p
++);
1883 if (in
->client_snap_caps
.empty())
1884 in
->item_open_file
.remove_myself();
1885 eval_cap_gather(in
, &need_issue
);
1888 if (issue_client_cap
&& need_issue
.count(in
) == 0) {
1889 Capability
*cap
= in
->get_client_cap(client
);
1890 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1891 issue_caps(in
, cap
);
1894 if (share_max
&& in
->is_auth() &&
1895 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1896 share_inode_max_size(in
);
1898 issue_caps_set(need_issue
);
1900 utime_t now
= ceph_clock_now();
1901 mds
->balancer
->hit_inode(now
, in
, META_POP_IWR
);
1903 // auth unpin after issuing caps
1907 Capability
* Locker::issue_new_caps(CInode
*in
,
1913 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1916 // if replay, try to reconnect cap, and otherwise do nothing.
1918 mds
->mdcache
->try_reconnect_cap(in
, session
);
1923 assert(session
->info
.inst
.name
.is_client());
1924 client_t my_client
= session
->info
.inst
.name
.num();
1925 int my_want
= ceph_caps_for_mode(mode
);
1927 // register a capability
1928 Capability
*cap
= in
->get_client_cap(my_client
);
1931 cap
= in
->add_client_cap(my_client
, session
, realm
);
1932 cap
->set_wanted(my_want
);
1934 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1938 // make sure it wants sufficient caps
1939 if (my_want
& ~cap
->wanted()) {
1940 // augment wanted caps for this client
1941 cap
->set_wanted(cap
->wanted() | my_want
);
1945 if (in
->is_auth()) {
1946 // [auth] twiddle mode?
1947 eval(in
, CEPH_CAP_LOCKS
);
1949 if (_need_flush_mdlog(in
, my_want
))
1950 mds
->mdlog
->flush();
1953 // [replica] tell auth about any new caps wanted
1954 request_inode_file_caps(in
);
1957 // issue caps (pot. incl new one)
1958 //issue_caps(in); // note: _eval above may have done this already...
1960 // re-issue whatever we can
1961 //cap->issue(cap->pending());
1964 cap
->dec_suppress();
1970 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1972 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1976 bool Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1978 // allowed caps are determined by the lock mode.
1979 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1980 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1981 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1983 client_t loner
= in
->get_loner();
1985 dout(7) << "issue_caps loner client." << loner
1986 << " allowed=" << ccap_string(loner_allowed
)
1987 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1988 << ", others allowed=" << ccap_string(all_allowed
)
1989 << " on " << *in
<< dendl
;
1991 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1992 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1993 << " on " << *in
<< dendl
;
1996 assert(in
->is_head());
1998 // count conflicts with
2002 map
<client_t
, Capability
*>::iterator it
;
2004 it
= in
->client_caps
.find(only_cap
->get_client());
2006 it
= in
->client_caps
.begin();
2007 for (; it
!= in
->client_caps
.end(); ++it
) {
2008 Capability
*cap
= it
->second
;
2009 if (cap
->is_stale())
2012 // do not issue _new_ bits when size|mtime is projected
2014 if (loner
== it
->first
)
2015 allowed
= loner_allowed
;
2017 allowed
= all_allowed
;
2019 // add in any xlocker-only caps (for locks this client is the xlocker for)
2020 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2022 Session
*session
= mds
->get_session(it
->first
);
2023 if (in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
2024 !(session
&& session
->connection
&&
2025 session
->connection
->has_feature(CEPH_FEATURE_MDS_INLINE_DATA
)))
2026 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2028 int pending
= cap
->pending();
2029 int wanted
= cap
->wanted();
2031 dout(20) << " client." << it
->first
2032 << " pending " << ccap_string(pending
)
2033 << " allowed " << ccap_string(allowed
)
2034 << " wanted " << ccap_string(wanted
)
2037 if (!(pending
& ~allowed
)) {
2038 // skip if suppress or new, and not revocation
2039 if (cap
->is_new() || cap
->is_suppress()) {
2040 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2045 // notify clients about deleted inode, to make sure they release caps ASAP.
2046 if (in
->inode
.nlink
== 0)
2047 wanted
|= CEPH_CAP_LINK_SHARED
;
2049 // are there caps that the client _wants_ and can have, but aren't pending?
2050 // or do we need to revoke?
2051 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2052 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2056 // include caps that clients generally like, while we're at it.
2057 int likes
= in
->get_caps_liked();
2058 int before
= pending
;
2060 if (pending
& ~allowed
)
2061 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2063 seq
= cap
->issue((wanted
|likes
) & allowed
);
2064 int after
= cap
->pending();
2066 if (cap
->is_new()) {
2067 // haven't send caps to client yet
2068 if (before
& ~after
)
2069 cap
->confirm_receipt(seq
, after
);
2071 dout(7) << " sending MClientCaps to client." << it
->first
2072 << " seq " << cap
->get_last_seq()
2073 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2076 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2077 if (op
== CEPH_CAP_OP_REVOKE
) {
2078 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2079 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2080 cap
->set_last_revoke_stamp(ceph_clock_now());
2081 cap
->reset_num_revoke_warnings();
2084 MClientCaps
*m
= new MClientCaps(op
, in
->ino(),
2085 in
->find_snaprealm()->inode
->ino(),
2086 cap
->get_cap_id(), cap
->get_last_seq(),
2089 mds
->get_osd_epoch_barrier());
2090 in
->encode_cap_message(m
, cap
);
2092 mds
->send_message_client_counted(m
, it
->first
);
2100 return (nissued
== 0); // true if no re-issued, no callbacks
2103 void Locker::issue_truncate(CInode
*in
)
2105 dout(7) << "issue_truncate on " << *in
<< dendl
;
2107 for (map
<client_t
, Capability
*>::iterator it
= in
->client_caps
.begin();
2108 it
!= in
->client_caps
.end();
2110 Capability
*cap
= it
->second
;
2111 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_TRUNC
,
2113 in
->find_snaprealm()->inode
->ino(),
2114 cap
->get_cap_id(), cap
->get_last_seq(),
2115 cap
->pending(), cap
->wanted(), 0,
2117 mds
->get_osd_epoch_barrier());
2118 in
->encode_cap_message(m
, cap
);
2119 mds
->send_message_client_counted(m
, it
->first
);
2122 // should we increase max_size?
2123 if (in
->is_auth() && in
->is_file())
2124 check_inode_max_size(in
);
2128 void Locker::revoke_stale_caps(Capability
*cap
)
2130 CInode
*in
= cap
->get_inode();
2131 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2132 // if export succeeds, the cap will be removed. if export fails, we need to
2133 // revoke the cap if it's still stale.
2134 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2138 int issued
= cap
->issued();
2139 if (issued
& ~CEPH_CAP_PIN
) {
2140 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2143 if (in
->is_auth() &&
2144 in
->inode
.client_ranges
.count(cap
->get_client()))
2145 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2147 if (!in
->filelock
.is_stable()) eval_gather(&in
->filelock
);
2148 if (!in
->linklock
.is_stable()) eval_gather(&in
->linklock
);
2149 if (!in
->authlock
.is_stable()) eval_gather(&in
->authlock
);
2150 if (!in
->xattrlock
.is_stable()) eval_gather(&in
->xattrlock
);
2152 if (in
->is_auth()) {
2153 try_eval(in
, CEPH_CAP_LOCKS
);
2155 request_inode_file_caps(in
);
2160 void Locker::revoke_stale_caps(Session
*session
)
2162 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2164 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2165 Capability
*cap
= *p
;
2167 revoke_stale_caps(cap
);
2171 void Locker::resume_stale_caps(Session
*session
)
2173 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2175 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ++p
) {
2176 Capability
*cap
= *p
;
2177 CInode
*in
= cap
->get_inode();
2178 assert(in
->is_head());
2179 if (cap
->is_stale()) {
2180 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2183 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2184 // if export succeeds, the cap will be removed. if export fails,
2185 // we need to re-issue the cap if it's not stale.
2186 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2190 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2191 issue_caps(in
, cap
);
2196 void Locker::remove_stale_leases(Session
*session
)
2198 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2199 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2201 ClientLease
*l
= *p
;
2203 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2204 dout(15) << " removing lease on " << *parent
<< dendl
;
2205 parent
->remove_client_lease(l
, this);
2210 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2213 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2214 in
->get(CInode::PIN_PTRWAITER
);
2216 void finish(int r
) override
{
2218 locker
->request_inode_file_caps(in
);
2219 in
->put(CInode::PIN_PTRWAITER
);
2223 void Locker::request_inode_file_caps(CInode
*in
)
2225 assert(!in
->is_auth());
2227 int wanted
= in
->get_caps_wanted() & in
->get_caps_allowed_ever() & ~CEPH_CAP_PIN
;
2228 if (wanted
!= in
->replica_caps_wanted
) {
2229 // wait for single auth
2230 if (in
->is_ambiguous_auth()) {
2231 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2232 new C_MDL_RequestInodeFileCaps(this, in
));
2236 mds_rank_t auth
= in
->authority().first
;
2237 if (mds
->is_cluster_degraded() &&
2238 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2239 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2243 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2244 << " was " << ccap_string(in
->replica_caps_wanted
)
2245 << " on " << *in
<< " to mds." << auth
<< dendl
;
2247 in
->replica_caps_wanted
= wanted
;
2249 if (!mds
->is_cluster_degraded() ||
2250 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2251 mds
->send_message_mds(new MInodeFileCaps(in
->ino(), in
->replica_caps_wanted
),
2256 /* This function DOES put the passed message before returning */
2257 void Locker::handle_inode_file_caps(MInodeFileCaps
*m
)
2259 // nobody should be talking to us during recovery.
2260 assert(mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
2263 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2264 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2267 assert(in
->is_auth());
2269 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2272 in
->mds_caps_wanted
[from
] = m
->get_caps();
2274 in
->mds_caps_wanted
.erase(from
);
2276 try_eval(in
, CEPH_CAP_LOCKS
);
2281 class C_MDL_CheckMaxSize
: public LockerContext
{
2283 uint64_t new_max_size
;
2288 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2289 uint64_t _newsize
, utime_t _mtime
) :
2290 LockerContext(l
), in(i
),
2291 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2293 in
->get(CInode::PIN_PTRWAITER
);
2295 void finish(int r
) override
{
2297 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2298 in
->put(CInode::PIN_PTRWAITER
);
2302 uint64_t Locker::calc_new_max_size(CInode::mempool_inode
*pi
, uint64_t size
)
2304 uint64_t new_max
= (size
+ 1) << 1;
2305 uint64_t max_inc
= g_conf
->mds_client_writeable_range_max_inc_objs
;
2307 max_inc
*= pi
->layout
.object_size
;
2308 new_max
= MIN(new_max
, size
+ max_inc
);
2310 return ROUND_UP_TO(new_max
, pi
->get_layout_size_increment());
2313 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
,
2314 CInode::mempool_inode::client_range_map
*new_ranges
,
2315 bool *max_increased
)
2317 auto latest
= in
->get_projected_inode();
2319 if(latest
->has_layout()) {
2320 ms
= calc_new_max_size(latest
, size
);
2322 // Layout-less directories like ~mds0/, have zero size
2326 // increase ranges as appropriate.
2327 // shrink to 0 if no WR|BUFFER caps issued.
2328 for (map
<client_t
,Capability
*>::iterator p
= in
->client_caps
.begin();
2329 p
!= in
->client_caps
.end();
2331 if ((p
->second
->issued() | p
->second
->wanted()) & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2332 client_writeable_range_t
& nr
= (*new_ranges
)[p
->first
];
2334 if (latest
->client_ranges
.count(p
->first
)) {
2335 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
->first
];
2336 if (ms
> oldr
.range
.last
)
2337 *max_increased
= true;
2338 nr
.range
.last
= MAX(ms
, oldr
.range
.last
);
2339 nr
.follows
= oldr
.follows
;
2341 *max_increased
= true;
2343 nr
.follows
= in
->first
- 1;
2349 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2350 uint64_t new_max_size
, uint64_t new_size
,
2353 assert(in
->is_auth());
2354 assert(in
->is_file());
2356 CInode::mempool_inode
*latest
= in
->get_projected_inode();
2357 CInode::mempool_inode::client_range_map new_ranges
;
2358 uint64_t size
= latest
->size
;
2359 bool update_size
= new_size
> 0;
2360 bool update_max
= false;
2361 bool max_increased
= false;
2364 new_size
= size
= MAX(size
, new_size
);
2365 new_mtime
= MAX(new_mtime
, latest
->mtime
);
2366 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2367 update_size
= false;
2370 calc_new_client_ranges(in
, max(new_max_size
, size
), &new_ranges
, &max_increased
);
2372 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2375 if (!update_size
&& !update_max
) {
2376 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2380 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2381 << " update_size " << update_size
2382 << " on " << *in
<< dendl
;
2384 if (in
->is_frozen()) {
2385 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2386 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2390 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2393 if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2395 if (in
->filelock
.is_stable()) {
2396 if (in
->get_target_loner() >= 0)
2397 file_excl(&in
->filelock
);
2399 simple_lock(&in
->filelock
);
2401 if (!in
->filelock
.can_wrlock(in
->get_loner())) {
2403 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
2408 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2409 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2414 MutationRef
mut(new MutationImpl());
2415 mut
->ls
= mds
->mdlog
->get_current_segment();
2417 auto &pi
= in
->project_inode();
2418 pi
.inode
.version
= in
->pre_dirty();
2421 dout(10) << "check_inode_max_size client_ranges " << pi
.inode
.client_ranges
<< " -> " << new_ranges
<< dendl
;
2422 pi
.inode
.client_ranges
= new_ranges
;
2426 dout(10) << "check_inode_max_size size " << pi
.inode
.size
<< " -> " << new_size
<< dendl
;
2427 pi
.inode
.size
= new_size
;
2428 pi
.inode
.rstat
.rbytes
= new_size
;
2429 dout(10) << "check_inode_max_size mtime " << pi
.inode
.mtime
<< " -> " << new_mtime
<< dendl
;
2430 pi
.inode
.mtime
= new_mtime
;
2431 if (new_mtime
> pi
.inode
.ctime
) {
2432 pi
.inode
.ctime
= new_mtime
;
2433 if (new_mtime
> pi
.inode
.rstat
.rctime
)
2434 pi
.inode
.rstat
.rctime
= new_mtime
;
2438 // use EOpen if the file is still open; otherwise, use EUpdate.
2439 // this is just an optimization to push open files forward into
2440 // newer log segments.
2442 EMetaBlob
*metablob
;
2443 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2444 EOpen
*eo
= new EOpen(mds
->mdlog
);
2445 eo
->add_ino(in
->ino());
2446 metablob
= &eo
->metablob
;
2448 mut
->ls
->open_files
.push_back(&in
->item_open_file
);
2450 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2451 metablob
= &eu
->metablob
;
2454 mds
->mdlog
->start_entry(le
);
2455 if (update_size
) { // FIXME if/when we do max_size nested accounting
2456 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2458 CDentry
*parent
= in
->get_projected_parent_dn();
2459 metablob
->add_primary_dentry(parent
, in
, true);
2461 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2462 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2464 mds
->mdlog
->submit_entry(le
,
2465 new C_Locker_FileUpdate_finish(this, in
, mut
, true));
2466 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2469 // make max_size _increase_ timely
2471 mds
->mdlog
->flush();
2477 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2480 * only share if currently issued a WR cap. if client doesn't have it,
2481 * file_max doesn't matter, and the client will get it if/when they get
2484 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2485 map
<client_t
, Capability
*>::iterator it
;
2487 it
= in
->client_caps
.find(only_cap
->get_client());
2489 it
= in
->client_caps
.begin();
2490 for (; it
!= in
->client_caps
.end(); ++it
) {
2491 const client_t client
= it
->first
;
2492 Capability
*cap
= it
->second
;
2493 if (cap
->is_suppress())
2495 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2496 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2497 cap
->inc_last_seq();
2498 MClientCaps
*m
= new MClientCaps(CEPH_CAP_OP_GRANT
,
2500 in
->find_snaprealm()->inode
->ino(),
2501 cap
->get_cap_id(), cap
->get_last_seq(),
2502 cap
->pending(), cap
->wanted(), 0,
2504 mds
->get_osd_epoch_barrier());
2505 in
->encode_cap_message(m
, cap
);
2506 mds
->send_message_client_counted(m
, client
);
2513 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2515 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2516 * pending mutations. */
2517 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2518 in
->filelock
.is_unstable_and_locked()) ||
2519 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2520 in
->authlock
.is_unstable_and_locked()) ||
2521 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2522 in
->linklock
.is_unstable_and_locked()) ||
2523 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2524 in
->xattrlock
.is_unstable_and_locked()))
2529 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2531 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2532 dout(10) << " wanted " << ccap_string(cap
->wanted())
2533 << " -> " << ccap_string(wanted
) << dendl
;
2534 cap
->set_wanted(wanted
);
2535 } else if (wanted
& ~cap
->wanted()) {
2536 dout(10) << " wanted " << ccap_string(cap
->wanted())
2537 << " -> " << ccap_string(wanted
)
2538 << " (added caps even though we had seq mismatch!)" << dendl
;
2539 cap
->set_wanted(wanted
| cap
->wanted());
2541 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2542 << " -> " << ccap_string(wanted
)
2543 << " (issue_seq " << issue_seq
<< " != last_issue "
2544 << cap
->get_last_issue() << ")" << dendl
;
2548 CInode
*cur
= cap
->get_inode();
2549 if (!cur
->is_auth()) {
2550 request_inode_file_caps(cur
);
2554 if (cap
->wanted() == 0) {
2555 if (cur
->item_open_file
.is_on_list() &&
2556 !cur
->is_any_caps_wanted()) {
2557 dout(10) << " removing unwanted file from open file list " << *cur
<< dendl
;
2558 cur
->item_open_file
.remove_myself();
2561 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2562 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2563 CEPH_CAP_FILE_WR
))) {
2564 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2567 if (!cur
->item_open_file
.is_on_list()) {
2568 dout(10) << " adding to open file list " << *cur
<< dendl
;
2569 assert(cur
->last
== CEPH_NOSNAP
);
2570 LogSegment
*ls
= mds
->mdlog
->get_current_segment();
2571 EOpen
*le
= new EOpen(mds
->mdlog
);
2572 mds
->mdlog
->start_entry(le
);
2573 le
->add_clean_inode(cur
);
2574 ls
->open_files
.push_back(&cur
->item_open_file
);
2575 mds
->mdlog
->submit_entry(le
);
2582 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2584 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2585 for (auto p
= head_in
->client_need_snapflush
.begin();
2586 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2587 snapid_t snapid
= p
->first
;
2588 auto &clients
= p
->second
;
2589 ++p
; // be careful, q loop below depends on this
2591 if (clients
.count(client
)) {
2592 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2593 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
2595 assert(sin
->first
<= snapid
);
2596 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, NULL
, NULL
);
2597 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2603 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2606 * This policy needs to be AT LEAST as permissive as allowing a client request
2607 * to go forward, or else a client request can release something, the release
2608 * gets deferred, but the request gets processed and deadlocks because when the
2609 * caps can't get revoked.
2611 * Currently, a request wait if anything locked is freezing (can't
2612 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2613 * _MUST_ be in the lock/auth_pin set.
2615 * auth_pins==0 implies no unstable lock and not auth pinnned by
2616 * client request, otherwise continue even it's freezing.
2618 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2622 * This function DOES put the passed message before returning
2624 void Locker::handle_client_caps(MClientCaps
*m
)
2626 client_t client
= m
->get_source().num();
2627 snapid_t follows
= m
->get_snap_follows();
2628 dout(7) << "handle_client_caps "
2629 << ((m
->flags
& CLIENT_CAPS_SYNC
) ? "sync" : "async")
2630 << " on " << m
->get_ino()
2631 << " tid " << m
->get_client_tid() << " follows " << follows
2632 << " op " << ceph_cap_op_name(m
->get_op()) << dendl
;
2634 Session
*session
= mds
->get_session(m
);
2635 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2637 dout(5) << " no session, dropping " << *m
<< dendl
;
2641 if (session
->is_closed() ||
2642 session
->is_closing() ||
2643 session
->is_killing()) {
2644 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2648 if (mds
->is_reconnect() &&
2649 m
->get_dirty() && m
->get_client_tid() > 0 &&
2650 !session
->have_completed_flush(m
->get_client_tid())) {
2651 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), m
->get_dirty());
2653 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2657 if (m
->get_client_tid() > 0 && session
&&
2658 session
->have_completed_flush(m
->get_client_tid())) {
2659 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2660 << " for client." << client
<< dendl
;
2662 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2663 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0,
2664 m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2666 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(),
2667 m
->get_seq(), m
->get_caps(), 0, m
->get_dirty(), 0,
2668 mds
->get_osd_epoch_barrier());
2670 ack
->set_snap_follows(follows
);
2671 ack
->set_client_tid(m
->get_client_tid());
2672 mds
->send_message_client_counted(ack
, m
->get_connection());
2673 if (m
->get_op() == CEPH_CAP_OP_FLUSHSNAP
) {
2677 // fall-thru because the message may release some caps
2679 m
->set_op(CEPH_CAP_OP_UPDATE
);
2683 // "oldest flush tid" > 0 means client uses unique TID for each flush
2684 if (m
->get_oldest_flush_tid() > 0 && session
) {
2685 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2686 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2688 if (session
->get_num_trim_flushes_warnings() > 0 &&
2689 session
->get_num_completed_flushes() * 2 < g_conf
->mds_max_completed_flushes
)
2690 session
->reset_num_trim_flushes_warnings();
2692 if (session
->get_num_completed_flushes() >=
2693 (g_conf
->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2694 session
->inc_num_trim_flushes_warnings();
2696 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2697 << m
->get_oldest_flush_tid() << "), "
2698 << session
->get_num_completed_flushes()
2699 << " completed flushes recorded in session";
2700 mds
->clog
->warn() << ss
.str();
2701 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2706 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2708 if (mds
->is_clientreplay()) {
2709 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2710 << ", will try again after replayed client requests" << dendl
;
2711 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2716 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2717 * Sequence of events that cause this are:
2718 * - client sends caps message to mds.a
2719 * - mds finishes subtree migration, send cap export to client
2720 * - mds trim its cache
2721 * - mds receives cap messages from client
2723 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2728 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2729 // Pause RADOS operations until we see the required epoch
2730 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2733 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2734 // Record the barrier so that we will retransmit it to clients
2735 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2738 dout(10) << " head inode " << *head_in
<< dendl
;
2740 Capability
*cap
= 0;
2741 cap
= head_in
->get_client_cap(client
);
2743 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
2750 if (should_defer_client_cap_frozen(head_in
)) {
2751 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
2752 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2755 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2756 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2757 << ", dropping" << dendl
;
2762 int op
= m
->get_op();
2765 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2766 if (!head_in
->is_auth()) {
2767 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
2771 SnapRealm
*realm
= head_in
->find_snaprealm();
2772 snapid_t snap
= realm
->get_snap_following(follows
);
2773 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2775 CInode
*in
= head_in
;
2776 if (snap
!= CEPH_NOSNAP
) {
2777 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
2779 dout(10) << " snapped inode " << *in
<< dendl
;
2782 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2783 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2784 // case we get a dup response, so whatever.)
2785 MClientCaps
*ack
= 0;
2786 if (m
->get_dirty()) {
2787 ack
= new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2788 ack
->set_snap_follows(follows
);
2789 ack
->set_client_tid(m
->get_client_tid());
2790 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2793 if (in
== head_in
||
2794 (head_in
->client_need_snapflush
.count(snap
) &&
2795 head_in
->client_need_snapflush
[snap
].count(client
))) {
2796 dout(7) << " flushsnap snap " << snap
2797 << " client." << client
<< " on " << *in
<< dendl
;
2799 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2801 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2802 else if (head_in
->client_need_snapflush
.begin()->first
< snap
)
2803 _do_null_snapflush(head_in
, client
, snap
);
2805 _do_snap_update(in
, snap
, m
->get_dirty(), follows
, client
, m
, ack
);
2808 head_in
->remove_need_snapflush(in
, snap
, client
);
2810 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2812 mds
->send_message_client_counted(ack
, m
->get_connection());
2817 if (cap
->get_cap_id() != m
->get_cap_id()) {
2818 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2820 CInode
*in
= head_in
;
2822 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2823 // intermediate snap inodes
2824 while (in
!= head_in
) {
2825 assert(in
->last
!= CEPH_NOSNAP
);
2826 if (in
->is_auth() && m
->get_dirty()) {
2827 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2828 _do_cap_update(in
, NULL
, m
->get_dirty(), follows
, m
);
2830 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2834 // head inode, and cap
2835 MClientCaps
*ack
= 0;
2837 int caps
= m
->get_caps();
2838 if (caps
& ~cap
->issued()) {
2839 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2840 caps
&= cap
->issued();
2843 cap
->confirm_receipt(m
->get_seq(), caps
);
2844 dout(10) << " follows " << follows
2845 << " retains " << ccap_string(m
->get_caps())
2846 << " dirty " << ccap_string(m
->get_dirty())
2847 << " on " << *in
<< dendl
;
2850 // missing/skipped snapflush?
2851 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2852 // presently only does so when it has actual dirty metadata. But, we
2853 // set up the need_snapflush stuff based on the issued caps.
2854 // We can infer that the client WONT send a FLUSHSNAP once they have
2855 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2857 if (!head_in
->client_need_snapflush
.empty()) {
2858 if ((cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2859 _do_null_snapflush(head_in
, client
);
2861 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2865 if (m
->get_dirty() && in
->is_auth()) {
2866 dout(7) << " flush client." << client
<< " dirty " << ccap_string(m
->get_dirty())
2867 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2868 ack
= new MClientCaps(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2869 m
->get_caps(), 0, m
->get_dirty(), 0, mds
->get_osd_epoch_barrier());
2870 ack
->set_client_tid(m
->get_client_tid());
2871 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2874 // filter wanted based on what we could ever give out (given auth/replica status)
2875 bool need_flush
= m
->flags
& CLIENT_CAPS_SYNC
;
2876 int new_wanted
= m
->get_wanted();
2877 if (new_wanted
!= cap
->wanted()) {
2878 if (!need_flush
&& in
->is_auth() && (new_wanted
& ~cap
->pending())) {
2879 // exapnding caps. make sure we aren't waiting for a log flush
2880 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2883 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2886 if (in
->is_auth() &&
2887 _do_cap_update(in
, cap
, m
->get_dirty(), follows
, m
, ack
, &need_flush
)) {
2889 eval(in
, CEPH_CAP_LOCKS
);
2891 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2892 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2894 // no update, ack now.
2896 mds
->send_message_client_counted(ack
, m
->get_connection());
2898 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2899 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2900 issue_caps(in
, cap
);
2902 if (cap
->get_last_seq() == 0 &&
2903 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2904 cap
->issue_norevoke(cap
->issued());
2905 share_inode_max_size(in
, cap
);
2910 mds
->mdlog
->flush();
2918 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2920 ceph_mds_request_release item
;
2922 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2923 LockerContext(l
), client(c
), item(it
) { }
2924 void finish(int r
) override
{
2926 MDRequestRef null_ref
;
2927 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2931 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2932 boost::string_view dname
)
2934 inodeno_t ino
= (uint64_t)item
.ino
;
2935 uint64_t cap_id
= item
.cap_id
;
2936 int caps
= item
.caps
;
2937 int wanted
= item
.wanted
;
2939 int issue_seq
= item
.issue_seq
;
2940 int mseq
= item
.mseq
;
2942 CInode
*in
= mdcache
->get_inode(ino
);
2946 if (dname
.length()) {
2947 frag_t fg
= in
->pick_dirfrag(dname
);
2948 CDir
*dir
= in
->get_dirfrag(fg
);
2950 CDentry
*dn
= dir
->lookup(dname
);
2952 ClientLease
*l
= dn
->get_client_lease(client
);
2954 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
2955 dn
->remove_client_lease(l
, this);
2957 dout(7) << "process_cap_release client." << client
2958 << " doesn't have lease on " << *dn
<< dendl
;
2961 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
2962 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
2967 Capability
*cap
= in
->get_client_cap(client
);
2971 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
2972 << (mdr
? "" : " (DEFERRED, no mdr)")
2975 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
2976 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
2980 if (cap
->get_cap_id() != cap_id
) {
2981 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
2985 if (should_defer_client_cap_frozen(in
)) {
2986 dout(7) << " frozen, deferring" << dendl
;
2987 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
2991 if (caps
& ~cap
->issued()) {
2992 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2993 caps
&= cap
->issued();
2995 cap
->confirm_receipt(seq
, caps
);
2997 if (!in
->client_need_snapflush
.empty() &&
2998 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
2999 _do_null_snapflush(in
, client
);
3002 adjust_cap_wanted(cap
, wanted
, issue_seq
);
3005 cap
->inc_suppress();
3006 eval(in
, CEPH_CAP_LOCKS
);
3008 cap
->dec_suppress();
3010 // take note; we may need to reissue on this cap later
3012 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
3015 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
3020 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
3021 LockerContext(l
), in(i
), client(c
), seq(s
) {
3022 in
->get(CInode::PIN_PTRWAITER
);
3024 void finish(int r
) override
{
3025 locker
->kick_issue_caps(in
, client
, seq
);
3026 in
->put(CInode::PIN_PTRWAITER
);
3030 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3032 Capability
*cap
= in
->get_client_cap(client
);
3033 if (!cap
|| cap
->get_last_sent() != seq
)
3035 if (in
->is_frozen()) {
3036 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3037 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3038 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3041 dout(10) << "kick_issue_caps released at current seq " << seq
3042 << ", reissuing" << dendl
;
3043 issue_caps(in
, cap
);
3046 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3048 client_t client
= mdr
->get_client();
3049 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3050 p
!= mdr
->cap_releases
.end();
3052 CInode
*in
= mdcache
->get_inode(p
->first
);
3055 kick_issue_caps(in
, client
, p
->second
);
3060 * m and ack might be NULL, so don't dereference them unless dirty != 0
3062 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, MClientCaps
*m
, MClientCaps
*ack
)
3064 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3065 << " follows " << follows
<< " snap " << snap
3066 << " on " << *in
<< dendl
;
3068 if (snap
== CEPH_NOSNAP
) {
3069 // hmm, i guess snap was already deleted? just ack!
3070 dout(10) << " wow, the snap following " << follows
3071 << " was already deleted. nothing to record, just ack." << dendl
;
3073 mds
->send_message_client_counted(ack
, m
->get_connection());
3077 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3078 mds
->mdlog
->start_entry(le
);
3079 MutationRef mut
= new MutationImpl();
3080 mut
->ls
= mds
->mdlog
->get_current_segment();
3082 // normal metadata updates that we can apply to the head as well.
3085 CInode::mempool_xattr_map
*px
= nullptr;
3086 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3087 m
->xattrbl
.length() &&
3088 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3090 CInode::mempool_old_inode
*oi
= 0;
3091 if (in
->is_multiversion()) {
3092 oi
= in
->pick_old_inode(snap
);
3095 CInode::mempool_inode
*i
;
3097 dout(10) << " writing into old inode" << dendl
;
3098 auto &pi
= in
->project_inode();
3099 pi
.inode
.version
= in
->pre_dirty();
3100 if (snap
> oi
->first
)
3101 in
->split_old_inode(snap
);
3106 auto &pi
= in
->project_inode(xattrs
);
3107 pi
.inode
.version
= in
->pre_dirty();
3110 px
= pi
.xattrs
.get();
3113 _update_cap_fields(in
, dirty
, m
, i
);
3117 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3118 << " len " << m
->xattrbl
.length() << dendl
;
3119 i
->xattr_version
= m
->head
.xattr_version
;
3120 bufferlist::iterator p
= m
->xattrbl
.begin();
3125 auto it
= i
->client_ranges
.find(client
);
3126 if (it
!= i
->client_ranges
.end()) {
3127 if (in
->last
== snap
) {
3128 dout(10) << " removing client_range entirely" << dendl
;
3129 i
->client_ranges
.erase(it
);
3131 dout(10) << " client_range now follows " << snap
<< dendl
;
3132 it
->second
.follows
= snap
;
3138 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3139 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3141 // "oldest flush tid" > 0 means client uses unique TID for each flush
3142 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3143 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3144 ack
->get_oldest_flush_tid());
3146 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, false, false,
3150 void Locker::_update_cap_fields(CInode
*in
, int dirty
, MClientCaps
*m
, CInode::mempool_inode
*pi
)
3155 /* m must be valid if there are dirty caps */
3157 uint64_t features
= m
->get_connection()->get_features();
3159 if (m
->get_ctime() > pi
->ctime
) {
3160 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3161 << " for " << *in
<< dendl
;
3162 pi
->ctime
= m
->get_ctime();
3163 if (m
->get_ctime() > pi
->rstat
.rctime
)
3164 pi
->rstat
.rctime
= m
->get_ctime();
3167 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3168 m
->get_change_attr() > pi
->change_attr
) {
3169 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3170 << " for " << *in
<< dendl
;
3171 pi
->change_attr
= m
->get_change_attr();
3175 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3176 utime_t atime
= m
->get_atime();
3177 utime_t mtime
= m
->get_mtime();
3178 uint64_t size
= m
->get_size();
3179 version_t inline_version
= m
->inline_version
;
3181 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3182 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3183 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3184 << " for " << *in
<< dendl
;
3186 if (mtime
> pi
->rstat
.rctime
)
3187 pi
->rstat
.rctime
= mtime
;
3189 if (in
->inode
.is_file() && // ONLY if regular file
3191 dout(7) << " size " << pi
->size
<< " -> " << size
3192 << " for " << *in
<< dendl
;
3194 pi
->rstat
.rbytes
= size
;
3196 if (in
->inode
.is_file() &&
3197 (dirty
& CEPH_CAP_FILE_WR
) &&
3198 inline_version
> pi
->inline_data
.version
) {
3199 pi
->inline_data
.version
= inline_version
;
3200 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3201 pi
->inline_data
.get_data() = m
->inline_data
;
3203 pi
->inline_data
.free_data();
3205 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3206 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3207 << " for " << *in
<< dendl
;
3210 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3211 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3212 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3213 << " for " << *in
<< dendl
;
3214 pi
->time_warp_seq
= m
->get_time_warp_seq();
3218 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3219 if (m
->head
.uid
!= pi
->uid
) {
3220 dout(7) << " uid " << pi
->uid
3221 << " -> " << m
->head
.uid
3222 << " for " << *in
<< dendl
;
3223 pi
->uid
= m
->head
.uid
;
3225 if (m
->head
.gid
!= pi
->gid
) {
3226 dout(7) << " gid " << pi
->gid
3227 << " -> " << m
->head
.gid
3228 << " for " << *in
<< dendl
;
3229 pi
->gid
= m
->head
.gid
;
3231 if (m
->head
.mode
!= pi
->mode
) {
3232 dout(7) << " mode " << oct
<< pi
->mode
3233 << " -> " << m
->head
.mode
<< dec
3234 << " for " << *in
<< dendl
;
3235 pi
->mode
= m
->head
.mode
;
3237 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3238 dout(7) << " btime " << oct
<< pi
->btime
3239 << " -> " << m
->get_btime() << dec
3240 << " for " << *in
<< dendl
;
3241 pi
->btime
= m
->get_btime();
3247 * update inode based on cap flush|flushsnap|wanted.
3248 * adjust max_size, if needed.
3249 * if we update, return true; otherwise, false (no updated needed).
3251 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3252 int dirty
, snapid_t follows
,
3253 MClientCaps
*m
, MClientCaps
*ack
,
3256 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3257 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3258 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3259 << " on " << *in
<< dendl
;
3260 assert(in
->is_auth());
3261 client_t client
= m
->get_source().num();
3262 CInode::mempool_inode
*latest
= in
->get_projected_inode();
3264 // increase or zero max_size?
3265 uint64_t size
= m
->get_size();
3266 bool change_max
= false;
3267 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3268 uint64_t new_max
= old_max
;
3270 if (in
->is_file()) {
3271 bool forced_change_max
= false;
3272 dout(20) << "inode is file" << dendl
;
3273 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3274 dout(20) << "client has write caps; m->get_max_size="
3275 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3276 if (m
->get_max_size() > new_max
) {
3277 dout(10) << "client requests file_max " << m
->get_max_size()
3278 << " > max " << old_max
<< dendl
;
3280 forced_change_max
= true;
3281 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3283 new_max
= calc_new_max_size(latest
, size
);
3285 if (new_max
> old_max
)
3297 if (in
->last
== CEPH_NOSNAP
&&
3299 !in
->filelock
.can_wrlock(client
) &&
3300 !in
->filelock
.can_force_wrlock(client
)) {
3301 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3302 if (in
->filelock
.is_stable()) {
3303 bool need_issue
= false;
3305 cap
->inc_suppress();
3306 if (in
->mds_caps_wanted
.empty() &&
3307 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3308 if (in
->filelock
.get_state() != LOCK_EXCL
)
3309 file_excl(&in
->filelock
, &need_issue
);
3311 simple_lock(&in
->filelock
, &need_issue
);
3315 cap
->dec_suppress();
3317 if (!in
->filelock
.can_wrlock(client
) &&
3318 !in
->filelock
.can_force_wrlock(client
)) {
3319 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3320 forced_change_max
? new_max
: 0,
3323 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3329 if (m
->flockbl
.length()) {
3331 bufferlist::iterator bli
= m
->flockbl
.begin();
3332 ::decode(num_locks
, bli
);
3333 for ( int i
=0; i
< num_locks
; ++i
) {
3334 ceph_filelock decoded_lock
;
3335 ::decode(decoded_lock
, bli
);
3336 in
->get_fcntl_lock_state()->held_locks
.
3337 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3338 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3340 ::decode(num_locks
, bli
);
3341 for ( int i
=0; i
< num_locks
; ++i
) {
3342 ceph_filelock decoded_lock
;
3343 ::decode(decoded_lock
, bli
);
3344 in
->get_flock_lock_state()->held_locks
.
3345 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3346 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3350 if (!dirty
&& !change_max
)
3353 Session
*session
= mds
->get_session(m
);
3354 if (session
->check_access(in
, MAY_WRITE
,
3355 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3356 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3361 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3362 mds
->mdlog
->start_entry(le
);
3364 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3365 m
->xattrbl
.length() &&
3366 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3368 auto &pi
= in
->project_inode(xattr
);
3369 pi
.inode
.version
= in
->pre_dirty();
3371 MutationRef
mut(new MutationImpl());
3372 mut
->ls
= mds
->mdlog
->get_current_segment();
3374 _update_cap_fields(in
, dirty
, m
, &pi
.inode
);
3377 dout(7) << " max_size " << old_max
<< " -> " << new_max
3378 << " for " << *in
<< dendl
;
3380 auto &cr
= pi
.inode
.client_ranges
[client
];
3382 cr
.range
.last
= new_max
;
3383 cr
.follows
= in
->first
- 1;
3385 pi
.inode
.client_ranges
.erase(client
);
3388 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3389 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3392 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3393 wrlock_force(&in
->authlock
, mut
);
3397 dout(7) << " xattrs v" << pi
.inode
.xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3398 pi
.inode
.xattr_version
= m
->head
.xattr_version
;
3399 bufferlist::iterator p
= m
->xattrbl
.begin();
3400 ::decode(*pi
.xattrs
, p
);
3401 wrlock_force(&in
->xattrlock
, mut
);
3405 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3406 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3408 // "oldest flush tid" > 0 means client uses unique TID for each flush
3409 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3410 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3411 ack
->get_oldest_flush_tid());
3413 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
3416 if (need_flush
&& !*need_flush
&&
3417 ((change_max
&& new_max
) || // max INCREASE
3418 _need_flush_mdlog(in
, dirty
)))
3424 /* This function DOES put the passed message before returning */
3425 void Locker::handle_client_cap_release(MClientCapRelease
*m
)
3427 client_t client
= m
->get_source().num();
3428 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3430 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3431 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3435 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3436 // Pause RADOS operations until we see the required epoch
3437 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3440 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3441 // Record the barrier so that we will retransmit it to clients
3442 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3445 Session
*session
= mds
->get_session(m
);
3447 for (vector
<ceph_mds_cap_item
>::iterator p
= m
->caps
.begin(); p
!= m
->caps
.end(); ++p
) {
3448 _do_cap_release(client
, inodeno_t((uint64_t)p
->ino
) , p
->cap_id
, p
->migrate_seq
, p
->seq
);
3452 session
->notify_cap_release(m
->caps
.size());
3458 class C_Locker_RetryCapRelease
: public LockerContext
{
3462 ceph_seq_t migrate_seq
;
3463 ceph_seq_t issue_seq
;
3465 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3466 ceph_seq_t mseq
, ceph_seq_t seq
) :
3467 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3468 void finish(int r
) override
{
3469 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3473 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3474 ceph_seq_t mseq
, ceph_seq_t seq
)
3476 CInode
*in
= mdcache
->get_inode(ino
);
3478 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3481 Capability
*cap
= in
->get_client_cap(client
);
3483 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3487 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3488 if (cap
->get_cap_id() != cap_id
) {
3489 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3492 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3493 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3496 if (should_defer_client_cap_frozen(in
)) {
3497 dout(7) << " freezing|frozen, deferring" << dendl
;
3498 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3499 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3502 if (seq
!= cap
->get_last_issue()) {
3503 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3504 // clean out any old revoke history
3505 cap
->clean_revoke_from(seq
);
3506 eval_cap_gather(in
);
3509 remove_client_cap(in
, client
);
3512 /* This function DOES put the passed message before returning */
3514 void Locker::remove_client_cap(CInode
*in
, client_t client
)
3516 // clean out any pending snapflush state
3517 if (!in
->client_need_snapflush
.empty())
3518 _do_null_snapflush(in
, client
);
3520 in
->remove_client_cap(client
);
3522 if (in
->is_auth()) {
3523 // make sure we clear out the client byte range
3524 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3525 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3526 check_inode_max_size(in
);
3528 request_inode_file_caps(in
);
3531 try_eval(in
, CEPH_CAP_LOCKS
);
3536 * Return true if any currently revoking caps exceed the
3537 * session_timeout threshold.
3539 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
,
3540 double timeout
) const
3542 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3544 // No revoking caps at the moment
3547 utime_t now
= ceph_clock_now();
3548 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3549 if (age
<= timeout
) {
3557 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
,
3558 double timeout
) const
3560 if (!any_late_revoking_caps(revoking_caps
, timeout
)) {
3561 // Fast path: no misbehaving clients, execute in O(1)
3565 // Slow path: execute in O(N_clients)
3566 for (auto &p
: revoking_caps_by_client
) {
3567 if (any_late_revoking_caps(p
.second
, timeout
)) {
3568 result
->push_back(p
.first
);
3573 // Hard-code instead of surfacing a config settings because this is
3574 // really a hack that should go away at some point when we have better
3575 // inspection tools for getting at detailed cap state (#7316)
3576 #define MAX_WARN_CAPS 100
3578 void Locker::caps_tick()
3580 utime_t now
= ceph_clock_now();
3582 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3585 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3586 Capability
*cap
= *p
;
3588 utime_t age
= now
- cap
->get_last_revoke_stamp();
3589 dout(20) << __func__
<< " age = " << age
<< cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3590 if (age
<= mds
->mdsmap
->get_session_timeout()) {
3591 dout(20) << __func__
<< " age below timeout " << mds
->mdsmap
->get_session_timeout() << dendl
;
3595 if (i
> MAX_WARN_CAPS
) {
3596 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3597 << "revoking, ignoring subsequent caps" << dendl
;
3601 // exponential backoff of warning intervals
3602 if (age
> mds
->mdsmap
->get_session_timeout() * (1 << cap
->get_num_revoke_warnings())) {
3603 cap
->inc_num_revoke_warnings();
3605 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3606 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3607 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3608 mds
->clog
->warn() << ss
.str();
3609 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3611 dout(20) << __func__
<< " silencing log message (backoff) for " << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3617 void Locker::handle_client_lease(MClientLease
*m
)
3619 dout(10) << "handle_client_lease " << *m
<< dendl
;
3621 assert(m
->get_source().is_client());
3622 client_t client
= m
->get_source().num();
3624 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3626 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3632 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3633 CDir
*dir
= in
->get_dirfrag(fg
);
3635 dn
= dir
->lookup(m
->dname
);
3637 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3641 dout(10) << " on " << *dn
<< dendl
;
3644 ClientLease
*l
= dn
->get_client_lease(client
);
3646 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3651 switch (m
->get_action()) {
3652 case CEPH_MDS_LEASE_REVOKE_ACK
:
3653 case CEPH_MDS_LEASE_RELEASE
:
3654 if (l
->seq
!= m
->get_seq()) {
3655 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3657 dout(7) << "handle_client_lease client." << client
3658 << " on " << *dn
<< dendl
;
3659 dn
->remove_client_lease(l
, this);
3664 case CEPH_MDS_LEASE_RENEW
:
3666 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3667 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3668 if (dn
->lock
.can_lease(client
)) {
3669 int pool
= 1; // fixme.. do something smart!
3670 m
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3671 m
->h
.seq
= ++l
->seq
;
3674 utime_t now
= ceph_clock_now();
3675 now
+= mdcache
->client_lease_durations
[pool
];
3676 mdcache
->touch_client_lease(l
, pool
, now
);
3678 mds
->send_message_client_counted(m
, m
->get_connection());
3684 ceph_abort(); // implement me
3690 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3691 bufferlist
&bl
, utime_t now
, Session
*session
)
3693 CInode
*diri
= dn
->get_dir()->get_inode();
3694 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3695 ((!diri
->filelock
.can_lease(client
) &&
3696 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3697 dn
->lock
.can_lease(client
)) {
3698 int pool
= 1; // fixme.. do something smart!
3699 // issue a dentry lease
3700 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3701 session
->touch_lease(l
);
3703 now
+= mdcache
->client_lease_durations
[pool
];
3704 mdcache
->touch_client_lease(l
, pool
, now
);
3707 e
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3709 e
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3711 dout(20) << "issue_client_lease seq " << e
.seq
<< " dur " << e
.duration_ms
<< "ms "
3712 << " on " << *dn
<< dendl
;
3720 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3725 void Locker::revoke_client_leases(SimpleLock
*lock
)
3728 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3729 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3730 p
!= dn
->client_lease_map
.end();
3732 ClientLease
*l
= p
->second
;
3735 assert(lock
->get_type() == CEPH_LOCK_DN
);
3737 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3738 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3740 // i should also revoke the dir ICONTENT lease, if they have it!
3741 CInode
*diri
= dn
->get_dir()->get_inode();
3742 mds
->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE
, l
->seq
,
3745 diri
->first
, CEPH_NOSNAP
,
3753 // locks ----------------------------------------------------------------
3755 SimpleLock
*Locker::get_lock(int lock_type
, MDSCacheObjectInfo
&info
)
3757 switch (lock_type
) {
3760 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3761 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3766 fg
= diri
->pick_dirfrag(info
.dname
);
3767 dir
= diri
->get_dirfrag(fg
);
3769 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3772 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3778 case CEPH_LOCK_IAUTH
:
3779 case CEPH_LOCK_ILINK
:
3780 case CEPH_LOCK_IDFT
:
3781 case CEPH_LOCK_IFILE
:
3782 case CEPH_LOCK_INEST
:
3783 case CEPH_LOCK_IXATTR
:
3784 case CEPH_LOCK_ISNAP
:
3785 case CEPH_LOCK_IFLOCK
:
3786 case CEPH_LOCK_IPOLICY
:
3788 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3790 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3793 switch (lock_type
) {
3794 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3795 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3796 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3797 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3798 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3799 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3800 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3801 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3802 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3807 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3815 /* This function DOES put the passed message before returning */
3816 void Locker::handle_lock(MLock
*m
)
3818 // nobody should be talking to us during recovery.
3819 assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3821 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3823 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3828 switch (lock
->get_type()) {
3830 case CEPH_LOCK_IAUTH
:
3831 case CEPH_LOCK_ILINK
:
3832 case CEPH_LOCK_ISNAP
:
3833 case CEPH_LOCK_IXATTR
:
3834 case CEPH_LOCK_IFLOCK
:
3835 case CEPH_LOCK_IPOLICY
:
3836 handle_simple_lock(lock
, m
);
3839 case CEPH_LOCK_IDFT
:
3840 case CEPH_LOCK_INEST
:
3841 //handle_scatter_lock((ScatterLock*)lock, m);
3844 case CEPH_LOCK_IFILE
:
3845 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3849 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3859 // ==========================================================================
3862 /** This function may take a reference to m if it needs one, but does
3863 * not put references. */
3864 void Locker::handle_reqrdlock(SimpleLock
*lock
, MLock
*m
)
3866 MDSCacheObject
*parent
= lock
->get_parent();
3867 if (parent
->is_auth() &&
3868 lock
->get_state() != LOCK_SYNC
&&
3869 !parent
->is_frozen()) {
3870 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3871 << " on " << *parent
<< dendl
;
3872 assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3873 if (lock
->is_stable()) {
3876 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3877 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3878 new C_MDS_RetryMessage(mds
, m
->get()));
3881 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3882 << " on " << *parent
<< dendl
;
3883 // replica should retry
3887 /* This function DOES put the passed message before returning */
3888 void Locker::handle_simple_lock(SimpleLock
*lock
, MLock
*m
)
3890 int from
= m
->get_asker();
3892 dout(10) << "handle_simple_lock " << *m
3893 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3895 if (mds
->is_rejoin()) {
3896 if (lock
->get_parent()->is_rejoining()) {
3897 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3898 << ", dropping " << *m
<< dendl
;
3904 switch (m
->get_action()) {
3907 assert(lock
->get_state() == LOCK_LOCK
);
3908 lock
->decode_locked_state(m
->get_data());
3909 lock
->set_state(LOCK_SYNC
);
3910 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3914 assert(lock
->get_state() == LOCK_SYNC
);
3915 lock
->set_state(LOCK_SYNC_LOCK
);
3916 if (lock
->is_leased())
3917 revoke_client_leases(lock
);
3918 eval_gather(lock
, true);
3919 if (lock
->is_unstable_and_locked())
3920 mds
->mdlog
->flush();
3925 case LOCK_AC_LOCKACK
:
3926 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
3927 lock
->get_state() == LOCK_SYNC_EXCL
);
3928 assert(lock
->is_gathering(from
));
3929 lock
->remove_gather(from
);
3931 if (lock
->is_gathering()) {
3932 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3933 << ", still gathering " << lock
->get_gather_set() << dendl
;
3935 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
3936 << ", last one" << dendl
;
3941 case LOCK_AC_REQRDLOCK
:
3942 handle_reqrdlock(lock
, m
);
3950 /* unused, currently.
3952 class C_Locker_SimpleEval : public Context {
3956 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3957 void finish(int r) {
3958 locker->try_simple_eval(lock);
3962 void Locker::try_simple_eval(SimpleLock *lock)
3964 // unstable and ambiguous auth?
3965 if (!lock->is_stable() &&
3966 lock->get_parent()->is_ambiguous_auth()) {
3967 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3968 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3969 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3973 if (!lock->get_parent()->is_auth()) {
3974 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3978 if (!lock->get_parent()->can_auth_pin()) {
3979 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3980 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3981 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3985 if (lock->is_stable())
3991 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
3993 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
3995 assert(lock
->get_parent()->is_auth());
3996 assert(lock
->is_stable());
3998 if (lock
->get_parent()->is_freezing_or_frozen()) {
3999 // dentry lock in unreadable state can block path traverse
4000 if ((lock
->get_type() != CEPH_LOCK_DN
||
4001 lock
->get_state() == LOCK_SYNC
||
4002 lock
->get_parent()->is_frozen()))
4006 if (mdcache
->is_readonly()) {
4007 if (lock
->get_state() != LOCK_SYNC
) {
4008 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4009 simple_sync(lock
, need_issue
);
4016 if (lock
->get_type() != CEPH_LOCK_DN
) {
4017 in
= static_cast<CInode
*>(lock
->get_parent());
4018 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
4022 if (lock
->get_state() != LOCK_EXCL
&&
4023 in
&& in
->get_target_loner() >= 0 &&
4024 (wanted
& CEPH_CAP_GEXCL
)) {
4025 dout(7) << "simple_eval stable, going to excl " << *lock
4026 << " on " << *lock
->get_parent() << dendl
;
4027 simple_excl(lock
, need_issue
);
4031 else if (lock
->get_state() != LOCK_SYNC
&&
4032 !lock
->is_wrlocked() &&
4033 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4034 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4035 dout(7) << "simple_eval stable, syncing " << *lock
4036 << " on " << *lock
->get_parent() << dendl
;
4037 simple_sync(lock
, need_issue
);
4044 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4046 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4047 assert(lock
->get_parent()->is_auth());
4048 assert(lock
->is_stable());
4051 if (lock
->get_cap_shift())
4052 in
= static_cast<CInode
*>(lock
->get_parent());
4054 int old_state
= lock
->get_state();
4056 if (old_state
!= LOCK_TSYN
) {
4058 switch (lock
->get_state()) {
4059 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4060 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4061 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4062 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4063 default: ceph_abort();
4067 if (lock
->is_wrlocked())
4070 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4071 send_lock_message(lock
, LOCK_AC_SYNC
);
4072 lock
->init_gather();
4076 if (in
&& in
->is_head()) {
4077 if (in
->issued_caps_need_gather(lock
)) {
4086 bool need_recover
= false;
4087 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4089 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4090 mds
->mdcache
->queue_file_recover(in
);
4091 need_recover
= true;
4096 if (!gather
&& lock
->is_dirty()) {
4097 lock
->get_parent()->auth_pin(lock
);
4098 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4099 mds
->mdlog
->flush();
4104 lock
->get_parent()->auth_pin(lock
);
4106 mds
->mdcache
->do_file_recover();
4111 if (lock
->get_parent()->is_replicated()) { // FIXME
4113 lock
->encode_locked_state(data
);
4114 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4116 lock
->set_state(LOCK_SYNC
);
4117 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4118 if (in
&& in
->is_head()) {
4127 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4129 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4130 assert(lock
->get_parent()->is_auth());
4131 assert(lock
->is_stable());
4134 if (lock
->get_cap_shift())
4135 in
= static_cast<CInode
*>(lock
->get_parent());
4137 switch (lock
->get_state()) {
4138 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4139 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4140 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4141 default: ceph_abort();
4145 if (lock
->is_rdlocked())
4147 if (lock
->is_wrlocked())
4150 if (lock
->get_parent()->is_replicated() &&
4151 lock
->get_state() != LOCK_LOCK_EXCL
&&
4152 lock
->get_state() != LOCK_XSYN_EXCL
) {
4153 send_lock_message(lock
, LOCK_AC_LOCK
);
4154 lock
->init_gather();
4158 if (in
&& in
->is_head()) {
4159 if (in
->issued_caps_need_gather(lock
)) {
4169 lock
->get_parent()->auth_pin(lock
);
4171 lock
->set_state(LOCK_EXCL
);
4172 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4182 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4184 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4185 assert(lock
->get_parent()->is_auth());
4186 assert(lock
->is_stable());
4187 assert(lock
->get_state() != LOCK_LOCK
);
4190 if (lock
->get_cap_shift())
4191 in
= static_cast<CInode
*>(lock
->get_parent());
4193 int old_state
= lock
->get_state();
4195 switch (lock
->get_state()) {
4196 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4197 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4198 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4199 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4200 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4202 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4203 default: ceph_abort();
4207 if (lock
->is_leased()) {
4209 revoke_client_leases(lock
);
4211 if (lock
->is_rdlocked())
4213 if (in
&& in
->is_head()) {
4214 if (in
->issued_caps_need_gather(lock
)) {
4223 bool need_recover
= false;
4224 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4226 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4227 mds
->mdcache
->queue_file_recover(in
);
4228 need_recover
= true;
4233 if (lock
->get_parent()->is_replicated() &&
4234 lock
->get_state() == LOCK_MIX_LOCK
&&
4236 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4238 // move to second stage of gather now, so we don't send the lock action later.
4239 if (lock
->get_state() == LOCK_MIX_LOCK
)
4240 lock
->set_state(LOCK_MIX_LOCK2
);
4242 if (lock
->get_parent()->is_replicated() &&
4243 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4245 send_lock_message(lock
, LOCK_AC_LOCK
);
4246 lock
->init_gather();
4250 if (!gather
&& lock
->is_dirty()) {
4251 lock
->get_parent()->auth_pin(lock
);
4252 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4253 mds
->mdlog
->flush();
4258 lock
->get_parent()->auth_pin(lock
);
4260 mds
->mdcache
->do_file_recover();
4262 lock
->set_state(LOCK_LOCK
);
4263 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4268 void Locker::simple_xlock(SimpleLock
*lock
)
4270 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4271 assert(lock
->get_parent()->is_auth());
4272 //assert(lock->is_stable());
4273 assert(lock
->get_state() != LOCK_XLOCK
);
4276 if (lock
->get_cap_shift())
4277 in
= static_cast<CInode
*>(lock
->get_parent());
4279 if (lock
->is_stable())
4280 lock
->get_parent()->auth_pin(lock
);
4282 switch (lock
->get_state()) {
4284 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4285 default: ceph_abort();
4289 if (lock
->is_rdlocked())
4291 if (lock
->is_wrlocked())
4294 if (in
&& in
->is_head()) {
4295 if (in
->issued_caps_need_gather(lock
)) {
4302 lock
->set_state(LOCK_PREXLOCK
);
4303 //assert("shouldn't be called if we are already xlockable" == 0);
4311 // ==========================================================================
4316 Some notes on scatterlocks.
4318 - The scatter/gather is driven by the inode lock. The scatter always
4319 brings in the latest metadata from the fragments.
4321 - When in a scattered/MIX state, fragments are only allowed to
4322 update/be written to if the accounted stat matches the inode's
4325 - That means, on gather, we _only_ assimilate diffs for frag metadata
4326 that match the current version, because those are the only ones
4327 written during this scatter/gather cycle. (Others didn't permit
4328 it.) We increment the version and journal this to disk.
4330 - When possible, we also simultaneously update our local frag
4331 accounted stats to match.
4333 - On scatter, the new inode info is broadcast to frags, both local
4334 and remote. If possible (auth and !frozen), the dirfrag auth
4335 should update the accounted state (if it isn't already up to date).
4336 Note that this may occur on both the local inode auth node and
4337 inode replicas, so there are two potential paths. If it is NOT
4338 possible, they need to mark_stale to prevent any possible writes.
4340 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4341 only). Both are opportunities to update the frag accounted stats,
4342 even though only the MIX case is affected by a stale dirfrag.
4344 - Because many scatter/gather cycles can potentially go by without a
4345 frag being able to update its accounted stats (due to being frozen
4346 by exports/refragments in progress), the frag may have (even very)
4347 old stat versions. That's fine. If when we do want to update it,
4348 we can update accounted_* and the version first.
4352 class C_Locker_ScatterWB
: public LockerLogContext
{
4356 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4357 LockerLogContext(l
), lock(sl
), mut(m
) {}
4358 void finish(int r
) override
{
4359 locker
->scatter_writebehind_finish(lock
, mut
);
4363 void Locker::scatter_writebehind(ScatterLock
*lock
)
4365 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4366 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4369 MutationRef
mut(new MutationImpl());
4370 mut
->ls
= mds
->mdlog
->get_current_segment();
4372 // forcefully take a wrlock
4373 lock
->get_wrlock(true);
4374 mut
->wrlocks
.insert(lock
);
4375 mut
->locks
.insert(lock
);
4377 in
->pre_cow_old_inode(); // avoid cow mayhem
4379 auto &pi
= in
->project_inode();
4380 pi
.inode
.version
= in
->pre_dirty();
4382 in
->finish_scatter_gather_update(lock
->get_type());
4383 lock
->start_flush();
4385 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4386 mds
->mdlog
->start_entry(le
);
4388 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4389 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4391 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4393 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4396 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4398 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4399 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4400 in
->pop_and_dirty_projected_inode(mut
->ls
);
4402 lock
->finish_flush();
4404 // if replicas may have flushed in a mix->lock state, send another
4405 // message so they can finish_flush().
4406 if (in
->is_replicated()) {
4407 switch (lock
->get_state()) {
4409 case LOCK_MIX_LOCK2
:
4412 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4417 drop_locks(mut
.get());
4420 if (lock
->is_stable())
4421 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4423 //scatter_eval_gather(lock);
4426 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4428 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4430 assert(lock
->get_parent()->is_auth());
4431 assert(lock
->is_stable());
4433 if (lock
->get_parent()->is_freezing_or_frozen()) {
4434 dout(20) << " freezing|frozen" << dendl
;
4438 if (mdcache
->is_readonly()) {
4439 if (lock
->get_state() != LOCK_SYNC
) {
4440 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4441 simple_sync(lock
, need_issue
);
4446 if (!lock
->is_rdlocked() &&
4447 lock
->get_state() != LOCK_MIX
&&
4448 lock
->get_scatter_wanted()) {
4449 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4450 << " on " << *lock
->get_parent() << dendl
;
4451 scatter_mix(lock
, need_issue
);
4455 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4456 // in general, we want to keep INEST writable at all times.
4457 if (!lock
->is_rdlocked()) {
4458 if (lock
->get_parent()->is_replicated()) {
4459 if (lock
->get_state() != LOCK_MIX
)
4460 scatter_mix(lock
, need_issue
);
4462 if (lock
->get_state() != LOCK_LOCK
)
4463 simple_lock(lock
, need_issue
);
4469 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4470 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4471 // i _should_ be sync.
4472 if (!lock
->is_wrlocked() &&
4473 lock
->get_state() != LOCK_SYNC
) {
4474 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4475 simple_sync(lock
, need_issue
);
4482 * mark a scatterlock to indicate that the dir fnode has some dirty data
4484 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4487 if (lock
->get_updated_item()->is_on_list()) {
4488 dout(10) << "mark_updated_scatterlock " << *lock
4489 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4491 updated_scatterlocks
.push_back(lock
->get_updated_item());
4492 utime_t now
= ceph_clock_now();
4493 lock
->set_update_stamp(now
);
4494 dout(10) << "mark_updated_scatterlock " << *lock
4495 << " - added at " << now
<< dendl
;
4500 * this is called by scatter_tick and LogSegment::try_to_trim() when
4501 * trying to flush dirty scattered data (i.e. updated fnode) back to
4504 * we need to lock|scatter in order to push fnode changes into the
4507 void Locker::scatter_nudge(ScatterLock
*lock
, MDSInternalContextBase
*c
, bool forcelockchange
)
4509 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4511 if (p
->is_frozen() || p
->is_freezing()) {
4512 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4514 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4515 else if (lock
->is_dirty())
4516 // just requeue. not ideal.. starvation prone..
4517 updated_scatterlocks
.push_back(lock
->get_updated_item());
4521 if (p
->is_ambiguous_auth()) {
4522 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4524 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4525 else if (lock
->is_dirty())
4526 // just requeue. not ideal.. starvation prone..
4527 updated_scatterlocks
.push_back(lock
->get_updated_item());
4534 if (lock
->is_stable()) {
4535 // can we do it now?
4536 // (only if we're not replicated.. if we are, we really do need
4537 // to nudge the lock state!)
4539 actually, even if we're not replicated, we can't stay in MIX, because another mds
4540 could discover and replicate us at any time. if that happens while we're flushing,
4541 they end up in MIX but their inode has the old scatterstat version.
4543 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4544 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4545 scatter_writebehind(lock);
4547 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4552 if (mdcache
->is_readonly()) {
4553 if (lock
->get_state() != LOCK_SYNC
) {
4554 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4555 simple_sync(static_cast<ScatterLock
*>(lock
));
4560 // adjust lock state
4561 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4562 switch (lock
->get_type()) {
4563 case CEPH_LOCK_IFILE
:
4564 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4565 scatter_mix(static_cast<ScatterLock
*>(lock
));
4566 else if (lock
->get_state() != LOCK_LOCK
)
4567 simple_lock(static_cast<ScatterLock
*>(lock
));
4569 simple_sync(static_cast<ScatterLock
*>(lock
));
4572 case CEPH_LOCK_IDFT
:
4573 case CEPH_LOCK_INEST
:
4574 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4576 else if (lock
->get_state() != LOCK_LOCK
)
4585 if (lock
->is_stable() && count
== 2) {
4586 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4587 // this should only realy happen when called via
4588 // handle_file_lock due to AC_NUDGE, because the rest of the
4589 // time we are replicated or have dirty data and won't get
4590 // called. bailing here avoids an infinite loop.
4595 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4597 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4602 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4603 << *lock
<< " on " << *p
<< dendl
;
4604 // request unscatter?
4605 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4606 if (!mds
->is_cluster_degraded() ||
4607 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
4608 mds
->send_message_mds(new MLock(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4612 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4614 // also, requeue, in case we had wrong auth or something
4615 if (lock
->is_dirty())
4616 updated_scatterlocks
.push_back(lock
->get_updated_item());
4620 void Locker::scatter_tick()
4622 dout(10) << "scatter_tick" << dendl
;
4625 utime_t now
= ceph_clock_now();
4626 int n
= updated_scatterlocks
.size();
4627 while (!updated_scatterlocks
.empty()) {
4628 ScatterLock
*lock
= updated_scatterlocks
.front();
4630 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4632 if (!lock
->is_dirty()) {
4633 updated_scatterlocks
.pop_front();
4634 dout(10) << " removing from updated_scatterlocks "
4635 << *lock
<< " " << *lock
->get_parent() << dendl
;
4638 if (now
- lock
->get_update_stamp() < g_conf
->mds_scatter_nudge_interval
)
4640 updated_scatterlocks
.pop_front();
4641 scatter_nudge(lock
, 0);
4643 mds
->mdlog
->flush();
4647 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4649 dout(10) << "scatter_tempsync " << *lock
4650 << " on " << *lock
->get_parent() << dendl
;
4651 assert(lock
->get_parent()->is_auth());
4652 assert(lock
->is_stable());
4654 assert(0 == "not fully implemented, at least not for filelock");
4656 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4658 switch (lock
->get_state()) {
4659 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4660 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4661 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4662 default: ceph_abort();
4666 if (lock
->is_wrlocked())
4669 if (lock
->get_cap_shift() &&
4671 in
->issued_caps_need_gather(lock
)) {
4679 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4680 in
->is_replicated()) {
4681 lock
->init_gather();
4682 send_lock_message(lock
, LOCK_AC_LOCK
);
4690 lock
->set_state(LOCK_TSYN
);
4691 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4692 if (lock
->get_cap_shift()) {
4703 // ==========================================================================
4706 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4708 dout(7) << "local_wrlock_grab on " << *lock
4709 << " on " << *lock
->get_parent() << dendl
;
4711 assert(lock
->get_parent()->is_auth());
4712 assert(lock
->can_wrlock());
4713 assert(!mut
->wrlocks
.count(lock
));
4714 lock
->get_wrlock(mut
->get_client());
4715 mut
->wrlocks
.insert(lock
);
4716 mut
->locks
.insert(lock
);
4719 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4721 dout(7) << "local_wrlock_start on " << *lock
4722 << " on " << *lock
->get_parent() << dendl
;
4724 assert(lock
->get_parent()->is_auth());
4725 if (lock
->can_wrlock()) {
4726 assert(!mut
->wrlocks
.count(lock
));
4727 lock
->get_wrlock(mut
->get_client());
4728 mut
->wrlocks
.insert(lock
);
4729 mut
->locks
.insert(lock
);
4732 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4737 void Locker::local_wrlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4739 dout(7) << "local_wrlock_finish on " << *lock
4740 << " on " << *lock
->get_parent() << dendl
;
4742 mut
->wrlocks
.erase(lock
);
4743 mut
->locks
.erase(lock
);
4744 if (lock
->get_num_wrlocks() == 0) {
4745 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4746 SimpleLock::WAIT_WR
|
4747 SimpleLock::WAIT_RD
);
4751 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4753 dout(7) << "local_xlock_start on " << *lock
4754 << " on " << *lock
->get_parent() << dendl
;
4756 assert(lock
->get_parent()->is_auth());
4757 if (!lock
->can_xlock_local()) {
4758 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4762 lock
->get_xlock(mut
, mut
->get_client());
4763 mut
->xlocks
.insert(lock
);
4764 mut
->locks
.insert(lock
);
4768 void Locker::local_xlock_finish(LocalLock
*lock
, MutationImpl
*mut
)
4770 dout(7) << "local_xlock_finish on " << *lock
4771 << " on " << *lock
->get_parent() << dendl
;
4773 mut
->xlocks
.erase(lock
);
4774 mut
->locks
.erase(lock
);
4776 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4777 SimpleLock::WAIT_WR
|
4778 SimpleLock::WAIT_RD
);
4783 // ==========================================================================
4787 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4789 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4790 int loner_wanted
, other_wanted
;
4791 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4792 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4793 << " loner_wanted=" << gcap_string(loner_wanted
)
4794 << " other_wanted=" << gcap_string(other_wanted
)
4795 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4798 assert(lock
->get_parent()->is_auth());
4799 assert(lock
->is_stable());
4801 if (lock
->get_parent()->is_freezing_or_frozen())
4804 if (mdcache
->is_readonly()) {
4805 if (lock
->get_state() != LOCK_SYNC
) {
4806 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4807 simple_sync(lock
, need_issue
);
4813 if (lock
->get_state() == LOCK_EXCL
) {
4814 dout(20) << " is excl" << dendl
;
4815 int loner_issued
, other_issued
, xlocker_issued
;
4816 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4817 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4818 << " other_issued=" << gcap_string(other_issued
)
4819 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4821 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4822 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4823 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4824 dout(20) << " should lose it" << dendl
;
4825 // we should lose it.
4836 // -> any writer means MIX; RD doesn't matter.
4837 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4838 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4839 scatter_mix(lock
, need_issue
);
4840 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4841 simple_sync(lock
, need_issue
);
4843 dout(10) << " waiting for wrlock to drain" << dendl
;
4848 else if (lock
->get_state() != LOCK_EXCL
&&
4849 !lock
->is_rdlocked() &&
4850 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4851 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4852 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4853 in
->get_target_loner() >= 0) {
4854 dout(7) << "file_eval stable, bump to loner " << *lock
4855 << " on " << *lock
->get_parent() << dendl
;
4856 file_excl(lock
, need_issue
);
4860 else if (lock
->get_state() != LOCK_MIX
&&
4861 !lock
->is_rdlocked() &&
4862 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4863 (lock
->get_scatter_wanted() ||
4864 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4865 dout(7) << "file_eval stable, bump to mixed " << *lock
4866 << " on " << *lock
->get_parent() << dendl
;
4867 scatter_mix(lock
, need_issue
);
4871 else if (lock
->get_state() != LOCK_SYNC
&&
4872 !lock
->is_wrlocked() && // drain wrlocks first!
4873 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4874 !(wanted
& CEPH_CAP_GWR
) &&
4875 !((lock
->get_state() == LOCK_MIX
) &&
4876 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4877 //((wanted & CEPH_CAP_RD) ||
4878 //in->is_replicated() ||
4879 //lock->is_leased() ||
4880 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4882 dout(7) << "file_eval stable, bump to sync " << *lock
4883 << " on " << *lock
->get_parent() << dendl
;
4884 simple_sync(lock
, need_issue
);
4890 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4892 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4894 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4895 assert(in
->is_auth());
4896 assert(lock
->is_stable());
4898 if (lock
->get_state() == LOCK_LOCK
) {
4899 in
->start_scatter(lock
);
4900 if (in
->is_replicated()) {
4902 bufferlist softdata
;
4903 lock
->encode_locked_state(softdata
);
4905 // bcast to replicas
4906 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4910 lock
->set_state(LOCK_MIX
);
4911 lock
->clear_scatter_wanted();
4912 if (lock
->get_cap_shift()) {
4920 switch (lock
->get_state()) {
4921 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4922 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4923 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
4924 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4925 default: ceph_abort();
4929 if (lock
->is_rdlocked())
4931 if (in
->is_replicated()) {
4932 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
4933 send_lock_message(lock
, LOCK_AC_MIX
);
4934 lock
->init_gather();
4938 if (lock
->is_leased()) {
4939 revoke_client_leases(lock
);
4942 if (lock
->get_cap_shift() &&
4944 in
->issued_caps_need_gather(lock
)) {
4951 bool need_recover
= false;
4952 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4953 mds
->mdcache
->queue_file_recover(in
);
4954 need_recover
= true;
4959 lock
->get_parent()->auth_pin(lock
);
4961 mds
->mdcache
->do_file_recover();
4963 in
->start_scatter(lock
);
4964 lock
->set_state(LOCK_MIX
);
4965 lock
->clear_scatter_wanted();
4966 if (in
->is_replicated()) {
4967 bufferlist softdata
;
4968 lock
->encode_locked_state(softdata
);
4969 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4971 if (lock
->get_cap_shift()) {
4982 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
4984 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4985 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4987 assert(in
->is_auth());
4988 assert(lock
->is_stable());
4990 assert((in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty()) ||
4991 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
4993 switch (lock
->get_state()) {
4994 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4995 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
4996 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4997 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4998 default: ceph_abort();
5002 if (lock
->is_rdlocked())
5004 if (lock
->is_wrlocked())
5007 if (in
->is_replicated() &&
5008 lock
->get_state() != LOCK_LOCK_EXCL
&&
5009 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
5010 send_lock_message(lock
, LOCK_AC_LOCK
);
5011 lock
->init_gather();
5014 if (lock
->is_leased()) {
5015 revoke_client_leases(lock
);
5018 if (in
->is_head() &&
5019 in
->issued_caps_need_gather(lock
)) {
5026 bool need_recover
= false;
5027 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5028 mds
->mdcache
->queue_file_recover(in
);
5029 need_recover
= true;
5034 lock
->get_parent()->auth_pin(lock
);
5036 mds
->mdcache
->do_file_recover();
5038 lock
->set_state(LOCK_EXCL
);
5046 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5048 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5049 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5050 assert(in
->is_auth());
5051 assert(in
->get_loner() >= 0 && in
->mds_caps_wanted
.empty());
5053 switch (lock
->get_state()) {
5054 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5055 default: ceph_abort();
5059 if (lock
->is_wrlocked())
5062 if (in
->is_head() &&
5063 in
->issued_caps_need_gather(lock
)) {
5072 lock
->get_parent()->auth_pin(lock
);
5074 lock
->set_state(LOCK_XSYN
);
5075 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5083 void Locker::file_recover(ScatterLock
*lock
)
5085 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5086 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5088 assert(in
->is_auth());
5089 //assert(lock->is_stable());
5090 assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5095 if (in->is_replicated()
5096 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5097 send_lock_message(lock, LOCK_AC_LOCK);
5098 lock->init_gather();
5102 if (in
->is_head() &&
5103 in
->issued_caps_need_gather(lock
)) {
5108 lock
->set_state(LOCK_SCAN
);
5110 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5112 mds
->mdcache
->queue_file_recover(in
);
5117 /* This function DOES put the passed message before returning */
5118 void Locker::handle_file_lock(ScatterLock
*lock
, MLock
*m
)
5120 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5121 int from
= m
->get_asker();
5123 if (mds
->is_rejoin()) {
5124 if (in
->is_rejoining()) {
5125 dout(7) << "handle_file_lock still rejoining " << *in
5126 << ", dropping " << *m
<< dendl
;
5132 dout(7) << "handle_file_lock a=" << get_lock_action_name(m
->get_action())
5134 << " from mds." << from
<< " "
5137 bool caps
= lock
->get_cap_shift();
5139 switch (m
->get_action()) {
5142 assert(lock
->get_state() == LOCK_LOCK
||
5143 lock
->get_state() == LOCK_MIX
||
5144 lock
->get_state() == LOCK_MIX_SYNC2
);
5146 if (lock
->get_state() == LOCK_MIX
) {
5147 lock
->set_state(LOCK_MIX_SYNC
);
5148 eval_gather(lock
, true);
5149 if (lock
->is_unstable_and_locked())
5150 mds
->mdlog
->flush();
5154 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5155 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5158 lock
->decode_locked_state(m
->get_data());
5159 lock
->set_state(LOCK_SYNC
);
5164 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5169 switch (lock
->get_state()) {
5170 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5171 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5172 default: ceph_abort();
5175 eval_gather(lock
, true);
5176 if (lock
->is_unstable_and_locked())
5177 mds
->mdlog
->flush();
5181 case LOCK_AC_LOCKFLUSHED
:
5182 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5183 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5184 // wake up scatter_nudge waiters
5185 if (lock
->is_stable())
5186 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5190 assert(lock
->get_state() == LOCK_SYNC
||
5191 lock
->get_state() == LOCK_LOCK
||
5192 lock
->get_state() == LOCK_SYNC_MIX2
);
5194 if (lock
->get_state() == LOCK_SYNC
) {
5196 lock
->set_state(LOCK_SYNC_MIX
);
5197 eval_gather(lock
, true);
5198 if (lock
->is_unstable_and_locked())
5199 mds
->mdlog
->flush();
5204 lock
->set_state(LOCK_MIX
);
5205 lock
->decode_locked_state(m
->get_data());
5210 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5215 case LOCK_AC_LOCKACK
:
5216 assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5217 lock
->get_state() == LOCK_MIX_LOCK
||
5218 lock
->get_state() == LOCK_MIX_LOCK2
||
5219 lock
->get_state() == LOCK_MIX_EXCL
||
5220 lock
->get_state() == LOCK_SYNC_EXCL
||
5221 lock
->get_state() == LOCK_SYNC_MIX
||
5222 lock
->get_state() == LOCK_MIX_TSYN
);
5223 assert(lock
->is_gathering(from
));
5224 lock
->remove_gather(from
);
5226 if (lock
->get_state() == LOCK_MIX_LOCK
||
5227 lock
->get_state() == LOCK_MIX_LOCK2
||
5228 lock
->get_state() == LOCK_MIX_EXCL
||
5229 lock
->get_state() == LOCK_MIX_TSYN
) {
5230 lock
->decode_locked_state(m
->get_data());
5231 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5232 // delay calling scatter_writebehind().
5233 lock
->clear_flushed();
5236 if (lock
->is_gathering()) {
5237 dout(7) << "handle_file_lock " << *in
<< " from " << from
5238 << ", still gathering " << lock
->get_gather_set() << dendl
;
5240 dout(7) << "handle_file_lock " << *in
<< " from " << from
5241 << ", last one" << dendl
;
5246 case LOCK_AC_SYNCACK
:
5247 assert(lock
->get_state() == LOCK_MIX_SYNC
);
5248 assert(lock
->is_gathering(from
));
5249 lock
->remove_gather(from
);
5251 lock
->decode_locked_state(m
->get_data());
5253 if (lock
->is_gathering()) {
5254 dout(7) << "handle_file_lock " << *in
<< " from " << from
5255 << ", still gathering " << lock
->get_gather_set() << dendl
;
5257 dout(7) << "handle_file_lock " << *in
<< " from " << from
5258 << ", last one" << dendl
;
5263 case LOCK_AC_MIXACK
:
5264 assert(lock
->get_state() == LOCK_SYNC_MIX
);
5265 assert(lock
->is_gathering(from
));
5266 lock
->remove_gather(from
);
5268 if (lock
->is_gathering()) {
5269 dout(7) << "handle_file_lock " << *in
<< " from " << from
5270 << ", still gathering " << lock
->get_gather_set() << dendl
;
5272 dout(7) << "handle_file_lock " << *in
<< " from " << from
5273 << ", last one" << dendl
;
5280 case LOCK_AC_REQSCATTER
:
5281 if (lock
->is_stable()) {
5282 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5283 * because the replica should be holding an auth_pin if they're
5284 * doing this (and thus, we are freezing, not frozen, and indefinite
5285 * starvation isn't an issue).
5287 dout(7) << "handle_file_lock got scatter request on " << *lock
5288 << " on " << *lock
->get_parent() << dendl
;
5289 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5292 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5293 << " on " << *lock
->get_parent() << dendl
;
5294 lock
->set_scatter_wanted();
5298 case LOCK_AC_REQUNSCATTER
:
5299 if (lock
->is_stable()) {
5300 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5301 * because the replica should be holding an auth_pin if they're
5302 * doing this (and thus, we are freezing, not frozen, and indefinite
5303 * starvation isn't an issue).
5305 dout(7) << "handle_file_lock got unscatter request on " << *lock
5306 << " on " << *lock
->get_parent() << dendl
;
5307 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5308 simple_lock(lock
); // FIXME tempsync?
5310 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5311 << " on " << *lock
->get_parent() << dendl
;
5312 lock
->set_unscatter_wanted();
5316 case LOCK_AC_REQRDLOCK
:
5317 handle_reqrdlock(lock
, m
);
5321 if (!lock
->get_parent()->is_auth()) {
5322 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5323 << " on " << *lock
->get_parent() << dendl
;
5324 } else if (!lock
->get_parent()->is_replicated()) {
5325 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5326 << " on " << *lock
->get_parent() << dendl
;
5328 dout(7) << "handle_file_lock trying nudge on " << *lock
5329 << " on " << *lock
->get_parent() << dendl
;
5330 scatter_nudge(lock
, 0, true);
5331 mds
->mdlog
->flush();