]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
a73c4225cbf27d58986db974401249b03058faee
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "CDir.h"
17 #include "CDentry.h"
18 #include "CInode.h"
19 #include "common/config.h"
20 #include "events/EOpen.h"
21 #include "events/EUpdate.h"
22 #include "Locker.h"
23 #include "MDBalancer.h"
24 #include "MDCache.h"
25 #include "MDLog.h"
26 #include "MDSRank.h"
27 #include "MDSMap.h"
28 #include "messages/MInodeFileCaps.h"
29 #include "messages/MMDSPeerRequest.h"
30 #include "Migrator.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #define dout_subsys ceph_subsys_mds
35 #undef dout_prefix
36 #define dout_context g_ceph_context
37 #define dout_prefix _prefix(_dout, mds)
38
39 using namespace std;
40
41 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
42 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
43 }
44
45
46 class LockerContext : public MDSContext {
47 protected:
48 Locker *locker;
49 MDSRank *get_mds() override
50 {
51 return locker->mds;
52 }
53
54 public:
55 explicit LockerContext(Locker *locker_) : locker(locker_) {
56 ceph_assert(locker != NULL);
57 }
58 };
59
60 class LockerLogContext : public MDSLogContextBase {
61 protected:
62 Locker *locker;
63 MDSRank *get_mds() override
64 {
65 return locker->mds;
66 }
67
68 public:
69 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
70 ceph_assert(locker != NULL);
71 }
72 };
73
74 Locker::Locker(MDSRank *m, MDCache *c) :
75 need_snapflush_inodes(member_offset(CInode, item_caps)), mds(m), mdcache(c) {}
76
77
78 void Locker::dispatch(const cref_t<Message> &m)
79 {
80
81 switch (m->get_type()) {
82 // inter-mds locking
83 case MSG_MDS_LOCK:
84 handle_lock(ref_cast<MLock>(m));
85 break;
86 // inter-mds caps
87 case MSG_MDS_INODEFILECAPS:
88 handle_inode_file_caps(ref_cast<MInodeFileCaps>(m));
89 break;
90 // client sync
91 case CEPH_MSG_CLIENT_CAPS:
92 handle_client_caps(ref_cast<MClientCaps>(m));
93 break;
94 case CEPH_MSG_CLIENT_CAPRELEASE:
95 handle_client_cap_release(ref_cast<MClientCapRelease>(m));
96 break;
97 case CEPH_MSG_CLIENT_LEASE:
98 handle_client_lease(ref_cast<MClientLease>(m));
99 break;
100 default:
101 derr << "locker unknown message " << m->get_type() << dendl;
102 ceph_abort_msg("locker unknown message");
103 }
104 }
105
106 void Locker::tick()
107 {
108 scatter_tick();
109 caps_tick();
110 }
111
112 /*
113 * locks vs rejoin
114 *
115 *
116 *
117 */
118
119 void Locker::send_lock_message(SimpleLock *lock, int msg)
120 {
121 for (const auto &it : lock->get_parent()->get_replicas()) {
122 if (mds->is_cluster_degraded() &&
123 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
124 continue;
125 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
126 mds->send_message_mds(m, it.first);
127 }
128 }
129
130 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
131 {
132 for (const auto &it : lock->get_parent()->get_replicas()) {
133 if (mds->is_cluster_degraded() &&
134 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
135 continue;
136 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
137 m->set_data(data);
138 mds->send_message_mds(m, it.first);
139 }
140 }
141
142 bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
143 int n, bool want_layout)
144 {
145 dout(10) << __func__ << " " << *mdr << " " << *in << dendl;
146 // rdlock ancestor snaps
147 inodeno_t root;
148 int depth = -1;
149 bool found_locked = false;
150 bool found_layout = false;
151
152 if (want_layout)
153 ceph_assert(n == 0);
154
155 client_t client = mdr->get_client();
156
157 CInode *t = in;
158 while (true) {
159 ++depth;
160 if (!found_locked && mdr->is_rdlocked(&t->snaplock))
161 found_locked = true;
162
163 if (!found_locked) {
164 if (!t->snaplock.can_rdlock(client)) {
165 t->snaplock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
166 goto failed;
167 }
168 t->snaplock.get_rdlock();
169 mdr->locks.emplace(&t->snaplock, MutationImpl::LockOp::RDLOCK);
170 dout(20) << " got rdlock on " << t->snaplock << " " << *t << dendl;
171 }
172 if (want_layout && !found_layout) {
173 if (!mdr->is_rdlocked(&t->policylock)) {
174 if (!t->policylock.can_rdlock(client)) {
175 t->policylock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
176 goto failed;
177 }
178 t->policylock.get_rdlock();
179 mdr->locks.emplace(&t->policylock, MutationImpl::LockOp::RDLOCK);
180 dout(20) << " got rdlock on " << t->policylock << " " << *t << dendl;
181 }
182 if (t->get_projected_inode()->has_layout()) {
183 mdr->dir_layout = t->get_projected_inode()->layout;
184 found_layout = true;
185 }
186 }
187 CDentry* pdn = t->get_projected_parent_dn();
188 if (!pdn) {
189 root = t->ino();
190 break;
191 }
192 t = pdn->get_dir()->get_inode();
193 }
194
195 mdr->dir_root[n] = root;
196 mdr->dir_depth[n] = depth;
197 return true;
198
199 failed:
200 dout(10) << __func__ << " failed" << dendl;
201
202 drop_locks(mdr.get(), nullptr);
203 mdr->drop_local_auth_pins();
204 return false;
205 }
206
207 struct MarkEventOnDestruct {
208 MDRequestRef& mdr;
209 std::string_view message;
210 bool mark_event;
211 MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
212 mdr(_mdr),
213 message(_message),
214 mark_event(true) {}
215 ~MarkEventOnDestruct() {
216 if (mark_event)
217 mdr->mark_event(message);
218 }
219 };
220
221 /* If this function returns false, the mdr has been placed
222 * on the appropriate wait list */
223 bool Locker::acquire_locks(MDRequestRef& mdr,
224 MutationImpl::LockOpVec& lov,
225 CInode *auth_pin_freeze,
226 bool auth_pin_nonblocking)
227 {
228 dout(10) << "acquire_locks " << *mdr << dendl;
229
230 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
231
232 client_t client = mdr->get_client();
233
234 set<MDSCacheObject*> mustpin; // items to authpin
235 if (auth_pin_freeze)
236 mustpin.insert(auth_pin_freeze);
237
238 // xlocks
239 for (size_t i = 0; i < lov.size(); ++i) {
240 auto& p = lov[i];
241 SimpleLock *lock = p.lock;
242 MDSCacheObject *object = lock->get_parent();
243
244 if (p.is_xlock()) {
245 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
246 lock->get_type() == CEPH_LOCK_IPOLICY) &&
247 mds->is_cluster_degraded() &&
248 mdr->is_leader() &&
249 !mdr->is_queued_for_replay()) {
250 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
251 // get processed in proper order.
252 bool wait = false;
253 if (object->is_auth()) {
254 if (!mdr->is_xlocked(lock)) {
255 set<mds_rank_t> ls;
256 object->list_replicas(ls);
257 for (auto m : ls) {
258 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
259 wait = true;
260 break;
261 }
262 }
263 }
264 } else {
265 // if the lock is the latest locked one, it's possible that peer mds got the lock
266 // while there are recovering mds.
267 if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock))
268 wait = true;
269 }
270 if (wait) {
271 dout(10) << " must xlock " << *lock << " " << *object
272 << ", waiting for cluster recovered" << dendl;
273 mds->locker->drop_locks(mdr.get(), NULL);
274 mdr->drop_local_auth_pins();
275 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
276 return false;
277 }
278 }
279
280 dout(20) << " must xlock " << *lock << " " << *object << dendl;
281
282 mustpin.insert(object);
283
284 // augment xlock with a versionlock?
285 if (lock->get_type() == CEPH_LOCK_DN) {
286 CDentry *dn = static_cast<CDentry*>(object);
287 if (!dn->is_auth())
288 continue;
289 if (mdr->is_leader()) {
290 // leader. wrlock versionlock so we can pipeline dentry updates to journal.
291 lov.add_wrlock(&dn->versionlock, i + 1);
292 } else {
293 // peer. exclusively lock the dentry version (i.e. block other journal updates).
294 // this makes rollback safe.
295 lov.add_xlock(&dn->versionlock, i + 1);
296 }
297 }
298 if (lock->get_type() >= CEPH_LOCK_IFIRST && lock->get_type() != CEPH_LOCK_IVERSION) {
299 // inode version lock?
300 CInode *in = static_cast<CInode*>(object);
301 if (!in->is_auth())
302 continue;
303 if (mdr->is_leader()) {
304 // leader. wrlock versionlock so we can pipeline inode updates to journal.
305 lov.add_wrlock(&in->versionlock, i + 1);
306 } else {
307 // peer. exclusively lock the inode version (i.e. block other journal updates).
308 // this makes rollback safe.
309 lov.add_xlock(&in->versionlock, i + 1);
310 }
311 }
312 } else if (p.is_wrlock()) {
313 dout(20) << " must wrlock " << *lock << " " << *object << dendl;
314 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
315 if (object->is_auth()) {
316 mustpin.insert(object);
317 } else if (!object->is_auth() &&
318 !lock->can_wrlock(_client) && // we might have to request a scatter
319 !mdr->is_peer()) { // if we are peer (remote_wrlock), the leader already authpinned
320 dout(15) << " will also auth_pin " << *object
321 << " in case we need to request a scatter" << dendl;
322 mustpin.insert(object);
323 }
324 } else if (p.is_remote_wrlock()) {
325 dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
326 << *lock << " " << *object << dendl;
327 mustpin.insert(object);
328 } else if (p.is_rdlock()) {
329
330 dout(20) << " must rdlock " << *lock << " " << *object << dendl;
331 if (object->is_auth()) {
332 mustpin.insert(object);
333 } else if (!object->is_auth() &&
334 !lock->can_rdlock(client)) { // we might have to request an rdlock
335 dout(15) << " will also auth_pin " << *object
336 << " in case we need to request a rdlock" << dendl;
337 mustpin.insert(object);
338 }
339 } else {
340 ceph_assert(0 == "locker unknown lock operation");
341 }
342 }
343
344 lov.sort_and_merge();
345
346 // AUTH PINS
347 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
348
349 // can i auth pin them all now?
350 marker.message = "failed to authpin local pins";
351 for (const auto &p : mustpin) {
352 MDSCacheObject *object = p;
353
354 dout(10) << " must authpin " << *object << dendl;
355
356 if (mdr->is_auth_pinned(object)) {
357 if (object != (MDSCacheObject*)auth_pin_freeze)
358 continue;
359 if (mdr->more()->is_remote_frozen_authpin) {
360 if (mdr->more()->rename_inode == auth_pin_freeze)
361 continue;
362 // unfreeze auth pin for the wrong inode
363 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
364 }
365 }
366
367 if (!object->is_auth()) {
368 if (mdr->lock_cache) { // debug
369 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
370 CDentry *dn = mdr->dn[0].back();
371 ceph_assert(dn->get_projected_linkage()->is_remote());
372 }
373
374 if (object->is_ambiguous_auth()) {
375 // wait
376 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
377 mdr->disable_lock_cache();
378 drop_locks(mdr.get());
379 mdr->drop_local_auth_pins();
380 marker.message = "waiting for single auth, object is being migrated";
381 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
382 return false;
383 }
384 mustpin_remote[object->authority().first].insert(object);
385 continue;
386 }
387 int err = 0;
388 if (!object->can_auth_pin(&err)) {
389 if (mdr->lock_cache) {
390 CDir *dir;
391 if (CInode *in = dynamic_cast<CInode*>(object)) {
392 ceph_assert(!in->is_frozen_inode() && !in->is_frozen_auth_pin());
393 dir = in->get_projected_parent_dir();
394 } else if (CDentry *dn = dynamic_cast<CDentry*>(object)) {
395 dir = dn->get_dir();
396 } else {
397 ceph_assert(0 == "unknown type of lock parent");
398 }
399 if (dir->get_inode() == mdr->lock_cache->get_dir_inode()) {
400 // forcibly auth pin if there is lock cache on parent dir
401 continue;
402 }
403
404 { // debug
405 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
406 CDentry *dn = mdr->dn[0].back();
407 ceph_assert(dn->get_projected_linkage()->is_remote());
408 }
409 }
410
411 // wait
412 mdr->disable_lock_cache();
413 drop_locks(mdr.get());
414 mdr->drop_local_auth_pins();
415 if (auth_pin_nonblocking) {
416 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
417 mdr->aborted = true;
418 return false;
419 }
420 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
421 marker.message = "failed to authpin, subtree is being exported";
422 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
423 marker.message = "failed to authpin, dir is being fragmented";
424 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
425 marker.message = "failed to authpin, inode is being exported";
426 }
427 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
428 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
429
430 if (mdr->is_any_remote_auth_pin())
431 notify_freeze_waiter(object);
432
433 return false;
434 }
435 }
436
437 // ok, grab local auth pins
438 for (const auto& p : mustpin) {
439 MDSCacheObject *object = p;
440 if (mdr->is_auth_pinned(object)) {
441 dout(10) << " already auth_pinned " << *object << dendl;
442 } else if (object->is_auth()) {
443 dout(10) << " auth_pinning " << *object << dendl;
444 mdr->auth_pin(object);
445 }
446 }
447
448 // request remote auth_pins
449 if (!mustpin_remote.empty()) {
450 marker.message = "requesting remote authpins";
451 for (const auto& p : mdr->object_states) {
452 if (p.second.remote_auth_pinned == MDS_RANK_NONE)
453 continue;
454 ceph_assert(p.second.remote_auth_pinned == p.first->authority().first);
455 auto q = mustpin_remote.find(p.second.remote_auth_pinned);
456 if (q != mustpin_remote.end())
457 q->second.insert(p.first);
458 }
459
460 for (auto& p : mustpin_remote) {
461 dout(10) << "requesting remote auth_pins from mds." << p.first << dendl;
462
463 // wait for active auth
464 if (mds->is_cluster_degraded() &&
465 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) {
466 dout(10) << " mds." << p.first << " is not active" << dendl;
467 if (mdr->more()->waiting_on_peer.empty())
468 mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr));
469 return false;
470 }
471
472 auto req = make_message<MMDSPeerRequest>(mdr->reqid, mdr->attempt,
473 MMDSPeerRequest::OP_AUTHPIN);
474 for (auto& o : p.second) {
475 dout(10) << " req remote auth_pin of " << *o << dendl;
476 MDSCacheObjectInfo info;
477 o->set_object_info(info);
478 req->get_authpins().push_back(info);
479 if (o == auth_pin_freeze)
480 o->set_object_info(req->get_authpin_freeze());
481 mdr->pin(o);
482 }
483 if (auth_pin_nonblocking)
484 req->mark_nonblocking();
485 else if (!mdr->locks.empty())
486 req->mark_notify_blocking();
487
488 mds->send_message_mds(req, p.first);
489
490 // put in waiting list
491 auto ret = mdr->more()->waiting_on_peer.insert(p.first);
492 ceph_assert(ret.second);
493 }
494 return false;
495 }
496
497 // caps i'll need to issue
498 set<CInode*> issue_set;
499 bool result = false;
500
501 // acquire locks.
502 // make sure they match currently acquired locks.
503 for (const auto& p : lov) {
504 auto lock = p.lock;
505 if (p.is_xlock()) {
506 if (mdr->is_xlocked(lock)) {
507 dout(10) << " already xlocked " << *lock << " " << *lock->get_parent() << dendl;
508 continue;
509 }
510 if (mdr->locking && lock != mdr->locking)
511 cancel_locking(mdr.get(), &issue_set);
512 if (!xlock_start(lock, mdr)) {
513 marker.message = "failed to xlock, waiting";
514 goto out;
515 }
516 dout(10) << " got xlock on " << *lock << " " << *lock->get_parent() << dendl;
517 } else if (p.is_wrlock() || p.is_remote_wrlock()) {
518 auto it = mdr->locks.find(lock);
519 if (p.is_remote_wrlock()) {
520 if (it != mdr->locks.end() && it->is_remote_wrlock()) {
521 dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl;
522 } else {
523 if (mdr->locking && lock != mdr->locking)
524 cancel_locking(mdr.get(), &issue_set);
525 marker.message = "waiting for remote wrlocks";
526 remote_wrlock_start(lock, p.wrlock_target, mdr);
527 goto out;
528 }
529 }
530 if (p.is_wrlock()) {
531 if (it != mdr->locks.end() && it->is_wrlock()) {
532 dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl;
533 continue;
534 }
535 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
536 if (p.is_remote_wrlock()) {
537 // nowait if we have already gotten remote wrlock
538 if (!wrlock_try(lock, mdr, _client)) {
539 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
540 // can't take the wrlock because the scatter lock is gathering. need to
541 // release the remote wrlock, so that the gathering process can finish.
542 ceph_assert(it != mdr->locks.end());
543 remote_wrlock_finish(it, mdr.get());
544 remote_wrlock_start(lock, p.wrlock_target, mdr);
545 goto out;
546 }
547 } else {
548 if (!wrlock_start(p, mdr)) {
549 ceph_assert(!p.is_remote_wrlock());
550 marker.message = "failed to wrlock, waiting";
551 goto out;
552 }
553 }
554 dout(10) << " got wrlock on " << *lock << " " << *lock->get_parent() << dendl;
555 }
556 } else {
557 if (mdr->is_rdlocked(lock)) {
558 dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl;
559 continue;
560 }
561
562 ceph_assert(mdr->is_leader());
563 if (lock->needs_recover()) {
564 if (mds->is_cluster_degraded()) {
565 if (!mdr->is_queued_for_replay()) {
566 // see comments in SimpleLock::set_state_rejoin() and
567 // ScatterLock::encode_state_for_rejoin()
568 drop_locks(mdr.get());
569 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
570 dout(10) << " rejoin recovering " << *lock << " " << *lock->get_parent()
571 << ", waiting for cluster recovered" << dendl;
572 marker.message = "rejoin recovering lock, waiting for cluster recovered";
573 return false;
574 }
575 } else {
576 lock->clear_need_recover();
577 }
578 }
579
580 if (!rdlock_start(lock, mdr)) {
581 marker.message = "failed to rdlock, waiting";
582 goto out;
583 }
584 dout(10) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
585 }
586 }
587
588 mdr->set_mds_stamp(ceph_clock_now());
589 result = true;
590 marker.message = "acquired locks";
591
592 out:
593 issue_caps_set(issue_set);
594 return result;
595 }
596
597 void Locker::notify_freeze_waiter(MDSCacheObject *o)
598 {
599 CDir *dir = NULL;
600 if (CInode *in = dynamic_cast<CInode*>(o)) {
601 if (!in->is_root())
602 dir = in->get_parent_dir();
603 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
604 dir = dn->get_dir();
605 } else {
606 dir = dynamic_cast<CDir*>(o);
607 ceph_assert(dir);
608 }
609 if (dir) {
610 if (dir->is_freezing_dir())
611 mdcache->fragment_freeze_inc_num_waiters(dir);
612 if (dir->is_freezing_tree()) {
613 while (!dir->is_freezing_tree_root())
614 dir = dir->get_parent_dir();
615 mdcache->migrator->export_freeze_inc_num_waiters(dir);
616 }
617 }
618 }
619
620 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
621 {
622 for (const auto &p : mut->locks) {
623 if (!p.is_xlock())
624 continue;
625 MDSCacheObject *obj = p.lock->get_parent();
626 ceph_assert(obj->is_auth());
627 if (skip_dentry &&
628 (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
629 continue;
630 dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
631 p.lock->set_xlock_done();
632 }
633 }
634
635 void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
636 bool drop_rdlocks)
637 {
638 set<mds_rank_t> peers;
639
640 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
641 SimpleLock *lock = it->lock;
642 MDSCacheObject *obj = lock->get_parent();
643
644 if (it->is_xlock()) {
645 if (obj->is_auth()) {
646 bool ni = false;
647 xlock_finish(it++, mut, &ni);
648 if (ni)
649 pneed_issue->insert(static_cast<CInode*>(obj));
650 } else {
651 ceph_assert(lock->get_sm()->can_remote_xlock);
652 peers.insert(obj->authority().first);
653 lock->put_xlock();
654 mut->locks.erase(it++);
655 }
656 } else if (it->is_wrlock() || it->is_remote_wrlock()) {
657 if (it->is_remote_wrlock()) {
658 peers.insert(it->wrlock_target);
659 it->clear_remote_wrlock();
660 }
661 if (it->is_wrlock()) {
662 bool ni = false;
663 wrlock_finish(it++, mut, &ni);
664 if (ni)
665 pneed_issue->insert(static_cast<CInode*>(obj));
666 } else {
667 mut->locks.erase(it++);
668 }
669 } else if (drop_rdlocks && it->is_rdlock()) {
670 bool ni = false;
671 rdlock_finish(it++, mut, &ni);
672 if (ni)
673 pneed_issue->insert(static_cast<CInode*>(obj));
674 } else {
675 ++it;
676 }
677 }
678
679 if (drop_rdlocks) {
680 if (mut->lock_cache) {
681 put_lock_cache(mut->lock_cache);
682 mut->lock_cache = nullptr;
683 }
684 }
685
686 for (set<mds_rank_t>::iterator p = peers.begin(); p != peers.end(); ++p) {
687 if (!mds->is_cluster_degraded() ||
688 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
689 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
690 auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt,
691 MMDSPeerRequest::OP_DROPLOCKS);
692 mds->send_message_mds(peerreq, *p);
693 }
694 }
695 }
696
697 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
698 {
699 SimpleLock *lock = mut->locking;
700 ceph_assert(lock);
701 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
702
703 if (lock->get_parent()->is_auth()) {
704 bool need_issue = false;
705 if (lock->get_state() == LOCK_PREXLOCK) {
706 _finish_xlock(lock, -1, &need_issue);
707 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
708 lock->set_state(LOCK_XLOCKDONE);
709 eval_gather(lock, true, &need_issue);
710 }
711 if (need_issue)
712 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
713 }
714 mut->finish_locking(lock);
715 }
716
717 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
718 {
719 // leftover locks
720 set<CInode*> my_need_issue;
721 if (!pneed_issue)
722 pneed_issue = &my_need_issue;
723
724 if (mut->locking)
725 cancel_locking(mut, pneed_issue);
726 _drop_locks(mut, pneed_issue, true);
727
728 if (pneed_issue == &my_need_issue)
729 issue_caps_set(*pneed_issue);
730 mut->locking_state = 0;
731 }
732
733 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
734 {
735 set<CInode*> my_need_issue;
736 if (!pneed_issue)
737 pneed_issue = &my_need_issue;
738
739 _drop_locks(mut, pneed_issue, false);
740
741 if (pneed_issue == &my_need_issue)
742 issue_caps_set(*pneed_issue);
743 }
744
745 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
746 {
747 set<CInode*> need_issue;
748
749 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
750 if (!it->is_rdlock()) {
751 ++it;
752 continue;
753 }
754 SimpleLock *lock = it->lock;
755 // make later mksnap/setlayout (at other mds) wait for this unsafe request
756 if (lock->get_type() == CEPH_LOCK_ISNAP ||
757 lock->get_type() == CEPH_LOCK_IPOLICY) {
758 ++it;
759 continue;
760 }
761 bool ni = false;
762 rdlock_finish(it++, mut, &ni);
763 if (ni)
764 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
765 }
766
767 issue_caps_set(need_issue);
768 }
769
770 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
771 {
772 set<CInode*> need_issue;
773
774 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
775 SimpleLock *lock = it->lock;
776 if (lock->get_type() == CEPH_LOCK_IDFT) {
777 ++it;
778 continue;
779 }
780 bool ni = false;
781 wrlock_finish(it++, mut, &ni);
782 if (ni)
783 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
784 }
785 issue_caps_set(need_issue);
786 }
787
788 class C_MDL_DropCache : public LockerContext {
789 MDLockCache *lock_cache;
790 public:
791 C_MDL_DropCache(Locker *l, MDLockCache *lc) :
792 LockerContext(l), lock_cache(lc) { }
793 void finish(int r) override {
794 locker->drop_locks(lock_cache);
795 lock_cache->cleanup();
796 delete lock_cache;
797 }
798 };
799
800 void Locker::put_lock_cache(MDLockCache* lock_cache)
801 {
802 ceph_assert(lock_cache->ref > 0);
803 if (--lock_cache->ref > 0)
804 return;
805
806 ceph_assert(lock_cache->invalidating);
807
808 lock_cache->detach_locks();
809
810 CInode *diri = lock_cache->get_dir_inode();
811 for (auto dir : lock_cache->auth_pinned_dirfrags) {
812 if (dir->get_inode() != diri)
813 continue;
814 dir->enable_frozen_inode();
815 }
816
817 mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
818 }
819
820 int Locker::get_cap_bit_for_lock_cache(int op)
821 {
822 switch(op) {
823 case CEPH_MDS_OP_CREATE:
824 return CEPH_CAP_DIR_CREATE;
825 case CEPH_MDS_OP_UNLINK:
826 return CEPH_CAP_DIR_UNLINK;
827 default:
828 ceph_assert(0 == "unsupported operation");
829 return 0;
830 }
831 }
832
833 void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
834 {
835 ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
836 if (lock_cache->invalidating) {
837 ceph_assert(!lock_cache->client_cap);
838 } else {
839 lock_cache->invalidating = true;
840 lock_cache->detach_dirfrags();
841 }
842
843 Capability *cap = lock_cache->client_cap;
844 if (cap) {
845 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
846 cap->clear_lock_cache_allowed(cap_bit);
847 if (cap->issued() & cap_bit)
848 issue_caps(lock_cache->get_dir_inode(), cap);
849 else
850 cap = nullptr;
851 }
852
853 if (!cap) {
854 lock_cache->item_cap_lock_cache.remove_myself();
855 put_lock_cache(lock_cache);
856 }
857 }
858
859 void Locker::eval_lock_caches(Capability *cap)
860 {
861 for (auto p = cap->lock_caches.begin(); !p.end(); ) {
862 MDLockCache *lock_cache = *p;
863 ++p;
864 if (!lock_cache->invalidating)
865 continue;
866 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
867 if (!(cap->issued() & cap_bit)) {
868 lock_cache->item_cap_lock_cache.remove_myself();
869 put_lock_cache(lock_cache);
870 }
871 }
872 }
873
874 // ask lock caches to release auth pins
875 void Locker::invalidate_lock_caches(CDir *dir)
876 {
877 dout(10) << "invalidate_lock_caches on " << *dir << dendl;
878 auto &lock_caches = dir->lock_caches_with_auth_pins;
879 while (!lock_caches.empty()) {
880 invalidate_lock_cache(lock_caches.front()->parent);
881 }
882 }
883
884 // ask lock caches to release locks
885 void Locker::invalidate_lock_caches(SimpleLock *lock)
886 {
887 dout(10) << "invalidate_lock_caches " << *lock << " on " << *lock->get_parent() << dendl;
888 if (lock->is_cached()) {
889 auto&& lock_caches = lock->get_active_caches();
890 for (auto& lc : lock_caches)
891 invalidate_lock_cache(lc);
892 }
893 }
894
895 void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout)
896 {
897 if (mdr->lock_cache)
898 return;
899
900 client_t client = mdr->get_client();
901 int opcode = mdr->client_request->get_op();
902 dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
903
904 if (!diri->is_auth()) {
905 dout(10) << " dir inode is not auth, noop" << dendl;
906 return;
907 }
908
909 if (mdr->has_more() && !mdr->more()->peers.empty()) {
910 dout(10) << " there are peers requests for " << *mdr << ", noop" << dendl;
911 return;
912 }
913
914 Capability *cap = diri->get_client_cap(client);
915 if (!cap) {
916 dout(10) << " there is no cap for client." << client << ", noop" << dendl;
917 return;
918 }
919
920 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
921 if ((*p)->opcode == opcode) {
922 dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode) << ", noop" << dendl;
923 return;
924 }
925 }
926
927 set<MDSCacheObject*> ancestors;
928 for (CInode *in = diri; ; ) {
929 CDentry *pdn = in->get_projected_parent_dn();
930 if (!pdn)
931 break;
932 // ancestors.insert(pdn);
933 in = pdn->get_dir()->get_inode();
934 ancestors.insert(in);
935 }
936
937 for (auto& p : mdr->object_states) {
938 if (p.first != diri && !ancestors.count(p.first))
939 continue;
940 auto& stat = p.second;
941 if (stat.auth_pinned) {
942 if (!p.first->can_auth_pin()) {
943 dout(10) << " can't auth_pin(freezing?) lock parent " << *p.first << ", noop" << dendl;
944 return;
945 }
946 if (CInode *in = dynamic_cast<CInode*>(p.first); in->is_parent_projected()) {
947 CDir *dir = in->get_projected_parent_dir();
948 if (!dir->can_auth_pin()) {
949 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
950 return;
951 }
952 }
953 }
954 }
955
956 std::vector<CDir*> dfv;
957 dfv.reserve(diri->get_num_dirfrags());
958
959 diri->get_dirfrags(dfv);
960 for (auto dir : dfv) {
961 if (!dir->is_auth() || !dir->can_auth_pin()) {
962 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
963 return;
964 }
965 if (dir->is_any_freezing_or_frozen_inode()) {
966 dout(10) << " there is freezing/frozen inode in " << *dir << ", noop" << dendl;
967 return;
968 }
969 }
970
971 for (auto& p : mdr->locks) {
972 MDSCacheObject *obj = p.lock->get_parent();
973 if (obj != diri && !ancestors.count(obj))
974 continue;
975 if (!p.lock->is_stable()) {
976 dout(10) << " unstable " << *p.lock << " on " << *obj << ", noop" << dendl;
977 return;
978 }
979 }
980
981 auto lock_cache = new MDLockCache(cap, opcode);
982 if (dir_layout)
983 lock_cache->set_dir_layout(*dir_layout);
984 cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
985
986 for (auto dir : dfv) {
987 // prevent subtree migration
988 lock_cache->auth_pin(dir);
989 // prevent frozen inode
990 dir->disable_frozen_inode();
991 }
992
993 for (auto& p : mdr->object_states) {
994 if (p.first != diri && !ancestors.count(p.first))
995 continue;
996 auto& stat = p.second;
997 if (stat.auth_pinned)
998 lock_cache->auth_pin(p.first);
999 else
1000 lock_cache->pin(p.first);
1001
1002 if (CInode *in = dynamic_cast<CInode*>(p.first)) {
1003 CDentry *pdn = in->get_projected_parent_dn();
1004 if (pdn)
1005 dfv.push_back(pdn->get_dir());
1006 } else if (CDentry *dn = dynamic_cast<CDentry*>(p.first)) {
1007 dfv.push_back(dn->get_dir());
1008 } else {
1009 ceph_assert(0 == "unknown type of lock parent");
1010 }
1011 }
1012 lock_cache->attach_dirfrags(std::move(dfv));
1013
1014 for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) {
1015 MDSCacheObject *obj = it->lock->get_parent();
1016 if (obj != diri && !ancestors.count(obj)) {
1017 ++it;
1018 continue;
1019 }
1020 unsigned lock_flag = 0;
1021 if (it->is_wrlock()) {
1022 // skip wrlocks that were added by MDCache::predirty_journal_parent()
1023 if (obj == diri)
1024 lock_flag = MutationImpl::LockOp::WRLOCK;
1025 } else {
1026 ceph_assert(it->is_rdlock());
1027 lock_flag = MutationImpl::LockOp::RDLOCK;
1028 }
1029 if (lock_flag) {
1030 lock_cache->emplace_lock(it->lock, lock_flag);
1031 mdr->locks.erase(it++);
1032 } else {
1033 ++it;
1034 }
1035 }
1036 lock_cache->attach_locks();
1037
1038 lock_cache->ref++;
1039 mdr->lock_cache = lock_cache;
1040 }
1041
1042 bool Locker::find_and_attach_lock_cache(MDRequestRef& mdr, CInode *diri)
1043 {
1044 if (mdr->lock_cache)
1045 return true;
1046
1047 Capability *cap = diri->get_client_cap(mdr->get_client());
1048 if (!cap)
1049 return false;
1050
1051 int opcode = mdr->client_request->get_op();
1052 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
1053 MDLockCache *lock_cache = *p;
1054 if (lock_cache->opcode == opcode) {
1055 dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl;
1056 mdr->lock_cache = lock_cache;
1057 mdr->lock_cache->ref++;
1058 return true;
1059 }
1060 }
1061 return false;
1062 }
1063
1064 // generics
1065
1066 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
1067 {
1068 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
1069 ceph_assert(!lock->is_stable());
1070
1071 int next = lock->get_next_state();
1072
1073 CInode *in = 0;
1074 bool caps = lock->get_cap_shift();
1075 if (lock->get_type() != CEPH_LOCK_DN)
1076 in = static_cast<CInode *>(lock->get_parent());
1077
1078 bool need_issue = false;
1079
1080 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
1081 ceph_assert(!caps || in != NULL);
1082 if (caps && in->is_head()) {
1083 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
1084 lock->get_cap_shift(), lock->get_cap_mask());
1085 dout(10) << " next state is " << lock->get_state_name(next)
1086 << " issued/allows loner " << gcap_string(loner_issued)
1087 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
1088 << " xlocker " << gcap_string(xlocker_issued)
1089 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
1090 << " other " << gcap_string(other_issued)
1091 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
1092 << dendl;
1093
1094 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
1095 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
1096 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
1097 need_issue = true;
1098 }
1099
1100 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
1101 bool auth = lock->get_parent()->is_auth();
1102 if (!lock->is_gathering() &&
1103 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
1104 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
1105 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
1106 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
1107 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
1108 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
1109 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
1110 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
1111 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
1112 lock->get_state() != LOCK_MIX_SYNC2
1113 ) {
1114 dout(7) << "eval_gather finished gather on " << *lock
1115 << " on " << *lock->get_parent() << dendl;
1116
1117 if (lock->get_sm() == &sm_filelock) {
1118 ceph_assert(in);
1119 if (in->state_test(CInode::STATE_RECOVERING)) {
1120 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
1121 return;
1122 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
1123 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
1124 mds->mdcache->queue_file_recover(in);
1125 mds->mdcache->do_file_recover();
1126 return;
1127 }
1128 }
1129
1130 if (!lock->get_parent()->is_auth()) {
1131 // replica: tell auth
1132 mds_rank_t auth = lock->get_parent()->authority().first;
1133
1134 if (lock->get_parent()->is_rejoining() &&
1135 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
1136 dout(7) << "eval_gather finished gather, but still rejoining "
1137 << *lock->get_parent() << dendl;
1138 return;
1139 }
1140
1141 if (!mds->is_cluster_degraded() ||
1142 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1143 switch (lock->get_state()) {
1144 case LOCK_SYNC_LOCK:
1145 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
1146 break;
1147
1148 case LOCK_MIX_SYNC:
1149 {
1150 auto reply = make_message<MLock>(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
1151 lock->encode_locked_state(reply->get_data());
1152 mds->send_message_mds(reply, auth);
1153 next = LOCK_MIX_SYNC2;
1154 (static_cast<ScatterLock *>(lock))->start_flush();
1155 }
1156 break;
1157
1158 case LOCK_MIX_SYNC2:
1159 (static_cast<ScatterLock *>(lock))->finish_flush();
1160 (static_cast<ScatterLock *>(lock))->clear_flushed();
1161
1162 case LOCK_SYNC_MIX2:
1163 // do nothing, we already acked
1164 break;
1165
1166 case LOCK_SYNC_MIX:
1167 {
1168 auto reply = make_message<MLock>(lock, LOCK_AC_MIXACK, mds->get_nodeid());
1169 mds->send_message_mds(reply, auth);
1170 next = LOCK_SYNC_MIX2;
1171 }
1172 break;
1173
1174 case LOCK_MIX_LOCK:
1175 {
1176 bufferlist data;
1177 lock->encode_locked_state(data);
1178 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
1179 (static_cast<ScatterLock *>(lock))->start_flush();
1180 // we'll get an AC_LOCKFLUSHED to complete
1181 }
1182 break;
1183
1184 default:
1185 ceph_abort();
1186 }
1187 }
1188 } else {
1189 // auth
1190
1191 // once the first (local) stage of mix->lock gather complete we can
1192 // gather from replicas
1193 if (lock->get_state() == LOCK_MIX_LOCK &&
1194 lock->get_parent()->is_replicated()) {
1195 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
1196 send_lock_message(lock, LOCK_AC_LOCK);
1197 lock->init_gather();
1198 lock->set_state(LOCK_MIX_LOCK2);
1199 return;
1200 }
1201
1202 if (lock->is_dirty() && !lock->is_flushed()) {
1203 scatter_writebehind(static_cast<ScatterLock *>(lock));
1204 return;
1205 }
1206 lock->clear_flushed();
1207
1208 switch (lock->get_state()) {
1209 // to mixed
1210 case LOCK_TSYN_MIX:
1211 case LOCK_SYNC_MIX:
1212 case LOCK_EXCL_MIX:
1213 case LOCK_XSYN_MIX:
1214 in->start_scatter(static_cast<ScatterLock *>(lock));
1215 if (lock->get_parent()->is_replicated()) {
1216 bufferlist softdata;
1217 lock->encode_locked_state(softdata);
1218 send_lock_message(lock, LOCK_AC_MIX, softdata);
1219 }
1220 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
1221 break;
1222
1223 case LOCK_XLOCK:
1224 case LOCK_XLOCKDONE:
1225 if (next != LOCK_SYNC)
1226 break;
1227 // fall-thru
1228
1229 // to sync
1230 case LOCK_EXCL_SYNC:
1231 case LOCK_LOCK_SYNC:
1232 case LOCK_MIX_SYNC:
1233 case LOCK_XSYN_SYNC:
1234 if (lock->get_parent()->is_replicated()) {
1235 bufferlist softdata;
1236 lock->encode_locked_state(softdata);
1237 send_lock_message(lock, LOCK_AC_SYNC, softdata);
1238 }
1239 break;
1240 }
1241
1242 }
1243
1244 lock->set_state(next);
1245
1246 if (lock->get_parent()->is_auth() &&
1247 lock->is_stable())
1248 lock->get_parent()->auth_unpin(lock);
1249
1250 // drop loner before doing waiters
1251 if (caps &&
1252 in->is_head() &&
1253 in->is_auth() &&
1254 in->get_wanted_loner() != in->get_loner()) {
1255 dout(10) << " trying to drop loner" << dendl;
1256 if (in->try_drop_loner()) {
1257 dout(10) << " dropped loner" << dendl;
1258 need_issue = true;
1259 }
1260 }
1261
1262 if (pfinishers)
1263 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1264 *pfinishers);
1265 else
1266 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1267
1268 if (caps && in->is_head())
1269 need_issue = true;
1270
1271 if (lock->get_parent()->is_auth() &&
1272 lock->is_stable())
1273 try_eval(lock, &need_issue);
1274 }
1275
1276 if (need_issue) {
1277 if (pneed_issue)
1278 *pneed_issue = true;
1279 else if (in->is_head())
1280 issue_caps(in);
1281 }
1282
1283 }
1284
1285 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1286 {
1287 bool need_issue = caps_imported;
1288 MDSContext::vec finishers;
1289
1290 dout(10) << "eval " << mask << " " << *in << dendl;
1291
1292 // choose loner?
1293 if (in->is_auth() && in->is_head()) {
1294 client_t orig_loner = in->get_loner();
1295 if (in->choose_ideal_loner()) {
1296 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1297 need_issue = true;
1298 mask = -1;
1299 } else if (in->get_wanted_loner() != in->get_loner()) {
1300 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1301 mask = -1;
1302 }
1303 }
1304
1305 retry:
1306 if (mask & CEPH_LOCK_IFILE)
1307 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1308 if (mask & CEPH_LOCK_IAUTH)
1309 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1310 if (mask & CEPH_LOCK_ILINK)
1311 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1312 if (mask & CEPH_LOCK_IXATTR)
1313 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1314 if (mask & CEPH_LOCK_INEST)
1315 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1316 if (mask & CEPH_LOCK_IFLOCK)
1317 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1318 if (mask & CEPH_LOCK_IPOLICY)
1319 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1320
1321 // drop loner?
1322 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1323 if (in->try_drop_loner()) {
1324 need_issue = true;
1325 if (in->get_wanted_loner() >= 0) {
1326 dout(10) << "eval end set loner to client." << in->get_wanted_loner() << dendl;
1327 bool ok = in->try_set_loner();
1328 ceph_assert(ok);
1329 mask = -1;
1330 goto retry;
1331 }
1332 }
1333 }
1334
1335 finish_contexts(g_ceph_context, finishers);
1336
1337 if (need_issue && in->is_head())
1338 issue_caps(in);
1339
1340 dout(10) << "eval done" << dendl;
1341 return need_issue;
1342 }
1343
1344 class C_Locker_Eval : public LockerContext {
1345 MDSCacheObject *p;
1346 int mask;
1347 public:
1348 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1349 // We are used as an MDSCacheObject waiter, so should
1350 // only be invoked by someone already holding the big lock.
1351 ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock));
1352 p->get(MDSCacheObject::PIN_PTRWAITER);
1353 }
1354 void finish(int r) override {
1355 locker->try_eval(p, mask);
1356 p->put(MDSCacheObject::PIN_PTRWAITER);
1357 }
1358 };
1359
1360 void Locker::try_eval(MDSCacheObject *p, int mask)
1361 {
1362 // unstable and ambiguous auth?
1363 if (p->is_ambiguous_auth()) {
1364 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1365 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1366 return;
1367 }
1368
1369 if (p->is_auth() && p->is_frozen()) {
1370 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1371 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1372 return;
1373 }
1374
1375 if (mask & CEPH_LOCK_DN) {
1376 ceph_assert(mask == CEPH_LOCK_DN);
1377 bool need_issue = false; // ignore this, no caps on dentries
1378 CDentry *dn = static_cast<CDentry *>(p);
1379 eval_any(&dn->lock, &need_issue);
1380 } else {
1381 CInode *in = static_cast<CInode *>(p);
1382 eval(in, mask);
1383 }
1384 }
1385
1386 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1387 {
1388 MDSCacheObject *p = lock->get_parent();
1389
1390 // unstable and ambiguous auth?
1391 if (p->is_ambiguous_auth()) {
1392 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1393 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1394 return;
1395 }
1396
1397 if (!p->is_auth()) {
1398 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1399 return;
1400 }
1401
1402 if (p->is_frozen()) {
1403 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1404 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1405 return;
1406 }
1407
1408 /*
1409 * We could have a situation like:
1410 *
1411 * - mds A authpins item on mds B
1412 * - mds B starts to freeze tree containing item
1413 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1414 * - mds B lock is unstable, sets scatter_wanted
1415 * - mds B lock stabilizes, calls try_eval.
1416 *
1417 * We can defer while freezing without causing a deadlock. Honor
1418 * scatter_wanted flag here. This will never get deferred by the
1419 * checks above due to the auth_pin held by the leader.
1420 */
1421 if (lock->is_scatterlock()) {
1422 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1423 if (slock->get_scatter_wanted() &&
1424 slock->get_state() != LOCK_MIX) {
1425 scatter_mix(slock, pneed_issue);
1426 if (!lock->is_stable())
1427 return;
1428 } else if (slock->get_unscatter_wanted() &&
1429 slock->get_state() != LOCK_LOCK) {
1430 simple_lock(slock, pneed_issue);
1431 if (!lock->is_stable()) {
1432 return;
1433 }
1434 }
1435 }
1436
1437 if (lock->get_type() != CEPH_LOCK_DN &&
1438 lock->get_type() != CEPH_LOCK_ISNAP &&
1439 lock->get_type() != CEPH_LOCK_IPOLICY &&
1440 p->is_freezing()) {
1441 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1442 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1443 return;
1444 }
1445
1446 eval(lock, pneed_issue);
1447 }
1448
1449 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1450 {
1451 bool need_issue = false;
1452 MDSContext::vec finishers;
1453
1454 // kick locks now
1455 if (!in->filelock.is_stable())
1456 eval_gather(&in->filelock, false, &need_issue, &finishers);
1457 if (!in->authlock.is_stable())
1458 eval_gather(&in->authlock, false, &need_issue, &finishers);
1459 if (!in->linklock.is_stable())
1460 eval_gather(&in->linklock, false, &need_issue, &finishers);
1461 if (!in->xattrlock.is_stable())
1462 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1463
1464 if (need_issue && in->is_head()) {
1465 if (issue_set)
1466 issue_set->insert(in);
1467 else
1468 issue_caps(in);
1469 }
1470
1471 finish_contexts(g_ceph_context, finishers);
1472 }
1473
1474 void Locker::eval_scatter_gathers(CInode *in)
1475 {
1476 bool need_issue = false;
1477 MDSContext::vec finishers;
1478
1479 dout(10) << "eval_scatter_gathers " << *in << dendl;
1480
1481 // kick locks now
1482 if (!in->filelock.is_stable())
1483 eval_gather(&in->filelock, false, &need_issue, &finishers);
1484 if (!in->nestlock.is_stable())
1485 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1486 if (!in->dirfragtreelock.is_stable())
1487 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1488
1489 if (need_issue && in->is_head())
1490 issue_caps(in);
1491
1492 finish_contexts(g_ceph_context, finishers);
1493 }
1494
1495 void Locker::eval(SimpleLock *lock, bool *need_issue)
1496 {
1497 switch (lock->get_type()) {
1498 case CEPH_LOCK_IFILE:
1499 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1500 case CEPH_LOCK_IDFT:
1501 case CEPH_LOCK_INEST:
1502 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1503 default:
1504 return simple_eval(lock, need_issue);
1505 }
1506 }
1507
1508
1509 // ------------------
1510 // rdlock
1511
1512 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1513 {
1514 // kick the lock
1515 if (lock->is_stable()) {
1516 if (lock->get_parent()->is_auth()) {
1517 if (lock->get_sm() == &sm_scatterlock) {
1518 // not until tempsync is fully implemented
1519 //if (lock->get_parent()->is_replicated())
1520 //scatter_tempsync((ScatterLock*)lock);
1521 //else
1522 simple_sync(lock);
1523 } else if (lock->get_sm() == &sm_filelock) {
1524 CInode *in = static_cast<CInode*>(lock->get_parent());
1525 if (lock->get_state() == LOCK_EXCL &&
1526 in->get_target_loner() >= 0 &&
1527 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1528 file_xsyn(lock);
1529 else
1530 simple_sync(lock);
1531 } else
1532 simple_sync(lock);
1533 return true;
1534 } else {
1535 // request rdlock state change from auth
1536 mds_rank_t auth = lock->get_parent()->authority().first;
1537 if (!mds->is_cluster_degraded() ||
1538 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1539 dout(10) << "requesting rdlock from auth on "
1540 << *lock << " on " << *lock->get_parent() << dendl;
1541 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1542 }
1543 return false;
1544 }
1545 }
1546 if (lock->get_type() == CEPH_LOCK_IFILE) {
1547 CInode *in = static_cast<CInode *>(lock->get_parent());
1548 if (in->state_test(CInode::STATE_RECOVERING)) {
1549 mds->mdcache->recovery_queue.prioritize(in);
1550 }
1551 }
1552
1553 return false;
1554 }
1555
1556 bool Locker::rdlock_try(SimpleLock *lock, client_t client)
1557 {
1558 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1559
1560 // can read? grab ref.
1561 if (lock->can_rdlock(client))
1562 return true;
1563
1564 _rdlock_kick(lock, false);
1565
1566 if (lock->can_rdlock(client))
1567 return true;
1568
1569 return false;
1570 }
1571
1572 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1573 {
1574 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1575
1576 // client may be allowed to rdlock the same item it has xlocked.
1577 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1578 if (mut->snapid != CEPH_NOSNAP)
1579 as_anon = true;
1580 client_t client = as_anon ? -1 : mut->get_client();
1581
1582 CInode *in = 0;
1583 if (lock->get_type() != CEPH_LOCK_DN)
1584 in = static_cast<CInode *>(lock->get_parent());
1585
1586 /*
1587 if (!lock->get_parent()->is_auth() &&
1588 lock->fw_rdlock_to_auth()) {
1589 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1590 return false;
1591 }
1592 */
1593
1594 while (1) {
1595 // can read? grab ref.
1596 if (lock->can_rdlock(client)) {
1597 lock->get_rdlock();
1598 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1599 return true;
1600 }
1601
1602 // hmm, wait a second.
1603 if (in && !in->is_head() && in->is_auth() &&
1604 lock->get_state() == LOCK_SNAP_SYNC) {
1605 // okay, we actually need to kick the head's lock to get ourselves synced up.
1606 CInode *head = mdcache->get_inode(in->ino());
1607 ceph_assert(head);
1608 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1609 if (hlock->get_state() == LOCK_SYNC)
1610 hlock = head->get_lock(lock->get_type());
1611
1612 if (hlock->get_state() != LOCK_SYNC) {
1613 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1614 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1615 return false;
1616 // oh, check our lock again then
1617 }
1618 }
1619
1620 if (!_rdlock_kick(lock, as_anon))
1621 break;
1622 }
1623
1624 // wait!
1625 int wait_on;
1626 if (lock->get_parent()->is_auth() && lock->is_stable())
1627 wait_on = SimpleLock::WAIT_RD;
1628 else
1629 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1630 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1631 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1632 nudge_log(lock);
1633 return false;
1634 }
1635
1636 void Locker::nudge_log(SimpleLock *lock)
1637 {
1638 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1639 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1640 mds->mdlog->flush();
1641 }
1642
1643 void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1644 {
1645 ceph_assert(it->is_rdlock());
1646 SimpleLock *lock = it->lock;
1647 // drop ref
1648 lock->put_rdlock();
1649 if (mut)
1650 mut->locks.erase(it);
1651
1652 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1653
1654 // last one?
1655 if (!lock->is_rdlocked()) {
1656 if (!lock->is_stable())
1657 eval_gather(lock, false, pneed_issue);
1658 else if (lock->get_parent()->is_auth())
1659 try_eval(lock, pneed_issue);
1660 }
1661 }
1662
1663 bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MDRequestRef& mdr)
1664 {
1665 dout(10) << __func__ << dendl;
1666 for (const auto& p : lov) {
1667 auto lock = p.lock;
1668 ceph_assert(p.is_rdlock());
1669 if (!mdr->is_rdlocked(lock) && !rdlock_try(lock, mdr->get_client())) {
1670 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD,
1671 new C_MDS_RetryRequest(mdcache, mdr));
1672 goto failed;
1673 }
1674 lock->get_rdlock();
1675 mdr->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1676 dout(20) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
1677 }
1678
1679 return true;
1680 failed:
1681 dout(10) << __func__ << " failed" << dendl;
1682 drop_locks(mdr.get(), nullptr);
1683 mdr->drop_local_auth_pins();
1684 return false;
1685 }
1686
1687 bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
1688 {
1689 dout(10) << __func__ << dendl;
1690 for (const auto& p : lov) {
1691 auto lock = p.lock;
1692 ceph_assert(p.is_rdlock());
1693 if (!lock->can_rdlock(mut->get_client()))
1694 return false;
1695 p.lock->get_rdlock();
1696 mut->emplace_lock(p.lock, MutationImpl::LockOp::RDLOCK);
1697 }
1698 return true;
1699 }
1700
1701 // ------------------
1702 // wrlock
1703
1704 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1705 {
1706 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1707 lock->get_type() == CEPH_LOCK_DVERSION)
1708 return local_wrlock_grab(static_cast<LocalLockC*>(lock), mut);
1709
1710 dout(7) << "wrlock_force on " << *lock
1711 << " on " << *lock->get_parent() << dendl;
1712 lock->get_wrlock(true);
1713 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1714 }
1715
1716 bool Locker::wrlock_try(SimpleLock *lock, const MutationRef& mut, client_t client)
1717 {
1718 dout(10) << "wrlock_try " << *lock << " on " << *lock->get_parent() << dendl;
1719 if (client == -1)
1720 client = mut->get_client();
1721
1722 while (1) {
1723 if (lock->can_wrlock(client)) {
1724 lock->get_wrlock();
1725 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1726 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1727 return true;
1728 }
1729 if (!lock->is_stable())
1730 break;
1731 CInode *in = static_cast<CInode *>(lock->get_parent());
1732 if (!in->is_auth())
1733 break;
1734 // caller may already has a log entry open. To avoid calling
1735 // scatter_writebehind or start_scatter. don't change nest lock
1736 // state if it has dirty scatterdata.
1737 if (lock->is_dirty())
1738 break;
1739 // To avoid calling scatter_writebehind or start_scatter. don't
1740 // change nest lock state to MIX.
1741 ScatterLock *slock = static_cast<ScatterLock*>(lock);
1742 if (slock->get_scatter_wanted() || in->has_subtree_or_exporting_dirfrag())
1743 break;
1744
1745 simple_lock(lock);
1746 }
1747 return false;
1748 }
1749
1750 bool Locker::wrlock_start(const MutationImpl::LockOp &op, MDRequestRef& mut)
1751 {
1752 SimpleLock *lock = op.lock;
1753 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1754 lock->get_type() == CEPH_LOCK_DVERSION)
1755 return local_wrlock_start(static_cast<LocalLockC*>(lock), mut);
1756
1757 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1758
1759 CInode *in = static_cast<CInode *>(lock->get_parent());
1760 client_t client = op.is_state_pin() ? lock->get_excl_client() : mut->get_client();
1761 bool want_scatter = lock->get_parent()->is_auth() &&
1762 (in->has_subtree_or_exporting_dirfrag() ||
1763 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1764
1765 while (1) {
1766 // wrlock?
1767 if (lock->can_wrlock(client) &&
1768 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1769 lock->get_wrlock();
1770 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1771 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1772 return true;
1773 }
1774
1775 if (lock->get_type() == CEPH_LOCK_IFILE &&
1776 in->state_test(CInode::STATE_RECOVERING)) {
1777 mds->mdcache->recovery_queue.prioritize(in);
1778 }
1779
1780 if (!lock->is_stable())
1781 break;
1782
1783 if (in->is_auth()) {
1784 if (want_scatter)
1785 scatter_mix(static_cast<ScatterLock*>(lock));
1786 else
1787 simple_lock(lock);
1788 } else {
1789 // replica.
1790 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1791 mds_rank_t auth = lock->get_parent()->authority().first;
1792 if (!mds->is_cluster_degraded() ||
1793 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1794 dout(10) << "requesting scatter from auth on "
1795 << *lock << " on " << *lock->get_parent() << dendl;
1796 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1797 }
1798 break;
1799 }
1800 }
1801
1802 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1803 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1804 nudge_log(lock);
1805
1806 return false;
1807 }
1808
1809 void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1810 {
1811 ceph_assert(it->is_wrlock());
1812 SimpleLock* lock = it->lock;
1813
1814 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1815 lock->get_type() == CEPH_LOCK_DVERSION)
1816 return local_wrlock_finish(it, mut);
1817
1818 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1819 lock->put_wrlock();
1820
1821 if (it->is_remote_wrlock())
1822 it->clear_wrlock();
1823 else
1824 mut->locks.erase(it);
1825
1826 if (lock->is_wrlocked()) {
1827 // Evaluate unstable lock after scatter_writebehind_finish(). Because
1828 // eval_gather() does not change lock's state when lock is flushing.
1829 if (!lock->is_stable() && lock->is_flushed() &&
1830 lock->get_parent()->is_auth())
1831 eval_gather(lock, false, pneed_issue);
1832 } else {
1833 if (!lock->is_stable())
1834 eval_gather(lock, false, pneed_issue);
1835 else if (lock->get_parent()->is_auth())
1836 try_eval(lock, pneed_issue);
1837 }
1838 }
1839
1840
1841 // remote wrlock
1842
1843 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1844 {
1845 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1846
1847 // wait for active target
1848 if (mds->is_cluster_degraded() &&
1849 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1850 dout(7) << " mds." << target << " is not active" << dendl;
1851 if (mut->more()->waiting_on_peer.empty())
1852 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1853 return;
1854 }
1855
1856 // send lock request
1857 mut->start_locking(lock, target);
1858 mut->more()->peers.insert(target);
1859 auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_WRLOCK);
1860 r->set_lock_type(lock->get_type());
1861 lock->get_parent()->set_object_info(r->get_object_info());
1862 mds->send_message_mds(r, target);
1863
1864 ceph_assert(mut->more()->waiting_on_peer.count(target) == 0);
1865 mut->more()->waiting_on_peer.insert(target);
1866 }
1867
1868 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1869 {
1870 ceph_assert(it->is_remote_wrlock());
1871 SimpleLock *lock = it->lock;
1872 mds_rank_t target = it->wrlock_target;
1873
1874 if (it->is_wrlock())
1875 it->clear_remote_wrlock();
1876 else
1877 mut->locks.erase(it);
1878
1879 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1880 << " " << *lock->get_parent() << dendl;
1881 if (!mds->is_cluster_degraded() ||
1882 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1883 auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNWRLOCK);
1884 peerreq->set_lock_type(lock->get_type());
1885 lock->get_parent()->set_object_info(peerreq->get_object_info());
1886 mds->send_message_mds(peerreq, target);
1887 }
1888 }
1889
1890
1891 // ------------------
1892 // xlock
1893
1894 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1895 {
1896 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1897 lock->get_type() == CEPH_LOCK_DVERSION)
1898 return local_xlock_start(static_cast<LocalLockC*>(lock), mut);
1899
1900 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1901 client_t client = mut->get_client();
1902
1903 CInode *in = nullptr;
1904 if (lock->get_cap_shift())
1905 in = static_cast<CInode *>(lock->get_parent());
1906
1907 // auth?
1908 if (lock->get_parent()->is_auth()) {
1909 // auth
1910 while (1) {
1911 if (mut->locking && // started xlock (not preempt other request)
1912 lock->can_xlock(client) &&
1913 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1914 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
1915 lock->set_state(LOCK_XLOCK);
1916 lock->get_xlock(mut, client);
1917 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
1918 mut->finish_locking(lock);
1919 return true;
1920 }
1921
1922 if (lock->get_type() == CEPH_LOCK_IFILE &&
1923 in->state_test(CInode::STATE_RECOVERING)) {
1924 mds->mdcache->recovery_queue.prioritize(in);
1925 }
1926
1927 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1928 lock->get_xlock_by_client() != client ||
1929 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1930 break;
1931
1932 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1933 mut->start_locking(lock);
1934 simple_xlock(lock);
1935 } else {
1936 simple_lock(lock);
1937 }
1938 }
1939
1940 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1941 nudge_log(lock);
1942 return false;
1943 } else {
1944 // replica
1945 ceph_assert(lock->get_sm()->can_remote_xlock);
1946 ceph_assert(!mut->peer_request);
1947
1948 // wait for single auth
1949 if (lock->get_parent()->is_ambiguous_auth()) {
1950 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1951 new C_MDS_RetryRequest(mdcache, mut));
1952 return false;
1953 }
1954
1955 // wait for active auth
1956 mds_rank_t auth = lock->get_parent()->authority().first;
1957 if (mds->is_cluster_degraded() &&
1958 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1959 dout(7) << " mds." << auth << " is not active" << dendl;
1960 if (mut->more()->waiting_on_peer.empty())
1961 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1962 return false;
1963 }
1964
1965 // send lock request
1966 mut->more()->peers.insert(auth);
1967 mut->start_locking(lock, auth);
1968 auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_XLOCK);
1969 r->set_lock_type(lock->get_type());
1970 lock->get_parent()->set_object_info(r->get_object_info());
1971 mds->send_message_mds(r, auth);
1972
1973 ceph_assert(mut->more()->waiting_on_peer.count(auth) == 0);
1974 mut->more()->waiting_on_peer.insert(auth);
1975
1976 return false;
1977 }
1978 }
1979
1980 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1981 {
1982 ceph_assert(!lock->is_stable());
1983 if (lock->get_type() != CEPH_LOCK_DN &&
1984 lock->get_type() != CEPH_LOCK_ISNAP &&
1985 lock->get_type() != CEPH_LOCK_IPOLICY &&
1986 lock->get_num_rdlocks() == 0 &&
1987 lock->get_num_wrlocks() == 0 &&
1988 !lock->is_leased() &&
1989 lock->get_state() != LOCK_XLOCKSNAP) {
1990 CInode *in = static_cast<CInode*>(lock->get_parent());
1991 client_t loner = in->get_target_loner();
1992 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1993 lock->set_state(LOCK_EXCL);
1994 lock->get_parent()->auth_unpin(lock);
1995 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1996 if (lock->get_cap_shift())
1997 *pneed_issue = true;
1998 if (lock->get_parent()->is_auth() &&
1999 lock->is_stable())
2000 try_eval(lock, pneed_issue);
2001 return;
2002 }
2003 }
2004 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
2005 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
2006 }
2007
2008 void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
2009 {
2010 ceph_assert(it->is_xlock());
2011 SimpleLock *lock = it->lock;
2012
2013 if (lock->get_type() == CEPH_LOCK_IVERSION ||
2014 lock->get_type() == CEPH_LOCK_DVERSION)
2015 return local_xlock_finish(it, mut);
2016
2017 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
2018
2019 client_t xlocker = lock->get_xlock_by_client();
2020
2021 // drop ref
2022 lock->put_xlock();
2023 ceph_assert(mut);
2024 mut->locks.erase(it);
2025
2026 bool do_issue = false;
2027
2028 // remote xlock?
2029 if (!lock->get_parent()->is_auth()) {
2030 ceph_assert(lock->get_sm()->can_remote_xlock);
2031
2032 // tell auth
2033 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
2034 mds_rank_t auth = lock->get_parent()->authority().first;
2035 if (!mds->is_cluster_degraded() ||
2036 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
2037 auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNXLOCK);
2038 peerreq->set_lock_type(lock->get_type());
2039 lock->get_parent()->set_object_info(peerreq->get_object_info());
2040 mds->send_message_mds(peerreq, auth);
2041 }
2042 // others waiting?
2043 lock->finish_waiters(SimpleLock::WAIT_STABLE |
2044 SimpleLock::WAIT_WR |
2045 SimpleLock::WAIT_RD, 0);
2046 } else {
2047 if (lock->get_num_xlocks() == 0 &&
2048 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
2049 _finish_xlock(lock, xlocker, &do_issue);
2050 }
2051 }
2052
2053 if (do_issue) {
2054 CInode *in = static_cast<CInode*>(lock->get_parent());
2055 if (in->is_head()) {
2056 if (pneed_issue)
2057 *pneed_issue = true;
2058 else
2059 issue_caps(in);
2060 }
2061 }
2062 }
2063
2064 void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
2065 {
2066 ceph_assert(it->is_xlock());
2067 SimpleLock *lock = it->lock;
2068 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
2069
2070 lock->put_xlock();
2071 mut->locks.erase(it);
2072
2073 MDSCacheObject *p = lock->get_parent();
2074 ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
2075
2076 if (!lock->is_stable())
2077 lock->get_parent()->auth_unpin(lock);
2078
2079 lock->set_state(LOCK_LOCK);
2080 }
2081
2082 void Locker::xlock_import(SimpleLock *lock)
2083 {
2084 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
2085 lock->get_parent()->auth_pin(lock);
2086 }
2087
2088 void Locker::xlock_downgrade(SimpleLock *lock, MutationImpl *mut)
2089 {
2090 dout(10) << "xlock_downgrade on " << *lock << " " << *lock->get_parent() << dendl;
2091 auto it = mut->locks.find(lock);
2092 if (it->is_rdlock())
2093 return; // already downgraded
2094
2095 ceph_assert(lock->get_parent()->is_auth());
2096 ceph_assert(it != mut->locks.end());
2097 ceph_assert(it->is_xlock());
2098
2099 lock->set_xlock_done();
2100 lock->get_rdlock();
2101 xlock_finish(it, mut, nullptr);
2102 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
2103 }
2104
2105
2106 // file i/o -----------------------------------------
2107
2108 version_t Locker::issue_file_data_version(CInode *in)
2109 {
2110 dout(7) << "issue_file_data_version on " << *in << dendl;
2111 return in->get_inode()->file_data_version;
2112 }
2113
2114 class C_Locker_FileUpdate_finish : public LockerLogContext {
2115 CInode *in;
2116 MutationRef mut;
2117 unsigned flags;
2118 client_t client;
2119 ref_t<MClientCaps> ack;
2120 public:
2121 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
2122 const ref_t<MClientCaps> &ack, client_t c=-1)
2123 : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
2124 in->get(CInode::PIN_PTRWAITER);
2125 }
2126 void finish(int r) override {
2127 locker->file_update_finish(in, mut, flags, client, ack);
2128 in->put(CInode::PIN_PTRWAITER);
2129 }
2130 };
2131
2132 enum {
2133 UPDATE_SHAREMAX = 1,
2134 UPDATE_NEEDSISSUE = 2,
2135 UPDATE_SNAPFLUSH = 4,
2136 };
2137
2138 void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
2139 client_t client, const ref_t<MClientCaps> &ack)
2140 {
2141 dout(10) << "file_update_finish on " << *in << dendl;
2142
2143 mut->apply();
2144
2145 if (ack) {
2146 Session *session = mds->get_session(client);
2147 if (session && !session->is_closed()) {
2148 // "oldest flush tid" > 0 means client uses unique TID for each flush
2149 if (ack->get_oldest_flush_tid() > 0)
2150 session->add_completed_flush(ack->get_client_tid());
2151 mds->send_message_client_counted(ack, session);
2152 } else {
2153 dout(10) << " no session for client." << client << " " << *ack << dendl;
2154 }
2155 }
2156
2157 set<CInode*> need_issue;
2158 drop_locks(mut.get(), &need_issue);
2159
2160 if (in->is_head()) {
2161 if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
2162 Capability *cap = in->get_client_cap(client);
2163 if (cap && (cap->wanted() & ~cap->pending()))
2164 issue_caps(in, cap);
2165 }
2166
2167 if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
2168 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
2169 share_inode_max_size(in);
2170
2171 } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
2172 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
2173 // check for snap writeback completion
2174 in->client_snap_caps.erase(client);
2175 if (in->client_snap_caps.empty()) {
2176 for (int i = 0; i < num_cinode_locks; i++) {
2177 SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock);
2178 ceph_assert(lock);
2179 lock->put_wrlock();
2180 }
2181 in->item_open_file.remove_myself();
2182 in->item_caps.remove_myself();
2183 eval_cap_gather(in, &need_issue);
2184 }
2185 }
2186 issue_caps_set(need_issue);
2187
2188 mds->balancer->hit_inode(in, META_POP_IWR);
2189
2190 // auth unpin after issuing caps
2191 mut->cleanup();
2192 }
2193
2194 Capability* Locker::issue_new_caps(CInode *in,
2195 int mode,
2196 MDRequestRef& mdr,
2197 SnapRealm *realm)
2198 {
2199 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
2200 Session *session = mdr->session;
2201 bool new_inode = (mdr->alloc_ino || mdr->used_prealloc_ino);
2202
2203 // if replay or async, try to reconnect cap, and otherwise do nothing.
2204 if (new_inode && mdr->client_request->is_queued_for_replay())
2205 return mds->mdcache->try_reconnect_cap(in, session);
2206
2207 // my needs
2208 ceph_assert(session->info.inst.name.is_client());
2209 client_t my_client = session->get_client();
2210 int my_want = ceph_caps_for_mode(mode);
2211
2212 // register a capability
2213 Capability *cap = in->get_client_cap(my_client);
2214 if (!cap) {
2215 // new cap
2216 cap = in->add_client_cap(my_client, session, realm, new_inode);
2217 cap->set_wanted(my_want);
2218 cap->mark_new();
2219 } else {
2220 // make sure it wants sufficient caps
2221 if (my_want & ~cap->wanted()) {
2222 // augment wanted caps for this client
2223 cap->set_wanted(cap->wanted() | my_want);
2224 }
2225 }
2226 cap->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply)
2227
2228 if (in->is_auth()) {
2229 // [auth] twiddle mode?
2230 eval(in, CEPH_CAP_LOCKS);
2231
2232 int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
2233 int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed,
2234 xlocker_allowed);
2235
2236 if (_need_flush_mdlog(in, my_want & ~allowed, true))
2237 mds->mdlog->flush();
2238
2239 } else {
2240 // [replica] tell auth about any new caps wanted
2241 request_inode_file_caps(in);
2242 }
2243
2244 // issue caps (pot. incl new one)
2245 //issue_caps(in); // note: _eval above may have done this already...
2246
2247 // re-issue whatever we can
2248 //cap->issue(cap->pending());
2249
2250 cap->dec_suppress();
2251
2252 return cap;
2253 }
2254
2255 void Locker::issue_caps_set(set<CInode*>& inset)
2256 {
2257 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
2258 issue_caps(*p);
2259 }
2260
2261 class C_Locker_RevokeStaleCap : public LockerContext {
2262 CInode *in;
2263 client_t client;
2264 public:
2265 C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
2266 LockerContext(l), in(i), client(c) {
2267 in->get(CInode::PIN_PTRWAITER);
2268 }
2269 void finish(int r) override {
2270 locker->revoke_stale_cap(in, client);
2271 in->put(CInode::PIN_PTRWAITER);
2272 }
2273 };
2274
2275 int Locker::get_allowed_caps(CInode *in, Capability *cap,
2276 int &all_allowed, int &loner_allowed,
2277 int &xlocker_allowed)
2278 {
2279 client_t client = cap->get_client();
2280
2281 // allowed caps are determined by the lock mode.
2282 if (all_allowed == -1)
2283 all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
2284 if (loner_allowed == -1)
2285 loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
2286 if (xlocker_allowed == -1)
2287 xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
2288
2289 client_t loner = in->get_loner();
2290 if (loner >= 0) {
2291 dout(7) << "get_allowed_caps loner client." << loner
2292 << " allowed=" << ccap_string(loner_allowed)
2293 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2294 << ", others allowed=" << ccap_string(all_allowed)
2295 << " on " << *in << dendl;
2296 } else {
2297 dout(7) << "get_allowed_caps allowed=" << ccap_string(all_allowed)
2298 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2299 << " on " << *in << dendl;
2300 }
2301
2302 // do not issue _new_ bits when size|mtime is projected
2303 int allowed;
2304 if (loner == client)
2305 allowed = loner_allowed;
2306 else
2307 allowed = all_allowed;
2308
2309 // add in any xlocker-only caps (for locks this client is the xlocker for)
2310 allowed |= xlocker_allowed & in->get_xlocker_mask(client);
2311 if (in->is_dir()) {
2312 allowed &= ~CEPH_CAP_ANY_DIR_OPS;
2313 if (allowed & CEPH_CAP_FILE_EXCL)
2314 allowed |= cap->get_lock_cache_allowed();
2315 }
2316
2317 if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
2318 cap->is_noinline()) ||
2319 (!in->get_inode()->layout.pool_ns.empty() &&
2320 cap->is_nopoolns()))
2321 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2322
2323 return allowed;
2324 }
2325
2326 int Locker::issue_caps(CInode *in, Capability *only_cap)
2327 {
2328 // count conflicts with
2329 int nissued = 0;
2330 int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
2331
2332 ceph_assert(in->is_head());
2333
2334 // client caps
2335 map<client_t, Capability>::iterator it;
2336 if (only_cap)
2337 it = in->client_caps.find(only_cap->get_client());
2338 else
2339 it = in->client_caps.begin();
2340 for (; it != in->client_caps.end(); ++it) {
2341 Capability *cap = &it->second;
2342 int allowed = get_allowed_caps(in, cap, all_allowed, loner_allowed,
2343 xlocker_allowed);
2344 int pending = cap->pending();
2345 int wanted = cap->wanted();
2346
2347 dout(20) << " client." << it->first
2348 << " pending " << ccap_string(pending)
2349 << " allowed " << ccap_string(allowed)
2350 << " wanted " << ccap_string(wanted)
2351 << dendl;
2352
2353 if (!(pending & ~allowed)) {
2354 // skip if suppress or new, and not revocation
2355 if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
2356 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
2357 continue;
2358 }
2359 } else {
2360 ceph_assert(!cap->is_new());
2361 if (cap->is_stale()) {
2362 dout(20) << " revoke stale cap from client." << it->first << dendl;
2363 ceph_assert(!cap->is_valid());
2364 cap->issue(allowed & pending, false);
2365 mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
2366 continue;
2367 }
2368
2369 if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) {
2370 // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
2371 // mds needs to re-issue caps, then do revocation.
2372 long seq = cap->issue(pending, true);
2373
2374 dout(7) << " sending MClientCaps to client." << it->first
2375 << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
2376
2377 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
2378
2379 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, in->ino(),
2380 in->find_snaprealm()->inode->ino(),
2381 cap->get_cap_id(), cap->get_last_seq(),
2382 pending, wanted, 0, cap->get_mseq(),
2383 mds->get_osd_epoch_barrier());
2384 in->encode_cap_message(m, cap);
2385
2386 mds->send_message_client_counted(m, cap->get_session());
2387 }
2388 }
2389
2390 // notify clients about deleted inode, to make sure they release caps ASAP.
2391 if (in->get_inode()->nlink == 0)
2392 wanted |= CEPH_CAP_LINK_SHARED;
2393
2394 // are there caps that the client _wants_ and can have, but aren't pending?
2395 // or do we need to revoke?
2396 if ((pending & ~allowed) || // need to revoke ~allowed caps.
2397 ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2398 !cap->is_valid()) { // after stale->resume circle
2399 // issue
2400 nissued++;
2401
2402 // include caps that clients generally like, while we're at it.
2403 int likes = in->get_caps_liked();
2404 int before = pending;
2405 long seq;
2406 if (pending & ~allowed)
2407 seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
2408 else
2409 seq = cap->issue((wanted|likes) & allowed, true);
2410 int after = cap->pending();
2411
2412 dout(7) << " sending MClientCaps to client." << it->first
2413 << " seq " << seq << " new pending " << ccap_string(after)
2414 << " was " << ccap_string(before) << dendl;
2415
2416 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2417 if (op == CEPH_CAP_OP_REVOKE) {
2418 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_revoke);
2419 revoking_caps.push_back(&cap->item_revoking_caps);
2420 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2421 cap->set_last_revoke_stamp(ceph_clock_now());
2422 cap->reset_num_revoke_warnings();
2423 } else {
2424 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
2425 }
2426
2427 auto m = make_message<MClientCaps>(op, in->ino(),
2428 in->find_snaprealm()->inode->ino(),
2429 cap->get_cap_id(), cap->get_last_seq(),
2430 after, wanted, 0, cap->get_mseq(),
2431 mds->get_osd_epoch_barrier());
2432 in->encode_cap_message(m, cap);
2433
2434 mds->send_message_client_counted(m, cap->get_session());
2435 }
2436
2437 if (only_cap)
2438 break;
2439 }
2440
2441 return nissued;
2442 }
2443
2444 void Locker::issue_truncate(CInode *in)
2445 {
2446 dout(7) << "issue_truncate on " << *in << dendl;
2447
2448 for (auto &p : in->client_caps) {
2449 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_trunc);
2450 Capability *cap = &p.second;
2451 auto m = make_message<MClientCaps>(CEPH_CAP_OP_TRUNC,
2452 in->ino(),
2453 in->find_snaprealm()->inode->ino(),
2454 cap->get_cap_id(), cap->get_last_seq(),
2455 cap->pending(), cap->wanted(), 0,
2456 cap->get_mseq(),
2457 mds->get_osd_epoch_barrier());
2458 in->encode_cap_message(m, cap);
2459 mds->send_message_client_counted(m, p.first);
2460 }
2461
2462 // should we increase max_size?
2463 if (in->is_auth() && in->is_file())
2464 check_inode_max_size(in);
2465 }
2466
2467
2468 void Locker::revoke_stale_cap(CInode *in, client_t client)
2469 {
2470 dout(7) << __func__ << " client." << client << " on " << *in << dendl;
2471 Capability *cap = in->get_client_cap(client);
2472 if (!cap)
2473 return;
2474
2475 if (cap->revoking() & CEPH_CAP_ANY_WR) {
2476 CachedStackStringStream css;
2477 mds->evict_client(client.v, false, g_conf()->mds_session_blocklist_on_timeout, *css, nullptr);
2478 return;
2479 }
2480
2481 cap->revoke();
2482
2483 if (in->is_auth() && in->get_inode()->client_ranges.count(cap->get_client()))
2484 in->state_set(CInode::STATE_NEEDSRECOVER);
2485
2486 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2487 return;
2488
2489 if (!in->filelock.is_stable())
2490 eval_gather(&in->filelock);
2491 if (!in->linklock.is_stable())
2492 eval_gather(&in->linklock);
2493 if (!in->authlock.is_stable())
2494 eval_gather(&in->authlock);
2495 if (!in->xattrlock.is_stable())
2496 eval_gather(&in->xattrlock);
2497
2498 if (in->is_auth())
2499 try_eval(in, CEPH_CAP_LOCKS);
2500 else
2501 request_inode_file_caps(in);
2502 }
2503
2504 bool Locker::revoke_stale_caps(Session *session)
2505 {
2506 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2507
2508 // invalidate all caps
2509 session->inc_cap_gen();
2510
2511 bool ret = true;
2512 std::vector<CInode*> to_eval;
2513
2514 for (auto p = session->caps.begin(); !p.end(); ) {
2515 Capability *cap = *p;
2516 ++p;
2517 if (!cap->is_notable()) {
2518 // the rest ones are not being revoked and don't have writeable range
2519 // and don't want exclusive caps or want file read/write. They don't
2520 // need recover, they don't affect eval_gather()/try_eval()
2521 break;
2522 }
2523
2524 int revoking = cap->revoking();
2525 if (!revoking)
2526 continue;
2527
2528 if (revoking & CEPH_CAP_ANY_WR) {
2529 ret = false;
2530 break;
2531 }
2532
2533 int issued = cap->issued();
2534 CInode *in = cap->get_inode();
2535 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2536 int revoked = cap->revoke();
2537 if (revoked & CEPH_CAP_ANY_DIR_OPS)
2538 eval_lock_caches(cap);
2539
2540 if (in->is_auth() &&
2541 in->get_inode()->client_ranges.count(cap->get_client()))
2542 in->state_set(CInode::STATE_NEEDSRECOVER);
2543
2544 // eval lock/inode may finish contexts, which may modify other cap's position
2545 // in the session->caps.
2546 to_eval.push_back(in);
2547 }
2548
2549 for (auto in : to_eval) {
2550 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2551 continue;
2552
2553 if (!in->filelock.is_stable())
2554 eval_gather(&in->filelock);
2555 if (!in->linklock.is_stable())
2556 eval_gather(&in->linklock);
2557 if (!in->authlock.is_stable())
2558 eval_gather(&in->authlock);
2559 if (!in->xattrlock.is_stable())
2560 eval_gather(&in->xattrlock);
2561
2562 if (in->is_auth())
2563 try_eval(in, CEPH_CAP_LOCKS);
2564 else
2565 request_inode_file_caps(in);
2566 }
2567
2568 return ret;
2569 }
2570
2571 void Locker::resume_stale_caps(Session *session)
2572 {
2573 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2574
2575 bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
2576 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
2577 Capability *cap = *p;
2578 ++p;
2579 if (lazy && !cap->is_notable())
2580 break; // see revoke_stale_caps()
2581
2582 CInode *in = cap->get_inode();
2583 ceph_assert(in->is_head());
2584 dout(10) << " clearing stale flag on " << *in << dendl;
2585
2586 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2587 // if export succeeds, the cap will be removed. if export fails,
2588 // we need to re-issue the cap if it's not stale.
2589 in->state_set(CInode::STATE_EVALSTALECAPS);
2590 continue;
2591 }
2592
2593 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2594 issue_caps(in, cap);
2595 }
2596 }
2597
2598 void Locker::remove_stale_leases(Session *session)
2599 {
2600 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2601 xlist<ClientLease*>::iterator p = session->leases.begin();
2602 while (!p.end()) {
2603 ClientLease *l = *p;
2604 ++p;
2605 CDentry *parent = static_cast<CDentry*>(l->parent);
2606 dout(15) << " removing lease on " << *parent << dendl;
2607 parent->remove_client_lease(l, this);
2608 }
2609 }
2610
2611
2612 class C_MDL_RequestInodeFileCaps : public LockerContext {
2613 CInode *in;
2614 public:
2615 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2616 in->get(CInode::PIN_PTRWAITER);
2617 }
2618 void finish(int r) override {
2619 if (!in->is_auth())
2620 locker->request_inode_file_caps(in);
2621 in->put(CInode::PIN_PTRWAITER);
2622 }
2623 };
2624
2625 void Locker::request_inode_file_caps(CInode *in)
2626 {
2627 ceph_assert(!in->is_auth());
2628
2629 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2630 if (wanted != in->replica_caps_wanted) {
2631 // wait for single auth
2632 if (in->is_ambiguous_auth()) {
2633 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2634 new C_MDL_RequestInodeFileCaps(this, in));
2635 return;
2636 }
2637
2638 mds_rank_t auth = in->authority().first;
2639 if (mds->is_cluster_degraded() &&
2640 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2641 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2642 return;
2643 }
2644
2645 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2646 << " was " << ccap_string(in->replica_caps_wanted)
2647 << " on " << *in << " to mds." << auth << dendl;
2648
2649 in->replica_caps_wanted = wanted;
2650
2651 if (!mds->is_cluster_degraded() ||
2652 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2653 mds->send_message_mds(make_message<MInodeFileCaps>(in->ino(), in->replica_caps_wanted), auth);
2654 }
2655 }
2656
2657 void Locker::handle_inode_file_caps(const cref_t<MInodeFileCaps> &m)
2658 {
2659 // nobody should be talking to us during recovery.
2660 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2661 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2662 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2663 return;
2664 }
2665 ceph_abort_msg("got unexpected message during recovery");
2666 }
2667
2668 // ok
2669 CInode *in = mdcache->get_inode(m->get_ino());
2670 mds_rank_t from = mds_rank_t(m->get_source().num());
2671
2672 ceph_assert(in);
2673 ceph_assert(in->is_auth());
2674
2675 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2676
2677 if (mds->logger) mds->logger->inc(l_mdss_handle_inode_file_caps);
2678
2679 in->set_mds_caps_wanted(from, m->get_caps());
2680
2681 try_eval(in, CEPH_CAP_LOCKS);
2682 }
2683
2684
2685 class C_MDL_CheckMaxSize : public LockerContext {
2686 CInode *in;
2687 uint64_t new_max_size;
2688 uint64_t newsize;
2689 utime_t mtime;
2690
2691 public:
2692 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2693 uint64_t _newsize, utime_t _mtime) :
2694 LockerContext(l), in(i),
2695 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2696 {
2697 in->get(CInode::PIN_PTRWAITER);
2698 }
2699 void finish(int r) override {
2700 if (in->is_auth())
2701 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2702 in->put(CInode::PIN_PTRWAITER);
2703 }
2704 };
2705
2706 uint64_t Locker::calc_new_max_size(const CInode::inode_const_ptr &pi, uint64_t size)
2707 {
2708 uint64_t new_max = (size + 1) << 1;
2709 uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
2710 if (max_inc > 0) {
2711 max_inc *= pi->layout.object_size;
2712 new_max = std::min(new_max, size + max_inc);
2713 }
2714 return round_up_to(new_max, pi->get_layout_size_increment());
2715 }
2716
2717 bool Locker::check_client_ranges(CInode *in, uint64_t size)
2718 {
2719 const auto& latest = in->get_projected_inode();
2720 uint64_t ms;
2721 if (latest->has_layout()) {
2722 ms = calc_new_max_size(latest, size);
2723 } else {
2724 // Layout-less directories like ~mds0/, have zero size
2725 ms = 0;
2726 }
2727
2728 auto it = latest->client_ranges.begin();
2729 for (auto &p : in->client_caps) {
2730 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2731 if (it == latest->client_ranges.end())
2732 return true;
2733 if (it->first != p.first)
2734 return true;
2735 if (ms > it->second.range.last)
2736 return true;
2737 ++it;
2738 }
2739 }
2740 return it != latest->client_ranges.end();
2741 }
2742
2743 bool Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool *max_increased)
2744 {
2745 const auto& latest = in->get_projected_inode();
2746 uint64_t ms;
2747 if (latest->has_layout()) {
2748 ms = calc_new_max_size(latest, size);
2749 } else {
2750 // Layout-less directories like ~mds0/, have zero size
2751 ms = 0;
2752 }
2753
2754 auto pi = in->_get_projected_inode();
2755 bool updated = false;
2756
2757 // increase ranges as appropriate.
2758 // shrink to 0 if no WR|BUFFER caps issued.
2759 auto it = pi->client_ranges.begin();
2760 for (auto &p : in->client_caps) {
2761 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2762 while (it != pi->client_ranges.end() && it->first < p.first) {
2763 it = pi->client_ranges.erase(it);
2764 updated = true;
2765 }
2766
2767 if (it != pi->client_ranges.end() && it->first == p.first) {
2768 if (ms > it->second.range.last) {
2769 it->second.range.last = ms;
2770 updated = true;
2771 if (max_increased)
2772 *max_increased = true;
2773 }
2774 } else {
2775 it = pi->client_ranges.emplace_hint(it, std::piecewise_construct,
2776 std::forward_as_tuple(p.first),
2777 std::forward_as_tuple());
2778 it->second.range.last = ms;
2779 it->second.follows = in->first - 1;
2780 updated = true;
2781 if (max_increased)
2782 *max_increased = true;
2783 }
2784 p.second.mark_clientwriteable();
2785 ++it;
2786 } else {
2787 p.second.clear_clientwriteable();
2788 }
2789 }
2790 while (it != pi->client_ranges.end()) {
2791 it = pi->client_ranges.erase(it);
2792 updated = true;
2793 }
2794 if (updated) {
2795 if (pi->client_ranges.empty())
2796 in->clear_clientwriteable();
2797 else
2798 in->mark_clientwriteable();
2799 }
2800 return updated;
2801 }
2802
2803 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2804 uint64_t new_max_size, uint64_t new_size,
2805 utime_t new_mtime)
2806 {
2807 ceph_assert(in->is_auth());
2808 ceph_assert(in->is_file());
2809
2810 const auto& latest = in->get_projected_inode();
2811 uint64_t size = latest->size;
2812 bool update_size = new_size > 0;
2813
2814 if (update_size) {
2815 new_size = size = std::max(size, new_size);
2816 new_mtime = std::max(new_mtime, latest->mtime);
2817 if (latest->size == new_size && latest->mtime == new_mtime)
2818 update_size = false;
2819 }
2820
2821 bool new_ranges = check_client_ranges(in, std::max(new_max_size, size));
2822 if (!update_size && !new_ranges) {
2823 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2824 return false;
2825 }
2826
2827 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2828 << " update_size " << update_size
2829 << " on " << *in << dendl;
2830
2831 if (in->is_frozen()) {
2832 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2833 in->add_waiter(CInode::WAIT_UNFREEZE,
2834 new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime));
2835 return false;
2836 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2837 // lock?
2838 if (in->filelock.is_stable()) {
2839 if (in->get_target_loner() >= 0)
2840 file_excl(&in->filelock);
2841 else
2842 simple_lock(&in->filelock);
2843 }
2844 if (!in->filelock.can_wrlock(in->get_loner())) {
2845 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2846 in->filelock.add_waiter(SimpleLock::WAIT_STABLE,
2847 new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime));
2848 return false;
2849 }
2850 }
2851
2852 MutationRef mut(new MutationImpl());
2853 mut->ls = mds->mdlog->get_current_segment();
2854
2855 auto pi = in->project_inode(mut);
2856 pi.inode->version = in->pre_dirty();
2857
2858 bool max_increased = false;
2859 if (new_ranges &&
2860 calc_new_client_ranges(in, std::max(new_max_size, size), &max_increased)) {
2861 dout(10) << "check_inode_max_size client_ranges "
2862 << in->get_previous_projected_inode()->client_ranges
2863 << " -> " << pi.inode->client_ranges << dendl;
2864 }
2865
2866 if (update_size) {
2867 dout(10) << "check_inode_max_size size " << pi.inode->size << " -> " << new_size << dendl;
2868 pi.inode->size = new_size;
2869 pi.inode->rstat.rbytes = new_size;
2870 dout(10) << "check_inode_max_size mtime " << pi.inode->mtime << " -> " << new_mtime << dendl;
2871 pi.inode->mtime = new_mtime;
2872 if (new_mtime > pi.inode->ctime) {
2873 pi.inode->ctime = new_mtime;
2874 if (new_mtime > pi.inode->rstat.rctime)
2875 pi.inode->rstat.rctime = new_mtime;
2876 }
2877 }
2878
2879 // use EOpen if the file is still open; otherwise, use EUpdate.
2880 // this is just an optimization to push open files forward into
2881 // newer log segments.
2882 LogEvent *le;
2883 EMetaBlob *metablob;
2884 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2885 EOpen *eo = new EOpen(mds->mdlog);
2886 eo->add_ino(in->ino());
2887 metablob = &eo->metablob;
2888 le = eo;
2889 } else {
2890 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2891 metablob = &eu->metablob;
2892 le = eu;
2893 }
2894 mds->mdlog->start_entry(le);
2895
2896 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2897 // no cow, here!
2898 CDentry *parent = in->get_projected_parent_dn();
2899 metablob->add_primary_dentry(parent, in, true);
2900 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2901
2902 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
2903 UPDATE_SHAREMAX, ref_t<MClientCaps>()));
2904 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2905 mut->auth_pin(in);
2906
2907 // make max_size _increase_ timely
2908 if (max_increased)
2909 mds->mdlog->flush();
2910
2911 return true;
2912 }
2913
2914
2915 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2916 {
2917 /*
2918 * only share if currently issued a WR cap. if client doesn't have it,
2919 * file_max doesn't matter, and the client will get it if/when they get
2920 * the cap later.
2921 */
2922 dout(10) << "share_inode_max_size on " << *in << dendl;
2923 map<client_t, Capability>::iterator it;
2924 if (only_cap)
2925 it = in->client_caps.find(only_cap->get_client());
2926 else
2927 it = in->client_caps.begin();
2928 for (; it != in->client_caps.end(); ++it) {
2929 const client_t client = it->first;
2930 Capability *cap = &it->second;
2931 if (cap->is_suppress())
2932 continue;
2933 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2934 dout(10) << "share_inode_max_size with client." << client << dendl;
2935 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
2936 cap->inc_last_seq();
2937 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT,
2938 in->ino(),
2939 in->find_snaprealm()->inode->ino(),
2940 cap->get_cap_id(),
2941 cap->get_last_seq(),
2942 cap->pending(),
2943 cap->wanted(), 0,
2944 cap->get_mseq(),
2945 mds->get_osd_epoch_barrier());
2946 in->encode_cap_message(m, cap);
2947 mds->send_message_client_counted(m, client);
2948 }
2949 if (only_cap)
2950 break;
2951 }
2952 }
2953
2954 bool Locker::_need_flush_mdlog(CInode *in, int wanted, bool lock_state_any)
2955 {
2956 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2957 * pending mutations. */
2958 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2959 (lock_state_any ? in->filelock.is_locked() : in->filelock.is_unstable_and_locked())) ||
2960 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2961 (lock_state_any ? in->authlock.is_locked() : in->authlock.is_unstable_and_locked())) ||
2962 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2963 (lock_state_any ? in->linklock.is_locked() : in->linklock.is_unstable_and_locked())) ||
2964 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2965 (lock_state_any ? in->xattrlock.is_locked() : in->xattrlock.is_unstable_and_locked())))
2966 return true;
2967 return false;
2968 }
2969
2970 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2971 {
2972 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2973 dout(10) << " wanted " << ccap_string(cap->wanted())
2974 << " -> " << ccap_string(wanted) << dendl;
2975 cap->set_wanted(wanted);
2976 } else if (wanted & ~cap->wanted()) {
2977 dout(10) << " wanted " << ccap_string(cap->wanted())
2978 << " -> " << ccap_string(wanted)
2979 << " (added caps even though we had seq mismatch!)" << dendl;
2980 cap->set_wanted(wanted | cap->wanted());
2981 } else {
2982 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2983 << " -> " << ccap_string(wanted)
2984 << " (issue_seq " << issue_seq << " != last_issue "
2985 << cap->get_last_issue() << ")" << dendl;
2986 return;
2987 }
2988
2989 CInode *cur = cap->get_inode();
2990 if (!cur->is_auth()) {
2991 request_inode_file_caps(cur);
2992 return;
2993 }
2994
2995 if (cap->wanted()) {
2996 if (cur->state_test(CInode::STATE_RECOVERING) &&
2997 (cap->wanted() & (CEPH_CAP_FILE_RD |
2998 CEPH_CAP_FILE_WR))) {
2999 mds->mdcache->recovery_queue.prioritize(cur);
3000 }
3001
3002 if (mdcache->open_file_table.should_log_open(cur)) {
3003 ceph_assert(cur->last == CEPH_NOSNAP);
3004 EOpen *le = new EOpen(mds->mdlog);
3005 mds->mdlog->start_entry(le);
3006 le->add_clean_inode(cur);
3007 mds->mdlog->submit_entry(le);
3008 }
3009 }
3010 }
3011
3012 void Locker::snapflush_nudge(CInode *in)
3013 {
3014 ceph_assert(in->last != CEPH_NOSNAP);
3015 if (in->client_snap_caps.empty())
3016 return;
3017
3018 CInode *head = mdcache->get_inode(in->ino());
3019 // head inode gets unpinned when snapflush starts. It might get trimmed
3020 // before snapflush finishes.
3021 if (!head)
3022 return;
3023
3024 ceph_assert(head->is_auth());
3025 if (head->client_need_snapflush.empty())
3026 return;
3027
3028 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
3029 if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
3030 hlock = NULL;
3031 for (int i = 0; i < num_cinode_locks; i++) {
3032 SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
3033 if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
3034 hlock = lock;
3035 break;
3036 }
3037 }
3038 }
3039 if (hlock) {
3040 _rdlock_kick(hlock, true);
3041 } else {
3042 // also, requeue, in case of unstable lock
3043 need_snapflush_inodes.push_back(&in->item_caps);
3044 }
3045 }
3046
3047 void Locker::mark_need_snapflush_inode(CInode *in)
3048 {
3049 ceph_assert(in->last != CEPH_NOSNAP);
3050 if (!in->item_caps.is_on_list()) {
3051 need_snapflush_inodes.push_back(&in->item_caps);
3052 utime_t now = ceph_clock_now();
3053 in->last_dirstat_prop = now;
3054 dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
3055 }
3056 }
3057
3058 bool Locker::is_revoking_any_caps_from(client_t client)
3059 {
3060 auto it = revoking_caps_by_client.find(client);
3061 if (it == revoking_caps_by_client.end())
3062 return false;
3063 return !it->second.empty();
3064 }
3065
3066 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
3067 {
3068 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
3069 for (auto p = head_in->client_need_snapflush.begin();
3070 p != head_in->client_need_snapflush.end() && p->first < last; ) {
3071 snapid_t snapid = p->first;
3072 auto &clients = p->second;
3073 ++p; // be careful, q loop below depends on this
3074
3075 if (clients.count(client)) {
3076 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
3077 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
3078 ceph_assert(sin);
3079 ceph_assert(sin->first <= snapid);
3080 _do_snap_update(sin, snapid, 0, sin->first - 1, client, ref_t<MClientCaps>(), ref_t<MClientCaps>());
3081 head_in->remove_need_snapflush(sin, snapid, client);
3082 }
3083 }
3084 }
3085
3086
3087 bool Locker::should_defer_client_cap_frozen(CInode *in)
3088 {
3089 if (in->is_frozen())
3090 return true;
3091
3092 /*
3093 * This policy needs to be AT LEAST as permissive as allowing a client
3094 * request to go forward, or else a client request can release something,
3095 * the release gets deferred, but the request gets processed and deadlocks
3096 * because when the caps can't get revoked.
3097 *
3098 * No auth_pin implies that there is no unstable lock and @in is not auth
3099 * pinnned by client request. If parent dirfrag is auth pinned by a lock
3100 * cache, later request from lock cache owner may forcibly auth pin the @in.
3101 */
3102 if (in->is_freezing() && in->get_num_auth_pins() == 0) {
3103 CDir* dir = in->get_parent_dir();
3104 if (!dir || !dir->is_auth_pinned_by_lock_cache())
3105 return true;
3106 }
3107 return false;
3108 }
3109
3110 void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
3111 {
3112 client_t client = m->get_source().num();
3113 snapid_t follows = m->get_snap_follows();
3114 auto op = m->get_op();
3115 auto dirty = m->get_dirty();
3116 dout(7) << "handle_client_caps "
3117 << " on " << m->get_ino()
3118 << " tid " << m->get_client_tid() << " follows " << follows
3119 << " op " << ceph_cap_op_name(op)
3120 << " flags 0x" << std::hex << m->flags << std::dec << dendl;
3121
3122 Session *session = mds->get_session(m);
3123 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3124 if (!session) {
3125 dout(5) << " no session, dropping " << *m << dendl;
3126 return;
3127 }
3128 if (session->is_closed() ||
3129 session->is_closing() ||
3130 session->is_killing()) {
3131 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
3132 return;
3133 }
3134 if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
3135 dirty && m->get_client_tid() > 0 &&
3136 !session->have_completed_flush(m->get_client_tid())) {
3137 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
3138 op == CEPH_CAP_OP_FLUSHSNAP);
3139 }
3140 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3141 return;
3142 }
3143
3144 if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps);
3145 if (dirty) {
3146 if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty);
3147 }
3148
3149 if (m->get_client_tid() > 0 && session &&
3150 session->have_completed_flush(m->get_client_tid())) {
3151 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
3152 << " for client." << client << dendl;
3153 ref_t<MClientCaps> ack;
3154 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3155 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
3156 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
3157 } else {
3158 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
3159 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
3160 }
3161 ack->set_snap_follows(follows);
3162 ack->set_client_tid(m->get_client_tid());
3163 mds->send_message_client_counted(ack, m->get_connection());
3164 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3165 return;
3166 } else {
3167 // fall-thru because the message may release some caps
3168 dirty = false;
3169 op = CEPH_CAP_OP_UPDATE;
3170 }
3171 }
3172
3173 // "oldest flush tid" > 0 means client uses unique TID for each flush
3174 if (m->get_oldest_flush_tid() > 0 && session) {
3175 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
3176 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
3177
3178 if (session->get_num_trim_flushes_warnings() > 0 &&
3179 session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
3180 session->reset_num_trim_flushes_warnings();
3181 } else {
3182 if (session->get_num_completed_flushes() >=
3183 (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
3184 session->inc_num_trim_flushes_warnings();
3185 CachedStackStringStream css;
3186 *css << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
3187 << m->get_oldest_flush_tid() << "), "
3188 << session->get_num_completed_flushes()
3189 << " completed flushes recorded in session";
3190 mds->clog->warn() << css->strv();
3191 dout(20) << __func__ << " " << css->strv() << dendl;
3192 }
3193 }
3194 }
3195
3196 CInode *head_in = mdcache->get_inode(m->get_ino());
3197 if (!head_in) {
3198 if (mds->is_clientreplay()) {
3199 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
3200 << ", will try again after replayed client requests" << dendl;
3201 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
3202 return;
3203 }
3204
3205 /*
3206 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
3207 * Sequence of events that cause this are:
3208 * - client sends caps message to mds.a
3209 * - mds finishes subtree migration, send cap export to client
3210 * - mds trim its cache
3211 * - mds receives cap messages from client
3212 */
3213 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
3214 return;
3215 }
3216
3217 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3218 // Pause RADOS operations until we see the required epoch
3219 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3220 }
3221
3222 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3223 // Record the barrier so that we will retransmit it to clients
3224 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3225 }
3226
3227 dout(10) << " head inode " << *head_in << dendl;
3228
3229 Capability *cap = 0;
3230 cap = head_in->get_client_cap(client);
3231 if (!cap) {
3232 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
3233 return;
3234 }
3235 ceph_assert(cap);
3236
3237 // freezing|frozen?
3238 if (should_defer_client_cap_frozen(head_in)) {
3239 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
3240 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
3241 return;
3242 }
3243 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
3244 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
3245 << ", dropping" << dendl;
3246 return;
3247 }
3248
3249 bool need_unpin = false;
3250
3251 // flushsnap?
3252 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3253 if (!head_in->is_auth()) {
3254 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
3255 goto out;
3256 }
3257
3258 SnapRealm *realm = head_in->find_snaprealm();
3259 snapid_t snap = realm->get_snap_following(follows);
3260 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
3261
3262 auto p = head_in->client_need_snapflush.begin();
3263 if (p != head_in->client_need_snapflush.end() && p->first < snap) {
3264 head_in->auth_pin(this); // prevent subtree frozen
3265 need_unpin = true;
3266 _do_null_snapflush(head_in, client, snap);
3267 }
3268
3269 CInode *in = head_in;
3270 if (snap != CEPH_NOSNAP) {
3271 in = mdcache->pick_inode_snap(head_in, snap - 1);
3272 if (in != head_in)
3273 dout(10) << " snapped inode " << *in << dendl;
3274 }
3275
3276 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
3277 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
3278 // case we get a dup response, so whatever.)
3279 ref_t<MClientCaps> ack;
3280 if (dirty) {
3281 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
3282 ack->set_snap_follows(follows);
3283 ack->set_client_tid(m->get_client_tid());
3284 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3285 }
3286
3287 if (in == head_in ||
3288 (head_in->client_need_snapflush.count(snap) &&
3289 head_in->client_need_snapflush[snap].count(client))) {
3290 dout(7) << " flushsnap snap " << snap
3291 << " client." << client << " on " << *in << dendl;
3292
3293 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
3294 if (in == head_in)
3295 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
3296
3297 _do_snap_update(in, snap, dirty, follows, client, m, ack);
3298
3299 if (in != head_in)
3300 head_in->remove_need_snapflush(in, snap, client);
3301 } else {
3302 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
3303 if (ack) {
3304 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
3305 mds->send_message_client_counted(ack, m->get_connection());
3306 }
3307 }
3308 goto out;
3309 }
3310
3311 if (cap->get_cap_id() != m->get_cap_id()) {
3312 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
3313 } else {
3314 CInode *in = head_in;
3315 if (follows > 0) {
3316 in = mdcache->pick_inode_snap(head_in, follows);
3317 // intermediate snap inodes
3318 while (in != head_in) {
3319 ceph_assert(in->last != CEPH_NOSNAP);
3320 if (in->is_auth() && dirty) {
3321 dout(10) << " updating intermediate snapped inode " << *in << dendl;
3322 _do_cap_update(in, NULL, dirty, follows, m, ref_t<MClientCaps>());
3323 }
3324 in = mdcache->pick_inode_snap(head_in, in->last);
3325 }
3326 }
3327
3328 // head inode, and cap
3329 ref_t<MClientCaps> ack;
3330
3331 int caps = m->get_caps();
3332 if (caps & ~cap->issued()) {
3333 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3334 caps &= cap->issued();
3335 }
3336
3337 int revoked = cap->confirm_receipt(m->get_seq(), caps);
3338 dout(10) << " follows " << follows
3339 << " retains " << ccap_string(m->get_caps())
3340 << " dirty " << ccap_string(dirty)
3341 << " on " << *in << dendl;
3342
3343 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3344 eval_lock_caches(cap);
3345
3346 // missing/skipped snapflush?
3347 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
3348 // presently only does so when it has actual dirty metadata. But, we
3349 // set up the need_snapflush stuff based on the issued caps.
3350 // We can infer that the client WONT send a FLUSHSNAP once they have
3351 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
3352 // update/release).
3353 if (!head_in->client_need_snapflush.empty()) {
3354 if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
3355 !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
3356 head_in->auth_pin(this); // prevent subtree frozen
3357 need_unpin = true;
3358 _do_null_snapflush(head_in, client);
3359 } else {
3360 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
3361 }
3362 }
3363 if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
3364 cap->clear_needsnapflush();
3365
3366 if (dirty && in->is_auth()) {
3367 dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
3368 << " seq " << m->get_seq() << " on " << *in << dendl;
3369 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
3370 m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
3371 ack->set_client_tid(m->get_client_tid());
3372 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3373 }
3374
3375 // filter wanted based on what we could ever give out (given auth/replica status)
3376 bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
3377 int new_wanted = m->get_wanted();
3378 if (new_wanted != cap->wanted()) {
3379 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
3380 // exapnding caps. make sure we aren't waiting for a log flush
3381 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
3382 }
3383
3384 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
3385 }
3386
3387 if (in->is_auth() &&
3388 _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) {
3389 // updated
3390 eval(in, CEPH_CAP_LOCKS);
3391
3392 if (!need_flush && (cap->wanted() & ~cap->pending()))
3393 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
3394 } else {
3395 // no update, ack now.
3396 if (ack) {
3397 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
3398 mds->send_message_client_counted(ack, m->get_connection());
3399 }
3400
3401 bool did_issue = eval(in, CEPH_CAP_LOCKS);
3402 if (!did_issue && (cap->wanted() & ~cap->pending()))
3403 issue_caps(in, cap);
3404
3405 if (cap->get_last_seq() == 0 &&
3406 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
3407 share_inode_max_size(in, cap);
3408 }
3409 }
3410
3411 if (need_flush)
3412 mds->mdlog->flush();
3413 }
3414
3415 out:
3416 if (need_unpin)
3417 head_in->auth_unpin(this);
3418 }
3419
3420
3421 class C_Locker_RetryRequestCapRelease : public LockerContext {
3422 client_t client;
3423 ceph_mds_request_release item;
3424 public:
3425 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
3426 LockerContext(l), client(c), item(it) { }
3427 void finish(int r) override {
3428 string dname;
3429 MDRequestRef null_ref;
3430 locker->process_request_cap_release(null_ref, client, item, dname);
3431 }
3432 };
3433
3434 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
3435 std::string_view dname)
3436 {
3437 inodeno_t ino = (uint64_t)item.ino;
3438 uint64_t cap_id = item.cap_id;
3439 int caps = item.caps;
3440 int wanted = item.wanted;
3441 int seq = item.seq;
3442 int issue_seq = item.issue_seq;
3443 int mseq = item.mseq;
3444
3445 CInode *in = mdcache->get_inode(ino);
3446 if (!in)
3447 return;
3448
3449 if (dname.length()) {
3450 frag_t fg = in->pick_dirfrag(dname);
3451 CDir *dir = in->get_dirfrag(fg);
3452 if (dir) {
3453 CDentry *dn = dir->lookup(dname);
3454 if (dn) {
3455 ClientLease *l = dn->get_client_lease(client);
3456 if (l) {
3457 dout(10) << __func__ << " removing lease on " << *dn << dendl;
3458 dn->remove_client_lease(l, this);
3459 } else {
3460 dout(7) << __func__ << " client." << client
3461 << " doesn't have lease on " << *dn << dendl;
3462 }
3463 } else {
3464 dout(7) << __func__ << " client." << client << " released lease on dn "
3465 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3466 }
3467 }
3468 }
3469
3470 Capability *cap = in->get_client_cap(client);
3471 if (!cap)
3472 return;
3473
3474 dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in
3475 << (mdr ? "" : " (DEFERRED, no mdr)")
3476 << dendl;
3477
3478 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3479 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3480 return;
3481 }
3482
3483 if (cap->get_cap_id() != cap_id) {
3484 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3485 return;
3486 }
3487
3488 if (should_defer_client_cap_frozen(in)) {
3489 dout(7) << " frozen, deferring" << dendl;
3490 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3491 return;
3492 }
3493
3494 if (mds->logger) mds->logger->inc(l_mdss_process_request_cap_release);
3495
3496 if (caps & ~cap->issued()) {
3497 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3498 caps &= cap->issued();
3499 }
3500 int revoked = cap->confirm_receipt(seq, caps);
3501 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3502 eval_lock_caches(cap);
3503
3504 if (!in->client_need_snapflush.empty() &&
3505 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3506 _do_null_snapflush(in, client);
3507 }
3508
3509 adjust_cap_wanted(cap, wanted, issue_seq);
3510
3511 if (mdr)
3512 cap->inc_suppress();
3513 eval(in, CEPH_CAP_LOCKS);
3514 if (mdr)
3515 cap->dec_suppress();
3516
3517 // take note; we may need to reissue on this cap later
3518 if (mdr)
3519 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3520 }
3521
3522 class C_Locker_RetryKickIssueCaps : public LockerContext {
3523 CInode *in;
3524 client_t client;
3525 ceph_seq_t seq;
3526 public:
3527 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3528 LockerContext(l), in(i), client(c), seq(s) {
3529 in->get(CInode::PIN_PTRWAITER);
3530 }
3531 void finish(int r) override {
3532 locker->kick_issue_caps(in, client, seq);
3533 in->put(CInode::PIN_PTRWAITER);
3534 }
3535 };
3536
3537 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3538 {
3539 Capability *cap = in->get_client_cap(client);
3540 if (!cap || cap->get_last_seq() != seq)
3541 return;
3542 if (in->is_frozen()) {
3543 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3544 in->add_waiter(CInode::WAIT_UNFREEZE,
3545 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3546 return;
3547 }
3548 dout(10) << "kick_issue_caps released at current seq " << seq
3549 << ", reissuing" << dendl;
3550 issue_caps(in, cap);
3551 }
3552
3553 void Locker::kick_cap_releases(MDRequestRef& mdr)
3554 {
3555 client_t client = mdr->get_client();
3556 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3557 p != mdr->cap_releases.end();
3558 ++p) {
3559 CInode *in = mdcache->get_inode(p->first);
3560 if (!in)
3561 continue;
3562 kick_issue_caps(in, client, p->second);
3563 }
3564 }
3565
3566 /**
3567 * m and ack might be NULL, so don't dereference them unless dirty != 0
3568 */
3569 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack)
3570 {
3571 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3572 << " follows " << follows << " snap " << snap
3573 << " on " << *in << dendl;
3574
3575 if (snap == CEPH_NOSNAP) {
3576 // hmm, i guess snap was already deleted? just ack!
3577 dout(10) << " wow, the snap following " << follows
3578 << " was already deleted. nothing to record, just ack." << dendl;
3579 if (ack) {
3580 if (ack->get_op() == CEPH_CAP_OP_FLUSHSNAP_ACK) {
3581 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
3582 }
3583 mds->send_message_client_counted(ack, m->get_connection());
3584 }
3585 return;
3586 }
3587
3588 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3589 mds->mdlog->start_entry(le);
3590 MutationRef mut = new MutationImpl();
3591 mut->ls = mds->mdlog->get_current_segment();
3592
3593 // normal metadata updates that we can apply to the head as well.
3594
3595 // update xattrs?
3596 CInode::mempool_xattr_map *px = nullptr;
3597 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3598 m->xattrbl.length() &&
3599 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3600
3601 CInode::mempool_old_inode *oi = nullptr;
3602 CInode::old_inode_map_ptr _old_inodes;
3603 if (in->is_any_old_inodes()) {
3604 auto last = in->pick_old_inode(snap);
3605 if (last) {
3606 _old_inodes = CInode::allocate_old_inode_map(*in->get_old_inodes());
3607 oi = &_old_inodes->at(last);
3608 if (snap > oi->first) {
3609 (*_old_inodes)[snap - 1] = *oi;;
3610 oi->first = snap;
3611 }
3612 }
3613 }
3614
3615 CInode::mempool_inode *i;
3616 if (oi) {
3617 dout(10) << " writing into old inode" << dendl;
3618 auto pi = in->project_inode(mut);
3619 pi.inode->version = in->pre_dirty();
3620 i = &oi->inode;
3621 if (xattrs)
3622 px = &oi->xattrs;
3623 } else {
3624 auto pi = in->project_inode(mut, xattrs);
3625 pi.inode->version = in->pre_dirty();
3626 i = pi.inode.get();
3627 if (xattrs)
3628 px = pi.xattrs.get();
3629 }
3630
3631 _update_cap_fields(in, dirty, m, i);
3632
3633 // xattr
3634 if (xattrs) {
3635 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3636 << " len " << m->xattrbl.length() << dendl;
3637 i->xattr_version = m->head.xattr_version;
3638 auto p = m->xattrbl.cbegin();
3639 decode(*px, p);
3640 }
3641
3642 {
3643 auto it = i->client_ranges.find(client);
3644 if (it != i->client_ranges.end()) {
3645 if (in->last == snap) {
3646 dout(10) << " removing client_range entirely" << dendl;
3647 i->client_ranges.erase(it);
3648 } else {
3649 dout(10) << " client_range now follows " << snap << dendl;
3650 it->second.follows = snap;
3651 }
3652 }
3653 }
3654
3655 if (_old_inodes)
3656 in->reset_old_inodes(std::move(_old_inodes));
3657
3658 mut->auth_pin(in);
3659 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3660 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3661
3662 // "oldest flush tid" > 0 means client uses unique TID for each flush
3663 if (ack && ack->get_oldest_flush_tid() > 0)
3664 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3665 ack->get_oldest_flush_tid());
3666
3667 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
3668 ack, client));
3669 }
3670
3671 void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps> &m, CInode::mempool_inode *pi)
3672 {
3673 if (dirty == 0)
3674 return;
3675
3676 /* m must be valid if there are dirty caps */
3677 ceph_assert(m);
3678 uint64_t features = m->get_connection()->get_features();
3679
3680 if (m->get_ctime() > pi->ctime) {
3681 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3682 << " for " << *in << dendl;
3683 pi->ctime = m->get_ctime();
3684 if (m->get_ctime() > pi->rstat.rctime)
3685 pi->rstat.rctime = m->get_ctime();
3686 }
3687
3688 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3689 m->get_change_attr() > pi->change_attr) {
3690 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3691 << " for " << *in << dendl;
3692 pi->change_attr = m->get_change_attr();
3693 }
3694
3695 // file
3696 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3697 utime_t atime = m->get_atime();
3698 utime_t mtime = m->get_mtime();
3699 uint64_t size = m->get_size();
3700 version_t inline_version = m->inline_version;
3701
3702 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3703 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3704 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3705 << " for " << *in << dendl;
3706 pi->mtime = mtime;
3707 if (mtime > pi->rstat.rctime)
3708 pi->rstat.rctime = mtime;
3709 }
3710 if (in->is_file() && // ONLY if regular file
3711 size > pi->size) {
3712 dout(7) << " size " << pi->size << " -> " << size
3713 << " for " << *in << dendl;
3714 pi->size = size;
3715 pi->rstat.rbytes = size;
3716 }
3717 if (in->is_file() &&
3718 (dirty & CEPH_CAP_FILE_WR) &&
3719 inline_version > pi->inline_data.version) {
3720 pi->inline_data.version = inline_version;
3721 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3722 pi->inline_data.set_data(m->inline_data);
3723 else
3724 pi->inline_data.free_data();
3725 }
3726 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3727 dout(7) << " atime " << pi->atime << " -> " << atime
3728 << " for " << *in << dendl;
3729 pi->atime = atime;
3730 }
3731 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3732 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3733 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3734 << " for " << *in << dendl;
3735 pi->time_warp_seq = m->get_time_warp_seq();
3736 }
3737 }
3738 // auth
3739 if (dirty & CEPH_CAP_AUTH_EXCL) {
3740 if (m->head.uid != pi->uid) {
3741 dout(7) << " uid " << pi->uid
3742 << " -> " << m->head.uid
3743 << " for " << *in << dendl;
3744 pi->uid = m->head.uid;
3745 }
3746 if (m->head.gid != pi->gid) {
3747 dout(7) << " gid " << pi->gid
3748 << " -> " << m->head.gid
3749 << " for " << *in << dendl;
3750 pi->gid = m->head.gid;
3751 }
3752 if (m->head.mode != pi->mode) {
3753 dout(7) << " mode " << oct << pi->mode
3754 << " -> " << m->head.mode << dec
3755 << " for " << *in << dendl;
3756 pi->mode = m->head.mode;
3757 }
3758 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3759 dout(7) << " btime " << oct << pi->btime
3760 << " -> " << m->get_btime() << dec
3761 << " for " << *in << dendl;
3762 pi->btime = m->get_btime();
3763 }
3764 }
3765 }
3766
3767 /*
3768 * update inode based on cap flush|flushsnap|wanted.
3769 * adjust max_size, if needed.
3770 * if we update, return true; otherwise, false (no updated needed).
3771 */
3772 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3773 int dirty, snapid_t follows,
3774 const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack,
3775 bool *need_flush)
3776 {
3777 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3778 << " issued " << ccap_string(cap ? cap->issued() : 0)
3779 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3780 << " on " << *in << dendl;
3781 ceph_assert(in->is_auth());
3782 client_t client = m->get_source().num();
3783 const auto& latest = in->get_projected_inode();
3784
3785 // increase or zero max_size?
3786 uint64_t size = m->get_size();
3787 bool change_max = false;
3788 uint64_t old_max = latest->get_client_range(client);
3789 uint64_t new_max = old_max;
3790
3791 if (in->is_file()) {
3792 bool forced_change_max = false;
3793 dout(20) << "inode is file" << dendl;
3794 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3795 dout(20) << "client has write caps; m->get_max_size="
3796 << m->get_max_size() << "; old_max=" << old_max << dendl;
3797 if (m->get_max_size() > new_max) {
3798 dout(10) << "client requests file_max " << m->get_max_size()
3799 << " > max " << old_max << dendl;
3800 change_max = true;
3801 forced_change_max = true;
3802 new_max = calc_new_max_size(latest, m->get_max_size());
3803 } else {
3804 new_max = calc_new_max_size(latest, size);
3805
3806 if (new_max > old_max)
3807 change_max = true;
3808 else
3809 new_max = old_max;
3810 }
3811 } else {
3812 if (old_max) {
3813 change_max = true;
3814 new_max = 0;
3815 }
3816 }
3817
3818 if (in->last == CEPH_NOSNAP &&
3819 change_max &&
3820 !in->filelock.can_wrlock(client) &&
3821 !in->filelock.can_force_wrlock(client)) {
3822 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3823 if (in->filelock.is_stable()) {
3824 bool need_issue = false;
3825 if (cap)
3826 cap->inc_suppress();
3827 if (in->get_mds_caps_wanted().empty() &&
3828 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3829 if (in->filelock.get_state() != LOCK_EXCL)
3830 file_excl(&in->filelock, &need_issue);
3831 } else
3832 simple_lock(&in->filelock, &need_issue);
3833 if (need_issue)
3834 issue_caps(in);
3835 if (cap)
3836 cap->dec_suppress();
3837 }
3838 if (!in->filelock.can_wrlock(client) &&
3839 !in->filelock.can_force_wrlock(client)) {
3840 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3841 forced_change_max ? new_max : 0,
3842 0, utime_t());
3843
3844 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3845 change_max = false;
3846 }
3847 }
3848 }
3849
3850 if (m->flockbl.length()) {
3851 int32_t num_locks;
3852 auto bli = m->flockbl.cbegin();
3853 decode(num_locks, bli);
3854 for ( int i=0; i < num_locks; ++i) {
3855 ceph_filelock decoded_lock;
3856 decode(decoded_lock, bli);
3857 in->get_fcntl_lock_state()->held_locks.
3858 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3859 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3860 }
3861 decode(num_locks, bli);
3862 for ( int i=0; i < num_locks; ++i) {
3863 ceph_filelock decoded_lock;
3864 decode(decoded_lock, bli);
3865 in->get_flock_lock_state()->held_locks.
3866 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3867 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3868 }
3869 }
3870
3871 if (!dirty && !change_max)
3872 return false;
3873
3874 Session *session = mds->get_session(m);
3875 if (session->check_access(in, MAY_WRITE,
3876 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3877 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3878 return false;
3879 }
3880
3881 // do the update.
3882 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3883 mds->mdlog->start_entry(le);
3884
3885 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3886 m->xattrbl.length() &&
3887 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3888
3889 MutationRef mut(new MutationImpl());
3890 mut->ls = mds->mdlog->get_current_segment();
3891
3892 auto pi = in->project_inode(mut, xattr);
3893 pi.inode->version = in->pre_dirty();
3894
3895 _update_cap_fields(in, dirty, m, pi.inode.get());
3896
3897 if (change_max) {
3898 dout(7) << " max_size " << old_max << " -> " << new_max
3899 << " for " << *in << dendl;
3900 if (new_max) {
3901 auto &cr = pi.inode->client_ranges[client];
3902 cr.range.first = 0;
3903 cr.range.last = new_max;
3904 cr.follows = in->first - 1;
3905 in->mark_clientwriteable();
3906 if (cap)
3907 cap->mark_clientwriteable();
3908 } else {
3909 pi.inode->client_ranges.erase(client);
3910 if (pi.inode->client_ranges.empty())
3911 in->clear_clientwriteable();
3912 if (cap)
3913 cap->clear_clientwriteable();
3914 }
3915 }
3916
3917 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3918 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3919
3920 // auth
3921 if (dirty & CEPH_CAP_AUTH_EXCL)
3922 wrlock_force(&in->authlock, mut);
3923
3924 // xattrs update?
3925 if (xattr) {
3926 dout(7) << " xattrs v" << pi.inode->xattr_version << " -> " << m->head.xattr_version << dendl;
3927 pi.inode->xattr_version = m->head.xattr_version;
3928 auto p = m->xattrbl.cbegin();
3929 decode_noshare(*pi.xattrs, p);
3930 wrlock_force(&in->xattrlock, mut);
3931 }
3932
3933 mut->auth_pin(in);
3934 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3935 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3936
3937 // "oldest flush tid" > 0 means client uses unique TID for each flush
3938 if (ack && ack->get_oldest_flush_tid() > 0)
3939 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3940 ack->get_oldest_flush_tid());
3941
3942 unsigned update_flags = 0;
3943 if (change_max)
3944 update_flags |= UPDATE_SHAREMAX;
3945 if (cap)
3946 update_flags |= UPDATE_NEEDSISSUE;
3947 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
3948 ack, client));
3949 if (need_flush && !*need_flush &&
3950 ((change_max && new_max) || // max INCREASE
3951 _need_flush_mdlog(in, dirty)))
3952 *need_flush = true;
3953
3954 return true;
3955 }
3956
3957 void Locker::handle_client_cap_release(const cref_t<MClientCapRelease> &m)
3958 {
3959 client_t client = m->get_source().num();
3960 dout(10) << "handle_client_cap_release " << *m << dendl;
3961
3962 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3963 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3964 return;
3965 }
3966
3967 if (mds->logger) mds->logger->inc(l_mdss_handle_client_cap_release);
3968
3969 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3970 // Pause RADOS operations until we see the required epoch
3971 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3972 }
3973
3974 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3975 // Record the barrier so that we will retransmit it to clients
3976 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3977 }
3978
3979 Session *session = mds->get_session(m);
3980
3981 for (const auto &cap : m->caps) {
3982 _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
3983 }
3984
3985 if (session) {
3986 session->notify_cap_release(m->caps.size());
3987 }
3988 }
3989
3990 class C_Locker_RetryCapRelease : public LockerContext {
3991 client_t client;
3992 inodeno_t ino;
3993 uint64_t cap_id;
3994 ceph_seq_t migrate_seq;
3995 ceph_seq_t issue_seq;
3996 public:
3997 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3998 ceph_seq_t mseq, ceph_seq_t seq) :
3999 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
4000 void finish(int r) override {
4001 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
4002 }
4003 };
4004
4005 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
4006 ceph_seq_t mseq, ceph_seq_t seq)
4007 {
4008 CInode *in = mdcache->get_inode(ino);
4009 if (!in) {
4010 dout(7) << "_do_cap_release missing ino " << ino << dendl;
4011 return;
4012 }
4013 Capability *cap = in->get_client_cap(client);
4014 if (!cap) {
4015 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
4016 return;
4017 }
4018
4019 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
4020 if (cap->get_cap_id() != cap_id) {
4021 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
4022 return;
4023 }
4024 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
4025 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
4026 return;
4027 }
4028 if (should_defer_client_cap_frozen(in)) {
4029 dout(7) << " freezing|frozen, deferring" << dendl;
4030 in->add_waiter(CInode::WAIT_UNFREEZE,
4031 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
4032 return;
4033 }
4034 if (seq != cap->get_last_issue()) {
4035 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
4036 // clean out any old revoke history
4037 cap->clean_revoke_from(seq);
4038 eval_cap_gather(in);
4039 return;
4040 }
4041 remove_client_cap(in, cap);
4042 }
4043
4044 void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
4045 {
4046 client_t client = cap->get_client();
4047 // clean out any pending snapflush state
4048 if (!in->client_need_snapflush.empty())
4049 _do_null_snapflush(in, client);
4050
4051 while (!cap->lock_caches.empty()) {
4052 MDLockCache* lock_cache = cap->lock_caches.front();
4053 lock_cache->client_cap = nullptr;
4054 invalidate_lock_cache(lock_cache);
4055 }
4056
4057 bool notable = cap->is_notable();
4058 in->remove_client_cap(client);
4059 if (!notable)
4060 return;
4061
4062 if (in->is_auth()) {
4063 // make sure we clear out the client byte range
4064 if (in->get_projected_inode()->client_ranges.count(client) &&
4065 !(in->get_inode()->nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
4066 if (kill)
4067 in->state_set(CInode::STATE_NEEDSRECOVER);
4068 else
4069 check_inode_max_size(in);
4070 }
4071 } else {
4072 request_inode_file_caps(in);
4073 }
4074
4075 try_eval(in, CEPH_CAP_LOCKS);
4076 }
4077
4078
4079 /**
4080 * Return true if any currently revoking caps exceed the
4081 * session_timeout threshold.
4082 */
4083 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
4084 double timeout) const
4085 {
4086 xlist<Capability*>::const_iterator p = revoking.begin();
4087 if (p.end()) {
4088 // No revoking caps at the moment
4089 return false;
4090 } else {
4091 utime_t now = ceph_clock_now();
4092 utime_t age = now - (*p)->get_last_revoke_stamp();
4093 if (age <= timeout) {
4094 return false;
4095 } else {
4096 return true;
4097 }
4098 }
4099 }
4100
4101 std::set<client_t> Locker::get_late_revoking_clients(double timeout) const
4102 {
4103 std::set<client_t> result;
4104
4105 if (any_late_revoking_caps(revoking_caps, timeout)) {
4106 // Slow path: execute in O(N_clients)
4107 for (auto &p : revoking_caps_by_client) {
4108 if (any_late_revoking_caps(p.second, timeout)) {
4109 result.insert(p.first);
4110 }
4111 }
4112 } else {
4113 // Fast path: no misbehaving clients, execute in O(1)
4114 }
4115 return result;
4116 }
4117
4118 // Hard-code instead of surfacing a config settings because this is
4119 // really a hack that should go away at some point when we have better
4120 // inspection tools for getting at detailed cap state (#7316)
4121 #define MAX_WARN_CAPS 100
4122
4123 void Locker::caps_tick()
4124 {
4125 utime_t now = ceph_clock_now();
4126
4127 if (!need_snapflush_inodes.empty()) {
4128 // snap inodes that needs flush are auth pinned, they affect
4129 // subtree/difrarg freeze.
4130 utime_t cutoff = now;
4131 cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
4132
4133 CInode *last = need_snapflush_inodes.back();
4134 while (!need_snapflush_inodes.empty()) {
4135 CInode *in = need_snapflush_inodes.front();
4136 if (in->last_dirstat_prop >= cutoff)
4137 break;
4138 in->item_caps.remove_myself();
4139 snapflush_nudge(in);
4140 if (in == last)
4141 break;
4142 }
4143 }
4144
4145 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
4146
4147 now = ceph_clock_now();
4148 int n = 0;
4149 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
4150 Capability *cap = *p;
4151
4152 utime_t age = now - cap->get_last_revoke_stamp();
4153 dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
4154 if (age <= mds->mdsmap->get_session_timeout()) {
4155 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
4156 break;
4157 } else {
4158 ++n;
4159 if (n > MAX_WARN_CAPS) {
4160 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
4161 << "revoking, ignoring subsequent caps" << dendl;
4162 break;
4163 }
4164 }
4165 // exponential backoff of warning intervals
4166 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
4167 cap->inc_num_revoke_warnings();
4168 CachedStackStringStream css;
4169 *css << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
4170 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
4171 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
4172 mds->clog->warn() << css->strv();
4173 dout(20) << __func__ << " " << css->strv() << dendl;
4174 } else {
4175 dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
4176 }
4177 }
4178 }
4179
4180
4181 void Locker::handle_client_lease(const cref_t<MClientLease> &m)
4182 {
4183 dout(10) << "handle_client_lease " << *m << dendl;
4184
4185 ceph_assert(m->get_source().is_client());
4186 client_t client = m->get_source().num();
4187
4188 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
4189 if (!in) {
4190 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
4191 return;
4192 }
4193 CDentry *dn = 0;
4194
4195 frag_t fg = in->pick_dirfrag(m->dname);
4196 CDir *dir = in->get_dirfrag(fg);
4197 if (dir)
4198 dn = dir->lookup(m->dname);
4199 if (!dn) {
4200 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
4201 return;
4202 }
4203 dout(10) << " on " << *dn << dendl;
4204
4205 // replica and lock
4206 ClientLease *l = dn->get_client_lease(client);
4207 if (!l) {
4208 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
4209 return;
4210 }
4211
4212 switch (m->get_action()) {
4213 case CEPH_MDS_LEASE_REVOKE_ACK:
4214 case CEPH_MDS_LEASE_RELEASE:
4215 if (l->seq != m->get_seq()) {
4216 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
4217 } else {
4218 dout(7) << "handle_client_lease client." << client
4219 << " on " << *dn << dendl;
4220 dn->remove_client_lease(l, this);
4221 }
4222 break;
4223
4224 case CEPH_MDS_LEASE_RENEW:
4225 {
4226 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
4227 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
4228 if (dn->lock.can_lease(client)) {
4229 auto reply = make_message<MClientLease>(*m);
4230 int pool = 1; // fixme.. do something smart!
4231 reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
4232 reply->h.seq = ++l->seq;
4233 reply->clear_payload();
4234
4235 utime_t now = ceph_clock_now();
4236 now += mdcache->client_lease_durations[pool];
4237 mdcache->touch_client_lease(l, pool, now);
4238
4239 mds->send_message_client_counted(reply, m->get_connection());
4240 }
4241 }
4242 break;
4243
4244 default:
4245 ceph_abort(); // implement me
4246 break;
4247 }
4248 }
4249
4250
4251 void Locker::issue_client_lease(CDentry *dn, CInode *in, MDRequestRef &mdr, utime_t now,
4252 bufferlist &bl)
4253 {
4254 client_t client = mdr->get_client();
4255 Session *session = mdr->session;
4256
4257 CInode *diri = dn->get_dir()->get_inode();
4258 if (mdr->snapid == CEPH_NOSNAP &&
4259 dn->lock.can_lease(client) &&
4260 !diri->is_stray() && // do not issue dn leases in stray dir!
4261 !diri->filelock.can_lease(client) &&
4262 !(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) {
4263 int mask = 0;
4264 CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
4265 if (dnl->is_primary()) {
4266 ceph_assert(dnl->get_inode() == in);
4267 mask = CEPH_LEASE_PRIMARY_LINK;
4268 } else {
4269 if (dnl->is_remote())
4270 ceph_assert(dnl->get_remote_ino() == in->ino());
4271 else
4272 ceph_assert(!in);
4273 }
4274 // issue a dentry lease
4275 ClientLease *l = dn->add_client_lease(client, session);
4276 session->touch_lease(l);
4277
4278 int pool = 1; // fixme.. do something smart!
4279 now += mdcache->client_lease_durations[pool];
4280 mdcache->touch_client_lease(l, pool, now);
4281
4282 LeaseStat lstat;
4283 lstat.mask = CEPH_LEASE_VALID | mask;
4284 lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
4285 lstat.seq = ++l->seq;
4286 lstat.alternate_name = std::string(dn->alternate_name);
4287 encode_lease(bl, session->info, lstat);
4288 dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
4289 << " on " << *dn << dendl;
4290 } else {
4291 // null lease
4292 LeaseStat lstat;
4293 lstat.mask = 0;
4294 lstat.alternate_name = std::string(dn->alternate_name);
4295 encode_lease(bl, session->info, lstat);
4296 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
4297 }
4298 }
4299
4300
4301 void Locker::revoke_client_leases(SimpleLock *lock)
4302 {
4303 int n = 0;
4304 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4305 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
4306 p != dn->client_lease_map.end();
4307 ++p) {
4308 ClientLease *l = p->second;
4309
4310 n++;
4311 ceph_assert(lock->get_type() == CEPH_LOCK_DN);
4312
4313 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4314 int mask = 1 | CEPH_LOCK_DN; // old and new bits
4315
4316 // i should also revoke the dir ICONTENT lease, if they have it!
4317 CInode *diri = dn->get_dir()->get_inode();
4318 auto lease = make_message<MClientLease>(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
4319 mds->send_message_client_counted(lease, l->client);
4320 }
4321 }
4322
4323 void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
4324 const LeaseStat& ls)
4325 {
4326 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
4327 ENCODE_START(2, 1, bl);
4328 encode(ls.mask, bl);
4329 encode(ls.duration_ms, bl);
4330 encode(ls.seq, bl);
4331 encode(ls.alternate_name, bl);
4332 ENCODE_FINISH(bl);
4333 }
4334 else {
4335 encode(ls.mask, bl);
4336 encode(ls.duration_ms, bl);
4337 encode(ls.seq, bl);
4338 }
4339 }
4340
4341 // locks ----------------------------------------------------------------
4342
4343 SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
4344 {
4345 switch (lock_type) {
4346 case CEPH_LOCK_DN:
4347 {
4348 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
4349 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
4350 frag_t fg;
4351 CDir *dir = 0;
4352 CDentry *dn = 0;
4353 if (diri) {
4354 fg = diri->pick_dirfrag(info.dname);
4355 dir = diri->get_dirfrag(fg);
4356 if (dir)
4357 dn = dir->lookup(info.dname, info.snapid);
4358 }
4359 if (!dn) {
4360 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
4361 return 0;
4362 }
4363 return &dn->lock;
4364 }
4365
4366 case CEPH_LOCK_IAUTH:
4367 case CEPH_LOCK_ILINK:
4368 case CEPH_LOCK_IDFT:
4369 case CEPH_LOCK_IFILE:
4370 case CEPH_LOCK_INEST:
4371 case CEPH_LOCK_IXATTR:
4372 case CEPH_LOCK_ISNAP:
4373 case CEPH_LOCK_IFLOCK:
4374 case CEPH_LOCK_IPOLICY:
4375 {
4376 CInode *in = mdcache->get_inode(info.ino, info.snapid);
4377 if (!in) {
4378 dout(7) << "get_lock don't have ino " << info.ino << dendl;
4379 return 0;
4380 }
4381 switch (lock_type) {
4382 case CEPH_LOCK_IAUTH: return &in->authlock;
4383 case CEPH_LOCK_ILINK: return &in->linklock;
4384 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
4385 case CEPH_LOCK_IFILE: return &in->filelock;
4386 case CEPH_LOCK_INEST: return &in->nestlock;
4387 case CEPH_LOCK_IXATTR: return &in->xattrlock;
4388 case CEPH_LOCK_ISNAP: return &in->snaplock;
4389 case CEPH_LOCK_IFLOCK: return &in->flocklock;
4390 case CEPH_LOCK_IPOLICY: return &in->policylock;
4391 }
4392 }
4393
4394 default:
4395 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
4396 ceph_abort();
4397 break;
4398 }
4399
4400 return 0;
4401 }
4402
4403 void Locker::handle_lock(const cref_t<MLock> &m)
4404 {
4405 // nobody should be talking to us during recovery.
4406 ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
4407
4408 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
4409 if (!lock) {
4410 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
4411 return;
4412 }
4413
4414 switch (lock->get_type()) {
4415 case CEPH_LOCK_DN:
4416 case CEPH_LOCK_IAUTH:
4417 case CEPH_LOCK_ILINK:
4418 case CEPH_LOCK_ISNAP:
4419 case CEPH_LOCK_IXATTR:
4420 case CEPH_LOCK_IFLOCK:
4421 case CEPH_LOCK_IPOLICY:
4422 handle_simple_lock(lock, m);
4423 break;
4424
4425 case CEPH_LOCK_IDFT:
4426 case CEPH_LOCK_INEST:
4427 //handle_scatter_lock((ScatterLock*)lock, m);
4428 //break;
4429
4430 case CEPH_LOCK_IFILE:
4431 handle_file_lock(static_cast<ScatterLock*>(lock), m);
4432 break;
4433
4434 default:
4435 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
4436 ceph_abort();
4437 break;
4438 }
4439 }
4440
4441
4442
4443
4444
4445 // ==========================================================================
4446 // simple lock
4447
4448 /** This function may take a reference to m if it needs one, but does
4449 * not put references. */
4450 void Locker::handle_reqrdlock(SimpleLock *lock, const cref_t<MLock> &m)
4451 {
4452 MDSCacheObject *parent = lock->get_parent();
4453 if (parent->is_auth() &&
4454 lock->get_state() != LOCK_SYNC &&
4455 !parent->is_frozen()) {
4456 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4457 << " on " << *parent << dendl;
4458 ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
4459 if (lock->is_stable()) {
4460 simple_sync(lock);
4461 } else {
4462 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
4463 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
4464 new C_MDS_RetryMessage(mds, m));
4465 }
4466 } else {
4467 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4468 << " on " << *parent << dendl;
4469 // replica should retry
4470 }
4471 }
4472
4473 void Locker::handle_simple_lock(SimpleLock *lock, const cref_t<MLock> &m)
4474 {
4475 int from = m->get_asker();
4476
4477 dout(10) << "handle_simple_lock " << *m
4478 << " on " << *lock << " " << *lock->get_parent() << dendl;
4479
4480 if (mds->is_rejoin()) {
4481 if (lock->get_parent()->is_rejoining()) {
4482 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
4483 << ", dropping " << *m << dendl;
4484 return;
4485 }
4486 }
4487
4488 switch (m->get_action()) {
4489 // -- replica --
4490 case LOCK_AC_SYNC:
4491 ceph_assert(lock->get_state() == LOCK_LOCK);
4492 lock->decode_locked_state(m->get_data());
4493 lock->set_state(LOCK_SYNC);
4494 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4495 break;
4496
4497 case LOCK_AC_LOCK:
4498 ceph_assert(lock->get_state() == LOCK_SYNC);
4499 lock->set_state(LOCK_SYNC_LOCK);
4500 if (lock->is_leased())
4501 revoke_client_leases(lock);
4502 eval_gather(lock, true);
4503 if (lock->is_unstable_and_locked()) {
4504 if (lock->is_cached())
4505 invalidate_lock_caches(lock);
4506 mds->mdlog->flush();
4507 }
4508 break;
4509
4510
4511 // -- auth --
4512 case LOCK_AC_LOCKACK:
4513 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
4514 lock->get_state() == LOCK_SYNC_EXCL);
4515 ceph_assert(lock->is_gathering(from));
4516 lock->remove_gather(from);
4517
4518 if (lock->is_gathering()) {
4519 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4520 << ", still gathering " << lock->get_gather_set() << dendl;
4521 } else {
4522 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4523 << ", last one" << dendl;
4524 eval_gather(lock);
4525 }
4526 break;
4527
4528 case LOCK_AC_REQRDLOCK:
4529 handle_reqrdlock(lock, m);
4530 break;
4531
4532 }
4533 }
4534
4535 /* unused, currently.
4536
4537 class C_Locker_SimpleEval : public Context {
4538 Locker *locker;
4539 SimpleLock *lock;
4540 public:
4541 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4542 void finish(int r) {
4543 locker->try_simple_eval(lock);
4544 }
4545 };
4546
4547 void Locker::try_simple_eval(SimpleLock *lock)
4548 {
4549 // unstable and ambiguous auth?
4550 if (!lock->is_stable() &&
4551 lock->get_parent()->is_ambiguous_auth()) {
4552 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4553 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4554 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4555 return;
4556 }
4557
4558 if (!lock->get_parent()->is_auth()) {
4559 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4560 return;
4561 }
4562
4563 if (!lock->get_parent()->can_auth_pin()) {
4564 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4565 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4566 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4567 return;
4568 }
4569
4570 if (lock->is_stable())
4571 simple_eval(lock);
4572 }
4573 */
4574
4575
4576 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4577 {
4578 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4579
4580 ceph_assert(lock->get_parent()->is_auth());
4581 ceph_assert(lock->is_stable());
4582
4583 if (lock->get_parent()->is_freezing_or_frozen()) {
4584 // dentry/snap lock in unreadable state can block path traverse
4585 if ((lock->get_type() != CEPH_LOCK_DN &&
4586 lock->get_type() != CEPH_LOCK_ISNAP &&
4587 lock->get_type() != CEPH_LOCK_IPOLICY) ||
4588 lock->get_state() == LOCK_SYNC ||
4589 lock->get_parent()->is_frozen())
4590 return;
4591 }
4592
4593 if (mdcache->is_readonly()) {
4594 if (lock->get_state() != LOCK_SYNC) {
4595 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4596 simple_sync(lock, need_issue);
4597 }
4598 return;
4599 }
4600
4601 CInode *in = 0;
4602 int wanted = 0;
4603 if (lock->get_cap_shift()) {
4604 in = static_cast<CInode*>(lock->get_parent());
4605 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4606 }
4607
4608 // -> excl?
4609 if (lock->get_state() != LOCK_EXCL &&
4610 in && in->get_target_loner() >= 0 &&
4611 (wanted & CEPH_CAP_GEXCL)) {
4612 dout(7) << "simple_eval stable, going to excl " << *lock
4613 << " on " << *lock->get_parent() << dendl;
4614 simple_excl(lock, need_issue);
4615 }
4616
4617 // stable -> sync?
4618 else if (lock->get_state() != LOCK_SYNC &&
4619 !lock->is_wrlocked() &&
4620 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4621 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4622 dout(7) << "simple_eval stable, syncing " << *lock
4623 << " on " << *lock->get_parent() << dendl;
4624 simple_sync(lock, need_issue);
4625 }
4626 }
4627
4628
4629 // mid
4630
4631 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4632 {
4633 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4634 ceph_assert(lock->get_parent()->is_auth());
4635 ceph_assert(lock->is_stable());
4636
4637 CInode *in = 0;
4638 if (lock->get_cap_shift())
4639 in = static_cast<CInode *>(lock->get_parent());
4640
4641 int old_state = lock->get_state();
4642
4643 if (old_state != LOCK_TSYN) {
4644
4645 switch (lock->get_state()) {
4646 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4647 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4648 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4649 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4650 default: ceph_abort();
4651 }
4652
4653 int gather = 0;
4654 if (lock->is_wrlocked()) {
4655 gather++;
4656 if (lock->is_cached())
4657 invalidate_lock_caches(lock);
4658
4659 // After a client request is early replied the mdlog won't be flushed
4660 // immediately, but before safe replied the request will hold the write
4661 // locks. So if the client sends another request to a different MDS
4662 // daemon, which then needs to request read lock from current MDS daemon,
4663 // then that daemon maybe stuck at most for 5 seconds. Which will lead
4664 // the client stuck at most 5 seconds.
4665 //
4666 // Let's try to flush the mdlog when the write lock is held, which will
4667 // release the write locks after mdlog is successfully flushed.
4668 mds->mdlog->flush();
4669 }
4670
4671 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4672 send_lock_message(lock, LOCK_AC_SYNC);
4673 lock->init_gather();
4674 gather++;
4675 }
4676
4677 if (in && in->is_head()) {
4678 if (in->issued_caps_need_gather(lock)) {
4679 if (need_issue)
4680 *need_issue = true;
4681 else
4682 issue_caps(in);
4683 gather++;
4684 }
4685 }
4686
4687 bool need_recover = false;
4688 if (lock->get_type() == CEPH_LOCK_IFILE) {
4689 ceph_assert(in);
4690 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4691 mds->mdcache->queue_file_recover(in);
4692 need_recover = true;
4693 gather++;
4694 }
4695 }
4696
4697 if (!gather && lock->is_dirty()) {
4698 lock->get_parent()->auth_pin(lock);
4699 scatter_writebehind(static_cast<ScatterLock*>(lock));
4700 return false;
4701 }
4702
4703 if (gather) {
4704 lock->get_parent()->auth_pin(lock);
4705 if (need_recover)
4706 mds->mdcache->do_file_recover();
4707 return false;
4708 }
4709 }
4710
4711 if (lock->get_parent()->is_replicated()) { // FIXME
4712 bufferlist data;
4713 lock->encode_locked_state(data);
4714 send_lock_message(lock, LOCK_AC_SYNC, data);
4715 }
4716 lock->set_state(LOCK_SYNC);
4717 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4718 if (in && in->is_head()) {
4719 if (need_issue)
4720 *need_issue = true;
4721 else
4722 issue_caps(in);
4723 }
4724 return true;
4725 }
4726
4727 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4728 {
4729 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4730 ceph_assert(lock->get_parent()->is_auth());
4731 ceph_assert(lock->is_stable());
4732
4733 CInode *in = 0;
4734 if (lock->get_cap_shift())
4735 in = static_cast<CInode *>(lock->get_parent());
4736
4737 switch (lock->get_state()) {
4738 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4739 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4740 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4741 default: ceph_abort();
4742 }
4743
4744 int gather = 0;
4745 if (lock->is_rdlocked())
4746 gather++;
4747 if (lock->is_wrlocked())
4748 gather++;
4749 if (gather && lock->is_cached())
4750 invalidate_lock_caches(lock);
4751
4752 if (lock->get_parent()->is_replicated() &&
4753 lock->get_state() != LOCK_LOCK_EXCL &&
4754 lock->get_state() != LOCK_XSYN_EXCL) {
4755 send_lock_message(lock, LOCK_AC_LOCK);
4756 lock->init_gather();
4757 gather++;
4758 }
4759
4760 if (in && in->is_head()) {
4761 if (in->issued_caps_need_gather(lock)) {
4762 if (need_issue)
4763 *need_issue = true;
4764 else
4765 issue_caps(in);
4766 gather++;
4767 }
4768 }
4769
4770 if (gather) {
4771 lock->get_parent()->auth_pin(lock);
4772 } else {
4773 lock->set_state(LOCK_EXCL);
4774 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4775 if (in) {
4776 if (need_issue)
4777 *need_issue = true;
4778 else
4779 issue_caps(in);
4780 }
4781 }
4782 }
4783
4784 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4785 {
4786 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4787 ceph_assert(lock->get_parent()->is_auth());
4788 ceph_assert(lock->is_stable());
4789 ceph_assert(lock->get_state() != LOCK_LOCK);
4790
4791 CInode *in = 0;
4792 if (lock->get_cap_shift())
4793 in = static_cast<CInode *>(lock->get_parent());
4794
4795 int old_state = lock->get_state();
4796
4797 switch (lock->get_state()) {
4798 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4799 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4800 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4801 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4802 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4803 break;
4804 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4805 default: ceph_abort();
4806 }
4807
4808 int gather = 0;
4809 if (lock->is_leased()) {
4810 gather++;
4811 revoke_client_leases(lock);
4812 }
4813 if (lock->is_rdlocked()) {
4814 if (lock->is_cached())
4815 invalidate_lock_caches(lock);
4816 gather++;
4817 }
4818 if (in && in->is_head()) {
4819 if (in->issued_caps_need_gather(lock)) {
4820 if (need_issue)
4821 *need_issue = true;
4822 else
4823 issue_caps(in);
4824 gather++;
4825 }
4826 }
4827
4828 bool need_recover = false;
4829 if (lock->get_type() == CEPH_LOCK_IFILE) {
4830 ceph_assert(in);
4831 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4832 mds->mdcache->queue_file_recover(in);
4833 need_recover = true;
4834 gather++;
4835 }
4836 }
4837
4838 if (lock->get_parent()->is_replicated() &&
4839 lock->get_state() == LOCK_MIX_LOCK &&
4840 gather) {
4841 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4842 } else {
4843 // move to second stage of gather now, so we don't send the lock action later.
4844 if (lock->get_state() == LOCK_MIX_LOCK)
4845 lock->set_state(LOCK_MIX_LOCK2);
4846
4847 if (lock->get_parent()->is_replicated() &&
4848 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4849 gather++;
4850 send_lock_message(lock, LOCK_AC_LOCK);
4851 lock->init_gather();
4852 }
4853 }
4854
4855 if (!gather && lock->is_dirty()) {
4856 lock->get_parent()->auth_pin(lock);
4857 scatter_writebehind(static_cast<ScatterLock*>(lock));
4858 return;
4859 }
4860
4861 if (gather) {
4862 lock->get_parent()->auth_pin(lock);
4863 if (need_recover)
4864 mds->mdcache->do_file_recover();
4865 } else {
4866 lock->set_state(LOCK_LOCK);
4867 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4868 }
4869 }
4870
4871
4872 void Locker::simple_xlock(SimpleLock *lock)
4873 {
4874 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4875 ceph_assert(lock->get_parent()->is_auth());
4876 //assert(lock->is_stable());
4877 ceph_assert(lock->get_state() != LOCK_XLOCK);
4878
4879 CInode *in = 0;
4880 if (lock->get_cap_shift())
4881 in = static_cast<CInode *>(lock->get_parent());
4882
4883 if (lock->is_stable())
4884 lock->get_parent()->auth_pin(lock);
4885
4886 switch (lock->get_state()) {
4887 case LOCK_LOCK:
4888 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4889 default: ceph_abort();
4890 }
4891
4892 int gather = 0;
4893 if (lock->is_rdlocked())
4894 gather++;
4895 if (lock->is_wrlocked())
4896 gather++;
4897 if (gather && lock->is_cached())
4898 invalidate_lock_caches(lock);
4899
4900 if (in && in->is_head()) {
4901 if (in->issued_caps_need_gather(lock)) {
4902 issue_caps(in);
4903 gather++;
4904 }
4905 }
4906
4907 if (!gather) {
4908 lock->set_state(LOCK_PREXLOCK);
4909 //assert("shouldn't be called if we are already xlockable" == 0);
4910 }
4911 }
4912
4913
4914
4915
4916
4917 // ==========================================================================
4918 // scatter lock
4919
4920 /*
4921
4922 Some notes on scatterlocks.
4923
4924 - The scatter/gather is driven by the inode lock. The scatter always
4925 brings in the latest metadata from the fragments.
4926
4927 - When in a scattered/MIX state, fragments are only allowed to
4928 update/be written to if the accounted stat matches the inode's
4929 current version.
4930
4931 - That means, on gather, we _only_ assimilate diffs for frag metadata
4932 that match the current version, because those are the only ones
4933 written during this scatter/gather cycle. (Others didn't permit
4934 it.) We increment the version and journal this to disk.
4935
4936 - When possible, we also simultaneously update our local frag
4937 accounted stats to match.
4938
4939 - On scatter, the new inode info is broadcast to frags, both local
4940 and remote. If possible (auth and !frozen), the dirfrag auth
4941 should update the accounted state (if it isn't already up to date).
4942 Note that this may occur on both the local inode auth node and
4943 inode replicas, so there are two potential paths. If it is NOT
4944 possible, they need to mark_stale to prevent any possible writes.
4945
4946 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4947 only). Both are opportunities to update the frag accounted stats,
4948 even though only the MIX case is affected by a stale dirfrag.
4949
4950 - Because many scatter/gather cycles can potentially go by without a
4951 frag being able to update its accounted stats (due to being frozen
4952 by exports/refragments in progress), the frag may have (even very)
4953 old stat versions. That's fine. If when we do want to update it,
4954 we can update accounted_* and the version first.
4955
4956 */
4957
4958 class C_Locker_ScatterWB : public LockerLogContext {
4959 ScatterLock *lock;
4960 MutationRef mut;
4961 public:
4962 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4963 LockerLogContext(l), lock(sl), mut(m) {}
4964 void finish(int r) override {
4965 locker->scatter_writebehind_finish(lock, mut);
4966 }
4967 };
4968
4969 void Locker::scatter_writebehind(ScatterLock *lock)
4970 {
4971 CInode *in = static_cast<CInode*>(lock->get_parent());
4972 dout(10) << "scatter_writebehind " << in->get_inode()->mtime << " on " << *lock << " on " << *in << dendl;
4973
4974 // journal
4975 MutationRef mut(new MutationImpl());
4976 mut->ls = mds->mdlog->get_current_segment();
4977
4978 // forcefully take a wrlock
4979 lock->get_wrlock(true);
4980 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
4981
4982 in->pre_cow_old_inode(); // avoid cow mayhem
4983
4984 auto pi = in->project_inode(mut);
4985 pi.inode->version = in->pre_dirty();
4986
4987 in->finish_scatter_gather_update(lock->get_type(), mut);
4988 lock->start_flush();
4989
4990 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4991 mds->mdlog->start_entry(le);
4992
4993 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4994 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4995
4996 in->finish_scatter_gather_update_accounted(lock->get_type(), &le->metablob);
4997
4998 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4999 mds->mdlog->flush();
5000 }
5001
5002 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
5003 {
5004 CInode *in = static_cast<CInode*>(lock->get_parent());
5005 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
5006
5007 mut->apply();
5008
5009 lock->finish_flush();
5010
5011 // if replicas may have flushed in a mix->lock state, send another
5012 // message so they can finish_flush().
5013 if (in->is_replicated()) {
5014 switch (lock->get_state()) {
5015 case LOCK_MIX_LOCK:
5016 case LOCK_MIX_LOCK2:
5017 case LOCK_MIX_EXCL:
5018 case LOCK_MIX_TSYN:
5019 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
5020 }
5021 }
5022
5023 drop_locks(mut.get());
5024 mut->cleanup();
5025
5026 if (lock->is_stable())
5027 lock->finish_waiters(ScatterLock::WAIT_STABLE);
5028
5029 //scatter_eval_gather(lock);
5030 }
5031
5032 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
5033 {
5034 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
5035
5036 ceph_assert(lock->get_parent()->is_auth());
5037 ceph_assert(lock->is_stable());
5038
5039 if (lock->get_parent()->is_freezing_or_frozen()) {
5040 dout(20) << " freezing|frozen" << dendl;
5041 return;
5042 }
5043
5044 if (mdcache->is_readonly()) {
5045 if (lock->get_state() != LOCK_SYNC) {
5046 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
5047 simple_sync(lock, need_issue);
5048 }
5049 return;
5050 }
5051
5052 if (!lock->is_rdlocked() &&
5053 lock->get_state() != LOCK_MIX &&
5054 lock->get_scatter_wanted()) {
5055 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
5056 << " on " << *lock->get_parent() << dendl;
5057 scatter_mix(lock, need_issue);
5058 return;
5059 }
5060
5061 if (lock->get_type() == CEPH_LOCK_INEST) {
5062 // in general, we want to keep INEST writable at all times.
5063 if (!lock->is_rdlocked()) {
5064 if (lock->get_parent()->is_replicated()) {
5065 if (lock->get_state() != LOCK_MIX)
5066 scatter_mix(lock, need_issue);
5067 } else {
5068 if (lock->get_state() != LOCK_LOCK)
5069 simple_lock(lock, need_issue);
5070 }
5071 }
5072 return;
5073 }
5074
5075 CInode *in = static_cast<CInode*>(lock->get_parent());
5076 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
5077 // i _should_ be sync.
5078 if (!lock->is_wrlocked() &&
5079 lock->get_state() != LOCK_SYNC) {
5080 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
5081 simple_sync(lock, need_issue);
5082 }
5083 }
5084 }
5085
5086
5087 /*
5088 * mark a scatterlock to indicate that the dir fnode has some dirty data
5089 */
5090 void Locker::mark_updated_scatterlock(ScatterLock *lock)
5091 {
5092 lock->mark_dirty();
5093 if (lock->get_updated_item()->is_on_list()) {
5094 dout(10) << "mark_updated_scatterlock " << *lock
5095 << " - already on list since " << lock->get_update_stamp() << dendl;
5096 } else {
5097 updated_scatterlocks.push_back(lock->get_updated_item());
5098 utime_t now = ceph_clock_now();
5099 lock->set_update_stamp(now);
5100 dout(10) << "mark_updated_scatterlock " << *lock
5101 << " - added at " << now << dendl;
5102 }
5103 }
5104
5105 /*
5106 * this is called by scatter_tick and LogSegment::try_to_trim() when
5107 * trying to flush dirty scattered data (i.e. updated fnode) back to
5108 * the inode.
5109 *
5110 * we need to lock|scatter in order to push fnode changes into the
5111 * inode.dirstat.
5112 */
5113 void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
5114 {
5115 CInode *p = static_cast<CInode *>(lock->get_parent());
5116
5117 if (p->is_frozen() || p->is_freezing()) {
5118 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
5119 if (c)
5120 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
5121 else if (lock->is_dirty())
5122 // just requeue. not ideal.. starvation prone..
5123 updated_scatterlocks.push_back(lock->get_updated_item());
5124 return;
5125 }
5126
5127 if (p->is_ambiguous_auth()) {
5128 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
5129 if (c)
5130 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
5131 else if (lock->is_dirty())
5132 // just requeue. not ideal.. starvation prone..
5133 updated_scatterlocks.push_back(lock->get_updated_item());
5134 return;
5135 }
5136
5137 if (p->is_auth()) {
5138 int count = 0;
5139 while (true) {
5140 if (lock->is_stable()) {
5141 // can we do it now?
5142 // (only if we're not replicated.. if we are, we really do need
5143 // to nudge the lock state!)
5144 /*
5145 actually, even if we're not replicated, we can't stay in MIX, because another mds
5146 could discover and replicate us at any time. if that happens while we're flushing,
5147 they end up in MIX but their inode has the old scatterstat version.
5148
5149 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
5150 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
5151 scatter_writebehind(lock);
5152 if (c)
5153 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5154 return;
5155 }
5156 */
5157
5158 if (mdcache->is_readonly()) {
5159 if (lock->get_state() != LOCK_SYNC) {
5160 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
5161 simple_sync(static_cast<ScatterLock*>(lock));
5162 }
5163 break;
5164 }
5165
5166 // adjust lock state
5167 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
5168 switch (lock->get_type()) {
5169 case CEPH_LOCK_IFILE:
5170 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5171 scatter_mix(static_cast<ScatterLock*>(lock));
5172 else if (lock->get_state() != LOCK_LOCK)
5173 simple_lock(static_cast<ScatterLock*>(lock));
5174 else
5175 simple_sync(static_cast<ScatterLock*>(lock));
5176 break;
5177
5178 case CEPH_LOCK_IDFT:
5179 case CEPH_LOCK_INEST:
5180 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5181 scatter_mix(lock);
5182 else if (lock->get_state() != LOCK_LOCK)
5183 simple_lock(lock);
5184 else
5185 simple_sync(lock);
5186 break;
5187 default:
5188 ceph_abort();
5189 }
5190 ++count;
5191 if (lock->is_stable() && count == 2) {
5192 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
5193 // this should only realy happen when called via
5194 // handle_file_lock due to AC_NUDGE, because the rest of the
5195 // time we are replicated or have dirty data and won't get
5196 // called. bailing here avoids an infinite loop.
5197 ceph_assert(!c);
5198 break;
5199 }
5200 } else {
5201 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
5202 if (c)
5203 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5204 return;
5205 }
5206 }
5207 } else {
5208 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
5209 << *lock << " on " << *p << dendl;
5210 // request unscatter?
5211 mds_rank_t auth = lock->get_parent()->authority().first;
5212 if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
5213 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
5214 }
5215
5216 // wait...
5217 if (c)
5218 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5219
5220 // also, requeue, in case we had wrong auth or something
5221 if (lock->is_dirty())
5222 updated_scatterlocks.push_back(lock->get_updated_item());
5223 }
5224 }
5225
5226 void Locker::scatter_tick()
5227 {
5228 dout(10) << "scatter_tick" << dendl;
5229
5230 // updated
5231 utime_t now = ceph_clock_now();
5232 int n = updated_scatterlocks.size();
5233 while (!updated_scatterlocks.empty()) {
5234 ScatterLock *lock = updated_scatterlocks.front();
5235
5236 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
5237
5238 if (!lock->is_dirty()) {
5239 updated_scatterlocks.pop_front();
5240 dout(10) << " removing from updated_scatterlocks "
5241 << *lock << " " << *lock->get_parent() << dendl;
5242 continue;
5243 }
5244 if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
5245 break;
5246 updated_scatterlocks.pop_front();
5247 scatter_nudge(lock, 0);
5248 }
5249 mds->mdlog->flush();
5250 }
5251
5252
5253 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
5254 {
5255 dout(10) << "scatter_tempsync " << *lock
5256 << " on " << *lock->get_parent() << dendl;
5257 ceph_assert(lock->get_parent()->is_auth());
5258 ceph_assert(lock->is_stable());
5259
5260 ceph_abort_msg("not fully implemented, at least not for filelock");
5261
5262 CInode *in = static_cast<CInode *>(lock->get_parent());
5263
5264 switch (lock->get_state()) {
5265 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
5266 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
5267 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
5268 default: ceph_abort();
5269 }
5270
5271 int gather = 0;
5272 if (lock->is_wrlocked()) {
5273 if (lock->is_cached())
5274 invalidate_lock_caches(lock);
5275 gather++;
5276 }
5277
5278 if (lock->get_cap_shift() &&
5279 in->is_head() &&
5280 in->issued_caps_need_gather(lock)) {
5281 if (need_issue)
5282 *need_issue = true;
5283 else
5284 issue_caps(in);
5285 gather++;
5286 }
5287
5288 if (lock->get_state() == LOCK_MIX_TSYN &&
5289 in->is_replicated()) {
5290 lock->init_gather();
5291 send_lock_message(lock, LOCK_AC_LOCK);
5292 gather++;
5293 }
5294
5295 if (gather) {
5296 in->auth_pin(lock);
5297 } else {
5298 // do tempsync
5299 lock->set_state(LOCK_TSYN);
5300 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
5301 if (lock->get_cap_shift()) {
5302 if (need_issue)
5303 *need_issue = true;
5304 else
5305 issue_caps(in);
5306 }
5307 }
5308 }
5309
5310
5311
5312 // ==========================================================================
5313 // local lock
5314
5315 void Locker::local_wrlock_grab(LocalLockC *lock, MutationRef& mut)
5316 {
5317 dout(7) << "local_wrlock_grab on " << *lock
5318 << " on " << *lock->get_parent() << dendl;
5319
5320 ceph_assert(lock->get_parent()->is_auth());
5321 ceph_assert(lock->can_wrlock());
5322 lock->get_wrlock(mut->get_client());
5323
5324 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5325 ceph_assert(it->is_wrlock());
5326 }
5327
5328 bool Locker::local_wrlock_start(LocalLockC *lock, MDRequestRef& mut)
5329 {
5330 dout(7) << "local_wrlock_start on " << *lock
5331 << " on " << *lock->get_parent() << dendl;
5332
5333 ceph_assert(lock->get_parent()->is_auth());
5334 if (lock->can_wrlock()) {
5335 lock->get_wrlock(mut->get_client());
5336 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5337 ceph_assert(it->is_wrlock());
5338 return true;
5339 } else {
5340 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5341 return false;
5342 }
5343 }
5344
5345 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
5346 {
5347 ceph_assert(it->is_wrlock());
5348 LocalLockC *lock = static_cast<LocalLockC*>(it->lock);
5349 dout(7) << "local_wrlock_finish on " << *lock
5350 << " on " << *lock->get_parent() << dendl;
5351 lock->put_wrlock();
5352 mut->locks.erase(it);
5353 if (lock->get_num_wrlocks() == 0) {
5354 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5355 SimpleLock::WAIT_WR |
5356 SimpleLock::WAIT_RD);
5357 }
5358 }
5359
5360 bool Locker::local_xlock_start(LocalLockC *lock, MDRequestRef& mut)
5361 {
5362 dout(7) << "local_xlock_start on " << *lock
5363 << " on " << *lock->get_parent() << dendl;
5364
5365 ceph_assert(lock->get_parent()->is_auth());
5366 if (!lock->can_xlock_local()) {
5367 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5368 return false;
5369 }
5370
5371 lock->get_xlock(mut, mut->get_client());
5372 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
5373 return true;
5374 }
5375
5376 void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
5377 {
5378 ceph_assert(it->is_xlock());
5379 LocalLockC *lock = static_cast<LocalLockC*>(it->lock);
5380 dout(7) << "local_xlock_finish on " << *lock
5381 << " on " << *lock->get_parent() << dendl;
5382 lock->put_xlock();
5383 mut->locks.erase(it);
5384
5385 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5386 SimpleLock::WAIT_WR |
5387 SimpleLock::WAIT_RD);
5388 }
5389
5390
5391
5392 // ==========================================================================
5393 // file lock
5394
5395
5396 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
5397 {
5398 CInode *in = static_cast<CInode*>(lock->get_parent());
5399 int loner_wanted, other_wanted;
5400 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
5401 dout(7) << "file_eval wanted=" << gcap_string(wanted)
5402 << " loner_wanted=" << gcap_string(loner_wanted)
5403 << " other_wanted=" << gcap_string(other_wanted)
5404 << " filelock=" << *lock << " on " << *lock->get_parent()
5405 << dendl;
5406
5407 ceph_assert(lock->get_parent()->is_auth());
5408 ceph_assert(lock->is_stable());
5409
5410 if (lock->get_parent()->is_freezing_or_frozen())
5411 return;
5412
5413 if (mdcache->is_readonly()) {
5414 if (lock->get_state() != LOCK_SYNC) {
5415 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
5416 simple_sync(lock, need_issue);
5417 }
5418 return;
5419 }
5420
5421 // excl -> *?
5422 if (lock->get_state() == LOCK_EXCL) {
5423 dout(20) << " is excl" << dendl;
5424 int loner_issued, other_issued, xlocker_issued;
5425 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
5426 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
5427 << " other_issued=" << gcap_string(other_issued)
5428 << " xlocker_issued=" << gcap_string(xlocker_issued)
5429 << dendl;
5430 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
5431 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
5432 (in->is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
5433 dout(20) << " should lose it" << dendl;
5434 // we should lose it.
5435 // loner other want
5436 // R R SYNC
5437 // R R|W MIX
5438 // R W MIX
5439 // R|W R MIX
5440 // R|W R|W MIX
5441 // R|W W MIX
5442 // W R MIX
5443 // W R|W MIX
5444 // W W MIX
5445 // -> any writer means MIX; RD doesn't matter.
5446 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
5447 lock->is_waiter_for(SimpleLock::WAIT_WR))
5448 scatter_mix(lock, need_issue);
5449 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
5450 simple_sync(lock, need_issue);
5451 else
5452 dout(10) << " waiting for wrlock to drain" << dendl;
5453 }
5454 }
5455
5456 // * -> excl?
5457 else if (lock->get_state() != LOCK_EXCL &&
5458 !lock->is_rdlocked() &&
5459 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5460 in->get_target_loner() >= 0 &&
5461 (in->is_dir() ?
5462 !in->has_subtree_or_exporting_dirfrag() :
5463 (wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))) {
5464 dout(7) << "file_eval stable, bump to loner " << *lock
5465 << " on " << *lock->get_parent() << dendl;
5466 file_excl(lock, need_issue);
5467 }
5468
5469 // * -> mixed?
5470 else if (lock->get_state() != LOCK_MIX &&
5471 !lock->is_rdlocked() &&
5472 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5473 (lock->get_scatter_wanted() ||
5474 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
5475 dout(7) << "file_eval stable, bump to mixed " << *lock
5476 << " on " << *lock->get_parent() << dendl;
5477 scatter_mix(lock, need_issue);
5478 }
5479
5480 // * -> sync?
5481 else if (lock->get_state() != LOCK_SYNC &&
5482 !lock->is_wrlocked() && // drain wrlocks first!
5483 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5484 !(wanted & CEPH_CAP_GWR) &&
5485 !((lock->get_state() == LOCK_MIX) &&
5486 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5487 //((wanted & CEPH_CAP_RD) ||
5488 //in->is_replicated() ||
5489 //lock->is_leased() ||
5490 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5491 ) {
5492 dout(7) << "file_eval stable, bump to sync " << *lock
5493 << " on " << *lock->get_parent() << dendl;
5494 simple_sync(lock, need_issue);
5495 }
5496 else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5497 mds->mdcache->queue_file_recover(in);
5498 mds->mdcache->do_file_recover();
5499 }
5500 }
5501
5502
5503
5504 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
5505 {
5506 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
5507
5508 CInode *in = static_cast<CInode*>(lock->get_parent());
5509 ceph_assert(in->is_auth());
5510 ceph_assert(lock->is_stable());
5511
5512 if (lock->get_state() == LOCK_LOCK) {
5513 in->start_scatter(lock);
5514 if (in->is_replicated()) {
5515 // data
5516 bufferlist softdata;
5517 lock->encode_locked_state(softdata);
5518
5519 // bcast to replicas
5520 send_lock_message(lock, LOCK_AC_MIX, softdata);
5521 }
5522
5523 // change lock
5524 lock->set_state(LOCK_MIX);
5525 lock->clear_scatter_wanted();
5526 if (lock->get_cap_shift()) {
5527 if (need_issue)
5528 *need_issue = true;
5529 else
5530 issue_caps(in);
5531 }
5532 } else {
5533 // gather?
5534 switch (lock->get_state()) {
5535 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
5536 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
5537 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
5538 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
5539 default: ceph_abort();
5540 }
5541
5542 int gather = 0;
5543 if (lock->is_rdlocked()) {
5544 if (lock->is_cached())
5545 invalidate_lock_caches(lock);
5546 gather++;
5547 }
5548 if (in->is_replicated()) {
5549 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
5550 send_lock_message(lock, LOCK_AC_MIX);
5551 lock->init_gather();
5552 gather++;
5553 }
5554 }
5555 if (lock->is_leased()) {
5556 revoke_client_leases(lock);
5557 gather++;
5558 }
5559 if (lock->get_cap_shift() &&
5560 in->is_head() &&
5561 in->issued_caps_need_gather(lock)) {
5562 if (need_issue)
5563 *need_issue = true;
5564 else
5565 issue_caps(in);
5566 gather++;
5567 }
5568 bool need_recover = false;
5569 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5570 mds->mdcache->queue_file_recover(in);
5571 need_recover = true;
5572 gather++;
5573 }
5574
5575 if (gather) {
5576 lock->get_parent()->auth_pin(lock);
5577 if (need_recover)
5578 mds->mdcache->do_file_recover();
5579 } else {
5580 in->start_scatter(lock);
5581 lock->set_state(LOCK_MIX);
5582 lock->clear_scatter_wanted();
5583 if (in->is_replicated()) {
5584 bufferlist softdata;
5585 lock->encode_locked_state(softdata);
5586 send_lock_message(lock, LOCK_AC_MIX, softdata);
5587 }
5588 if (lock->get_cap_shift()) {
5589 if (need_issue)
5590 *need_issue = true;
5591 else
5592 issue_caps(in);
5593 }
5594 }
5595 }
5596 }
5597
5598
5599 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5600 {
5601 CInode *in = static_cast<CInode*>(lock->get_parent());
5602 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5603
5604 ceph_assert(in->is_auth());
5605 ceph_assert(lock->is_stable());
5606
5607 ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
5608 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5609
5610 switch (lock->get_state()) {
5611 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5612 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5613 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5614 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5615 default: ceph_abort();
5616 }
5617 int gather = 0;
5618
5619 if (lock->is_rdlocked())
5620 gather++;
5621 if (lock->is_wrlocked())
5622 gather++;
5623 if (gather && lock->is_cached())
5624 invalidate_lock_caches(lock);
5625
5626 if (in->is_replicated() &&
5627 lock->get_state() != LOCK_LOCK_EXCL &&
5628 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5629 send_lock_message(lock, LOCK_AC_LOCK);
5630 lock->init_gather();
5631 gather++;
5632 }
5633 if (lock->is_leased()) {
5634 revoke_client_leases(lock);
5635 gather++;
5636 }
5637 if (in->is_head() &&
5638 in->issued_caps_need_gather(lock)) {
5639 if (need_issue)
5640 *need_issue = true;
5641 else
5642 issue_caps(in);
5643 gather++;
5644 }
5645 bool need_recover = false;
5646 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5647 mds->mdcache->queue_file_recover(in);
5648 need_recover = true;
5649 gather++;
5650 }
5651
5652 if (gather) {
5653 lock->get_parent()->auth_pin(lock);
5654 if (need_recover)
5655 mds->mdcache->do_file_recover();
5656 } else {
5657 lock->set_state(LOCK_EXCL);
5658 if (need_issue)
5659 *need_issue = true;
5660 else
5661 issue_caps(in);
5662 }
5663 }
5664
5665 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5666 {
5667 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5668 CInode *in = static_cast<CInode *>(lock->get_parent());
5669 ceph_assert(in->is_auth());
5670 ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
5671
5672 switch (lock->get_state()) {
5673 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5674 default: ceph_abort();
5675 }
5676
5677 int gather = 0;
5678 if (lock->is_wrlocked()) {
5679 if (lock->is_cached())
5680 invalidate_lock_caches(lock);
5681 gather++;
5682 }
5683
5684 if (in->is_head() &&
5685 in->issued_caps_need_gather(lock)) {
5686 if (need_issue)
5687 *need_issue = true;
5688 else
5689 issue_caps(in);
5690 gather++;
5691 }
5692
5693 if (gather) {
5694 lock->get_parent()->auth_pin(lock);
5695 } else {
5696 lock->set_state(LOCK_XSYN);
5697 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5698 if (need_issue)
5699 *need_issue = true;
5700 else
5701 issue_caps(in);
5702 }
5703 }
5704
5705 void Locker::file_recover(ScatterLock *lock)
5706 {
5707 CInode *in = static_cast<CInode *>(lock->get_parent());
5708 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5709
5710 ceph_assert(in->is_auth());
5711 //assert(lock->is_stable());
5712 ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5713
5714 int gather = 0;
5715
5716 /*
5717 if (in->is_replicated()
5718 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5719 send_lock_message(lock, LOCK_AC_LOCK);
5720 lock->init_gather();
5721 gather++;
5722 }
5723 */
5724 if (in->is_head() &&
5725 in->issued_caps_need_gather(lock)) {
5726 issue_caps(in);
5727 gather++;
5728 }
5729
5730 lock->set_state(LOCK_SCAN);
5731 if (gather)
5732 in->state_set(CInode::STATE_NEEDSRECOVER);
5733 else
5734 mds->mdcache->queue_file_recover(in);
5735 }
5736
5737
5738 // messenger
5739 void Locker::handle_file_lock(ScatterLock *lock, const cref_t<MLock> &m)
5740 {
5741 CInode *in = static_cast<CInode*>(lock->get_parent());
5742 int from = m->get_asker();
5743
5744 if (mds->is_rejoin()) {
5745 if (in->is_rejoining()) {
5746 dout(7) << "handle_file_lock still rejoining " << *in
5747 << ", dropping " << *m << dendl;
5748 return;
5749 }
5750 }
5751
5752 dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
5753 << " on " << *lock
5754 << " from mds." << from << " "
5755 << *in << dendl;
5756
5757 bool caps = lock->get_cap_shift();
5758
5759 switch (m->get_action()) {
5760 // -- replica --
5761 case LOCK_AC_SYNC:
5762 ceph_assert(lock->get_state() == LOCK_LOCK ||
5763 lock->get_state() == LOCK_MIX ||
5764 lock->get_state() == LOCK_MIX_SYNC2);
5765
5766 if (lock->get_state() == LOCK_MIX) {
5767 lock->set_state(LOCK_MIX_SYNC);
5768 eval_gather(lock, true);
5769 if (lock->is_unstable_and_locked()) {
5770 if (lock->is_cached())
5771 invalidate_lock_caches(lock);
5772 mds->mdlog->flush();
5773 }
5774 break;
5775 }
5776
5777 (static_cast<ScatterLock *>(lock))->finish_flush();
5778 (static_cast<ScatterLock *>(lock))->clear_flushed();
5779
5780 // ok
5781 lock->decode_locked_state(m->get_data());
5782 lock->set_state(LOCK_SYNC);
5783
5784 lock->get_rdlock();
5785 if (caps)
5786 issue_caps(in);
5787 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5788 lock->put_rdlock();
5789 break;
5790
5791 case LOCK_AC_LOCK:
5792 switch (lock->get_state()) {
5793 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5794 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5795 default: ceph_abort();
5796 }
5797
5798 eval_gather(lock, true);
5799 if (lock->is_unstable_and_locked()) {
5800 if (lock->is_cached())
5801 invalidate_lock_caches(lock);
5802 mds->mdlog->flush();
5803 }
5804
5805 break;
5806
5807 case LOCK_AC_LOCKFLUSHED:
5808 (static_cast<ScatterLock *>(lock))->finish_flush();
5809 (static_cast<ScatterLock *>(lock))->clear_flushed();
5810 // wake up scatter_nudge waiters
5811 if (lock->is_stable())
5812 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5813 break;
5814
5815 case LOCK_AC_MIX:
5816 ceph_assert(lock->get_state() == LOCK_SYNC ||
5817 lock->get_state() == LOCK_LOCK ||
5818 lock->get_state() == LOCK_SYNC_MIX2);
5819
5820 if (lock->get_state() == LOCK_SYNC) {
5821 // MIXED
5822 lock->set_state(LOCK_SYNC_MIX);
5823 eval_gather(lock, true);
5824 if (lock->is_unstable_and_locked()) {
5825 if (lock->is_cached())
5826 invalidate_lock_caches(lock);
5827 mds->mdlog->flush();
5828 }
5829 break;
5830 }
5831
5832 // ok
5833 lock->set_state(LOCK_MIX);
5834 lock->decode_locked_state(m->get_data());
5835
5836 if (caps)
5837 issue_caps(in);
5838
5839 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5840 break;
5841
5842
5843 // -- auth --
5844 case LOCK_AC_LOCKACK:
5845 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
5846 lock->get_state() == LOCK_MIX_LOCK ||
5847 lock->get_state() == LOCK_MIX_LOCK2 ||
5848 lock->get_state() == LOCK_MIX_EXCL ||
5849 lock->get_state() == LOCK_SYNC_EXCL ||
5850 lock->get_state() == LOCK_SYNC_MIX ||
5851 lock->get_state() == LOCK_MIX_TSYN);
5852 ceph_assert(lock->is_gathering(from));
5853 lock->remove_gather(from);
5854
5855 if (lock->get_state() == LOCK_MIX_LOCK ||
5856 lock->get_state() == LOCK_MIX_LOCK2 ||
5857 lock->get_state() == LOCK_MIX_EXCL ||
5858 lock->get_state() == LOCK_MIX_TSYN) {
5859 lock->decode_locked_state(m->get_data());
5860 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5861 // delay calling scatter_writebehind().
5862 lock->clear_flushed();
5863 }
5864
5865 if (lock->is_gathering()) {
5866 dout(7) << "handle_file_lock " << *in << " from " << from
5867 << ", still gathering " << lock->get_gather_set() << dendl;
5868 } else {
5869 dout(7) << "handle_file_lock " << *in << " from " << from
5870 << ", last one" << dendl;
5871 eval_gather(lock);
5872 }
5873 break;
5874
5875 case LOCK_AC_SYNCACK:
5876 ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
5877 ceph_assert(lock->is_gathering(from));
5878 lock->remove_gather(from);
5879
5880 lock->decode_locked_state(m->get_data());
5881
5882 if (lock->is_gathering()) {
5883 dout(7) << "handle_file_lock " << *in << " from " << from
5884 << ", still gathering " << lock->get_gather_set() << dendl;
5885 } else {
5886 dout(7) << "handle_file_lock " << *in << " from " << from
5887 << ", last one" << dendl;
5888 eval_gather(lock);
5889 }
5890 break;
5891
5892 case LOCK_AC_MIXACK:
5893 ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
5894 ceph_assert(lock->is_gathering(from));
5895 lock->remove_gather(from);
5896
5897 if (lock->is_gathering()) {
5898 dout(7) << "handle_file_lock " << *in << " from " << from
5899 << ", still gathering " << lock->get_gather_set() << dendl;
5900 } else {
5901 dout(7) << "handle_file_lock " << *in << " from " << from
5902 << ", last one" << dendl;
5903 eval_gather(lock);
5904 }
5905 break;
5906
5907
5908 // requests....
5909 case LOCK_AC_REQSCATTER:
5910 if (lock->is_stable()) {
5911 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5912 * because the replica should be holding an auth_pin if they're
5913 * doing this (and thus, we are freezing, not frozen, and indefinite
5914 * starvation isn't an issue).
5915 */
5916 dout(7) << "handle_file_lock got scatter request on " << *lock
5917 << " on " << *lock->get_parent() << dendl;
5918 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5919 scatter_mix(lock);
5920 } else {
5921 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5922 << " on " << *lock->get_parent() << dendl;
5923 lock->set_scatter_wanted();
5924 }
5925 break;
5926
5927 case LOCK_AC_REQUNSCATTER:
5928 if (lock->is_stable()) {
5929 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5930 * because the replica should be holding an auth_pin if they're
5931 * doing this (and thus, we are freezing, not frozen, and indefinite
5932 * starvation isn't an issue).
5933 */
5934 dout(7) << "handle_file_lock got unscatter request on " << *lock
5935 << " on " << *lock->get_parent() << dendl;
5936 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5937 simple_lock(lock); // FIXME tempsync?
5938 } else {
5939 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5940 << " on " << *lock->get_parent() << dendl;
5941 lock->set_unscatter_wanted();
5942 }
5943 break;
5944
5945 case LOCK_AC_REQRDLOCK:
5946 handle_reqrdlock(lock, m);
5947 break;
5948
5949 case LOCK_AC_NUDGE:
5950 if (!lock->get_parent()->is_auth()) {
5951 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5952 << " on " << *lock->get_parent() << dendl;
5953 } else if (!lock->get_parent()->is_replicated()) {
5954 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5955 << " on " << *lock->get_parent() << dendl;
5956 } else {
5957 dout(7) << "handle_file_lock trying nudge on " << *lock
5958 << " on " << *lock->get_parent() << dendl;
5959 scatter_nudge(lock, 0, true);
5960 mds->mdlog->flush();
5961 }
5962 break;
5963
5964 default:
5965 ceph_abort();
5966 }
5967 }