1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <string_view>
20 #include "MDBalancer.h"
26 #include "MDSContext.h"
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
44 #include "messages/MMDSSlaveRequest.h"
48 #include "common/config.h"
51 #define dout_subsys ceph_subsys_mds
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
56 return *_dout
<< "mds." << mds
->get_nodeid() << ".locker ";
60 class LockerContext
: public MDSContext
{
63 MDSRank
*get_mds() override
69 explicit LockerContext(Locker
*locker_
) : locker(locker_
) {
70 ceph_assert(locker
!= NULL
);
74 class LockerLogContext
: public MDSLogContextBase
{
77 MDSRank
*get_mds() override
83 explicit LockerLogContext(Locker
*locker_
) : locker(locker_
) {
84 ceph_assert(locker
!= NULL
);
88 Locker::Locker(MDSRank
*m
, MDCache
*c
) :
89 mds(m
), mdcache(c
), need_snapflush_inodes(member_offset(CInode
, item_caps
)) {}
92 void Locker::dispatch(const Message::const_ref
&m
)
95 switch (m
->get_type()) {
98 handle_lock(MLock::msgref_cast(m
));
101 case MSG_MDS_INODEFILECAPS
:
102 handle_inode_file_caps(MInodeFileCaps::msgref_cast(m
));
105 case CEPH_MSG_CLIENT_CAPS
:
106 handle_client_caps(MClientCaps::msgref_cast(m
));
108 case CEPH_MSG_CLIENT_CAPRELEASE
:
109 handle_client_cap_release(MClientCapRelease::msgref_cast(m
));
111 case CEPH_MSG_CLIENT_LEASE
:
112 handle_client_lease(MClientLease::msgref_cast(m
));
115 derr
<< "locker unknown message " << m
->get_type() << dendl
;
116 ceph_abort_msg("locker unknown message");
133 void Locker::send_lock_message(SimpleLock
*lock
, int msg
)
135 for (const auto &it
: lock
->get_parent()->get_replicas()) {
136 if (mds
->is_cluster_degraded() &&
137 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
139 auto m
= MLock::create(lock
, msg
, mds
->get_nodeid());
140 mds
->send_message_mds(m
, it
.first
);
144 void Locker::send_lock_message(SimpleLock
*lock
, int msg
, const bufferlist
&data
)
146 for (const auto &it
: lock
->get_parent()->get_replicas()) {
147 if (mds
->is_cluster_degraded() &&
148 mds
->mdsmap
->get_state(it
.first
) < MDSMap::STATE_REJOIN
)
150 auto m
= MLock::create(lock
, msg
, mds
->get_nodeid());
152 mds
->send_message_mds(m
, it
.first
);
159 void Locker::include_snap_rdlocks(CInode
*in
, MutationImpl::LockOpVec
& lov
)
161 // rdlock ancestor snaps
163 while (t
->get_projected_parent_dn()) {
164 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
165 lov
.add_rdlock(&t
->snaplock
);
167 lov
.add_rdlock(&in
->snaplock
);
170 void Locker::include_snap_rdlocks_wlayout(CInode
*in
, MutationImpl::LockOpVec
& lov
,
171 file_layout_t
**layout
)
173 //rdlock ancestor snaps
175 lov
.add_rdlock(&in
->snaplock
);
176 lov
.add_rdlock(&in
->policylock
);
177 bool found_layout
= false;
179 lov
.add_rdlock(&t
->snaplock
);
181 lov
.add_rdlock(&t
->policylock
);
182 if (t
->get_projected_inode()->has_layout()) {
183 *layout
= &t
->get_projected_inode()->layout
;
187 if (t
->get_projected_parent_dn() &&
188 t
->get_projected_parent_dn()->get_dir())
189 t
= t
->get_projected_parent_dn()->get_dir()->get_inode();
194 struct MarkEventOnDestruct
{
196 std::string_view message
;
198 MarkEventOnDestruct(MDRequestRef
& _mdr
, std::string_view _message
) :
202 ~MarkEventOnDestruct() {
204 mdr
->mark_event(message
);
208 /* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210 bool Locker::acquire_locks(MDRequestRef
& mdr
,
211 MutationImpl::LockOpVec
& lov
,
212 CInode
*auth_pin_freeze
,
213 bool auth_pin_nonblock
)
215 if (mdr
->done_locking
&&
216 !mdr
->is_slave()) { // not on slaves! master requests locks piecemeal.
217 dout(10) << "acquire_locks " << *mdr
<< " - done locking" << dendl
;
218 return true; // at least we had better be!
220 dout(10) << "acquire_locks " << *mdr
<< dendl
;
222 MarkEventOnDestruct
marker(mdr
, "failed to acquire_locks");
224 client_t client
= mdr
->get_client();
226 set
<MDSCacheObject
*> mustpin
; // items to authpin
229 for (int i
= 0, size
= lov
.size(); i
< size
; ++i
) {
231 SimpleLock
*lock
= p
.lock
;
232 MDSCacheObject
*object
= lock
->get_parent();
235 if ((lock
->get_type() == CEPH_LOCK_ISNAP
||
236 lock
->get_type() == CEPH_LOCK_IPOLICY
) &&
237 mds
->is_cluster_degraded() &&
239 !mdr
->is_queued_for_replay()) {
240 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
241 // get processed in proper order.
243 if (object
->is_auth()) {
244 if (!mdr
->locks
.count(lock
)) {
246 object
->list_replicas(ls
);
248 if (mds
->mdsmap
->get_state(m
) < MDSMap::STATE_ACTIVE
) {
255 // if the lock is the latest locked one, it's possible that slave mds got the lock
256 // while there are recovering mds.
257 if (!mdr
->locks
.count(lock
) || lock
== *mdr
->locks
.rbegin())
261 dout(10) << " must xlock " << *lock
<< " " << *object
262 << ", waiting for cluster recovered" << dendl
;
263 mds
->locker
->drop_locks(mdr
.get(), NULL
);
264 mdr
->drop_local_auth_pins();
265 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
270 dout(20) << " must xlock " << *lock
<< " " << *object
<< dendl
;
272 mustpin
.insert(object
);
274 // augment xlock with a versionlock?
275 if (lock
->get_type() == CEPH_LOCK_DN
) {
276 CDentry
*dn
= static_cast<CDentry
*>(object
);
279 if (mdr
->is_master()) {
280 // master. wrlock versionlock so we can pipeline dentry updates to journal.
281 lov
.add_wrlock(&dn
->versionlock
);
283 // slave. exclusively lock the dentry version (i.e. block other journal updates).
284 // this makes rollback safe.
285 lov
.add_xlock(&dn
->versionlock
);
288 if (lock
->get_type() > CEPH_LOCK_IVERSION
) {
289 // inode version lock?
290 CInode
*in
= static_cast<CInode
*>(object
);
293 if (mdr
->is_master()) {
294 // master. wrlock versionlock so we can pipeline inode updates to journal.
295 lov
.add_wrlock(&in
->versionlock
);
297 // slave. exclusively lock the inode version (i.e. block other journal updates).
298 // this makes rollback safe.
299 lov
.add_xlock(&in
->versionlock
);
302 } else if (p
.is_wrlock()) {
303 dout(20) << " must wrlock " << *lock
<< " " << *object
<< dendl
;
304 if (object
->is_auth()) {
305 mustpin
.insert(object
);
306 } else if (!object
->is_auth() &&
307 !lock
->can_wrlock(client
) && // we might have to request a scatter
308 !mdr
->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
309 dout(15) << " will also auth_pin " << *object
310 << " in case we need to request a scatter" << dendl
;
311 mustpin
.insert(object
);
313 } else if (p
.is_remote_wrlock()) {
314 dout(20) << " must remote_wrlock on mds." << p
.wrlock_target
<< " "
315 << *lock
<< " " << *object
<< dendl
;
316 mustpin
.insert(object
);
317 } else if (p
.is_rdlock()) {
319 dout(20) << " must rdlock " << *lock
<< " " << *object
<< dendl
;
320 if (object
->is_auth()) {
321 mustpin
.insert(object
);
322 } else if (!object
->is_auth() &&
323 !lock
->can_rdlock(client
)) { // we might have to request an rdlock
324 dout(15) << " will also auth_pin " << *object
325 << " in case we need to request a rdlock" << dendl
;
326 mustpin
.insert(object
);
329 ceph_assert(0 == "locker unknown lock operation");
333 lov
.sort_and_merge();
336 map
<mds_rank_t
, set
<MDSCacheObject
*> > mustpin_remote
; // mds -> (object set)
338 // can i auth pin them all now?
339 marker
.message
= "failed to authpin local pins";
340 for (const auto &p
: mustpin
) {
341 MDSCacheObject
*object
= p
;
343 dout(10) << " must authpin " << *object
<< dendl
;
345 if (mdr
->is_auth_pinned(object
)) {
346 if (object
!= (MDSCacheObject
*)auth_pin_freeze
)
348 if (mdr
->more()->is_remote_frozen_authpin
) {
349 if (mdr
->more()->rename_inode
== auth_pin_freeze
)
351 // unfreeze auth pin for the wrong inode
352 mustpin_remote
[mdr
->more()->rename_inode
->authority().first
].size();
356 if (!object
->is_auth()) {
357 if (!mdr
->locks
.empty())
358 drop_locks(mdr
.get());
359 if (object
->is_ambiguous_auth()) {
361 marker
.message
= "waiting for single auth, object is being migrated";
362 dout(10) << " ambiguous auth, waiting to authpin " << *object
<< dendl
;
363 object
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_MDS_RetryRequest(mdcache
, mdr
));
364 mdr
->drop_local_auth_pins();
367 mustpin_remote
[object
->authority().first
].insert(object
);
371 if (!object
->can_auth_pin(&err
)) {
373 drop_locks(mdr
.get());
374 mdr
->drop_local_auth_pins();
375 if (auth_pin_nonblock
) {
376 dout(10) << " can't auth_pin (freezing?) " << *object
<< ", nonblocking" << dendl
;
380 if (err
== MDSCacheObject::ERR_EXPORTING_TREE
) {
381 marker
.message
= "failed to authpin, subtree is being exported";
382 } else if (err
== MDSCacheObject::ERR_FRAGMENTING_DIR
) {
383 marker
.message
= "failed to authpin, dir is being fragmented";
384 } else if (err
== MDSCacheObject::ERR_EXPORTING_INODE
) {
385 marker
.message
= "failed to authpin, inode is being exported";
387 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object
<< dendl
;
388 object
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_MDS_RetryRequest(mdcache
, mdr
));
390 if (!mdr
->remote_auth_pins
.empty())
391 notify_freeze_waiter(object
);
397 // ok, grab local auth pins
398 for (const auto& p
: mustpin
) {
399 MDSCacheObject
*object
= p
;
400 if (mdr
->is_auth_pinned(object
)) {
401 dout(10) << " already auth_pinned " << *object
<< dendl
;
402 } else if (object
->is_auth()) {
403 dout(10) << " auth_pinning " << *object
<< dendl
;
404 mdr
->auth_pin(object
);
408 // request remote auth_pins
409 if (!mustpin_remote
.empty()) {
410 marker
.message
= "requesting remote authpins";
411 for (const auto& p
: mdr
->remote_auth_pins
) {
412 if (mustpin
.count(p
.first
)) {
413 ceph_assert(p
.second
== p
.first
->authority().first
);
414 map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator q
= mustpin_remote
.find(p
.second
);
415 if (q
!= mustpin_remote
.end())
416 q
->second
.insert(p
.first
);
419 for (map
<mds_rank_t
, set
<MDSCacheObject
*> >::iterator p
= mustpin_remote
.begin();
420 p
!= mustpin_remote
.end();
422 dout(10) << "requesting remote auth_pins from mds." << p
->first
<< dendl
;
424 // wait for active auth
425 if (mds
->is_cluster_degraded() &&
426 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(p
->first
)) {
427 dout(10) << " mds." << p
->first
<< " is not active" << dendl
;
428 if (mdr
->more()->waiting_on_slave
.empty())
429 mds
->wait_for_active_peer(p
->first
, new C_MDS_RetryRequest(mdcache
, mdr
));
433 auto req
= MMDSSlaveRequest::create(mdr
->reqid
, mdr
->attempt
, MMDSSlaveRequest::OP_AUTHPIN
);
434 for (set
<MDSCacheObject
*>::iterator q
= p
->second
.begin();
435 q
!= p
->second
.end();
437 dout(10) << " req remote auth_pin of " << **q
<< dendl
;
438 MDSCacheObjectInfo info
;
439 (*q
)->set_object_info(info
);
440 req
->get_authpins().push_back(info
);
441 if (*q
== auth_pin_freeze
)
442 (*q
)->set_object_info(req
->get_authpin_freeze());
445 if (auth_pin_nonblock
)
446 req
->mark_nonblock();
447 mds
->send_message_mds(req
, p
->first
);
449 // put in waiting list
450 ceph_assert(mdr
->more()->waiting_on_slave
.count(p
->first
) == 0);
451 mdr
->more()->waiting_on_slave
.insert(p
->first
);
456 // caps i'll need to issue
457 set
<CInode
*> issue_set
;
461 // make sure they match currently acquired locks.
462 auto existing
= mdr
->locks
.begin();
463 for (const auto& p
: lov
) {
464 bool need_wrlock
= p
.is_wrlock();
465 bool need_remote_wrlock
= p
.is_remote_wrlock();
468 if (existing
!= mdr
->locks
.end() && existing
->lock
== p
.lock
) {
470 auto it
= existing
++;
471 auto have
= *it
; // don't reference
473 if (have
.is_xlock() && p
.is_xlock()) {
474 dout(10) << " already xlocked " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
478 if (have
.is_remote_wrlock() &&
479 (!need_remote_wrlock
|| have
.wrlock_target
!= p
.wrlock_target
)) {
480 dout(10) << " unlocking remote_wrlock on wrong mds." << have
.wrlock_target
481 << " " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
482 remote_wrlock_finish(it
, mdr
.get());
483 have
.clear_remote_wrlock();
486 if (need_wrlock
|| need_remote_wrlock
) {
487 if (need_wrlock
== have
.is_wrlock() &&
488 need_remote_wrlock
== have
.is_remote_wrlock()) {
490 dout(10) << " already wrlocked " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
491 if (need_remote_wrlock
)
492 dout(10) << " already remote_wrlocked " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
496 if (have
.is_wrlock()) {
498 dout(10) << " unlocking extra " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
499 else if (need_remote_wrlock
) // acquire remote_wrlock first
500 dout(10) << " unlocking out-of-order " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
501 bool need_issue
= false;
502 wrlock_finish(it
, mdr
.get(), &need_issue
);
504 issue_set
.insert(static_cast<CInode
*>(have
.lock
->get_parent()));
506 } else if (have
.is_rdlock() && p
.is_rdlock()) {
507 dout(10) << " already rdlocked " << *have
.lock
<< " " << *have
.lock
->get_parent() << dendl
;
512 // hose any stray locks
513 while (existing
!= mdr
->locks
.end()) {
514 auto it
= existing
++;
515 auto stray
= *it
; // don't reference
516 dout(10) << " unlocking out-of-order " << *stray
.lock
<< " " << *stray
.lock
->get_parent() << dendl
;
517 bool need_issue
= false;
518 if (stray
.is_xlock()) {
519 xlock_finish(it
, mdr
.get(), &need_issue
);
520 } else if (stray
.is_rdlock()) {
521 rdlock_finish(it
, mdr
.get(), &need_issue
);
523 // may have acquired both wrlock and remore wrlock
524 if (stray
.is_wrlock())
525 wrlock_finish(it
, mdr
.get(), &need_issue
);
526 if (stray
.is_remote_wrlock())
527 remote_wrlock_finish(it
, mdr
.get());
530 issue_set
.insert(static_cast<CInode
*>(stray
.lock
->get_parent()));
534 if (mdr
->locking
&& p
.lock
!= mdr
->locking
) {
535 cancel_locking(mdr
.get(), &issue_set
);
538 marker
.message
= "failed to xlock, waiting";
539 if (!xlock_start(p
.lock
, mdr
))
541 dout(10) << " got xlock on " << *p
.lock
<< " " << *p
.lock
->get_parent() << dendl
;
542 } else if (need_wrlock
|| need_remote_wrlock
) {
543 if (need_remote_wrlock
&& !mdr
->is_remote_wrlocked(p
)) {
544 marker
.message
= "waiting for remote wrlocks";
545 remote_wrlock_start(p
, p
.wrlock_target
, mdr
);
549 marker
.message
= "failed to wrlock, waiting";
550 if (need_remote_wrlock
&& !p
.lock
->can_wrlock(mdr
->get_client())) {
551 marker
.message
= "failed to wrlock, dropping remote wrlock and waiting";
552 // can't take the wrlock because the scatter lock is gathering. need to
553 // release the remote wrlock, so that the gathering process can finish.
554 auto it
= mdr
->locks
.end();
556 remote_wrlock_finish(it
, mdr
.get());
557 remote_wrlock_start(p
, p
.wrlock_target
, mdr
);
560 // nowait if we have already gotten remote wrlock
561 if (!wrlock_start(p
, mdr
, need_remote_wrlock
))
563 dout(10) << " got wrlock on " << *p
.lock
<< " " << *p
.lock
->get_parent() << dendl
;
566 ceph_assert(mdr
->is_master());
567 if (p
.lock
->needs_recover()) {
568 if (mds
->is_cluster_degraded()) {
569 if (!mdr
->is_queued_for_replay()) {
570 // see comments in SimpleLock::set_state_rejoin() and
571 // ScatterLock::encode_state_for_rejoin()
572 drop_locks(mdr
.get());
573 mds
->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache
, mdr
));
574 dout(10) << " rejoin recovering " << *p
.lock
<< " " << *p
.lock
->get_parent()
575 << ", waiting for cluster recovered" << dendl
;
576 marker
.message
= "rejoin recovering lock, waiting for cluster recovered";
580 p
.lock
->clear_need_recover();
584 marker
.message
= "failed to rdlock, waiting";
585 if (!rdlock_start(p
, mdr
))
587 dout(10) << " got rdlock on " << *p
.lock
<< " " << *p
.lock
->get_parent() << dendl
;
591 // any extra unneeded locks?
592 while (existing
!= mdr
->locks
.end()) {
593 auto it
= existing
++;
595 dout(10) << " unlocking extra " << *stray
.lock
<< " " << *stray
.lock
->get_parent() << dendl
;
596 bool need_issue
= false;
597 if (stray
.is_xlock()) {
598 xlock_finish(it
, mdr
.get(), &need_issue
);
599 } else if (stray
.is_rdlock()) {
600 rdlock_finish(it
, mdr
.get(), &need_issue
);
602 // may have acquired both wrlock and remore wrlock
603 if (stray
.is_wrlock())
604 wrlock_finish(it
, mdr
.get(), &need_issue
);
605 if (stray
.is_remote_wrlock())
606 remote_wrlock_finish(it
, mdr
.get());
609 issue_set
.insert(static_cast<CInode
*>(stray
.lock
->get_parent()));
612 mdr
->done_locking
= true;
613 mdr
->set_mds_stamp(ceph_clock_now());
615 marker
.message
= "acquired locks";
618 issue_caps_set(issue_set
);
622 void Locker::notify_freeze_waiter(MDSCacheObject
*o
)
625 if (CInode
*in
= dynamic_cast<CInode
*>(o
)) {
627 dir
= in
->get_parent_dir();
628 } else if (CDentry
*dn
= dynamic_cast<CDentry
*>(o
)) {
631 dir
= dynamic_cast<CDir
*>(o
);
635 if (dir
->is_freezing_dir())
636 mdcache
->fragment_freeze_inc_num_waiters(dir
);
637 if (dir
->is_freezing_tree()) {
638 while (!dir
->is_freezing_tree_root())
639 dir
= dir
->get_parent_dir();
640 mdcache
->migrator
->export_freeze_inc_num_waiters(dir
);
645 void Locker::set_xlocks_done(MutationImpl
*mut
, bool skip_dentry
)
647 for (const auto &p
: mut
->locks
) {
650 MDSCacheObject
*obj
= p
.lock
->get_parent();
651 ceph_assert(obj
->is_auth());
653 (p
.lock
->get_type() == CEPH_LOCK_DN
|| p
.lock
->get_type() == CEPH_LOCK_DVERSION
))
655 dout(10) << "set_xlocks_done on " << *p
.lock
<< " " << *obj
<< dendl
;
656 p
.lock
->set_xlock_done();
660 void Locker::_drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
,
663 set
<mds_rank_t
> slaves
;
665 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
666 SimpleLock
*lock
= it
->lock
;
667 MDSCacheObject
*obj
= lock
->get_parent();
669 if (it
->is_xlock()) {
670 if (obj
->is_auth()) {
672 xlock_finish(it
++, mut
, &ni
);
674 pneed_issue
->insert(static_cast<CInode
*>(obj
));
676 ceph_assert(lock
->get_sm()->can_remote_xlock
);
677 slaves
.insert(obj
->authority().first
);
679 mut
->locks
.erase(it
++);
681 } else if (it
->is_wrlock() || it
->is_remote_wrlock()) {
682 if (it
->is_remote_wrlock()) {
683 slaves
.insert(it
->wrlock_target
);
684 it
->clear_remote_wrlock();
686 if (it
->is_wrlock()) {
688 wrlock_finish(it
++, mut
, &ni
);
690 pneed_issue
->insert(static_cast<CInode
*>(obj
));
692 mut
->locks
.erase(it
++);
694 } else if (drop_rdlocks
&& it
->is_rdlock()) {
696 rdlock_finish(it
++, mut
, &ni
);
698 pneed_issue
->insert(static_cast<CInode
*>(obj
));
704 for (set
<mds_rank_t
>::iterator p
= slaves
.begin(); p
!= slaves
.end(); ++p
) {
705 if (!mds
->is_cluster_degraded() ||
706 mds
->mdsmap
->get_state(*p
) >= MDSMap::STATE_REJOIN
) {
707 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p
<< dendl
;
708 auto slavereq
= MMDSSlaveRequest::create(mut
->reqid
, mut
->attempt
, MMDSSlaveRequest::OP_DROPLOCKS
);
709 mds
->send_message_mds(slavereq
, *p
);
714 void Locker::cancel_locking(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
716 SimpleLock
*lock
= mut
->locking
;
718 dout(10) << "cancel_locking " << *lock
<< " on " << *mut
<< dendl
;
720 if (lock
->get_parent()->is_auth()) {
721 bool need_issue
= false;
722 if (lock
->get_state() == LOCK_PREXLOCK
) {
723 _finish_xlock(lock
, -1, &need_issue
);
724 } else if (lock
->get_state() == LOCK_LOCK_XLOCK
) {
725 lock
->set_state(LOCK_XLOCKDONE
);
726 eval_gather(lock
, true, &need_issue
);
729 pneed_issue
->insert(static_cast<CInode
*>(lock
->get_parent()));
731 mut
->finish_locking(lock
);
734 void Locker::drop_locks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
737 set
<CInode
*> my_need_issue
;
739 pneed_issue
= &my_need_issue
;
742 cancel_locking(mut
, pneed_issue
);
743 _drop_locks(mut
, pneed_issue
, true);
745 if (pneed_issue
== &my_need_issue
)
746 issue_caps_set(*pneed_issue
);
747 mut
->done_locking
= false;
750 void Locker::drop_non_rdlocks(MutationImpl
*mut
, set
<CInode
*> *pneed_issue
)
752 set
<CInode
*> my_need_issue
;
754 pneed_issue
= &my_need_issue
;
756 _drop_locks(mut
, pneed_issue
, false);
758 if (pneed_issue
== &my_need_issue
)
759 issue_caps_set(*pneed_issue
);
762 void Locker::drop_rdlocks_for_early_reply(MutationImpl
*mut
)
764 set
<CInode
*> need_issue
;
766 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
767 if (!it
->is_rdlock()) {
771 SimpleLock
*lock
= it
->lock
;
772 // make later mksnap/setlayout (at other mds) wait for this unsafe request
773 if (lock
->get_type() == CEPH_LOCK_ISNAP
||
774 lock
->get_type() == CEPH_LOCK_IPOLICY
) {
779 rdlock_finish(it
++, mut
, &ni
);
781 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
784 issue_caps_set(need_issue
);
787 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl
*mut
)
789 set
<CInode
*> need_issue
;
791 for (auto it
= mut
->locks
.begin(); it
!= mut
->locks
.end(); ) {
792 SimpleLock
*lock
= it
->lock
;
793 if (lock
->get_type() == CEPH_LOCK_IDFT
) {
798 wrlock_finish(it
++, mut
, &ni
);
800 need_issue
.insert(static_cast<CInode
*>(lock
->get_parent()));
802 issue_caps_set(need_issue
);
807 void Locker::eval_gather(SimpleLock
*lock
, bool first
, bool *pneed_issue
, MDSContext::vec
*pfinishers
)
809 dout(10) << "eval_gather " << *lock
<< " on " << *lock
->get_parent() << dendl
;
810 ceph_assert(!lock
->is_stable());
812 int next
= lock
->get_next_state();
815 bool caps
= lock
->get_cap_shift();
816 if (lock
->get_type() != CEPH_LOCK_DN
)
817 in
= static_cast<CInode
*>(lock
->get_parent());
819 bool need_issue
= false;
821 int loner_issued
= 0, other_issued
= 0, xlocker_issued
= 0;
822 ceph_assert(!caps
|| in
!= NULL
);
823 if (caps
&& in
->is_head()) {
824 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
,
825 lock
->get_cap_shift(), lock
->get_cap_mask());
826 dout(10) << " next state is " << lock
->get_state_name(next
)
827 << " issued/allows loner " << gcap_string(loner_issued
)
828 << "/" << gcap_string(lock
->gcaps_allowed(CAP_LONER
, next
))
829 << " xlocker " << gcap_string(xlocker_issued
)
830 << "/" << gcap_string(lock
->gcaps_allowed(CAP_XLOCKER
, next
))
831 << " other " << gcap_string(other_issued
)
832 << "/" << gcap_string(lock
->gcaps_allowed(CAP_ANY
, next
))
835 if (first
&& ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) ||
836 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) ||
837 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
)))
841 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
842 bool auth
= lock
->get_parent()->is_auth();
843 if (!lock
->is_gathering() &&
844 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_rdlock
, auth
) || !lock
->is_rdlocked()) &&
845 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_wrlock
, auth
) || !lock
->is_wrlocked()) &&
846 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_xlock
, auth
) || !lock
->is_xlocked()) &&
847 (IS_TRUE_AND_LT_AUTH(lock
->get_sm()->states
[next
].can_lease
, auth
) || !lock
->is_leased()) &&
848 !(lock
->get_parent()->is_auth() && lock
->is_flushing()) && // i.e. wait for scatter_writebehind!
849 (!caps
|| ((~lock
->gcaps_allowed(CAP_ANY
, next
) & other_issued
) == 0 &&
850 (~lock
->gcaps_allowed(CAP_LONER
, next
) & loner_issued
) == 0 &&
851 (~lock
->gcaps_allowed(CAP_XLOCKER
, next
) & xlocker_issued
) == 0)) &&
852 lock
->get_state() != LOCK_SYNC_MIX2
&& // these states need an explicit trigger from the auth mds
853 lock
->get_state() != LOCK_MIX_SYNC2
855 dout(7) << "eval_gather finished gather on " << *lock
856 << " on " << *lock
->get_parent() << dendl
;
858 if (lock
->get_sm() == &sm_filelock
) {
860 if (in
->state_test(CInode::STATE_RECOVERING
)) {
861 dout(7) << "eval_gather finished gather, but still recovering" << dendl
;
863 } else if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
864 dout(7) << "eval_gather finished gather, but need to recover" << dendl
;
865 mds
->mdcache
->queue_file_recover(in
);
866 mds
->mdcache
->do_file_recover();
871 if (!lock
->get_parent()->is_auth()) {
872 // replica: tell auth
873 mds_rank_t auth
= lock
->get_parent()->authority().first
;
875 if (lock
->get_parent()->is_rejoining() &&
876 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
877 dout(7) << "eval_gather finished gather, but still rejoining "
878 << *lock
->get_parent() << dendl
;
882 if (!mds
->is_cluster_degraded() ||
883 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
884 switch (lock
->get_state()) {
886 mds
->send_message_mds(MLock::create(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid()), auth
);
891 auto reply
= MLock::create(lock
, LOCK_AC_SYNCACK
, mds
->get_nodeid());
892 lock
->encode_locked_state(reply
->get_data());
893 mds
->send_message_mds(reply
, auth
);
894 next
= LOCK_MIX_SYNC2
;
895 (static_cast<ScatterLock
*>(lock
))->start_flush();
900 (static_cast<ScatterLock
*>(lock
))->finish_flush();
901 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
904 // do nothing, we already acked
909 auto reply
= MLock::create(lock
, LOCK_AC_MIXACK
, mds
->get_nodeid());
910 mds
->send_message_mds(reply
, auth
);
911 next
= LOCK_SYNC_MIX2
;
918 lock
->encode_locked_state(data
);
919 mds
->send_message_mds(MLock::create(lock
, LOCK_AC_LOCKACK
, mds
->get_nodeid(), data
), auth
);
920 (static_cast<ScatterLock
*>(lock
))->start_flush();
921 // we'll get an AC_LOCKFLUSHED to complete
932 // once the first (local) stage of mix->lock gather complete we can
933 // gather from replicas
934 if (lock
->get_state() == LOCK_MIX_LOCK
&&
935 lock
->get_parent()->is_replicated()) {
936 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl
;
937 send_lock_message(lock
, LOCK_AC_LOCK
);
939 lock
->set_state(LOCK_MIX_LOCK2
);
943 if (lock
->is_dirty() && !lock
->is_flushed()) {
944 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
948 lock
->clear_flushed();
950 switch (lock
->get_state()) {
956 in
->start_scatter(static_cast<ScatterLock
*>(lock
));
957 if (lock
->get_parent()->is_replicated()) {
959 lock
->encode_locked_state(softdata
);
960 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
962 (static_cast<ScatterLock
*>(lock
))->clear_scatter_wanted();
967 if (next
!= LOCK_SYNC
)
976 if (lock
->get_parent()->is_replicated()) {
978 lock
->encode_locked_state(softdata
);
979 send_lock_message(lock
, LOCK_AC_SYNC
, softdata
);
986 lock
->set_state(next
);
988 if (lock
->get_parent()->is_auth() &&
990 lock
->get_parent()->auth_unpin(lock
);
992 // drop loner before doing waiters
996 in
->get_wanted_loner() != in
->get_loner()) {
997 dout(10) << " trying to drop loner" << dendl
;
998 if (in
->try_drop_loner()) {
999 dout(10) << " dropped loner" << dendl
;
1005 lock
->take_waiting(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
,
1008 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
|SimpleLock::WAIT_XLOCK
);
1010 if (caps
&& in
->is_head())
1013 if (lock
->get_parent()->is_auth() &&
1015 try_eval(lock
, &need_issue
);
1020 *pneed_issue
= true;
1021 else if (in
->is_head())
1027 bool Locker::eval(CInode
*in
, int mask
, bool caps_imported
)
1029 bool need_issue
= caps_imported
;
1030 MDSContext::vec finishers
;
1032 dout(10) << "eval " << mask
<< " " << *in
<< dendl
;
1035 if (in
->is_auth() && in
->is_head()) {
1036 client_t orig_loner
= in
->get_loner();
1037 if (in
->choose_ideal_loner()) {
1038 dout(10) << "eval set loner: client." << orig_loner
<< " -> client." << in
->get_loner() << dendl
;
1041 } else if (in
->get_wanted_loner() != in
->get_loner()) {
1042 dout(10) << "eval want loner: client." << in
->get_wanted_loner() << " but failed to set it" << dendl
;
1048 if (mask
& CEPH_LOCK_IFILE
)
1049 eval_any(&in
->filelock
, &need_issue
, &finishers
, caps_imported
);
1050 if (mask
& CEPH_LOCK_IAUTH
)
1051 eval_any(&in
->authlock
, &need_issue
, &finishers
, caps_imported
);
1052 if (mask
& CEPH_LOCK_ILINK
)
1053 eval_any(&in
->linklock
, &need_issue
, &finishers
, caps_imported
);
1054 if (mask
& CEPH_LOCK_IXATTR
)
1055 eval_any(&in
->xattrlock
, &need_issue
, &finishers
, caps_imported
);
1056 if (mask
& CEPH_LOCK_INEST
)
1057 eval_any(&in
->nestlock
, &need_issue
, &finishers
, caps_imported
);
1058 if (mask
& CEPH_LOCK_IFLOCK
)
1059 eval_any(&in
->flocklock
, &need_issue
, &finishers
, caps_imported
);
1060 if (mask
& CEPH_LOCK_IPOLICY
)
1061 eval_any(&in
->policylock
, &need_issue
, &finishers
, caps_imported
);
1064 if (in
->is_auth() && in
->is_head() && in
->get_wanted_loner() != in
->get_loner()) {
1065 if (in
->try_drop_loner()) {
1067 if (in
->get_wanted_loner() >= 0) {
1068 dout(10) << "eval end set loner to client." << in
->get_loner() << dendl
;
1069 bool ok
= in
->try_set_loner();
1077 finish_contexts(g_ceph_context
, finishers
);
1079 if (need_issue
&& in
->is_head())
1082 dout(10) << "eval done" << dendl
;
1086 class C_Locker_Eval
: public LockerContext
{
1090 C_Locker_Eval(Locker
*l
, MDSCacheObject
*pp
, int m
) : LockerContext(l
), p(pp
), mask(m
) {
1091 // We are used as an MDSCacheObject waiter, so should
1092 // only be invoked by someone already holding the big lock.
1093 ceph_assert(locker
->mds
->mds_lock
.is_locked_by_me());
1094 p
->get(MDSCacheObject::PIN_PTRWAITER
);
1096 void finish(int r
) override
{
1097 locker
->try_eval(p
, mask
);
1098 p
->put(MDSCacheObject::PIN_PTRWAITER
);
1102 void Locker::try_eval(MDSCacheObject
*p
, int mask
)
1104 // unstable and ambiguous auth?
1105 if (p
->is_ambiguous_auth()) {
1106 dout(7) << "try_eval ambiguous auth, waiting on " << *p
<< dendl
;
1107 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, mask
));
1111 if (p
->is_auth() && p
->is_frozen()) {
1112 dout(7) << "try_eval frozen, waiting on " << *p
<< dendl
;
1113 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, mask
));
1117 if (mask
& CEPH_LOCK_DN
) {
1118 ceph_assert(mask
== CEPH_LOCK_DN
);
1119 bool need_issue
= false; // ignore this, no caps on dentries
1120 CDentry
*dn
= static_cast<CDentry
*>(p
);
1121 eval_any(&dn
->lock
, &need_issue
);
1123 CInode
*in
= static_cast<CInode
*>(p
);
1128 void Locker::try_eval(SimpleLock
*lock
, bool *pneed_issue
)
1130 MDSCacheObject
*p
= lock
->get_parent();
1132 // unstable and ambiguous auth?
1133 if (p
->is_ambiguous_auth()) {
1134 dout(7) << "try_eval " << *lock
<< " ambiguousauth, waiting on " << *p
<< dendl
;
1135 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_Locker_Eval(this, p
, lock
->get_type()));
1139 if (!p
->is_auth()) {
1140 dout(7) << "try_eval " << *lock
<< " not auth for " << *p
<< dendl
;
1144 if (p
->is_frozen()) {
1145 dout(7) << "try_eval " << *lock
<< " frozen, waiting on " << *p
<< dendl
;
1146 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1151 * We could have a situation like:
1153 * - mds A authpins item on mds B
1154 * - mds B starts to freeze tree containing item
1155 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1156 * - mds B lock is unstable, sets scatter_wanted
1157 * - mds B lock stabilizes, calls try_eval.
1159 * We can defer while freezing without causing a deadlock. Honor
1160 * scatter_wanted flag here. This will never get deferred by the
1161 * checks above due to the auth_pin held by the master.
1163 if (lock
->is_scatterlock()) {
1164 ScatterLock
*slock
= static_cast<ScatterLock
*>(lock
);
1165 if (slock
->get_scatter_wanted() &&
1166 slock
->get_state() != LOCK_MIX
) {
1167 scatter_mix(slock
, pneed_issue
);
1168 if (!lock
->is_stable())
1170 } else if (slock
->get_unscatter_wanted() &&
1171 slock
->get_state() != LOCK_LOCK
) {
1172 simple_lock(slock
, pneed_issue
);
1173 if (!lock
->is_stable()) {
1179 if (lock
->get_type() != CEPH_LOCK_DN
&&
1180 lock
->get_type() != CEPH_LOCK_ISNAP
&&
1182 dout(7) << "try_eval " << *lock
<< " freezing, waiting on " << *p
<< dendl
;
1183 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, new C_Locker_Eval(this, p
, lock
->get_type()));
1187 eval(lock
, pneed_issue
);
1190 void Locker::eval_cap_gather(CInode
*in
, set
<CInode
*> *issue_set
)
1192 bool need_issue
= false;
1193 MDSContext::vec finishers
;
1196 if (!in
->filelock
.is_stable())
1197 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1198 if (!in
->authlock
.is_stable())
1199 eval_gather(&in
->authlock
, false, &need_issue
, &finishers
);
1200 if (!in
->linklock
.is_stable())
1201 eval_gather(&in
->linklock
, false, &need_issue
, &finishers
);
1202 if (!in
->xattrlock
.is_stable())
1203 eval_gather(&in
->xattrlock
, false, &need_issue
, &finishers
);
1205 if (need_issue
&& in
->is_head()) {
1207 issue_set
->insert(in
);
1212 finish_contexts(g_ceph_context
, finishers
);
1215 void Locker::eval_scatter_gathers(CInode
*in
)
1217 bool need_issue
= false;
1218 MDSContext::vec finishers
;
1220 dout(10) << "eval_scatter_gathers " << *in
<< dendl
;
1223 if (!in
->filelock
.is_stable())
1224 eval_gather(&in
->filelock
, false, &need_issue
, &finishers
);
1225 if (!in
->nestlock
.is_stable())
1226 eval_gather(&in
->nestlock
, false, &need_issue
, &finishers
);
1227 if (!in
->dirfragtreelock
.is_stable())
1228 eval_gather(&in
->dirfragtreelock
, false, &need_issue
, &finishers
);
1230 if (need_issue
&& in
->is_head())
1233 finish_contexts(g_ceph_context
, finishers
);
1236 void Locker::eval(SimpleLock
*lock
, bool *need_issue
)
1238 switch (lock
->get_type()) {
1239 case CEPH_LOCK_IFILE
:
1240 return file_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1241 case CEPH_LOCK_IDFT
:
1242 case CEPH_LOCK_INEST
:
1243 return scatter_eval(static_cast<ScatterLock
*>(lock
), need_issue
);
1245 return simple_eval(lock
, need_issue
);
1250 // ------------------
1253 bool Locker::_rdlock_kick(SimpleLock
*lock
, bool as_anon
)
1256 if (lock
->is_stable()) {
1257 if (lock
->get_parent()->is_auth()) {
1258 if (lock
->get_sm() == &sm_scatterlock
) {
1259 // not until tempsync is fully implemented
1260 //if (lock->get_parent()->is_replicated())
1261 //scatter_tempsync((ScatterLock*)lock);
1264 } else if (lock
->get_sm() == &sm_filelock
) {
1265 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1266 if (lock
->get_state() == LOCK_EXCL
&&
1267 in
->get_target_loner() >= 0 &&
1268 !in
->is_dir() && !as_anon
) // as_anon => caller wants SYNC, not XSYN
1276 // request rdlock state change from auth
1277 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1278 if (!mds
->is_cluster_degraded() ||
1279 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1280 dout(10) << "requesting rdlock from auth on "
1281 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1282 mds
->send_message_mds(MLock::create(lock
, LOCK_AC_REQRDLOCK
, mds
->get_nodeid()), auth
);
1287 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
1288 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1289 if (in
->state_test(CInode::STATE_RECOVERING
)) {
1290 mds
->mdcache
->recovery_queue
.prioritize(in
);
1297 bool Locker::rdlock_try(SimpleLock
*lock
, client_t client
, MDSContext
*con
)
1299 dout(7) << "rdlock_try on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1301 // can read? grab ref.
1302 if (lock
->can_rdlock(client
))
1305 _rdlock_kick(lock
, false);
1307 if (lock
->can_rdlock(client
))
1312 dout(7) << "rdlock_try waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1313 lock
->add_waiter(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_RD
, con
);
1318 bool Locker::rdlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool as_anon
)
1320 dout(7) << "rdlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1322 // client may be allowed to rdlock the same item it has xlocked.
1323 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1324 if (mut
->snapid
!= CEPH_NOSNAP
)
1326 client_t client
= as_anon
? -1 : mut
->get_client();
1329 if (lock
->get_type() != CEPH_LOCK_DN
)
1330 in
= static_cast<CInode
*>(lock
->get_parent());
1333 if (!lock->get_parent()->is_auth() &&
1334 lock->fw_rdlock_to_auth()) {
1335 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1341 // can read? grab ref.
1342 if (lock
->can_rdlock(client
)) {
1344 mut
->locks
.emplace_hint(mut
->locks
.end(), lock
, MutationImpl::LockOp::RDLOCK
);
1348 // hmm, wait a second.
1349 if (in
&& !in
->is_head() && in
->is_auth() &&
1350 lock
->get_state() == LOCK_SNAP_SYNC
) {
1351 // okay, we actually need to kick the head's lock to get ourselves synced up.
1352 CInode
*head
= mdcache
->get_inode(in
->ino());
1354 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
1355 if (hlock
->get_state() == LOCK_SYNC
)
1356 hlock
= head
->get_lock(lock
->get_type());
1358 if (hlock
->get_state() != LOCK_SYNC
) {
1359 dout(10) << "rdlock_start trying head inode " << *head
<< dendl
;
1360 if (!rdlock_start(hlock
, mut
, true)) // ** as_anon, no rdlock on EXCL **
1362 // oh, check our lock again then
1366 if (!_rdlock_kick(lock
, as_anon
))
1372 if (lock
->get_parent()->is_auth() && lock
->is_stable())
1373 wait_on
= SimpleLock::WAIT_RD
;
1375 wait_on
= SimpleLock::WAIT_STABLE
; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1376 dout(7) << "rdlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1377 lock
->add_waiter(wait_on
, new C_MDS_RetryRequest(mdcache
, mut
));
1382 void Locker::nudge_log(SimpleLock
*lock
)
1384 dout(10) << "nudge_log " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1385 if (lock
->get_parent()->is_auth() && lock
->is_unstable_and_locked()) // as with xlockdone, or cap flush
1386 mds
->mdlog
->flush();
1389 void Locker::rdlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1391 ceph_assert(it
->is_rdlock());
1392 SimpleLock
*lock
= it
->lock
;
1396 mut
->locks
.erase(it
);
1398 dout(7) << "rdlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1401 if (!lock
->is_rdlocked()) {
1402 if (!lock
->is_stable())
1403 eval_gather(lock
, false, pneed_issue
);
1404 else if (lock
->get_parent()->is_auth())
1405 try_eval(lock
, pneed_issue
);
1410 bool Locker::can_rdlock_set(MutationImpl::LockOpVec
& lov
)
1412 dout(10) << "can_rdlock_set " << dendl
;
1413 for (const auto& p
: lov
) {
1414 ceph_assert(p
.is_rdlock());
1415 if (!p
.lock
->can_rdlock(-1)) {
1416 dout(10) << "can_rdlock_set can't rdlock " << *p
<< " on " << *p
.lock
->get_parent() << dendl
;
1424 void Locker::rdlock_take_set(MutationImpl::LockOpVec
& lov
, MutationRef
& mut
)
1426 dout(10) << "rdlock_take_set " << dendl
;
1427 for (const auto& p
: lov
) {
1428 ceph_assert(p
.is_rdlock());
1429 p
.lock
->get_rdlock();
1430 mut
->locks
.emplace(p
.lock
, MutationImpl::LockOp::RDLOCK
);
1434 // ------------------
1437 void Locker::wrlock_force(SimpleLock
*lock
, MutationRef
& mut
)
1439 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1440 lock
->get_type() == CEPH_LOCK_DVERSION
)
1441 return local_wrlock_grab(static_cast<LocalLock
*>(lock
), mut
);
1443 dout(7) << "wrlock_force on " << *lock
1444 << " on " << *lock
->get_parent() << dendl
;
1445 lock
->get_wrlock(true);
1446 mut
->locks
.emplace(lock
, MutationImpl::LockOp::WRLOCK
);
1449 bool Locker::wrlock_start(SimpleLock
*lock
, MDRequestRef
& mut
, bool nowait
)
1451 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1452 lock
->get_type() == CEPH_LOCK_DVERSION
)
1453 return local_wrlock_start(static_cast<LocalLock
*>(lock
), mut
);
1455 dout(10) << "wrlock_start " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1457 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1458 client_t client
= mut
->get_client();
1459 bool want_scatter
= !nowait
&& lock
->get_parent()->is_auth() &&
1460 (in
->has_subtree_or_exporting_dirfrag() ||
1461 static_cast<ScatterLock
*>(lock
)->get_scatter_wanted());
1465 if (lock
->can_wrlock(client
) &&
1466 (!want_scatter
|| lock
->get_state() == LOCK_MIX
)) {
1468 auto it
= mut
->locks
.emplace_hint(mut
->locks
.end(), lock
, MutationImpl::LockOp::WRLOCK
);
1469 it
->flags
|= MutationImpl::LockOp::WRLOCK
; // may already remote_wrlocked
1473 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1474 in
->state_test(CInode::STATE_RECOVERING
)) {
1475 mds
->mdcache
->recovery_queue
.prioritize(in
);
1478 if (!lock
->is_stable())
1481 if (in
->is_auth()) {
1482 // don't do nested lock state change if we have dirty scatterdata and
1483 // may scatter_writebehind or start_scatter, because nowait==true implies
1484 // that the caller already has a log entry open!
1485 if (nowait
&& lock
->is_dirty())
1489 scatter_mix(static_cast<ScatterLock
*>(lock
));
1493 if (nowait
&& !lock
->can_wrlock(client
))
1498 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1499 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1500 if (!mds
->is_cluster_degraded() ||
1501 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1502 dout(10) << "requesting scatter from auth on "
1503 << *lock
<< " on " << *lock
->get_parent() << dendl
;
1504 mds
->send_message_mds(MLock::create(lock
, LOCK_AC_REQSCATTER
, mds
->get_nodeid()), auth
);
1511 dout(7) << "wrlock_start waiting on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1512 lock
->add_waiter(SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1519 void Locker::wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1521 ceph_assert(it
->is_wrlock());
1522 SimpleLock
* lock
= it
->lock
;
1524 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1525 lock
->get_type() == CEPH_LOCK_DVERSION
)
1526 return local_wrlock_finish(it
, mut
);
1528 dout(7) << "wrlock_finish on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1531 if (it
->is_remote_wrlock())
1534 mut
->locks
.erase(it
);
1536 if (!lock
->is_wrlocked()) {
1537 if (!lock
->is_stable())
1538 eval_gather(lock
, false, pneed_issue
);
1539 else if (lock
->get_parent()->is_auth())
1540 try_eval(lock
, pneed_issue
);
1547 void Locker::remote_wrlock_start(SimpleLock
*lock
, mds_rank_t target
, MDRequestRef
& mut
)
1549 dout(7) << "remote_wrlock_start mds." << target
<< " on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1551 // wait for active target
1552 if (mds
->is_cluster_degraded() &&
1553 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(target
)) {
1554 dout(7) << " mds." << target
<< " is not active" << dendl
;
1555 if (mut
->more()->waiting_on_slave
.empty())
1556 mds
->wait_for_active_peer(target
, new C_MDS_RetryRequest(mdcache
, mut
));
1560 // send lock request
1561 mut
->start_locking(lock
, target
);
1562 mut
->more()->slaves
.insert(target
);
1563 auto r
= MMDSSlaveRequest::create(mut
->reqid
, mut
->attempt
, MMDSSlaveRequest::OP_WRLOCK
);
1564 r
->set_lock_type(lock
->get_type());
1565 lock
->get_parent()->set_object_info(r
->get_object_info());
1566 mds
->send_message_mds(r
, target
);
1568 ceph_assert(mut
->more()->waiting_on_slave
.count(target
) == 0);
1569 mut
->more()->waiting_on_slave
.insert(target
);
1572 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
1574 ceph_assert(it
->is_remote_wrlock());
1575 SimpleLock
*lock
= it
->lock
;
1576 mds_rank_t target
= it
->wrlock_target
;
1578 if (it
->is_wrlock())
1579 it
->clear_remote_wrlock();
1581 mut
->locks
.erase(it
);
1583 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1584 << " " << *lock
->get_parent() << dendl
;
1585 if (!mds
->is_cluster_degraded() ||
1586 mds
->mdsmap
->get_state(target
) >= MDSMap::STATE_REJOIN
) {
1587 auto slavereq
= MMDSSlaveRequest::create(mut
->reqid
, mut
->attempt
, MMDSSlaveRequest::OP_UNWRLOCK
);
1588 slavereq
->set_lock_type(lock
->get_type());
1589 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1590 mds
->send_message_mds(slavereq
, target
);
1595 // ------------------
1598 bool Locker::xlock_start(SimpleLock
*lock
, MDRequestRef
& mut
)
1600 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1601 lock
->get_type() == CEPH_LOCK_DVERSION
)
1602 return local_xlock_start(static_cast<LocalLock
*>(lock
), mut
);
1604 dout(7) << "xlock_start on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
1605 client_t client
= mut
->get_client();
1607 CInode
*in
= nullptr;
1608 if (lock
->get_cap_shift())
1609 in
= static_cast<CInode
*>(lock
->get_parent());
1612 if (lock
->get_parent()->is_auth()) {
1615 if (lock
->can_xlock(client
) &&
1616 !(lock
->get_state() == LOCK_LOCK_XLOCK
&& // client is not xlocker or
1617 in
&& in
->issued_caps_need_gather(lock
))) { // xlocker does not hold shared cap
1618 lock
->set_state(LOCK_XLOCK
);
1619 lock
->get_xlock(mut
, client
);
1620 mut
->locks
.emplace_hint(mut
->locks
.end(), lock
, MutationImpl::LockOp::XLOCK
);
1621 mut
->finish_locking(lock
);
1625 if (lock
->get_type() == CEPH_LOCK_IFILE
&&
1626 in
->state_test(CInode::STATE_RECOVERING
)) {
1627 mds
->mdcache
->recovery_queue
.prioritize(in
);
1630 if (!lock
->is_stable() && (lock
->get_state() != LOCK_XLOCKDONE
||
1631 lock
->get_xlock_by_client() != client
||
1632 lock
->is_waiter_for(SimpleLock::WAIT_STABLE
)))
1635 if (lock
->get_state() == LOCK_LOCK
|| lock
->get_state() == LOCK_XLOCKDONE
) {
1636 mut
->start_locking(lock
);
1643 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
1648 ceph_assert(lock
->get_sm()->can_remote_xlock
);
1649 ceph_assert(!mut
->slave_request
);
1651 // wait for single auth
1652 if (lock
->get_parent()->is_ambiguous_auth()) {
1653 lock
->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
1654 new C_MDS_RetryRequest(mdcache
, mut
));
1658 // wait for active auth
1659 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1660 if (mds
->is_cluster_degraded() &&
1661 !mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
1662 dout(7) << " mds." << auth
<< " is not active" << dendl
;
1663 if (mut
->more()->waiting_on_slave
.empty())
1664 mds
->wait_for_active_peer(auth
, new C_MDS_RetryRequest(mdcache
, mut
));
1668 // send lock request
1669 mut
->more()->slaves
.insert(auth
);
1670 mut
->start_locking(lock
, auth
);
1671 auto r
= MMDSSlaveRequest::create(mut
->reqid
, mut
->attempt
, MMDSSlaveRequest::OP_XLOCK
);
1672 r
->set_lock_type(lock
->get_type());
1673 lock
->get_parent()->set_object_info(r
->get_object_info());
1674 mds
->send_message_mds(r
, auth
);
1676 ceph_assert(mut
->more()->waiting_on_slave
.count(auth
) == 0);
1677 mut
->more()->waiting_on_slave
.insert(auth
);
1683 void Locker::_finish_xlock(SimpleLock
*lock
, client_t xlocker
, bool *pneed_issue
)
1685 ceph_assert(!lock
->is_stable());
1686 if (lock
->get_type() != CEPH_LOCK_DN
&&
1687 lock
->get_type() != CEPH_LOCK_ISNAP
&&
1688 lock
->get_num_rdlocks() == 0 &&
1689 lock
->get_num_wrlocks() == 0 &&
1690 !lock
->is_leased() &&
1691 lock
->get_state() != LOCK_XLOCKSNAP
) {
1692 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1693 client_t loner
= in
->get_target_loner();
1694 if (loner
>= 0 && (xlocker
< 0 || xlocker
== loner
)) {
1695 lock
->set_state(LOCK_EXCL
);
1696 lock
->get_parent()->auth_unpin(lock
);
1697 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|SimpleLock::WAIT_WR
|SimpleLock::WAIT_RD
);
1698 if (lock
->get_cap_shift())
1699 *pneed_issue
= true;
1700 if (lock
->get_parent()->is_auth() &&
1702 try_eval(lock
, pneed_issue
);
1706 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1707 eval_gather(lock
, lock
->get_state() != LOCK_XLOCKSNAP
, pneed_issue
);
1710 void Locker::xlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
, bool *pneed_issue
)
1712 ceph_assert(it
->is_xlock());
1713 SimpleLock
*lock
= it
->lock
;
1715 if (lock
->get_type() == CEPH_LOCK_IVERSION
||
1716 lock
->get_type() == CEPH_LOCK_DVERSION
)
1717 return local_xlock_finish(it
, mut
);
1719 dout(10) << "xlock_finish on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1721 client_t xlocker
= lock
->get_xlock_by_client();
1726 mut
->locks
.erase(it
);
1728 bool do_issue
= false;
1731 if (!lock
->get_parent()->is_auth()) {
1732 ceph_assert(lock
->get_sm()->can_remote_xlock
);
1735 dout(7) << "xlock_finish releasing remote xlock on " << *lock
->get_parent() << dendl
;
1736 mds_rank_t auth
= lock
->get_parent()->authority().first
;
1737 if (!mds
->is_cluster_degraded() ||
1738 mds
->mdsmap
->get_state(auth
) >= MDSMap::STATE_REJOIN
) {
1739 auto slavereq
= MMDSSlaveRequest::create(mut
->reqid
, mut
->attempt
, MMDSSlaveRequest::OP_UNXLOCK
);
1740 slavereq
->set_lock_type(lock
->get_type());
1741 lock
->get_parent()->set_object_info(slavereq
->get_object_info());
1742 mds
->send_message_mds(slavereq
, auth
);
1745 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
1746 SimpleLock::WAIT_WR
|
1747 SimpleLock::WAIT_RD
, 0);
1749 if (lock
->get_num_xlocks() == 0 &&
1750 lock
->get_state() != LOCK_LOCK_XLOCK
) { // no one is taking xlock
1751 _finish_xlock(lock
, xlocker
, &do_issue
);
1756 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
1757 if (in
->is_head()) {
1759 *pneed_issue
= true;
1766 void Locker::xlock_export(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
1768 ceph_assert(it
->is_xlock());
1769 SimpleLock
*lock
= it
->lock
;
1770 dout(10) << "xlock_export on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1773 mut
->locks
.erase(it
);
1775 MDSCacheObject
*p
= lock
->get_parent();
1776 ceph_assert(p
->state_test(CInode::STATE_AMBIGUOUSAUTH
)); // we are exporting this (inode)
1778 if (!lock
->is_stable())
1779 lock
->get_parent()->auth_unpin(lock
);
1781 lock
->set_state(LOCK_LOCK
);
1784 void Locker::xlock_import(SimpleLock
*lock
)
1786 dout(10) << "xlock_import on " << *lock
<< " " << *lock
->get_parent() << dendl
;
1787 lock
->get_parent()->auth_pin(lock
);
1792 // file i/o -----------------------------------------
1794 version_t
Locker::issue_file_data_version(CInode
*in
)
1796 dout(7) << "issue_file_data_version on " << *in
<< dendl
;
1797 return in
->inode
.file_data_version
;
1800 class C_Locker_FileUpdate_finish
: public LockerLogContext
{
1805 MClientCaps::ref ack
;
1807 C_Locker_FileUpdate_finish(Locker
*l
, CInode
*i
, MutationRef
& m
, unsigned f
,
1808 const MClientCaps::ref
&ack
, client_t c
=-1)
1809 : LockerLogContext(l
), in(i
), mut(m
), flags(f
), client(c
), ack(ack
) {
1810 in
->get(CInode::PIN_PTRWAITER
);
1812 void finish(int r
) override
{
1813 locker
->file_update_finish(in
, mut
, flags
, client
, ack
);
1814 in
->put(CInode::PIN_PTRWAITER
);
1819 UPDATE_SHAREMAX
= 1,
1820 UPDATE_NEEDSISSUE
= 2,
1821 UPDATE_SNAPFLUSH
= 4,
1824 void Locker::file_update_finish(CInode
*in
, MutationRef
& mut
, unsigned flags
,
1825 client_t client
, const MClientCaps::ref
&ack
)
1827 dout(10) << "file_update_finish on " << *in
<< dendl
;
1828 in
->pop_and_dirty_projected_inode(mut
->ls
);
1833 Session
*session
= mds
->get_session(client
);
1835 // "oldest flush tid" > 0 means client uses unique TID for each flush
1836 if (ack
->get_oldest_flush_tid() > 0)
1837 session
->add_completed_flush(ack
->get_client_tid());
1838 mds
->send_message_client_counted(ack
, session
);
1840 dout(10) << " no session for client." << client
<< " " << *ack
<< dendl
;
1844 set
<CInode
*> need_issue
;
1845 drop_locks(mut
.get(), &need_issue
);
1847 if (in
->is_head()) {
1848 if ((flags
& UPDATE_NEEDSISSUE
) && need_issue
.count(in
) == 0) {
1849 Capability
*cap
= in
->get_client_cap(client
);
1850 if (cap
&& (cap
->wanted() & ~cap
->pending()))
1851 issue_caps(in
, cap
);
1854 if ((flags
& UPDATE_SHAREMAX
) && in
->is_auth() &&
1855 (in
->filelock
.gcaps_allowed(CAP_LONER
) & (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)))
1856 share_inode_max_size(in
);
1858 } else if ((flags
& UPDATE_SNAPFLUSH
) && !in
->client_snap_caps
.empty()) {
1859 dout(10) << " client_snap_caps " << in
->client_snap_caps
<< dendl
;
1860 // check for snap writeback completion
1861 bool gather
= false;
1862 auto p
= in
->client_snap_caps
.begin();
1863 while (p
!= in
->client_snap_caps
.end()) {
1864 auto q
= p
->second
.find(client
);
1865 if (q
!= p
->second
.end()) {
1866 SimpleLock
*lock
= in
->get_lock(p
->first
);
1868 dout(10) << " completing client_snap_caps for " << ccap_string(p
->first
)
1869 << " lock " << *lock
<< " on " << *in
<< dendl
;
1873 if (p
->second
.empty()) {
1875 in
->client_snap_caps
.erase(p
++);
1881 if (in
->client_snap_caps
.empty()) {
1882 in
->item_open_file
.remove_myself();
1883 in
->item_caps
.remove_myself();
1885 eval_cap_gather(in
, &need_issue
);
1888 issue_caps_set(need_issue
);
1890 mds
->balancer
->hit_inode(in
, META_POP_IWR
);
1892 // auth unpin after issuing caps
1896 Capability
* Locker::issue_new_caps(CInode
*in
,
1902 dout(7) << "issue_new_caps for mode " << mode
<< " on " << *in
<< dendl
;
1905 // if replay, try to reconnect cap, and otherwise do nothing.
1907 return mds
->mdcache
->try_reconnect_cap(in
, session
);
1911 ceph_assert(session
->info
.inst
.name
.is_client());
1912 client_t my_client
= session
->get_client();
1913 int my_want
= ceph_caps_for_mode(mode
);
1915 // register a capability
1916 Capability
*cap
= in
->get_client_cap(my_client
);
1919 cap
= in
->add_client_cap(my_client
, session
, realm
);
1920 cap
->set_wanted(my_want
);
1922 cap
->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1926 // make sure it wants sufficient caps
1927 if (my_want
& ~cap
->wanted()) {
1928 // augment wanted caps for this client
1929 cap
->set_wanted(cap
->wanted() | my_want
);
1933 if (in
->is_auth()) {
1934 // [auth] twiddle mode?
1935 eval(in
, CEPH_CAP_LOCKS
);
1937 if (_need_flush_mdlog(in
, my_want
))
1938 mds
->mdlog
->flush();
1941 // [replica] tell auth about any new caps wanted
1942 request_inode_file_caps(in
);
1945 // issue caps (pot. incl new one)
1946 //issue_caps(in); // note: _eval above may have done this already...
1948 // re-issue whatever we can
1949 //cap->issue(cap->pending());
1952 cap
->dec_suppress();
1958 void Locker::issue_caps_set(set
<CInode
*>& inset
)
1960 for (set
<CInode
*>::iterator p
= inset
.begin(); p
!= inset
.end(); ++p
)
1964 int Locker::issue_caps(CInode
*in
, Capability
*only_cap
)
1966 // allowed caps are determined by the lock mode.
1967 int all_allowed
= in
->get_caps_allowed_by_type(CAP_ANY
);
1968 int loner_allowed
= in
->get_caps_allowed_by_type(CAP_LONER
);
1969 int xlocker_allowed
= in
->get_caps_allowed_by_type(CAP_XLOCKER
);
1971 client_t loner
= in
->get_loner();
1973 dout(7) << "issue_caps loner client." << loner
1974 << " allowed=" << ccap_string(loner_allowed
)
1975 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1976 << ", others allowed=" << ccap_string(all_allowed
)
1977 << " on " << *in
<< dendl
;
1979 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed
)
1980 << ", xlocker allowed=" << ccap_string(xlocker_allowed
)
1981 << " on " << *in
<< dendl
;
1984 ceph_assert(in
->is_head());
1986 // count conflicts with
1990 map
<client_t
, Capability
>::iterator it
;
1992 it
= in
->client_caps
.find(only_cap
->get_client());
1994 it
= in
->client_caps
.begin();
1995 for (; it
!= in
->client_caps
.end(); ++it
) {
1996 Capability
*cap
= &it
->second
;
1997 if (cap
->is_stale())
2000 // do not issue _new_ bits when size|mtime is projected
2002 if (loner
== it
->first
)
2003 allowed
= loner_allowed
;
2005 allowed
= all_allowed
;
2007 // add in any xlocker-only caps (for locks this client is the xlocker for)
2008 allowed
|= xlocker_allowed
& in
->get_xlocker_mask(it
->first
);
2010 if ((in
->inode
.inline_data
.version
!= CEPH_INLINE_NONE
&&
2011 cap
->is_noinline()) ||
2012 (!in
->inode
.layout
.pool_ns
.empty() &&
2013 cap
->is_nopoolns()))
2014 allowed
&= ~(CEPH_CAP_FILE_RD
| CEPH_CAP_FILE_WR
);
2016 int pending
= cap
->pending();
2017 int wanted
= cap
->wanted();
2019 dout(20) << " client." << it
->first
2020 << " pending " << ccap_string(pending
)
2021 << " allowed " << ccap_string(allowed
)
2022 << " wanted " << ccap_string(wanted
)
2025 if (!(pending
& ~allowed
)) {
2026 // skip if suppress or new, and not revocation
2027 if (cap
->is_new() || cap
->is_suppress()) {
2028 dout(20) << " !revoke and new|suppressed, skipping client." << it
->first
<< dendl
;
2033 // notify clients about deleted inode, to make sure they release caps ASAP.
2034 if (in
->inode
.nlink
== 0)
2035 wanted
|= CEPH_CAP_LINK_SHARED
;
2037 // are there caps that the client _wants_ and can have, but aren't pending?
2038 // or do we need to revoke?
2039 if (((wanted
& allowed
) & ~pending
) || // missing wanted+allowed caps
2040 (pending
& ~allowed
)) { // need to revoke ~allowed caps.
2044 // include caps that clients generally like, while we're at it.
2045 int likes
= in
->get_caps_liked();
2046 int before
= pending
;
2048 if (pending
& ~allowed
)
2049 seq
= cap
->issue((wanted
|likes
) & allowed
& pending
); // if revoking, don't issue anything new.
2051 seq
= cap
->issue((wanted
|likes
) & allowed
);
2052 int after
= cap
->pending();
2054 if (cap
->is_new()) {
2055 // haven't send caps to client yet
2056 if (before
& ~after
)
2057 cap
->confirm_receipt(seq
, after
);
2059 dout(7) << " sending MClientCaps to client." << it
->first
2060 << " seq " << cap
->get_last_seq()
2061 << " new pending " << ccap_string(after
) << " was " << ccap_string(before
)
2064 int op
= (before
& ~after
) ? CEPH_CAP_OP_REVOKE
: CEPH_CAP_OP_GRANT
;
2065 if (op
== CEPH_CAP_OP_REVOKE
) {
2066 revoking_caps
.push_back(&cap
->item_revoking_caps
);
2067 revoking_caps_by_client
[cap
->get_client()].push_back(&cap
->item_client_revoking_caps
);
2068 cap
->set_last_revoke_stamp(ceph_clock_now());
2069 cap
->reset_num_revoke_warnings();
2072 auto m
= MClientCaps::create(op
, in
->ino(),
2073 in
->find_snaprealm()->inode
->ino(),
2075 cap
->get_last_seq(),
2078 mds
->get_osd_epoch_barrier());
2079 in
->encode_cap_message(m
, cap
);
2081 mds
->send_message_client_counted(m
, cap
->get_session());
2092 void Locker::issue_truncate(CInode
*in
)
2094 dout(7) << "issue_truncate on " << *in
<< dendl
;
2096 for (auto &p
: in
->client_caps
) {
2097 Capability
*cap
= &p
.second
;
2098 auto m
= MClientCaps::create(CEPH_CAP_OP_TRUNC
,
2100 in
->find_snaprealm()->inode
->ino(),
2101 cap
->get_cap_id(), cap
->get_last_seq(),
2102 cap
->pending(), cap
->wanted(), 0,
2104 mds
->get_osd_epoch_barrier());
2105 in
->encode_cap_message(m
, cap
);
2106 mds
->send_message_client_counted(m
, p
.first
);
2109 // should we increase max_size?
2110 if (in
->is_auth() && in
->is_file())
2111 check_inode_max_size(in
);
2114 void Locker::revoke_stale_caps(Session
*session
)
2116 dout(10) << "revoke_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2118 std::vector
<CInode
*> to_eval
;
2120 for (auto p
= session
->caps
.begin(); !p
.end(); ) {
2121 Capability
*cap
= *p
;
2123 if (!cap
->is_notable()) {
2124 // the rest ones are not being revoked and don't have writeable range
2125 // and don't want exclusive caps or want file read/write. They don't
2126 // need recover, they don't affect eval_gather()/try_eval()
2130 int issued
= cap
->issued();
2131 if (!(issued
& ~CEPH_CAP_PIN
))
2134 CInode
*in
= cap
->get_inode();
2135 dout(10) << " revoking " << ccap_string(issued
) << " on " << *in
<< dendl
;
2138 if (in
->is_auth() &&
2139 in
->inode
.client_ranges
.count(cap
->get_client()))
2140 in
->state_set(CInode::STATE_NEEDSRECOVER
);
2142 // eval lock/inode may finish contexts, which may modify other cap's position
2143 // in the session->caps.
2144 to_eval
.push_back(in
);
2147 // invalidate the rest
2148 session
->inc_cap_gen();
2150 for (auto in
: to_eval
) {
2151 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
))
2154 if (!in
->filelock
.is_stable())
2155 eval_gather(&in
->filelock
);
2156 if (!in
->linklock
.is_stable())
2157 eval_gather(&in
->linklock
);
2158 if (!in
->authlock
.is_stable())
2159 eval_gather(&in
->authlock
);
2160 if (!in
->xattrlock
.is_stable())
2161 eval_gather(&in
->xattrlock
);
2164 try_eval(in
, CEPH_CAP_LOCKS
);
2166 request_inode_file_caps(in
);
2170 void Locker::resume_stale_caps(Session
*session
)
2172 dout(10) << "resume_stale_caps for " << session
->info
.inst
.name
<< dendl
;
2174 bool lazy
= session
->info
.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED
);
2175 for (xlist
<Capability
*>::iterator p
= session
->caps
.begin(); !p
.end(); ) {
2176 Capability
*cap
= *p
;
2178 if (lazy
&& !cap
->is_notable())
2179 break; // see revoke_stale_caps()
2181 CInode
*in
= cap
->get_inode();
2182 ceph_assert(in
->is_head());
2183 dout(10) << " clearing stale flag on " << *in
<< dendl
;
2185 if (in
->state_test(CInode::STATE_EXPORTINGCAPS
)) {
2186 // if export succeeds, the cap will be removed. if export fails,
2187 // we need to re-issue the cap if it's not stale.
2188 in
->state_set(CInode::STATE_EVALSTALECAPS
);
2192 if (!in
->is_auth() || !eval(in
, CEPH_CAP_LOCKS
))
2193 issue_caps(in
, cap
);
2197 void Locker::remove_stale_leases(Session
*session
)
2199 dout(10) << "remove_stale_leases for " << session
->info
.inst
.name
<< dendl
;
2200 xlist
<ClientLease
*>::iterator p
= session
->leases
.begin();
2202 ClientLease
*l
= *p
;
2204 CDentry
*parent
= static_cast<CDentry
*>(l
->parent
);
2205 dout(15) << " removing lease on " << *parent
<< dendl
;
2206 parent
->remove_client_lease(l
, this);
2211 class C_MDL_RequestInodeFileCaps
: public LockerContext
{
2214 C_MDL_RequestInodeFileCaps(Locker
*l
, CInode
*i
) : LockerContext(l
), in(i
) {
2215 in
->get(CInode::PIN_PTRWAITER
);
2217 void finish(int r
) override
{
2219 locker
->request_inode_file_caps(in
);
2220 in
->put(CInode::PIN_PTRWAITER
);
2224 void Locker::request_inode_file_caps(CInode
*in
)
2226 ceph_assert(!in
->is_auth());
2228 int wanted
= in
->get_caps_wanted() & in
->get_caps_allowed_ever() & ~CEPH_CAP_PIN
;
2229 if (wanted
!= in
->replica_caps_wanted
) {
2230 // wait for single auth
2231 if (in
->is_ambiguous_auth()) {
2232 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
,
2233 new C_MDL_RequestInodeFileCaps(this, in
));
2237 mds_rank_t auth
= in
->authority().first
;
2238 if (mds
->is_cluster_degraded() &&
2239 mds
->mdsmap
->get_state(auth
) == MDSMap::STATE_REJOIN
) {
2240 mds
->wait_for_active_peer(auth
, new C_MDL_RequestInodeFileCaps(this, in
));
2244 dout(7) << "request_inode_file_caps " << ccap_string(wanted
)
2245 << " was " << ccap_string(in
->replica_caps_wanted
)
2246 << " on " << *in
<< " to mds." << auth
<< dendl
;
2248 in
->replica_caps_wanted
= wanted
;
2250 if (!mds
->is_cluster_degraded() ||
2251 mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
))
2252 mds
->send_message_mds(MInodeFileCaps::create(in
->ino(), in
->replica_caps_wanted
), auth
);
2256 void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref
&m
)
2258 // nobody should be talking to us during recovery.
2259 if (mds
->get_state() < MDSMap::STATE_CLIENTREPLAY
) {
2260 if (mds
->get_want_state() >= MDSMap::STATE_CLIENTREPLAY
) {
2261 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2264 ceph_abort_msg("got unexpected message during recovery");
2268 CInode
*in
= mdcache
->get_inode(m
->get_ino());
2269 mds_rank_t from
= mds_rank_t(m
->get_source().num());
2272 ceph_assert(in
->is_auth());
2274 dout(7) << "handle_inode_file_caps replica mds." << from
<< " wants caps " << ccap_string(m
->get_caps()) << " on " << *in
<< dendl
;
2276 in
->set_mds_caps_wanted(from
, m
->get_caps());
2278 try_eval(in
, CEPH_CAP_LOCKS
);
2282 class C_MDL_CheckMaxSize
: public LockerContext
{
2284 uint64_t new_max_size
;
2289 C_MDL_CheckMaxSize(Locker
*l
, CInode
*i
, uint64_t _new_max_size
,
2290 uint64_t _newsize
, utime_t _mtime
) :
2291 LockerContext(l
), in(i
),
2292 new_max_size(_new_max_size
), newsize(_newsize
), mtime(_mtime
)
2294 in
->get(CInode::PIN_PTRWAITER
);
2296 void finish(int r
) override
{
2298 locker
->check_inode_max_size(in
, false, new_max_size
, newsize
, mtime
);
2299 in
->put(CInode::PIN_PTRWAITER
);
2303 uint64_t Locker::calc_new_max_size(CInode::mempool_inode
*pi
, uint64_t size
)
2305 uint64_t new_max
= (size
+ 1) << 1;
2306 uint64_t max_inc
= g_conf()->mds_client_writeable_range_max_inc_objs
;
2308 max_inc
*= pi
->layout
.object_size
;
2309 new_max
= std::min(new_max
, size
+ max_inc
);
2311 return round_up_to(new_max
, pi
->get_layout_size_increment());
2314 void Locker::calc_new_client_ranges(CInode
*in
, uint64_t size
, bool update
,
2315 CInode::mempool_inode::client_range_map
*new_ranges
,
2316 bool *max_increased
)
2318 auto latest
= in
->get_projected_inode();
2320 if (latest
->has_layout()) {
2321 ms
= calc_new_max_size(latest
, size
);
2323 // Layout-less directories like ~mds0/, have zero size
2327 // increase ranges as appropriate.
2328 // shrink to 0 if no WR|BUFFER caps issued.
2329 for (auto &p
: in
->client_caps
) {
2330 if ((p
.second
.issued() | p
.second
.wanted()) & CEPH_CAP_ANY_FILE_WR
) {
2331 client_writeable_range_t
& nr
= (*new_ranges
)[p
.first
];
2333 if (latest
->client_ranges
.count(p
.first
)) {
2334 client_writeable_range_t
& oldr
= latest
->client_ranges
[p
.first
];
2335 if (ms
> oldr
.range
.last
)
2336 *max_increased
= true;
2337 nr
.range
.last
= std::max(ms
, oldr
.range
.last
);
2338 nr
.follows
= oldr
.follows
;
2340 *max_increased
= true;
2342 nr
.follows
= in
->first
- 1;
2345 p
.second
.mark_clientwriteable();
2348 p
.second
.clear_clientwriteable();
2353 bool Locker::check_inode_max_size(CInode
*in
, bool force_wrlock
,
2354 uint64_t new_max_size
, uint64_t new_size
,
2357 ceph_assert(in
->is_auth());
2358 ceph_assert(in
->is_file());
2360 CInode::mempool_inode
*latest
= in
->get_projected_inode();
2361 CInode::mempool_inode::client_range_map new_ranges
;
2362 uint64_t size
= latest
->size
;
2363 bool update_size
= new_size
> 0;
2364 bool update_max
= false;
2365 bool max_increased
= false;
2368 new_size
= size
= std::max(size
, new_size
);
2369 new_mtime
= std::max(new_mtime
, latest
->mtime
);
2370 if (latest
->size
== new_size
&& latest
->mtime
== new_mtime
)
2371 update_size
= false;
2375 if (in
->is_frozen()) {
2377 } else if (!force_wrlock
&& !in
->filelock
.can_wrlock(in
->get_loner())) {
2379 if (in
->filelock
.is_stable()) {
2380 if (in
->get_target_loner() >= 0)
2381 file_excl(&in
->filelock
);
2383 simple_lock(&in
->filelock
);
2385 if (!in
->filelock
.can_wrlock(in
->get_loner()))
2389 calc_new_client_ranges(in
, std::max(new_max_size
, size
), can_update
> 0,
2390 &new_ranges
, &max_increased
);
2392 if (max_increased
|| latest
->client_ranges
!= new_ranges
)
2395 if (!update_size
&& !update_max
) {
2396 dout(20) << "check_inode_max_size no-op on " << *in
<< dendl
;
2400 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2401 << " update_size " << update_size
2402 << " on " << *in
<< dendl
;
2404 if (can_update
< 0) {
2405 auto cms
= new C_MDL_CheckMaxSize(this, in
, new_max_size
, new_size
, new_mtime
);
2406 if (can_update
== -1) {
2407 dout(10) << "check_inode_max_size frozen, waiting on " << *in
<< dendl
;
2408 in
->add_waiter(CInode::WAIT_UNFREEZE
, cms
);
2410 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
2411 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in
<< dendl
;
2416 MutationRef
mut(new MutationImpl());
2417 mut
->ls
= mds
->mdlog
->get_current_segment();
2419 auto &pi
= in
->project_inode();
2420 pi
.inode
.version
= in
->pre_dirty();
2423 dout(10) << "check_inode_max_size client_ranges " << pi
.inode
.client_ranges
<< " -> " << new_ranges
<< dendl
;
2424 pi
.inode
.client_ranges
= new_ranges
;
2428 dout(10) << "check_inode_max_size size " << pi
.inode
.size
<< " -> " << new_size
<< dendl
;
2429 pi
.inode
.size
= new_size
;
2430 pi
.inode
.rstat
.rbytes
= new_size
;
2431 dout(10) << "check_inode_max_size mtime " << pi
.inode
.mtime
<< " -> " << new_mtime
<< dendl
;
2432 pi
.inode
.mtime
= new_mtime
;
2433 if (new_mtime
> pi
.inode
.ctime
) {
2434 pi
.inode
.ctime
= new_mtime
;
2435 if (new_mtime
> pi
.inode
.rstat
.rctime
)
2436 pi
.inode
.rstat
.rctime
= new_mtime
;
2440 // use EOpen if the file is still open; otherwise, use EUpdate.
2441 // this is just an optimization to push open files forward into
2442 // newer log segments.
2444 EMetaBlob
*metablob
;
2445 if (in
->is_any_caps_wanted() && in
->last
== CEPH_NOSNAP
) {
2446 EOpen
*eo
= new EOpen(mds
->mdlog
);
2447 eo
->add_ino(in
->ino());
2448 metablob
= &eo
->metablob
;
2451 EUpdate
*eu
= new EUpdate(mds
->mdlog
, "check_inode_max_size");
2452 metablob
= &eu
->metablob
;
2455 mds
->mdlog
->start_entry(le
);
2456 if (update_size
) { // FIXME if/when we do max_size nested accounting
2457 mdcache
->predirty_journal_parents(mut
, metablob
, in
, 0, PREDIRTY_PRIMARY
);
2459 CDentry
*parent
= in
->get_projected_parent_dn();
2460 metablob
->add_primary_dentry(parent
, in
, true);
2462 metablob
->add_dir_context(in
->get_projected_parent_dn()->get_dir());
2463 mdcache
->journal_dirty_inode(mut
.get(), metablob
, in
);
2465 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
,
2466 UPDATE_SHAREMAX
, MClientCaps::ref()));
2467 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
2470 // make max_size _increase_ timely
2472 mds
->mdlog
->flush();
2478 void Locker::share_inode_max_size(CInode
*in
, Capability
*only_cap
)
2481 * only share if currently issued a WR cap. if client doesn't have it,
2482 * file_max doesn't matter, and the client will get it if/when they get
2485 dout(10) << "share_inode_max_size on " << *in
<< dendl
;
2486 map
<client_t
, Capability
>::iterator it
;
2488 it
= in
->client_caps
.find(only_cap
->get_client());
2490 it
= in
->client_caps
.begin();
2491 for (; it
!= in
->client_caps
.end(); ++it
) {
2492 const client_t client
= it
->first
;
2493 Capability
*cap
= &it
->second
;
2494 if (cap
->is_suppress())
2496 if (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
)) {
2497 dout(10) << "share_inode_max_size with client." << client
<< dendl
;
2498 cap
->inc_last_seq();
2499 auto m
= MClientCaps::create(CEPH_CAP_OP_GRANT
,
2501 in
->find_snaprealm()->inode
->ino(),
2503 cap
->get_last_seq(),
2507 mds
->get_osd_epoch_barrier());
2508 in
->encode_cap_message(m
, cap
);
2509 mds
->send_message_client_counted(m
, client
);
2516 bool Locker::_need_flush_mdlog(CInode
*in
, int wanted
)
2518 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2519 * pending mutations. */
2520 if (((wanted
& (CEPH_CAP_FILE_RD
|CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_SHARED
|CEPH_CAP_FILE_EXCL
)) &&
2521 in
->filelock
.is_unstable_and_locked()) ||
2522 ((wanted
& (CEPH_CAP_AUTH_SHARED
|CEPH_CAP_AUTH_EXCL
)) &&
2523 in
->authlock
.is_unstable_and_locked()) ||
2524 ((wanted
& (CEPH_CAP_LINK_SHARED
|CEPH_CAP_LINK_EXCL
)) &&
2525 in
->linklock
.is_unstable_and_locked()) ||
2526 ((wanted
& (CEPH_CAP_XATTR_SHARED
|CEPH_CAP_XATTR_EXCL
)) &&
2527 in
->xattrlock
.is_unstable_and_locked()))
2532 void Locker::adjust_cap_wanted(Capability
*cap
, int wanted
, int issue_seq
)
2534 if (ceph_seq_cmp(issue_seq
, cap
->get_last_issue()) == 0) {
2535 dout(10) << " wanted " << ccap_string(cap
->wanted())
2536 << " -> " << ccap_string(wanted
) << dendl
;
2537 cap
->set_wanted(wanted
);
2538 } else if (wanted
& ~cap
->wanted()) {
2539 dout(10) << " wanted " << ccap_string(cap
->wanted())
2540 << " -> " << ccap_string(wanted
)
2541 << " (added caps even though we had seq mismatch!)" << dendl
;
2542 cap
->set_wanted(wanted
| cap
->wanted());
2544 dout(10) << " NOT changing wanted " << ccap_string(cap
->wanted())
2545 << " -> " << ccap_string(wanted
)
2546 << " (issue_seq " << issue_seq
<< " != last_issue "
2547 << cap
->get_last_issue() << ")" << dendl
;
2551 CInode
*cur
= cap
->get_inode();
2552 if (!cur
->is_auth()) {
2553 request_inode_file_caps(cur
);
2557 if (cap
->wanted()) {
2558 if (cur
->state_test(CInode::STATE_RECOVERING
) &&
2559 (cap
->wanted() & (CEPH_CAP_FILE_RD
|
2560 CEPH_CAP_FILE_WR
))) {
2561 mds
->mdcache
->recovery_queue
.prioritize(cur
);
2564 if (mdcache
->open_file_table
.should_log_open(cur
)) {
2565 ceph_assert(cur
->last
== CEPH_NOSNAP
);
2566 EOpen
*le
= new EOpen(mds
->mdlog
);
2567 mds
->mdlog
->start_entry(le
);
2568 le
->add_clean_inode(cur
);
2569 mds
->mdlog
->submit_entry(le
);
2574 void Locker::snapflush_nudge(CInode
*in
)
2576 ceph_assert(in
->last
!= CEPH_NOSNAP
);
2577 if (in
->client_snap_caps
.empty())
2580 CInode
*head
= mdcache
->get_inode(in
->ino());
2581 // head inode gets unpinned when snapflush starts. It might get trimmed
2582 // before snapflush finishes.
2586 ceph_assert(head
->is_auth());
2587 if (head
->client_need_snapflush
.empty())
2590 SimpleLock
*hlock
= head
->get_lock(CEPH_LOCK_IFILE
);
2591 if (hlock
->get_state() == LOCK_SYNC
|| !hlock
->is_stable()) {
2593 for (int i
= 0; i
< num_cinode_locks
; i
++) {
2594 SimpleLock
*lock
= head
->get_lock(cinode_lock_info
[i
].lock
);
2595 if (lock
->get_state() != LOCK_SYNC
&& lock
->is_stable()) {
2602 _rdlock_kick(hlock
, true);
2604 // also, requeue, in case of unstable lock
2605 need_snapflush_inodes
.push_back(&in
->item_caps
);
2609 void Locker::mark_need_snapflush_inode(CInode
*in
)
2611 ceph_assert(in
->last
!= CEPH_NOSNAP
);
2612 if (!in
->item_caps
.is_on_list()) {
2613 need_snapflush_inodes
.push_back(&in
->item_caps
);
2614 utime_t now
= ceph_clock_now();
2615 in
->last_dirstat_prop
= now
;
2616 dout(10) << "mark_need_snapflush_inode " << *in
<< " - added at " << now
<< dendl
;
2620 void Locker::_do_null_snapflush(CInode
*head_in
, client_t client
, snapid_t last
)
2622 dout(10) << "_do_null_snapflush client." << client
<< " on " << *head_in
<< dendl
;
2623 for (auto p
= head_in
->client_need_snapflush
.begin();
2624 p
!= head_in
->client_need_snapflush
.end() && p
->first
< last
; ) {
2625 snapid_t snapid
= p
->first
;
2626 auto &clients
= p
->second
;
2627 ++p
; // be careful, q loop below depends on this
2629 if (clients
.count(client
)) {
2630 dout(10) << " doing async NULL snapflush on " << snapid
<< " from client." << client
<< dendl
;
2631 CInode
*sin
= mdcache
->pick_inode_snap(head_in
, snapid
- 1);
2633 ceph_assert(sin
->first
<= snapid
);
2634 _do_snap_update(sin
, snapid
, 0, sin
->first
- 1, client
, MClientCaps::ref(), MClientCaps::ref());
2635 head_in
->remove_need_snapflush(sin
, snapid
, client
);
2641 bool Locker::should_defer_client_cap_frozen(CInode
*in
)
2644 * This policy needs to be AT LEAST as permissive as allowing a client request
2645 * to go forward, or else a client request can release something, the release
2646 * gets deferred, but the request gets processed and deadlocks because when the
2647 * caps can't get revoked.
2649 * Currently, a request wait if anything locked is freezing (can't
2650 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2651 * _MUST_ be in the lock/auth_pin set.
2653 * auth_pins==0 implies no unstable lock and not auth pinnned by
2654 * client request, otherwise continue even it's freezing.
2656 return (in
->is_freezing() && in
->get_num_auth_pins() == 0) || in
->is_frozen();
2659 void Locker::handle_client_caps(const MClientCaps::const_ref
&m
)
2661 client_t client
= m
->get_source().num();
2662 snapid_t follows
= m
->get_snap_follows();
2663 auto op
= m
->get_op();
2664 auto dirty
= m
->get_dirty();
2665 dout(7) << "handle_client_caps "
2666 << " on " << m
->get_ino()
2667 << " tid " << m
->get_client_tid() << " follows " << follows
2668 << " op " << ceph_cap_op_name(op
)
2669 << " flags 0x" << std::hex
<< m
->flags
<< std::dec
<< dendl
;
2671 Session
*session
= mds
->get_session(m
);
2672 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
2674 dout(5) << " no session, dropping " << *m
<< dendl
;
2677 if (session
->is_closed() ||
2678 session
->is_closing() ||
2679 session
->is_killing()) {
2680 dout(7) << " session closed|closing|killing, dropping " << *m
<< dendl
;
2683 if ((mds
->is_reconnect() || mds
->get_want_state() == MDSMap::STATE_RECONNECT
) &&
2684 dirty
&& m
->get_client_tid() > 0 &&
2685 !session
->have_completed_flush(m
->get_client_tid())) {
2686 mdcache
->set_reconnected_dirty_caps(client
, m
->get_ino(), dirty
,
2687 op
== CEPH_CAP_OP_FLUSHSNAP
);
2689 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
2693 if (m
->get_client_tid() > 0 && session
&&
2694 session
->have_completed_flush(m
->get_client_tid())) {
2695 dout(7) << "handle_client_caps already flushed tid " << m
->get_client_tid()
2696 << " for client." << client
<< dendl
;
2697 MClientCaps::ref ack
;
2698 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2699 ack
= MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK
, m
->get_ino(), 0, 0, 0, 0, 0, dirty
, 0, mds
->get_osd_epoch_barrier());
2701 ack
= MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK
, m
->get_ino(), 0, m
->get_cap_id(), m
->get_seq(), m
->get_caps(), 0, dirty
, 0, mds
->get_osd_epoch_barrier());
2703 ack
->set_snap_follows(follows
);
2704 ack
->set_client_tid(m
->get_client_tid());
2705 mds
->send_message_client_counted(ack
, m
->get_connection());
2706 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2709 // fall-thru because the message may release some caps
2711 op
= CEPH_CAP_OP_UPDATE
;
2715 // "oldest flush tid" > 0 means client uses unique TID for each flush
2716 if (m
->get_oldest_flush_tid() > 0 && session
) {
2717 if (session
->trim_completed_flushes(m
->get_oldest_flush_tid())) {
2718 mds
->mdlog
->get_current_segment()->touched_sessions
.insert(session
->info
.inst
.name
);
2720 if (session
->get_num_trim_flushes_warnings() > 0 &&
2721 session
->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes
)
2722 session
->reset_num_trim_flushes_warnings();
2724 if (session
->get_num_completed_flushes() >=
2725 (g_conf()->mds_max_completed_flushes
<< session
->get_num_trim_flushes_warnings())) {
2726 session
->inc_num_trim_flushes_warnings();
2728 ss
<< "client." << session
->get_client() << " does not advance its oldest_flush_tid ("
2729 << m
->get_oldest_flush_tid() << "), "
2730 << session
->get_num_completed_flushes()
2731 << " completed flushes recorded in session";
2732 mds
->clog
->warn() << ss
.str();
2733 dout(20) << __func__
<< " " << ss
.str() << dendl
;
2738 CInode
*head_in
= mdcache
->get_inode(m
->get_ino());
2740 if (mds
->is_clientreplay()) {
2741 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino()
2742 << ", will try again after replayed client requests" << dendl
;
2743 mdcache
->wait_replay_cap_reconnect(m
->get_ino(), new C_MDS_RetryMessage(mds
, m
));
2748 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2749 * Sequence of events that cause this are:
2750 * - client sends caps message to mds.a
2751 * - mds finishes subtree migration, send cap export to client
2752 * - mds trim its cache
2753 * - mds receives cap messages from client
2755 dout(7) << "handle_client_caps on unknown ino " << m
->get_ino() << ", dropping" << dendl
;
2759 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
2760 // Pause RADOS operations until we see the required epoch
2761 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
2764 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
2765 // Record the barrier so that we will retransmit it to clients
2766 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
2769 dout(10) << " head inode " << *head_in
<< dendl
;
2771 Capability
*cap
= 0;
2772 cap
= head_in
->get_client_cap(client
);
2774 dout(7) << "handle_client_caps no cap for client." << client
<< " on " << *head_in
<< dendl
;
2780 if (should_defer_client_cap_frozen(head_in
)) {
2781 dout(7) << "handle_client_caps freezing|frozen on " << *head_in
<< dendl
;
2782 head_in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_MDS_RetryMessage(mds
, m
));
2785 if (ceph_seq_cmp(m
->get_mseq(), cap
->get_mseq()) < 0) {
2786 dout(7) << "handle_client_caps mseq " << m
->get_mseq() << " < " << cap
->get_mseq()
2787 << ", dropping" << dendl
;
2791 bool need_unpin
= false;
2794 if (op
== CEPH_CAP_OP_FLUSHSNAP
) {
2795 if (!head_in
->is_auth()) {
2796 dout(7) << " not auth, ignoring flushsnap on " << *head_in
<< dendl
;
2800 SnapRealm
*realm
= head_in
->find_snaprealm();
2801 snapid_t snap
= realm
->get_snap_following(follows
);
2802 dout(10) << " flushsnap follows " << follows
<< " -> snap " << snap
<< dendl
;
2804 auto p
= head_in
->client_need_snapflush
.begin();
2805 if (p
!= head_in
->client_need_snapflush
.end() && p
->first
< snap
) {
2806 head_in
->auth_pin(this); // prevent subtree frozen
2808 _do_null_snapflush(head_in
, client
, snap
);
2811 CInode
*in
= head_in
;
2812 if (snap
!= CEPH_NOSNAP
) {
2813 in
= mdcache
->pick_inode_snap(head_in
, snap
- 1);
2815 dout(10) << " snapped inode " << *in
<< dendl
;
2818 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2819 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2820 // case we get a dup response, so whatever.)
2821 MClientCaps::ref ack
;
2823 ack
= MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK
, in
->ino(), 0, 0, 0, 0, 0, dirty
, 0, mds
->get_osd_epoch_barrier());
2824 ack
->set_snap_follows(follows
);
2825 ack
->set_client_tid(m
->get_client_tid());
2826 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2829 if (in
== head_in
||
2830 (head_in
->client_need_snapflush
.count(snap
) &&
2831 head_in
->client_need_snapflush
[snap
].count(client
))) {
2832 dout(7) << " flushsnap snap " << snap
2833 << " client." << client
<< " on " << *in
<< dendl
;
2835 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2837 cap
->client_follows
= snap
< CEPH_NOSNAP
? snap
: realm
->get_newest_seq();
2839 _do_snap_update(in
, snap
, dirty
, follows
, client
, m
, ack
);
2842 head_in
->remove_need_snapflush(in
, snap
, client
);
2844 dout(7) << " not expecting flushsnap " << snap
<< " from client." << client
<< " on " << *in
<< dendl
;
2846 mds
->send_message_client_counted(ack
, m
->get_connection());
2851 if (cap
->get_cap_id() != m
->get_cap_id()) {
2852 dout(7) << " ignoring client capid " << m
->get_cap_id() << " != my " << cap
->get_cap_id() << dendl
;
2854 CInode
*in
= head_in
;
2856 in
= mdcache
->pick_inode_snap(head_in
, follows
);
2857 // intermediate snap inodes
2858 while (in
!= head_in
) {
2859 ceph_assert(in
->last
!= CEPH_NOSNAP
);
2860 if (in
->is_auth() && dirty
) {
2861 dout(10) << " updating intermediate snapped inode " << *in
<< dendl
;
2862 _do_cap_update(in
, NULL
, dirty
, follows
, m
, MClientCaps::ref());
2864 in
= mdcache
->pick_inode_snap(head_in
, in
->last
);
2868 // head inode, and cap
2869 MClientCaps::ref ack
;
2871 int caps
= m
->get_caps();
2872 if (caps
& ~cap
->issued()) {
2873 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
2874 caps
&= cap
->issued();
2877 cap
->confirm_receipt(m
->get_seq(), caps
);
2878 dout(10) << " follows " << follows
2879 << " retains " << ccap_string(m
->get_caps())
2880 << " dirty " << ccap_string(dirty
)
2881 << " on " << *in
<< dendl
;
2884 // missing/skipped snapflush?
2885 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2886 // presently only does so when it has actual dirty metadata. But, we
2887 // set up the need_snapflush stuff based on the issued caps.
2888 // We can infer that the client WONT send a FLUSHSNAP once they have
2889 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2891 if (!head_in
->client_need_snapflush
.empty()) {
2892 if (!(cap
->issued() & CEPH_CAP_ANY_FILE_WR
) &&
2893 !(m
->flags
& MClientCaps::FLAG_PENDING_CAPSNAP
)) {
2894 head_in
->auth_pin(this); // prevent subtree frozen
2896 _do_null_snapflush(head_in
, client
);
2898 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl
;
2902 bool need_snapflush
= cap
->need_snapflush();
2903 if (dirty
&& in
->is_auth()) {
2904 dout(7) << " flush client." << client
<< " dirty " << ccap_string(dirty
)
2905 << " seq " << m
->get_seq() << " on " << *in
<< dendl
;
2906 ack
= MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK
, in
->ino(), 0, cap
->get_cap_id(), m
->get_seq(),
2907 m
->get_caps(), 0, dirty
, 0, mds
->get_osd_epoch_barrier());
2908 ack
->set_client_tid(m
->get_client_tid());
2909 ack
->set_oldest_flush_tid(m
->get_oldest_flush_tid());
2911 // client flushes and releases caps at the same time. make sure MDCache::cow_inode()
2912 // properly setup CInode::client_need_snapflush
2913 if ((dirty
& ~cap
->issued()) && !need_snapflush
)
2914 cap
->mark_needsnapflush();
2917 // filter wanted based on what we could ever give out (given auth/replica status)
2918 bool need_flush
= m
->flags
& MClientCaps::FLAG_SYNC
;
2919 int new_wanted
= m
->get_wanted();
2920 if (new_wanted
!= cap
->wanted()) {
2921 if (!need_flush
&& in
->is_auth() && (new_wanted
& ~cap
->pending())) {
2922 // exapnding caps. make sure we aren't waiting for a log flush
2923 need_flush
= _need_flush_mdlog(head_in
, new_wanted
& ~cap
->pending());
2926 adjust_cap_wanted(cap
, new_wanted
, m
->get_issue_seq());
2929 bool updated
= in
->is_auth() &&
2930 _do_cap_update(in
, cap
, dirty
, follows
, m
, ack
, &need_flush
);
2932 if (cap
->need_snapflush() &&
2933 (!need_snapflush
|| !(m
->flags
& MClientCaps::FLAG_PENDING_CAPSNAP
)))
2934 cap
->clear_needsnapflush();
2937 eval(in
, CEPH_CAP_LOCKS
);
2939 if (!need_flush
&& (cap
->wanted() & ~cap
->pending()))
2940 need_flush
= _need_flush_mdlog(in
, cap
->wanted() & ~cap
->pending());
2942 // no update, ack now.
2944 mds
->send_message_client_counted(ack
, m
->get_connection());
2946 bool did_issue
= eval(in
, CEPH_CAP_LOCKS
);
2947 if (!did_issue
&& (cap
->wanted() & ~cap
->pending()))
2948 issue_caps(in
, cap
);
2950 if (cap
->get_last_seq() == 0 &&
2951 (cap
->pending() & (CEPH_CAP_FILE_WR
|CEPH_CAP_FILE_BUFFER
))) {
2952 share_inode_max_size(in
, cap
);
2957 mds
->mdlog
->flush();
2962 head_in
->auth_unpin(this);
2966 class C_Locker_RetryRequestCapRelease
: public LockerContext
{
2968 ceph_mds_request_release item
;
2970 C_Locker_RetryRequestCapRelease(Locker
*l
, client_t c
, const ceph_mds_request_release
& it
) :
2971 LockerContext(l
), client(c
), item(it
) { }
2972 void finish(int r
) override
{
2974 MDRequestRef null_ref
;
2975 locker
->process_request_cap_release(null_ref
, client
, item
, dname
);
2979 void Locker::process_request_cap_release(MDRequestRef
& mdr
, client_t client
, const ceph_mds_request_release
& item
,
2980 std::string_view dname
)
2982 inodeno_t ino
= (uint64_t)item
.ino
;
2983 uint64_t cap_id
= item
.cap_id
;
2984 int caps
= item
.caps
;
2985 int wanted
= item
.wanted
;
2987 int issue_seq
= item
.issue_seq
;
2988 int mseq
= item
.mseq
;
2990 CInode
*in
= mdcache
->get_inode(ino
);
2994 if (dname
.length()) {
2995 frag_t fg
= in
->pick_dirfrag(dname
);
2996 CDir
*dir
= in
->get_dirfrag(fg
);
2998 CDentry
*dn
= dir
->lookup(dname
);
3000 ClientLease
*l
= dn
->get_client_lease(client
);
3002 dout(10) << "process_cap_release removing lease on " << *dn
<< dendl
;
3003 dn
->remove_client_lease(l
, this);
3005 dout(7) << "process_cap_release client." << client
3006 << " doesn't have lease on " << *dn
<< dendl
;
3009 dout(7) << "process_cap_release client." << client
<< " released lease on dn "
3010 << dir
->dirfrag() << "/" << dname
<< " which dne" << dendl
;
3015 Capability
*cap
= in
->get_client_cap(client
);
3019 dout(10) << "process_cap_release client." << client
<< " " << ccap_string(caps
) << " on " << *in
3020 << (mdr
? "" : " (DEFERRED, no mdr)")
3023 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3024 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", dropping" << dendl
;
3028 if (cap
->get_cap_id() != cap_id
) {
3029 dout(7) << " cap_id " << cap_id
<< " != " << cap
->get_cap_id() << ", dropping" << dendl
;
3033 if (should_defer_client_cap_frozen(in
)) {
3034 dout(7) << " frozen, deferring" << dendl
;
3035 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_Locker_RetryRequestCapRelease(this, client
, item
));
3039 if (caps
& ~cap
->issued()) {
3040 dout(10) << " confirming not issued caps " << ccap_string(caps
& ~cap
->issued()) << dendl
;
3041 caps
&= cap
->issued();
3043 cap
->confirm_receipt(seq
, caps
);
3045 if (!in
->client_need_snapflush
.empty() &&
3046 (cap
->issued() & CEPH_CAP_ANY_FILE_WR
) == 0) {
3047 _do_null_snapflush(in
, client
);
3050 adjust_cap_wanted(cap
, wanted
, issue_seq
);
3053 cap
->inc_suppress();
3054 eval(in
, CEPH_CAP_LOCKS
);
3056 cap
->dec_suppress();
3058 // take note; we may need to reissue on this cap later
3060 mdr
->cap_releases
[in
->vino()] = cap
->get_last_seq();
3063 class C_Locker_RetryKickIssueCaps
: public LockerContext
{
3068 C_Locker_RetryKickIssueCaps(Locker
*l
, CInode
*i
, client_t c
, ceph_seq_t s
) :
3069 LockerContext(l
), in(i
), client(c
), seq(s
) {
3070 in
->get(CInode::PIN_PTRWAITER
);
3072 void finish(int r
) override
{
3073 locker
->kick_issue_caps(in
, client
, seq
);
3074 in
->put(CInode::PIN_PTRWAITER
);
3078 void Locker::kick_issue_caps(CInode
*in
, client_t client
, ceph_seq_t seq
)
3080 Capability
*cap
= in
->get_client_cap(client
);
3081 if (!cap
|| cap
->get_last_seq() != seq
)
3083 if (in
->is_frozen()) {
3084 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in
<< dendl
;
3085 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3086 new C_Locker_RetryKickIssueCaps(this, in
, client
, seq
));
3089 dout(10) << "kick_issue_caps released at current seq " << seq
3090 << ", reissuing" << dendl
;
3091 issue_caps(in
, cap
);
3094 void Locker::kick_cap_releases(MDRequestRef
& mdr
)
3096 client_t client
= mdr
->get_client();
3097 for (map
<vinodeno_t
,ceph_seq_t
>::iterator p
= mdr
->cap_releases
.begin();
3098 p
!= mdr
->cap_releases
.end();
3100 CInode
*in
= mdcache
->get_inode(p
->first
);
3103 kick_issue_caps(in
, client
, p
->second
);
3108 * m and ack might be NULL, so don't dereference them unless dirty != 0
3110 void Locker::_do_snap_update(CInode
*in
, snapid_t snap
, int dirty
, snapid_t follows
, client_t client
, const MClientCaps::const_ref
&m
, const MClientCaps::ref
&ack
)
3112 dout(10) << "_do_snap_update dirty " << ccap_string(dirty
)
3113 << " follows " << follows
<< " snap " << snap
3114 << " on " << *in
<< dendl
;
3116 if (snap
== CEPH_NOSNAP
) {
3117 // hmm, i guess snap was already deleted? just ack!
3118 dout(10) << " wow, the snap following " << follows
3119 << " was already deleted. nothing to record, just ack." << dendl
;
3121 mds
->send_message_client_counted(ack
, m
->get_connection());
3125 EUpdate
*le
= new EUpdate(mds
->mdlog
, "snap flush");
3126 mds
->mdlog
->start_entry(le
);
3127 MutationRef mut
= new MutationImpl();
3128 mut
->ls
= mds
->mdlog
->get_current_segment();
3130 // normal metadata updates that we can apply to the head as well.
3133 CInode::mempool_xattr_map
*px
= nullptr;
3134 bool xattrs
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3135 m
->xattrbl
.length() &&
3136 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3138 CInode::mempool_old_inode
*oi
= 0;
3139 if (in
->is_multiversion()) {
3140 oi
= in
->pick_old_inode(snap
);
3143 CInode::mempool_inode
*i
;
3145 dout(10) << " writing into old inode" << dendl
;
3146 auto &pi
= in
->project_inode();
3147 pi
.inode
.version
= in
->pre_dirty();
3148 if (snap
> oi
->first
)
3149 in
->split_old_inode(snap
);
3154 auto &pi
= in
->project_inode(xattrs
);
3155 pi
.inode
.version
= in
->pre_dirty();
3158 px
= pi
.xattrs
.get();
3161 _update_cap_fields(in
, dirty
, m
, i
);
3165 dout(7) << " xattrs v" << i
->xattr_version
<< " -> " << m
->head
.xattr_version
3166 << " len " << m
->xattrbl
.length() << dendl
;
3167 i
->xattr_version
= m
->head
.xattr_version
;
3168 auto p
= m
->xattrbl
.cbegin();
3173 auto it
= i
->client_ranges
.find(client
);
3174 if (it
!= i
->client_ranges
.end()) {
3175 if (in
->last
== snap
) {
3176 dout(10) << " removing client_range entirely" << dendl
;
3177 i
->client_ranges
.erase(it
);
3179 dout(10) << " client_range now follows " << snap
<< dendl
;
3180 it
->second
.follows
= snap
;
3186 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3187 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3189 // "oldest flush tid" > 0 means client uses unique TID for each flush
3190 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3191 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3192 ack
->get_oldest_flush_tid());
3194 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, UPDATE_SNAPFLUSH
,
3198 void Locker::_update_cap_fields(CInode
*in
, int dirty
, const MClientCaps::const_ref
&m
, CInode::mempool_inode
*pi
)
3203 /* m must be valid if there are dirty caps */
3205 uint64_t features
= m
->get_connection()->get_features();
3207 if (m
->get_ctime() > pi
->ctime
) {
3208 dout(7) << " ctime " << pi
->ctime
<< " -> " << m
->get_ctime()
3209 << " for " << *in
<< dendl
;
3210 pi
->ctime
= m
->get_ctime();
3211 if (m
->get_ctime() > pi
->rstat
.rctime
)
3212 pi
->rstat
.rctime
= m
->get_ctime();
3215 if ((features
& CEPH_FEATURE_FS_CHANGE_ATTR
) &&
3216 m
->get_change_attr() > pi
->change_attr
) {
3217 dout(7) << " change_attr " << pi
->change_attr
<< " -> " << m
->get_change_attr()
3218 << " for " << *in
<< dendl
;
3219 pi
->change_attr
= m
->get_change_attr();
3223 if (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)) {
3224 utime_t atime
= m
->get_atime();
3225 utime_t mtime
= m
->get_mtime();
3226 uint64_t size
= m
->get_size();
3227 version_t inline_version
= m
->inline_version
;
3229 if (((dirty
& CEPH_CAP_FILE_WR
) && mtime
> pi
->mtime
) ||
3230 ((dirty
& CEPH_CAP_FILE_EXCL
) && mtime
!= pi
->mtime
)) {
3231 dout(7) << " mtime " << pi
->mtime
<< " -> " << mtime
3232 << " for " << *in
<< dendl
;
3234 if (mtime
> pi
->rstat
.rctime
)
3235 pi
->rstat
.rctime
= mtime
;
3237 if (in
->inode
.is_file() && // ONLY if regular file
3239 dout(7) << " size " << pi
->size
<< " -> " << size
3240 << " for " << *in
<< dendl
;
3242 pi
->rstat
.rbytes
= size
;
3244 if (in
->inode
.is_file() &&
3245 (dirty
& CEPH_CAP_FILE_WR
) &&
3246 inline_version
> pi
->inline_data
.version
) {
3247 pi
->inline_data
.version
= inline_version
;
3248 if (inline_version
!= CEPH_INLINE_NONE
&& m
->inline_data
.length() > 0)
3249 pi
->inline_data
.get_data() = m
->inline_data
;
3251 pi
->inline_data
.free_data();
3253 if ((dirty
& CEPH_CAP_FILE_EXCL
) && atime
!= pi
->atime
) {
3254 dout(7) << " atime " << pi
->atime
<< " -> " << atime
3255 << " for " << *in
<< dendl
;
3258 if ((dirty
& CEPH_CAP_FILE_EXCL
) &&
3259 ceph_seq_cmp(pi
->time_warp_seq
, m
->get_time_warp_seq()) < 0) {
3260 dout(7) << " time_warp_seq " << pi
->time_warp_seq
<< " -> " << m
->get_time_warp_seq()
3261 << " for " << *in
<< dendl
;
3262 pi
->time_warp_seq
= m
->get_time_warp_seq();
3266 if (dirty
& CEPH_CAP_AUTH_EXCL
) {
3267 if (m
->head
.uid
!= pi
->uid
) {
3268 dout(7) << " uid " << pi
->uid
3269 << " -> " << m
->head
.uid
3270 << " for " << *in
<< dendl
;
3271 pi
->uid
= m
->head
.uid
;
3273 if (m
->head
.gid
!= pi
->gid
) {
3274 dout(7) << " gid " << pi
->gid
3275 << " -> " << m
->head
.gid
3276 << " for " << *in
<< dendl
;
3277 pi
->gid
= m
->head
.gid
;
3279 if (m
->head
.mode
!= pi
->mode
) {
3280 dout(7) << " mode " << oct
<< pi
->mode
3281 << " -> " << m
->head
.mode
<< dec
3282 << " for " << *in
<< dendl
;
3283 pi
->mode
= m
->head
.mode
;
3285 if ((features
& CEPH_FEATURE_FS_BTIME
) && m
->get_btime() != pi
->btime
) {
3286 dout(7) << " btime " << oct
<< pi
->btime
3287 << " -> " << m
->get_btime() << dec
3288 << " for " << *in
<< dendl
;
3289 pi
->btime
= m
->get_btime();
3295 * update inode based on cap flush|flushsnap|wanted.
3296 * adjust max_size, if needed.
3297 * if we update, return true; otherwise, false (no updated needed).
3299 bool Locker::_do_cap_update(CInode
*in
, Capability
*cap
,
3300 int dirty
, snapid_t follows
,
3301 const MClientCaps::const_ref
&m
, const MClientCaps::ref
&ack
,
3304 dout(10) << "_do_cap_update dirty " << ccap_string(dirty
)
3305 << " issued " << ccap_string(cap
? cap
->issued() : 0)
3306 << " wanted " << ccap_string(cap
? cap
->wanted() : 0)
3307 << " on " << *in
<< dendl
;
3308 ceph_assert(in
->is_auth());
3309 client_t client
= m
->get_source().num();
3310 CInode::mempool_inode
*latest
= in
->get_projected_inode();
3312 // increase or zero max_size?
3313 uint64_t size
= m
->get_size();
3314 bool change_max
= false;
3315 uint64_t old_max
= latest
->client_ranges
.count(client
) ? latest
->client_ranges
[client
].range
.last
: 0;
3316 uint64_t new_max
= old_max
;
3318 if (in
->is_file()) {
3319 bool forced_change_max
= false;
3320 dout(20) << "inode is file" << dendl
;
3321 if (cap
&& ((cap
->issued() | cap
->wanted()) & CEPH_CAP_ANY_FILE_WR
)) {
3322 dout(20) << "client has write caps; m->get_max_size="
3323 << m
->get_max_size() << "; old_max=" << old_max
<< dendl
;
3324 if (m
->get_max_size() > new_max
) {
3325 dout(10) << "client requests file_max " << m
->get_max_size()
3326 << " > max " << old_max
<< dendl
;
3328 forced_change_max
= true;
3329 new_max
= calc_new_max_size(latest
, m
->get_max_size());
3331 new_max
= calc_new_max_size(latest
, size
);
3333 if (new_max
> old_max
)
3345 if (in
->last
== CEPH_NOSNAP
&&
3347 !in
->filelock
.can_wrlock(client
) &&
3348 !in
->filelock
.can_force_wrlock(client
)) {
3349 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl
;
3350 if (in
->filelock
.is_stable()) {
3351 bool need_issue
= false;
3353 cap
->inc_suppress();
3354 if (in
->get_mds_caps_wanted().empty() &&
3355 (in
->get_loner() >= 0 || (in
->get_wanted_loner() >= 0 && in
->try_set_loner()))) {
3356 if (in
->filelock
.get_state() != LOCK_EXCL
)
3357 file_excl(&in
->filelock
, &need_issue
);
3359 simple_lock(&in
->filelock
, &need_issue
);
3363 cap
->dec_suppress();
3365 if (!in
->filelock
.can_wrlock(client
) &&
3366 !in
->filelock
.can_force_wrlock(client
)) {
3367 C_MDL_CheckMaxSize
*cms
= new C_MDL_CheckMaxSize(this, in
,
3368 forced_change_max
? new_max
: 0,
3371 in
->filelock
.add_waiter(SimpleLock::WAIT_STABLE
, cms
);
3377 if (m
->flockbl
.length()) {
3379 auto bli
= m
->flockbl
.cbegin();
3380 decode(num_locks
, bli
);
3381 for ( int i
=0; i
< num_locks
; ++i
) {
3382 ceph_filelock decoded_lock
;
3383 decode(decoded_lock
, bli
);
3384 in
->get_fcntl_lock_state()->held_locks
.
3385 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3386 ++in
->get_fcntl_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3388 decode(num_locks
, bli
);
3389 for ( int i
=0; i
< num_locks
; ++i
) {
3390 ceph_filelock decoded_lock
;
3391 decode(decoded_lock
, bli
);
3392 in
->get_flock_lock_state()->held_locks
.
3393 insert(pair
<uint64_t, ceph_filelock
>(decoded_lock
.start
, decoded_lock
));
3394 ++in
->get_flock_lock_state()->client_held_lock_counts
[(client_t
)(decoded_lock
.client
)];
3398 if (!dirty
&& !change_max
)
3401 Session
*session
= mds
->get_session(m
);
3402 if (session
->check_access(in
, MAY_WRITE
,
3403 m
->caller_uid
, m
->caller_gid
, NULL
, 0, 0) < 0) {
3404 dout(10) << "check_access failed, dropping cap update on " << *in
<< dendl
;
3409 EUpdate
*le
= new EUpdate(mds
->mdlog
, "cap update");
3410 mds
->mdlog
->start_entry(le
);
3412 bool xattr
= (dirty
& CEPH_CAP_XATTR_EXCL
) &&
3413 m
->xattrbl
.length() &&
3414 m
->head
.xattr_version
> in
->get_projected_inode()->xattr_version
;
3416 auto &pi
= in
->project_inode(xattr
);
3417 pi
.inode
.version
= in
->pre_dirty();
3419 MutationRef
mut(new MutationImpl());
3420 mut
->ls
= mds
->mdlog
->get_current_segment();
3422 _update_cap_fields(in
, dirty
, m
, &pi
.inode
);
3425 dout(7) << " max_size " << old_max
<< " -> " << new_max
3426 << " for " << *in
<< dendl
;
3428 auto &cr
= pi
.inode
.client_ranges
[client
];
3430 cr
.range
.last
= new_max
;
3431 cr
.follows
= in
->first
- 1;
3433 cap
->mark_clientwriteable();
3435 pi
.inode
.client_ranges
.erase(client
);
3437 cap
->clear_clientwriteable();
3441 if (change_max
|| (dirty
& (CEPH_CAP_FILE_EXCL
|CEPH_CAP_FILE_WR
)))
3442 wrlock_force(&in
->filelock
, mut
); // wrlock for duration of journal
3445 if (dirty
& CEPH_CAP_AUTH_EXCL
)
3446 wrlock_force(&in
->authlock
, mut
);
3450 dout(7) << " xattrs v" << pi
.inode
.xattr_version
<< " -> " << m
->head
.xattr_version
<< dendl
;
3451 pi
.inode
.xattr_version
= m
->head
.xattr_version
;
3452 auto p
= m
->xattrbl
.cbegin();
3453 decode(*pi
.xattrs
, p
);
3454 wrlock_force(&in
->xattrlock
, mut
);
3458 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
, 0, follows
);
3459 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
, follows
);
3461 // "oldest flush tid" > 0 means client uses unique TID for each flush
3462 if (ack
&& ack
->get_oldest_flush_tid() > 0)
3463 le
->metablob
.add_client_flush(metareqid_t(m
->get_source(), ack
->get_client_tid()),
3464 ack
->get_oldest_flush_tid());
3466 unsigned update_flags
= 0;
3468 update_flags
|= UPDATE_SHAREMAX
;
3470 update_flags
|= UPDATE_NEEDSISSUE
;
3471 mds
->mdlog
->submit_entry(le
, new C_Locker_FileUpdate_finish(this, in
, mut
, update_flags
,
3473 if (need_flush
&& !*need_flush
&&
3474 ((change_max
&& new_max
) || // max INCREASE
3475 _need_flush_mdlog(in
, dirty
)))
3481 void Locker::handle_client_cap_release(const MClientCapRelease::const_ref
&m
)
3483 client_t client
= m
->get_source().num();
3484 dout(10) << "handle_client_cap_release " << *m
<< dendl
;
3486 if (!mds
->is_clientreplay() && !mds
->is_active() && !mds
->is_stopping()) {
3487 mds
->wait_for_replay(new C_MDS_RetryMessage(mds
, m
));
3491 if (m
->osd_epoch_barrier
&& !mds
->objecter
->have_map(m
->osd_epoch_barrier
)) {
3492 // Pause RADOS operations until we see the required epoch
3493 mds
->objecter
->set_epoch_barrier(m
->osd_epoch_barrier
);
3496 if (mds
->get_osd_epoch_barrier() < m
->osd_epoch_barrier
) {
3497 // Record the barrier so that we will retransmit it to clients
3498 mds
->set_osd_epoch_barrier(m
->osd_epoch_barrier
);
3501 Session
*session
= mds
->get_session(m
);
3503 for (const auto &cap
: m
->caps
) {
3504 _do_cap_release(client
, inodeno_t((uint64_t)cap
.ino
) , cap
.cap_id
, cap
.migrate_seq
, cap
.seq
);
3508 session
->notify_cap_release(m
->caps
.size());
3512 class C_Locker_RetryCapRelease
: public LockerContext
{
3516 ceph_seq_t migrate_seq
;
3517 ceph_seq_t issue_seq
;
3519 C_Locker_RetryCapRelease(Locker
*l
, client_t c
, inodeno_t i
, uint64_t id
,
3520 ceph_seq_t mseq
, ceph_seq_t seq
) :
3521 LockerContext(l
), client(c
), ino(i
), cap_id(id
), migrate_seq(mseq
), issue_seq(seq
) {}
3522 void finish(int r
) override
{
3523 locker
->_do_cap_release(client
, ino
, cap_id
, migrate_seq
, issue_seq
);
3527 void Locker::_do_cap_release(client_t client
, inodeno_t ino
, uint64_t cap_id
,
3528 ceph_seq_t mseq
, ceph_seq_t seq
)
3530 CInode
*in
= mdcache
->get_inode(ino
);
3532 dout(7) << "_do_cap_release missing ino " << ino
<< dendl
;
3535 Capability
*cap
= in
->get_client_cap(client
);
3537 dout(7) << "_do_cap_release no cap for client" << client
<< " on "<< *in
<< dendl
;
3541 dout(7) << "_do_cap_release for client." << client
<< " on "<< *in
<< dendl
;
3542 if (cap
->get_cap_id() != cap_id
) {
3543 dout(7) << " capid " << cap_id
<< " != " << cap
->get_cap_id() << ", ignore" << dendl
;
3546 if (ceph_seq_cmp(mseq
, cap
->get_mseq()) < 0) {
3547 dout(7) << " mseq " << mseq
<< " < " << cap
->get_mseq() << ", ignore" << dendl
;
3550 if (should_defer_client_cap_frozen(in
)) {
3551 dout(7) << " freezing|frozen, deferring" << dendl
;
3552 in
->add_waiter(CInode::WAIT_UNFREEZE
,
3553 new C_Locker_RetryCapRelease(this, client
, ino
, cap_id
, mseq
, seq
));
3556 if (seq
!= cap
->get_last_issue()) {
3557 dout(7) << " issue_seq " << seq
<< " != " << cap
->get_last_issue() << dendl
;
3558 // clean out any old revoke history
3559 cap
->clean_revoke_from(seq
);
3560 eval_cap_gather(in
);
3563 remove_client_cap(in
, cap
);
3566 void Locker::remove_client_cap(CInode
*in
, Capability
*cap
)
3568 client_t client
= cap
->get_client();
3569 // clean out any pending snapflush state
3570 if (!in
->client_need_snapflush
.empty())
3571 _do_null_snapflush(in
, client
);
3573 bool notable
= cap
->is_notable();
3574 in
->remove_client_cap(client
);
3578 if (in
->is_auth()) {
3579 // make sure we clear out the client byte range
3580 if (in
->get_projected_inode()->client_ranges
.count(client
) &&
3581 !(in
->inode
.nlink
== 0 && !in
->is_any_caps())) // unless it's unlink + stray
3582 check_inode_max_size(in
);
3584 request_inode_file_caps(in
);
3587 try_eval(in
, CEPH_CAP_LOCKS
);
3592 * Return true if any currently revoking caps exceed the
3593 * session_timeout threshold.
3595 bool Locker::any_late_revoking_caps(xlist
<Capability
*> const &revoking
,
3596 double timeout
) const
3598 xlist
<Capability
*>::const_iterator p
= revoking
.begin();
3600 // No revoking caps at the moment
3603 utime_t now
= ceph_clock_now();
3604 utime_t age
= now
- (*p
)->get_last_revoke_stamp();
3605 if (age
<= timeout
) {
3613 void Locker::get_late_revoking_clients(std::list
<client_t
> *result
,
3614 double timeout
) const
3616 if (!any_late_revoking_caps(revoking_caps
, timeout
)) {
3617 // Fast path: no misbehaving clients, execute in O(1)
3621 // Slow path: execute in O(N_clients)
3622 for (auto &p
: revoking_caps_by_client
) {
3623 if (any_late_revoking_caps(p
.second
, timeout
)) {
3624 result
->push_back(p
.first
);
3629 // Hard-code instead of surfacing a config settings because this is
3630 // really a hack that should go away at some point when we have better
3631 // inspection tools for getting at detailed cap state (#7316)
3632 #define MAX_WARN_CAPS 100
3634 void Locker::caps_tick()
3636 utime_t now
= ceph_clock_now();
3638 if (!need_snapflush_inodes
.empty()) {
3639 // snap inodes that needs flush are auth pinned, they affect
3640 // subtree/difrarg freeze.
3641 utime_t cutoff
= now
;
3642 cutoff
-= g_conf()->mds_freeze_tree_timeout
/ 3;
3644 CInode
*last
= need_snapflush_inodes
.back();
3645 while (!need_snapflush_inodes
.empty()) {
3646 CInode
*in
= need_snapflush_inodes
.front();
3647 if (in
->last_dirstat_prop
>= cutoff
)
3649 in
->item_caps
.remove_myself();
3650 snapflush_nudge(in
);
3656 dout(20) << __func__
<< " " << revoking_caps
.size() << " revoking caps" << dendl
;
3658 now
= ceph_clock_now();
3660 for (xlist
<Capability
*>::iterator p
= revoking_caps
.begin(); !p
.end(); ++p
) {
3661 Capability
*cap
= *p
;
3663 utime_t age
= now
- cap
->get_last_revoke_stamp();
3664 dout(20) << __func__
<< " age = " << age
<< " client." << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3665 if (age
<= mds
->mdsmap
->get_session_timeout()) {
3666 dout(20) << __func__
<< " age below timeout " << mds
->mdsmap
->get_session_timeout() << dendl
;
3670 if (n
> MAX_WARN_CAPS
) {
3671 dout(1) << __func__
<< " more than " << MAX_WARN_CAPS
<< " caps are late"
3672 << "revoking, ignoring subsequent caps" << dendl
;
3676 // exponential backoff of warning intervals
3677 if (age
> mds
->mdsmap
->get_session_timeout() * (1 << cap
->get_num_revoke_warnings())) {
3678 cap
->inc_num_revoke_warnings();
3680 ss
<< "client." << cap
->get_client() << " isn't responding to mclientcaps(revoke), ino "
3681 << cap
->get_inode()->ino() << " pending " << ccap_string(cap
->pending())
3682 << " issued " << ccap_string(cap
->issued()) << ", sent " << age
<< " seconds ago";
3683 mds
->clog
->warn() << ss
.str();
3684 dout(20) << __func__
<< " " << ss
.str() << dendl
;
3686 dout(20) << __func__
<< " silencing log message (backoff) for " << "client." << cap
->get_client() << "." << cap
->get_inode()->ino() << dendl
;
3692 void Locker::handle_client_lease(const MClientLease::const_ref
&m
)
3694 dout(10) << "handle_client_lease " << *m
<< dendl
;
3696 ceph_assert(m
->get_source().is_client());
3697 client_t client
= m
->get_source().num();
3699 CInode
*in
= mdcache
->get_inode(m
->get_ino(), m
->get_last());
3701 dout(7) << "handle_client_lease don't have ino " << m
->get_ino() << "." << m
->get_last() << dendl
;
3706 frag_t fg
= in
->pick_dirfrag(m
->dname
);
3707 CDir
*dir
= in
->get_dirfrag(fg
);
3709 dn
= dir
->lookup(m
->dname
);
3711 dout(7) << "handle_client_lease don't have dn " << m
->get_ino() << " " << m
->dname
<< dendl
;
3714 dout(10) << " on " << *dn
<< dendl
;
3717 ClientLease
*l
= dn
->get_client_lease(client
);
3719 dout(7) << "handle_client_lease didn't have lease for client." << client
<< " of " << *dn
<< dendl
;
3723 switch (m
->get_action()) {
3724 case CEPH_MDS_LEASE_REVOKE_ACK
:
3725 case CEPH_MDS_LEASE_RELEASE
:
3726 if (l
->seq
!= m
->get_seq()) {
3727 dout(7) << "handle_client_lease release - seq " << l
->seq
<< " != provided " << m
->get_seq() << dendl
;
3729 dout(7) << "handle_client_lease client." << client
3730 << " on " << *dn
<< dendl
;
3731 dn
->remove_client_lease(l
, this);
3735 case CEPH_MDS_LEASE_RENEW
:
3737 dout(7) << "handle_client_lease client." << client
<< " renew on " << *dn
3738 << (!dn
->lock
.can_lease(client
)?", revoking lease":"") << dendl
;
3739 if (dn
->lock
.can_lease(client
)) {
3740 auto reply
= MClientLease::create(*m
);
3741 int pool
= 1; // fixme.. do something smart!
3742 reply
->h
.duration_ms
= (int)(1000 * mdcache
->client_lease_durations
[pool
]);
3743 reply
->h
.seq
= ++l
->seq
;
3744 reply
->clear_payload();
3746 utime_t now
= ceph_clock_now();
3747 now
+= mdcache
->client_lease_durations
[pool
];
3748 mdcache
->touch_client_lease(l
, pool
, now
);
3750 mds
->send_message_client_counted(reply
, m
->get_connection());
3756 ceph_abort(); // implement me
3762 void Locker::issue_client_lease(CDentry
*dn
, client_t client
,
3763 bufferlist
&bl
, utime_t now
, Session
*session
)
3765 CInode
*diri
= dn
->get_dir()->get_inode();
3766 if (!diri
->is_stray() && // do not issue dn leases in stray dir!
3767 ((!diri
->filelock
.can_lease(client
) &&
3768 (diri
->get_client_cap_pending(client
) & (CEPH_CAP_FILE_SHARED
| CEPH_CAP_FILE_EXCL
)) == 0)) &&
3769 dn
->lock
.can_lease(client
)) {
3770 int pool
= 1; // fixme.. do something smart!
3771 // issue a dentry lease
3772 ClientLease
*l
= dn
->add_client_lease(client
, session
);
3773 session
->touch_lease(l
);
3775 now
+= mdcache
->client_lease_durations
[pool
];
3776 mdcache
->touch_client_lease(l
, pool
, now
);
3779 lstat
.mask
= 1 | CEPH_LOCK_DN
; // old and new bit values
3780 lstat
.duration_ms
= (uint32_t)(1000 * mdcache
->client_lease_durations
[pool
]);
3781 lstat
.seq
= ++l
->seq
;
3782 encode_lease(bl
, session
->info
, lstat
);
3783 dout(20) << "issue_client_lease seq " << lstat
.seq
<< " dur " << lstat
.duration_ms
<< "ms "
3784 << " on " << *dn
<< dendl
;
3788 encode_lease(bl
, session
->info
, lstat
);
3789 dout(20) << "issue_client_lease no/null lease on " << *dn
<< dendl
;
3794 void Locker::revoke_client_leases(SimpleLock
*lock
)
3797 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3798 for (map
<client_t
, ClientLease
*>::iterator p
= dn
->client_lease_map
.begin();
3799 p
!= dn
->client_lease_map
.end();
3801 ClientLease
*l
= p
->second
;
3804 ceph_assert(lock
->get_type() == CEPH_LOCK_DN
);
3806 CDentry
*dn
= static_cast<CDentry
*>(lock
->get_parent());
3807 int mask
= 1 | CEPH_LOCK_DN
; // old and new bits
3809 // i should also revoke the dir ICONTENT lease, if they have it!
3810 CInode
*diri
= dn
->get_dir()->get_inode();
3811 auto lease
= MClientLease::create(CEPH_MDS_LEASE_REVOKE
, l
->seq
, mask
, diri
->ino(), diri
->first
, CEPH_NOSNAP
, dn
->get_name());
3812 mds
->send_message_client_counted(lease
, l
->client
);
3816 void Locker::encode_lease(bufferlist
& bl
, const session_info_t
& info
,
3817 const LeaseStat
& ls
)
3819 if (info
.has_feature(CEPHFS_FEATURE_REPLY_ENCODING
)) {
3820 ENCODE_START(1, 1, bl
);
3821 encode(ls
.mask
, bl
);
3822 encode(ls
.duration_ms
, bl
);
3827 encode(ls
.mask
, bl
);
3828 encode(ls
.duration_ms
, bl
);
3833 // locks ----------------------------------------------------------------
3835 SimpleLock
*Locker::get_lock(int lock_type
, const MDSCacheObjectInfo
&info
)
3837 switch (lock_type
) {
3840 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3841 CInode
*diri
= mdcache
->get_inode(info
.dirfrag
.ino
);
3846 fg
= diri
->pick_dirfrag(info
.dname
);
3847 dir
= diri
->get_dirfrag(fg
);
3849 dn
= dir
->lookup(info
.dname
, info
.snapid
);
3852 dout(7) << "get_lock don't have dn " << info
.dirfrag
.ino
<< " " << info
.dname
<< dendl
;
3858 case CEPH_LOCK_IAUTH
:
3859 case CEPH_LOCK_ILINK
:
3860 case CEPH_LOCK_IDFT
:
3861 case CEPH_LOCK_IFILE
:
3862 case CEPH_LOCK_INEST
:
3863 case CEPH_LOCK_IXATTR
:
3864 case CEPH_LOCK_ISNAP
:
3865 case CEPH_LOCK_IFLOCK
:
3866 case CEPH_LOCK_IPOLICY
:
3868 CInode
*in
= mdcache
->get_inode(info
.ino
, info
.snapid
);
3870 dout(7) << "get_lock don't have ino " << info
.ino
<< dendl
;
3873 switch (lock_type
) {
3874 case CEPH_LOCK_IAUTH
: return &in
->authlock
;
3875 case CEPH_LOCK_ILINK
: return &in
->linklock
;
3876 case CEPH_LOCK_IDFT
: return &in
->dirfragtreelock
;
3877 case CEPH_LOCK_IFILE
: return &in
->filelock
;
3878 case CEPH_LOCK_INEST
: return &in
->nestlock
;
3879 case CEPH_LOCK_IXATTR
: return &in
->xattrlock
;
3880 case CEPH_LOCK_ISNAP
: return &in
->snaplock
;
3881 case CEPH_LOCK_IFLOCK
: return &in
->flocklock
;
3882 case CEPH_LOCK_IPOLICY
: return &in
->policylock
;
3887 dout(7) << "get_lock don't know lock_type " << lock_type
<< dendl
;
3895 void Locker::handle_lock(const MLock::const_ref
&m
)
3897 // nobody should be talking to us during recovery.
3898 ceph_assert(mds
->is_rejoin() || mds
->is_clientreplay() || mds
->is_active() || mds
->is_stopping());
3900 SimpleLock
*lock
= get_lock(m
->get_lock_type(), m
->get_object_info());
3902 dout(10) << "don't have object " << m
->get_object_info() << ", must have trimmed, dropping" << dendl
;
3906 switch (lock
->get_type()) {
3908 case CEPH_LOCK_IAUTH
:
3909 case CEPH_LOCK_ILINK
:
3910 case CEPH_LOCK_ISNAP
:
3911 case CEPH_LOCK_IXATTR
:
3912 case CEPH_LOCK_IFLOCK
:
3913 case CEPH_LOCK_IPOLICY
:
3914 handle_simple_lock(lock
, m
);
3917 case CEPH_LOCK_IDFT
:
3918 case CEPH_LOCK_INEST
:
3919 //handle_scatter_lock((ScatterLock*)lock, m);
3922 case CEPH_LOCK_IFILE
:
3923 handle_file_lock(static_cast<ScatterLock
*>(lock
), m
);
3927 dout(7) << "handle_lock got otype " << m
->get_lock_type() << dendl
;
3937 // ==========================================================================
3940 /** This function may take a reference to m if it needs one, but does
3941 * not put references. */
3942 void Locker::handle_reqrdlock(SimpleLock
*lock
, const MLock::const_ref
&m
)
3944 MDSCacheObject
*parent
= lock
->get_parent();
3945 if (parent
->is_auth() &&
3946 lock
->get_state() != LOCK_SYNC
&&
3947 !parent
->is_frozen()) {
3948 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3949 << " on " << *parent
<< dendl
;
3950 ceph_assert(parent
->is_auth()); // replica auth pinned if they're doing this!
3951 if (lock
->is_stable()) {
3954 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl
;
3955 lock
->add_waiter(SimpleLock::WAIT_STABLE
| MDSCacheObject::WAIT_UNFREEZE
,
3956 new C_MDS_RetryMessage(mds
, m
));
3959 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3960 << " on " << *parent
<< dendl
;
3961 // replica should retry
3965 void Locker::handle_simple_lock(SimpleLock
*lock
, const MLock::const_ref
&m
)
3967 int from
= m
->get_asker();
3969 dout(10) << "handle_simple_lock " << *m
3970 << " on " << *lock
<< " " << *lock
->get_parent() << dendl
;
3972 if (mds
->is_rejoin()) {
3973 if (lock
->get_parent()->is_rejoining()) {
3974 dout(7) << "handle_simple_lock still rejoining " << *lock
->get_parent()
3975 << ", dropping " << *m
<< dendl
;
3980 switch (m
->get_action()) {
3983 ceph_assert(lock
->get_state() == LOCK_LOCK
);
3984 lock
->decode_locked_state(m
->get_data());
3985 lock
->set_state(LOCK_SYNC
);
3986 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
3990 ceph_assert(lock
->get_state() == LOCK_SYNC
);
3991 lock
->set_state(LOCK_SYNC_LOCK
);
3992 if (lock
->is_leased())
3993 revoke_client_leases(lock
);
3994 eval_gather(lock
, true);
3995 if (lock
->is_unstable_and_locked())
3996 mds
->mdlog
->flush();
4001 case LOCK_AC_LOCKACK
:
4002 ceph_assert(lock
->get_state() == LOCK_SYNC_LOCK
||
4003 lock
->get_state() == LOCK_SYNC_EXCL
);
4004 ceph_assert(lock
->is_gathering(from
));
4005 lock
->remove_gather(from
);
4007 if (lock
->is_gathering()) {
4008 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
4009 << ", still gathering " << lock
->get_gather_set() << dendl
;
4011 dout(7) << "handle_simple_lock " << *lock
<< " on " << *lock
->get_parent() << " from " << from
4012 << ", last one" << dendl
;
4017 case LOCK_AC_REQRDLOCK
:
4018 handle_reqrdlock(lock
, m
);
4024 /* unused, currently.
4026 class C_Locker_SimpleEval : public Context {
4030 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4031 void finish(int r) {
4032 locker->try_simple_eval(lock);
4036 void Locker::try_simple_eval(SimpleLock *lock)
4038 // unstable and ambiguous auth?
4039 if (!lock->is_stable() &&
4040 lock->get_parent()->is_ambiguous_auth()) {
4041 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4042 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4043 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4047 if (!lock->get_parent()->is_auth()) {
4048 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4052 if (!lock->get_parent()->can_auth_pin()) {
4053 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4054 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4055 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4059 if (lock->is_stable())
4065 void Locker::simple_eval(SimpleLock
*lock
, bool *need_issue
)
4067 dout(10) << "simple_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4069 ceph_assert(lock
->get_parent()->is_auth());
4070 ceph_assert(lock
->is_stable());
4072 if (lock
->get_parent()->is_freezing_or_frozen()) {
4073 // dentry/snap lock in unreadable state can block path traverse
4074 if ((lock
->get_type() != CEPH_LOCK_DN
&&
4075 lock
->get_type() != CEPH_LOCK_ISNAP
) ||
4076 lock
->get_state() == LOCK_SYNC
||
4077 lock
->get_parent()->is_frozen())
4081 if (mdcache
->is_readonly()) {
4082 if (lock
->get_state() != LOCK_SYNC
) {
4083 dout(10) << "simple_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4084 simple_sync(lock
, need_issue
);
4091 if (lock
->get_cap_shift()) {
4092 in
= static_cast<CInode
*>(lock
->get_parent());
4093 in
->get_caps_wanted(&wanted
, NULL
, lock
->get_cap_shift());
4097 if (lock
->get_state() != LOCK_EXCL
&&
4098 in
&& in
->get_target_loner() >= 0 &&
4099 (wanted
& CEPH_CAP_GEXCL
)) {
4100 dout(7) << "simple_eval stable, going to excl " << *lock
4101 << " on " << *lock
->get_parent() << dendl
;
4102 simple_excl(lock
, need_issue
);
4106 else if (lock
->get_state() != LOCK_SYNC
&&
4107 !lock
->is_wrlocked() &&
4108 ((!(wanted
& CEPH_CAP_GEXCL
) && !lock
->is_waiter_for(SimpleLock::WAIT_WR
)) ||
4109 (lock
->get_state() == LOCK_EXCL
&& in
&& in
->get_target_loner() < 0))) {
4110 dout(7) << "simple_eval stable, syncing " << *lock
4111 << " on " << *lock
->get_parent() << dendl
;
4112 simple_sync(lock
, need_issue
);
4119 bool Locker::simple_sync(SimpleLock
*lock
, bool *need_issue
)
4121 dout(7) << "simple_sync on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4122 ceph_assert(lock
->get_parent()->is_auth());
4123 ceph_assert(lock
->is_stable());
4126 if (lock
->get_cap_shift())
4127 in
= static_cast<CInode
*>(lock
->get_parent());
4129 int old_state
= lock
->get_state();
4131 if (old_state
!= LOCK_TSYN
) {
4133 switch (lock
->get_state()) {
4134 case LOCK_MIX
: lock
->set_state(LOCK_MIX_SYNC
); break;
4135 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_SYNC
); break;
4136 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_SYNC
); break;
4137 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_SYNC
); break;
4138 default: ceph_abort();
4142 if (lock
->is_wrlocked())
4145 if (lock
->get_parent()->is_replicated() && old_state
== LOCK_MIX
) {
4146 send_lock_message(lock
, LOCK_AC_SYNC
);
4147 lock
->init_gather();
4151 if (in
&& in
->is_head()) {
4152 if (in
->issued_caps_need_gather(lock
)) {
4161 bool need_recover
= false;
4162 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4164 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4165 mds
->mdcache
->queue_file_recover(in
);
4166 need_recover
= true;
4171 if (!gather
&& lock
->is_dirty()) {
4172 lock
->get_parent()->auth_pin(lock
);
4173 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4174 mds
->mdlog
->flush();
4179 lock
->get_parent()->auth_pin(lock
);
4181 mds
->mdcache
->do_file_recover();
4186 if (lock
->get_parent()->is_replicated()) { // FIXME
4188 lock
->encode_locked_state(data
);
4189 send_lock_message(lock
, LOCK_AC_SYNC
, data
);
4191 lock
->set_state(LOCK_SYNC
);
4192 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
4193 if (in
&& in
->is_head()) {
4202 void Locker::simple_excl(SimpleLock
*lock
, bool *need_issue
)
4204 dout(7) << "simple_excl on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4205 ceph_assert(lock
->get_parent()->is_auth());
4206 ceph_assert(lock
->is_stable());
4209 if (lock
->get_cap_shift())
4210 in
= static_cast<CInode
*>(lock
->get_parent());
4212 switch (lock
->get_state()) {
4213 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
4214 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
4215 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
4216 default: ceph_abort();
4220 if (lock
->is_rdlocked())
4222 if (lock
->is_wrlocked())
4225 if (lock
->get_parent()->is_replicated() &&
4226 lock
->get_state() != LOCK_LOCK_EXCL
&&
4227 lock
->get_state() != LOCK_XSYN_EXCL
) {
4228 send_lock_message(lock
, LOCK_AC_LOCK
);
4229 lock
->init_gather();
4233 if (in
&& in
->is_head()) {
4234 if (in
->issued_caps_need_gather(lock
)) {
4244 lock
->get_parent()->auth_pin(lock
);
4246 lock
->set_state(LOCK_EXCL
);
4247 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
4257 void Locker::simple_lock(SimpleLock
*lock
, bool *need_issue
)
4259 dout(7) << "simple_lock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4260 ceph_assert(lock
->get_parent()->is_auth());
4261 ceph_assert(lock
->is_stable());
4262 ceph_assert(lock
->get_state() != LOCK_LOCK
);
4265 if (lock
->get_cap_shift())
4266 in
= static_cast<CInode
*>(lock
->get_parent());
4268 int old_state
= lock
->get_state();
4270 switch (lock
->get_state()) {
4271 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
4272 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_LOCK
); break;
4273 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_LOCK
); break;
4274 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
);
4275 (static_cast<ScatterLock
*>(lock
))->clear_unscatter_wanted();
4277 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_LOCK
); break;
4278 default: ceph_abort();
4282 if (lock
->is_leased()) {
4284 revoke_client_leases(lock
);
4286 if (lock
->is_rdlocked())
4288 if (in
&& in
->is_head()) {
4289 if (in
->issued_caps_need_gather(lock
)) {
4298 bool need_recover
= false;
4299 if (lock
->get_type() == CEPH_LOCK_IFILE
) {
4301 if(in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
4302 mds
->mdcache
->queue_file_recover(in
);
4303 need_recover
= true;
4308 if (lock
->get_parent()->is_replicated() &&
4309 lock
->get_state() == LOCK_MIX_LOCK
&&
4311 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl
;
4313 // move to second stage of gather now, so we don't send the lock action later.
4314 if (lock
->get_state() == LOCK_MIX_LOCK
)
4315 lock
->set_state(LOCK_MIX_LOCK2
);
4317 if (lock
->get_parent()->is_replicated() &&
4318 lock
->get_sm()->states
[old_state
].replica_state
!= LOCK_LOCK
) { // replica may already be LOCK
4320 send_lock_message(lock
, LOCK_AC_LOCK
);
4321 lock
->init_gather();
4325 if (!gather
&& lock
->is_dirty()) {
4326 lock
->get_parent()->auth_pin(lock
);
4327 scatter_writebehind(static_cast<ScatterLock
*>(lock
));
4328 mds
->mdlog
->flush();
4333 lock
->get_parent()->auth_pin(lock
);
4335 mds
->mdcache
->do_file_recover();
4337 lock
->set_state(LOCK_LOCK
);
4338 lock
->finish_waiters(ScatterLock::WAIT_XLOCK
|ScatterLock::WAIT_WR
|ScatterLock::WAIT_STABLE
);
4343 void Locker::simple_xlock(SimpleLock
*lock
)
4345 dout(7) << "simple_xlock on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4346 ceph_assert(lock
->get_parent()->is_auth());
4347 //assert(lock->is_stable());
4348 ceph_assert(lock
->get_state() != LOCK_XLOCK
);
4351 if (lock
->get_cap_shift())
4352 in
= static_cast<CInode
*>(lock
->get_parent());
4354 if (lock
->is_stable())
4355 lock
->get_parent()->auth_pin(lock
);
4357 switch (lock
->get_state()) {
4359 case LOCK_XLOCKDONE
: lock
->set_state(LOCK_LOCK_XLOCK
); break;
4360 default: ceph_abort();
4364 if (lock
->is_rdlocked())
4366 if (lock
->is_wrlocked())
4369 if (in
&& in
->is_head()) {
4370 if (in
->issued_caps_need_gather(lock
)) {
4377 lock
->set_state(LOCK_PREXLOCK
);
4378 //assert("shouldn't be called if we are already xlockable" == 0);
4386 // ==========================================================================
4391 Some notes on scatterlocks.
4393 - The scatter/gather is driven by the inode lock. The scatter always
4394 brings in the latest metadata from the fragments.
4396 - When in a scattered/MIX state, fragments are only allowed to
4397 update/be written to if the accounted stat matches the inode's
4400 - That means, on gather, we _only_ assimilate diffs for frag metadata
4401 that match the current version, because those are the only ones
4402 written during this scatter/gather cycle. (Others didn't permit
4403 it.) We increment the version and journal this to disk.
4405 - When possible, we also simultaneously update our local frag
4406 accounted stats to match.
4408 - On scatter, the new inode info is broadcast to frags, both local
4409 and remote. If possible (auth and !frozen), the dirfrag auth
4410 should update the accounted state (if it isn't already up to date).
4411 Note that this may occur on both the local inode auth node and
4412 inode replicas, so there are two potential paths. If it is NOT
4413 possible, they need to mark_stale to prevent any possible writes.
4415 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4416 only). Both are opportunities to update the frag accounted stats,
4417 even though only the MIX case is affected by a stale dirfrag.
4419 - Because many scatter/gather cycles can potentially go by without a
4420 frag being able to update its accounted stats (due to being frozen
4421 by exports/refragments in progress), the frag may have (even very)
4422 old stat versions. That's fine. If when we do want to update it,
4423 we can update accounted_* and the version first.
4427 class C_Locker_ScatterWB
: public LockerLogContext
{
4431 C_Locker_ScatterWB(Locker
*l
, ScatterLock
*sl
, MutationRef
& m
) :
4432 LockerLogContext(l
), lock(sl
), mut(m
) {}
4433 void finish(int r
) override
{
4434 locker
->scatter_writebehind_finish(lock
, mut
);
4438 void Locker::scatter_writebehind(ScatterLock
*lock
)
4440 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4441 dout(10) << "scatter_writebehind " << in
->inode
.mtime
<< " on " << *lock
<< " on " << *in
<< dendl
;
4444 MutationRef
mut(new MutationImpl());
4445 mut
->ls
= mds
->mdlog
->get_current_segment();
4447 // forcefully take a wrlock
4448 lock
->get_wrlock(true);
4449 mut
->locks
.emplace(lock
, MutationImpl::LockOp::WRLOCK
);
4451 in
->pre_cow_old_inode(); // avoid cow mayhem
4453 auto &pi
= in
->project_inode();
4454 pi
.inode
.version
= in
->pre_dirty();
4456 in
->finish_scatter_gather_update(lock
->get_type());
4457 lock
->start_flush();
4459 EUpdate
*le
= new EUpdate(mds
->mdlog
, "scatter_writebehind");
4460 mds
->mdlog
->start_entry(le
);
4462 mdcache
->predirty_journal_parents(mut
, &le
->metablob
, in
, 0, PREDIRTY_PRIMARY
);
4463 mdcache
->journal_dirty_inode(mut
.get(), &le
->metablob
, in
);
4465 in
->finish_scatter_gather_update_accounted(lock
->get_type(), mut
, &le
->metablob
);
4467 mds
->mdlog
->submit_entry(le
, new C_Locker_ScatterWB(this, lock
, mut
));
4470 void Locker::scatter_writebehind_finish(ScatterLock
*lock
, MutationRef
& mut
)
4472 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4473 dout(10) << "scatter_writebehind_finish on " << *lock
<< " on " << *in
<< dendl
;
4474 in
->pop_and_dirty_projected_inode(mut
->ls
);
4476 lock
->finish_flush();
4478 // if replicas may have flushed in a mix->lock state, send another
4479 // message so they can finish_flush().
4480 if (in
->is_replicated()) {
4481 switch (lock
->get_state()) {
4483 case LOCK_MIX_LOCK2
:
4486 send_lock_message(lock
, LOCK_AC_LOCKFLUSHED
);
4491 drop_locks(mut
.get());
4494 if (lock
->is_stable())
4495 lock
->finish_waiters(ScatterLock::WAIT_STABLE
);
4497 //scatter_eval_gather(lock);
4500 void Locker::scatter_eval(ScatterLock
*lock
, bool *need_issue
)
4502 dout(10) << "scatter_eval " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4504 ceph_assert(lock
->get_parent()->is_auth());
4505 ceph_assert(lock
->is_stable());
4507 if (lock
->get_parent()->is_freezing_or_frozen()) {
4508 dout(20) << " freezing|frozen" << dendl
;
4512 if (mdcache
->is_readonly()) {
4513 if (lock
->get_state() != LOCK_SYNC
) {
4514 dout(10) << "scatter_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4515 simple_sync(lock
, need_issue
);
4520 if (!lock
->is_rdlocked() &&
4521 lock
->get_state() != LOCK_MIX
&&
4522 lock
->get_scatter_wanted()) {
4523 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4524 << " on " << *lock
->get_parent() << dendl
;
4525 scatter_mix(lock
, need_issue
);
4529 if (lock
->get_type() == CEPH_LOCK_INEST
) {
4530 // in general, we want to keep INEST writable at all times.
4531 if (!lock
->is_rdlocked()) {
4532 if (lock
->get_parent()->is_replicated()) {
4533 if (lock
->get_state() != LOCK_MIX
)
4534 scatter_mix(lock
, need_issue
);
4536 if (lock
->get_state() != LOCK_LOCK
)
4537 simple_lock(lock
, need_issue
);
4543 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4544 if (!in
->has_subtree_or_exporting_dirfrag() || in
->is_base()) {
4545 // i _should_ be sync.
4546 if (!lock
->is_wrlocked() &&
4547 lock
->get_state() != LOCK_SYNC
) {
4548 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl
;
4549 simple_sync(lock
, need_issue
);
4556 * mark a scatterlock to indicate that the dir fnode has some dirty data
4558 void Locker::mark_updated_scatterlock(ScatterLock
*lock
)
4561 if (lock
->get_updated_item()->is_on_list()) {
4562 dout(10) << "mark_updated_scatterlock " << *lock
4563 << " - already on list since " << lock
->get_update_stamp() << dendl
;
4565 updated_scatterlocks
.push_back(lock
->get_updated_item());
4566 utime_t now
= ceph_clock_now();
4567 lock
->set_update_stamp(now
);
4568 dout(10) << "mark_updated_scatterlock " << *lock
4569 << " - added at " << now
<< dendl
;
4574 * this is called by scatter_tick and LogSegment::try_to_trim() when
4575 * trying to flush dirty scattered data (i.e. updated fnode) back to
4578 * we need to lock|scatter in order to push fnode changes into the
4581 void Locker::scatter_nudge(ScatterLock
*lock
, MDSContext
*c
, bool forcelockchange
)
4583 CInode
*p
= static_cast<CInode
*>(lock
->get_parent());
4585 if (p
->is_frozen() || p
->is_freezing()) {
4586 dout(10) << "scatter_nudge waiting for unfreeze on " << *p
<< dendl
;
4588 p
->add_waiter(MDSCacheObject::WAIT_UNFREEZE
, c
);
4589 else if (lock
->is_dirty())
4590 // just requeue. not ideal.. starvation prone..
4591 updated_scatterlocks
.push_back(lock
->get_updated_item());
4595 if (p
->is_ambiguous_auth()) {
4596 dout(10) << "scatter_nudge waiting for single auth on " << *p
<< dendl
;
4598 p
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, c
);
4599 else if (lock
->is_dirty())
4600 // just requeue. not ideal.. starvation prone..
4601 updated_scatterlocks
.push_back(lock
->get_updated_item());
4608 if (lock
->is_stable()) {
4609 // can we do it now?
4610 // (only if we're not replicated.. if we are, we really do need
4611 // to nudge the lock state!)
4613 actually, even if we're not replicated, we can't stay in MIX, because another mds
4614 could discover and replicate us at any time. if that happens while we're flushing,
4615 they end up in MIX but their inode has the old scatterstat version.
4617 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4618 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4619 scatter_writebehind(lock);
4621 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4626 if (mdcache
->is_readonly()) {
4627 if (lock
->get_state() != LOCK_SYNC
) {
4628 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock
<< " on " << *p
<< dendl
;
4629 simple_sync(static_cast<ScatterLock
*>(lock
));
4634 // adjust lock state
4635 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock
<< " on " << *p
<< dendl
;
4636 switch (lock
->get_type()) {
4637 case CEPH_LOCK_IFILE
:
4638 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4639 scatter_mix(static_cast<ScatterLock
*>(lock
));
4640 else if (lock
->get_state() != LOCK_LOCK
)
4641 simple_lock(static_cast<ScatterLock
*>(lock
));
4643 simple_sync(static_cast<ScatterLock
*>(lock
));
4646 case CEPH_LOCK_IDFT
:
4647 case CEPH_LOCK_INEST
:
4648 if (p
->is_replicated() && lock
->get_state() != LOCK_MIX
)
4650 else if (lock
->get_state() != LOCK_LOCK
)
4659 if (lock
->is_stable() && count
== 2) {
4660 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl
;
4661 // this should only realy happen when called via
4662 // handle_file_lock due to AC_NUDGE, because the rest of the
4663 // time we are replicated or have dirty data and won't get
4664 // called. bailing here avoids an infinite loop.
4669 dout(10) << "scatter_nudge auth, waiting for stable " << *lock
<< " on " << *p
<< dendl
;
4671 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4676 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4677 << *lock
<< " on " << *p
<< dendl
;
4678 // request unscatter?
4679 mds_rank_t auth
= lock
->get_parent()->authority().first
;
4680 if (!mds
->is_cluster_degraded() || mds
->mdsmap
->is_clientreplay_or_active_or_stopping(auth
)) {
4681 mds
->send_message_mds(MLock::create(lock
, LOCK_AC_NUDGE
, mds
->get_nodeid()), auth
);
4686 lock
->add_waiter(SimpleLock::WAIT_STABLE
, c
);
4688 // also, requeue, in case we had wrong auth or something
4689 if (lock
->is_dirty())
4690 updated_scatterlocks
.push_back(lock
->get_updated_item());
4694 void Locker::scatter_tick()
4696 dout(10) << "scatter_tick" << dendl
;
4699 utime_t now
= ceph_clock_now();
4700 int n
= updated_scatterlocks
.size();
4701 while (!updated_scatterlocks
.empty()) {
4702 ScatterLock
*lock
= updated_scatterlocks
.front();
4704 if (n
-- == 0) break; // scatter_nudge() may requeue; avoid looping
4706 if (!lock
->is_dirty()) {
4707 updated_scatterlocks
.pop_front();
4708 dout(10) << " removing from updated_scatterlocks "
4709 << *lock
<< " " << *lock
->get_parent() << dendl
;
4712 if (now
- lock
->get_update_stamp() < g_conf()->mds_scatter_nudge_interval
)
4714 updated_scatterlocks
.pop_front();
4715 scatter_nudge(lock
, 0);
4717 mds
->mdlog
->flush();
4721 void Locker::scatter_tempsync(ScatterLock
*lock
, bool *need_issue
)
4723 dout(10) << "scatter_tempsync " << *lock
4724 << " on " << *lock
->get_parent() << dendl
;
4725 ceph_assert(lock
->get_parent()->is_auth());
4726 ceph_assert(lock
->is_stable());
4728 ceph_abort_msg("not fully implemented, at least not for filelock");
4730 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4732 switch (lock
->get_state()) {
4733 case LOCK_SYNC
: ceph_abort(); // this shouldn't happen
4734 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_TSYN
); break;
4735 case LOCK_MIX
: lock
->set_state(LOCK_MIX_TSYN
); break;
4736 default: ceph_abort();
4740 if (lock
->is_wrlocked())
4743 if (lock
->get_cap_shift() &&
4745 in
->issued_caps_need_gather(lock
)) {
4753 if (lock
->get_state() == LOCK_MIX_TSYN
&&
4754 in
->is_replicated()) {
4755 lock
->init_gather();
4756 send_lock_message(lock
, LOCK_AC_LOCK
);
4764 lock
->set_state(LOCK_TSYN
);
4765 lock
->finish_waiters(ScatterLock::WAIT_RD
|ScatterLock::WAIT_STABLE
);
4766 if (lock
->get_cap_shift()) {
4777 // ==========================================================================
4780 void Locker::local_wrlock_grab(LocalLock
*lock
, MutationRef
& mut
)
4782 dout(7) << "local_wrlock_grab on " << *lock
4783 << " on " << *lock
->get_parent() << dendl
;
4785 ceph_assert(lock
->get_parent()->is_auth());
4786 ceph_assert(lock
->can_wrlock());
4787 lock
->get_wrlock(mut
->get_client());
4789 auto ret
= mut
->locks
.emplace(lock
, MutationImpl::LockOp::WRLOCK
);
4790 ceph_assert(ret
.second
);
4793 bool Locker::local_wrlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4795 dout(7) << "local_wrlock_start on " << *lock
4796 << " on " << *lock
->get_parent() << dendl
;
4798 ceph_assert(lock
->get_parent()->is_auth());
4799 if (lock
->can_wrlock()) {
4800 lock
->get_wrlock(mut
->get_client());
4801 auto it
= mut
->locks
.emplace_hint(mut
->locks
.end(), lock
, MutationImpl::LockOp::WRLOCK
);
4802 ceph_assert(it
->is_wrlock());
4805 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4810 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
4812 ceph_assert(it
->is_wrlock());
4813 LocalLock
*lock
= static_cast<LocalLock
*>(it
->lock
);
4814 dout(7) << "local_wrlock_finish on " << *lock
4815 << " on " << *lock
->get_parent() << dendl
;
4817 mut
->locks
.erase(it
);
4818 if (lock
->get_num_wrlocks() == 0) {
4819 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4820 SimpleLock::WAIT_WR
|
4821 SimpleLock::WAIT_RD
);
4825 bool Locker::local_xlock_start(LocalLock
*lock
, MDRequestRef
& mut
)
4827 dout(7) << "local_xlock_start on " << *lock
4828 << " on " << *lock
->get_parent() << dendl
;
4830 ceph_assert(lock
->get_parent()->is_auth());
4831 if (!lock
->can_xlock_local()) {
4832 lock
->add_waiter(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
, new C_MDS_RetryRequest(mdcache
, mut
));
4836 lock
->get_xlock(mut
, mut
->get_client());
4837 mut
->locks
.emplace_hint(mut
->locks
.end(), lock
, MutationImpl::LockOp::XLOCK
);
4841 void Locker::local_xlock_finish(const MutationImpl::lock_iterator
& it
, MutationImpl
*mut
)
4843 ceph_assert(it
->is_xlock());
4844 LocalLock
*lock
= static_cast<LocalLock
*>(it
->lock
);
4845 dout(7) << "local_xlock_finish on " << *lock
4846 << " on " << *lock
->get_parent() << dendl
;
4848 mut
->locks
.erase(it
);
4850 lock
->finish_waiters(SimpleLock::WAIT_STABLE
|
4851 SimpleLock::WAIT_WR
|
4852 SimpleLock::WAIT_RD
);
4857 // ==========================================================================
4861 void Locker::file_eval(ScatterLock
*lock
, bool *need_issue
)
4863 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4864 int loner_wanted
, other_wanted
;
4865 int wanted
= in
->get_caps_wanted(&loner_wanted
, &other_wanted
, CEPH_CAP_SFILE
);
4866 dout(7) << "file_eval wanted=" << gcap_string(wanted
)
4867 << " loner_wanted=" << gcap_string(loner_wanted
)
4868 << " other_wanted=" << gcap_string(other_wanted
)
4869 << " filelock=" << *lock
<< " on " << *lock
->get_parent()
4872 ceph_assert(lock
->get_parent()->is_auth());
4873 ceph_assert(lock
->is_stable());
4875 if (lock
->get_parent()->is_freezing_or_frozen())
4878 if (mdcache
->is_readonly()) {
4879 if (lock
->get_state() != LOCK_SYNC
) {
4880 dout(10) << "file_eval read-only FS, syncing " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4881 simple_sync(lock
, need_issue
);
4887 if (lock
->get_state() == LOCK_EXCL
) {
4888 dout(20) << " is excl" << dendl
;
4889 int loner_issued
, other_issued
, xlocker_issued
;
4890 in
->get_caps_issued(&loner_issued
, &other_issued
, &xlocker_issued
, CEPH_CAP_SFILE
);
4891 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued
)
4892 << " other_issued=" << gcap_string(other_issued
)
4893 << " xlocker_issued=" << gcap_string(xlocker_issued
)
4895 if (!((loner_wanted
|loner_issued
) & (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4896 (other_wanted
& (CEPH_CAP_GEXCL
|CEPH_CAP_GWR
|CEPH_CAP_GRD
)) ||
4897 (in
->inode
.is_dir() && in
->multiple_nonstale_caps())) { // FIXME.. :/
4898 dout(20) << " should lose it" << dendl
;
4899 // we should lose it.
4910 // -> any writer means MIX; RD doesn't matter.
4911 if (((other_wanted
|loner_wanted
) & CEPH_CAP_GWR
) ||
4912 lock
->is_waiter_for(SimpleLock::WAIT_WR
))
4913 scatter_mix(lock
, need_issue
);
4914 else if (!lock
->is_wrlocked()) // let excl wrlocks drain first
4915 simple_sync(lock
, need_issue
);
4917 dout(10) << " waiting for wrlock to drain" << dendl
;
4922 else if (lock
->get_state() != LOCK_EXCL
&&
4923 !lock
->is_rdlocked() &&
4924 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4925 ((wanted
& (CEPH_CAP_GWR
|CEPH_CAP_GBUFFER
)) ||
4926 (in
->inode
.is_dir() && !in
->has_subtree_or_exporting_dirfrag())) &&
4927 in
->get_target_loner() >= 0) {
4928 dout(7) << "file_eval stable, bump to loner " << *lock
4929 << " on " << *lock
->get_parent() << dendl
;
4930 file_excl(lock
, need_issue
);
4934 else if (lock
->get_state() != LOCK_MIX
&&
4935 !lock
->is_rdlocked() &&
4936 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4937 (lock
->get_scatter_wanted() ||
4938 (in
->get_target_loner() < 0 && (wanted
& CEPH_CAP_GWR
)))) {
4939 dout(7) << "file_eval stable, bump to mixed " << *lock
4940 << " on " << *lock
->get_parent() << dendl
;
4941 scatter_mix(lock
, need_issue
);
4945 else if (lock
->get_state() != LOCK_SYNC
&&
4946 !lock
->is_wrlocked() && // drain wrlocks first!
4947 !lock
->is_waiter_for(SimpleLock::WAIT_WR
) &&
4948 !(wanted
& CEPH_CAP_GWR
) &&
4949 !((lock
->get_state() == LOCK_MIX
) &&
4950 in
->is_dir() && in
->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4951 //((wanted & CEPH_CAP_RD) ||
4952 //in->is_replicated() ||
4953 //lock->is_leased() ||
4954 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4956 dout(7) << "file_eval stable, bump to sync " << *lock
4957 << " on " << *lock
->get_parent() << dendl
;
4958 simple_sync(lock
, need_issue
);
4964 void Locker::scatter_mix(ScatterLock
*lock
, bool *need_issue
)
4966 dout(7) << "scatter_mix " << *lock
<< " on " << *lock
->get_parent() << dendl
;
4968 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
4969 ceph_assert(in
->is_auth());
4970 ceph_assert(lock
->is_stable());
4972 if (lock
->get_state() == LOCK_LOCK
) {
4973 in
->start_scatter(lock
);
4974 if (in
->is_replicated()) {
4976 bufferlist softdata
;
4977 lock
->encode_locked_state(softdata
);
4979 // bcast to replicas
4980 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
4984 lock
->set_state(LOCK_MIX
);
4985 lock
->clear_scatter_wanted();
4986 if (lock
->get_cap_shift()) {
4994 switch (lock
->get_state()) {
4995 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_MIX
); break;
4996 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_MIX
); break;
4997 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_MIX
); break;
4998 case LOCK_TSYN
: lock
->set_state(LOCK_TSYN_MIX
); break;
4999 default: ceph_abort();
5003 if (lock
->is_rdlocked())
5005 if (in
->is_replicated()) {
5006 if (lock
->get_state() == LOCK_SYNC_MIX
) { // for the rest states, replicas are already LOCK
5007 send_lock_message(lock
, LOCK_AC_MIX
);
5008 lock
->init_gather();
5012 if (lock
->is_leased()) {
5013 revoke_client_leases(lock
);
5016 if (lock
->get_cap_shift() &&
5018 in
->issued_caps_need_gather(lock
)) {
5025 bool need_recover
= false;
5026 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5027 mds
->mdcache
->queue_file_recover(in
);
5028 need_recover
= true;
5033 lock
->get_parent()->auth_pin(lock
);
5035 mds
->mdcache
->do_file_recover();
5037 in
->start_scatter(lock
);
5038 lock
->set_state(LOCK_MIX
);
5039 lock
->clear_scatter_wanted();
5040 if (in
->is_replicated()) {
5041 bufferlist softdata
;
5042 lock
->encode_locked_state(softdata
);
5043 send_lock_message(lock
, LOCK_AC_MIX
, softdata
);
5045 if (lock
->get_cap_shift()) {
5056 void Locker::file_excl(ScatterLock
*lock
, bool *need_issue
)
5058 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5059 dout(7) << "file_excl " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5061 ceph_assert(in
->is_auth());
5062 ceph_assert(lock
->is_stable());
5064 ceph_assert((in
->get_loner() >= 0 && in
->get_mds_caps_wanted().empty()) ||
5065 (lock
->get_state() == LOCK_XSYN
)); // must do xsyn -> excl -> <anything else>
5067 switch (lock
->get_state()) {
5068 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_EXCL
); break;
5069 case LOCK_MIX
: lock
->set_state(LOCK_MIX_EXCL
); break;
5070 case LOCK_LOCK
: lock
->set_state(LOCK_LOCK_EXCL
); break;
5071 case LOCK_XSYN
: lock
->set_state(LOCK_XSYN_EXCL
); break;
5072 default: ceph_abort();
5076 if (lock
->is_rdlocked())
5078 if (lock
->is_wrlocked())
5081 if (in
->is_replicated() &&
5082 lock
->get_state() != LOCK_LOCK_EXCL
&&
5083 lock
->get_state() != LOCK_XSYN_EXCL
) { // if we were lock, replicas are already lock.
5084 send_lock_message(lock
, LOCK_AC_LOCK
);
5085 lock
->init_gather();
5088 if (lock
->is_leased()) {
5089 revoke_client_leases(lock
);
5092 if (in
->is_head() &&
5093 in
->issued_caps_need_gather(lock
)) {
5100 bool need_recover
= false;
5101 if (in
->state_test(CInode::STATE_NEEDSRECOVER
)) {
5102 mds
->mdcache
->queue_file_recover(in
);
5103 need_recover
= true;
5108 lock
->get_parent()->auth_pin(lock
);
5110 mds
->mdcache
->do_file_recover();
5112 lock
->set_state(LOCK_EXCL
);
5120 void Locker::file_xsyn(SimpleLock
*lock
, bool *need_issue
)
5122 dout(7) << "file_xsyn on " << *lock
<< " on " << *lock
->get_parent() << dendl
;
5123 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5124 ceph_assert(in
->is_auth());
5125 ceph_assert(in
->get_loner() >= 0 && in
->get_mds_caps_wanted().empty());
5127 switch (lock
->get_state()) {
5128 case LOCK_EXCL
: lock
->set_state(LOCK_EXCL_XSYN
); break;
5129 default: ceph_abort();
5133 if (lock
->is_wrlocked())
5136 if (in
->is_head() &&
5137 in
->issued_caps_need_gather(lock
)) {
5146 lock
->get_parent()->auth_pin(lock
);
5148 lock
->set_state(LOCK_XSYN
);
5149 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5157 void Locker::file_recover(ScatterLock
*lock
)
5159 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5160 dout(7) << "file_recover " << *lock
<< " on " << *in
<< dendl
;
5162 ceph_assert(in
->is_auth());
5163 //assert(lock->is_stable());
5164 ceph_assert(lock
->get_state() == LOCK_PRE_SCAN
); // only called from MDCache::start_files_to_recover()
5169 if (in->is_replicated()
5170 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5171 send_lock_message(lock, LOCK_AC_LOCK);
5172 lock->init_gather();
5176 if (in
->is_head() &&
5177 in
->issued_caps_need_gather(lock
)) {
5182 lock
->set_state(LOCK_SCAN
);
5184 in
->state_set(CInode::STATE_NEEDSRECOVER
);
5186 mds
->mdcache
->queue_file_recover(in
);
5191 void Locker::handle_file_lock(ScatterLock
*lock
, const MLock::const_ref
&m
)
5193 CInode
*in
= static_cast<CInode
*>(lock
->get_parent());
5194 int from
= m
->get_asker();
5196 if (mds
->is_rejoin()) {
5197 if (in
->is_rejoining()) {
5198 dout(7) << "handle_file_lock still rejoining " << *in
5199 << ", dropping " << *m
<< dendl
;
5204 dout(7) << "handle_file_lock a=" << lock
->get_lock_action_name(m
->get_action())
5206 << " from mds." << from
<< " "
5209 bool caps
= lock
->get_cap_shift();
5211 switch (m
->get_action()) {
5214 ceph_assert(lock
->get_state() == LOCK_LOCK
||
5215 lock
->get_state() == LOCK_MIX
||
5216 lock
->get_state() == LOCK_MIX_SYNC2
);
5218 if (lock
->get_state() == LOCK_MIX
) {
5219 lock
->set_state(LOCK_MIX_SYNC
);
5220 eval_gather(lock
, true);
5221 if (lock
->is_unstable_and_locked())
5222 mds
->mdlog
->flush();
5226 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5227 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5230 lock
->decode_locked_state(m
->get_data());
5231 lock
->set_state(LOCK_SYNC
);
5236 lock
->finish_waiters(SimpleLock::WAIT_RD
|SimpleLock::WAIT_STABLE
);
5241 switch (lock
->get_state()) {
5242 case LOCK_SYNC
: lock
->set_state(LOCK_SYNC_LOCK
); break;
5243 case LOCK_MIX
: lock
->set_state(LOCK_MIX_LOCK
); break;
5244 default: ceph_abort();
5247 eval_gather(lock
, true);
5248 if (lock
->is_unstable_and_locked())
5249 mds
->mdlog
->flush();
5253 case LOCK_AC_LOCKFLUSHED
:
5254 (static_cast<ScatterLock
*>(lock
))->finish_flush();
5255 (static_cast<ScatterLock
*>(lock
))->clear_flushed();
5256 // wake up scatter_nudge waiters
5257 if (lock
->is_stable())
5258 lock
->finish_waiters(SimpleLock::WAIT_STABLE
);
5262 ceph_assert(lock
->get_state() == LOCK_SYNC
||
5263 lock
->get_state() == LOCK_LOCK
||
5264 lock
->get_state() == LOCK_SYNC_MIX2
);
5266 if (lock
->get_state() == LOCK_SYNC
) {
5268 lock
->set_state(LOCK_SYNC_MIX
);
5269 eval_gather(lock
, true);
5270 if (lock
->is_unstable_and_locked())
5271 mds
->mdlog
->flush();
5276 lock
->set_state(LOCK_MIX
);
5277 lock
->decode_locked_state(m
->get_data());
5282 lock
->finish_waiters(SimpleLock::WAIT_WR
|SimpleLock::WAIT_STABLE
);
5287 case LOCK_AC_LOCKACK
:
5288 ceph_assert(lock
->get_state() == LOCK_SYNC_LOCK
||
5289 lock
->get_state() == LOCK_MIX_LOCK
||
5290 lock
->get_state() == LOCK_MIX_LOCK2
||
5291 lock
->get_state() == LOCK_MIX_EXCL
||
5292 lock
->get_state() == LOCK_SYNC_EXCL
||
5293 lock
->get_state() == LOCK_SYNC_MIX
||
5294 lock
->get_state() == LOCK_MIX_TSYN
);
5295 ceph_assert(lock
->is_gathering(from
));
5296 lock
->remove_gather(from
);
5298 if (lock
->get_state() == LOCK_MIX_LOCK
||
5299 lock
->get_state() == LOCK_MIX_LOCK2
||
5300 lock
->get_state() == LOCK_MIX_EXCL
||
5301 lock
->get_state() == LOCK_MIX_TSYN
) {
5302 lock
->decode_locked_state(m
->get_data());
5303 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5304 // delay calling scatter_writebehind().
5305 lock
->clear_flushed();
5308 if (lock
->is_gathering()) {
5309 dout(7) << "handle_file_lock " << *in
<< " from " << from
5310 << ", still gathering " << lock
->get_gather_set() << dendl
;
5312 dout(7) << "handle_file_lock " << *in
<< " from " << from
5313 << ", last one" << dendl
;
5318 case LOCK_AC_SYNCACK
:
5319 ceph_assert(lock
->get_state() == LOCK_MIX_SYNC
);
5320 ceph_assert(lock
->is_gathering(from
));
5321 lock
->remove_gather(from
);
5323 lock
->decode_locked_state(m
->get_data());
5325 if (lock
->is_gathering()) {
5326 dout(7) << "handle_file_lock " << *in
<< " from " << from
5327 << ", still gathering " << lock
->get_gather_set() << dendl
;
5329 dout(7) << "handle_file_lock " << *in
<< " from " << from
5330 << ", last one" << dendl
;
5335 case LOCK_AC_MIXACK
:
5336 ceph_assert(lock
->get_state() == LOCK_SYNC_MIX
);
5337 ceph_assert(lock
->is_gathering(from
));
5338 lock
->remove_gather(from
);
5340 if (lock
->is_gathering()) {
5341 dout(7) << "handle_file_lock " << *in
<< " from " << from
5342 << ", still gathering " << lock
->get_gather_set() << dendl
;
5344 dout(7) << "handle_file_lock " << *in
<< " from " << from
5345 << ", last one" << dendl
;
5352 case LOCK_AC_REQSCATTER
:
5353 if (lock
->is_stable()) {
5354 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5355 * because the replica should be holding an auth_pin if they're
5356 * doing this (and thus, we are freezing, not frozen, and indefinite
5357 * starvation isn't an issue).
5359 dout(7) << "handle_file_lock got scatter request on " << *lock
5360 << " on " << *lock
->get_parent() << dendl
;
5361 if (lock
->get_state() != LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5364 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5365 << " on " << *lock
->get_parent() << dendl
;
5366 lock
->set_scatter_wanted();
5370 case LOCK_AC_REQUNSCATTER
:
5371 if (lock
->is_stable()) {
5372 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5373 * because the replica should be holding an auth_pin if they're
5374 * doing this (and thus, we are freezing, not frozen, and indefinite
5375 * starvation isn't an issue).
5377 dout(7) << "handle_file_lock got unscatter request on " << *lock
5378 << " on " << *lock
->get_parent() << dendl
;
5379 if (lock
->get_state() == LOCK_MIX
) // i.e., the reqscatter didn't race with an actual mix/scatter
5380 simple_lock(lock
); // FIXME tempsync?
5382 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5383 << " on " << *lock
->get_parent() << dendl
;
5384 lock
->set_unscatter_wanted();
5388 case LOCK_AC_REQRDLOCK
:
5389 handle_reqrdlock(lock
, m
);
5393 if (!lock
->get_parent()->is_auth()) {
5394 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5395 << " on " << *lock
->get_parent() << dendl
;
5396 } else if (!lock
->get_parent()->is_replicated()) {
5397 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5398 << " on " << *lock
->get_parent() << dendl
;
5400 dout(7) << "handle_file_lock trying nudge on " << *lock
5401 << " on " << *lock
->get_parent() << dendl
;
5402 scatter_nudge(lock
, 0, true);
5403 mds
->mdlog
->flush();