]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
bacc79a10381d63bdfc29cb1c0a357a18e4b05a1
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "CDir.h"
17 #include "CDentry.h"
18 #include "CInode.h"
19 #include "common/config.h"
20 #include "events/EOpen.h"
21 #include "events/EUpdate.h"
22 #include "Locker.h"
23 #include "MDBalancer.h"
24 #include "MDCache.h"
25 #include "MDLog.h"
26 #include "MDSRank.h"
27 #include "MDSMap.h"
28 #include "messages/MInodeFileCaps.h"
29 #include "messages/MMDSPeerRequest.h"
30 #include "Migrator.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #define dout_subsys ceph_subsys_mds
35 #undef dout_prefix
36 #define dout_context g_ceph_context
37 #define dout_prefix _prefix(_dout, mds)
38
39 using namespace std;
40
41 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
42 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
43 }
44
45
46 class LockerContext : public MDSContext {
47 protected:
48 Locker *locker;
49 MDSRank *get_mds() override
50 {
51 return locker->mds;
52 }
53
54 public:
55 explicit LockerContext(Locker *locker_) : locker(locker_) {
56 ceph_assert(locker != NULL);
57 }
58 };
59
60 class LockerLogContext : public MDSLogContextBase {
61 protected:
62 Locker *locker;
63 MDSRank *get_mds() override
64 {
65 return locker->mds;
66 }
67
68 public:
69 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
70 ceph_assert(locker != NULL);
71 }
72 };
73
74 Locker::Locker(MDSRank *m, MDCache *c) :
75 need_snapflush_inodes(member_offset(CInode, item_caps)), mds(m), mdcache(c) {}
76
77
78 void Locker::dispatch(const cref_t<Message> &m)
79 {
80
81 switch (m->get_type()) {
82 // inter-mds locking
83 case MSG_MDS_LOCK:
84 handle_lock(ref_cast<MLock>(m));
85 break;
86 // inter-mds caps
87 case MSG_MDS_INODEFILECAPS:
88 handle_inode_file_caps(ref_cast<MInodeFileCaps>(m));
89 break;
90 // client sync
91 case CEPH_MSG_CLIENT_CAPS:
92 handle_client_caps(ref_cast<MClientCaps>(m));
93 break;
94 case CEPH_MSG_CLIENT_CAPRELEASE:
95 handle_client_cap_release(ref_cast<MClientCapRelease>(m));
96 break;
97 case CEPH_MSG_CLIENT_LEASE:
98 handle_client_lease(ref_cast<MClientLease>(m));
99 break;
100 default:
101 derr << "locker unknown message " << m->get_type() << dendl;
102 ceph_abort_msg("locker unknown message");
103 }
104 }
105
106 void Locker::tick()
107 {
108 scatter_tick();
109 caps_tick();
110 }
111
112 /*
113 * locks vs rejoin
114 *
115 *
116 *
117 */
118
119 void Locker::send_lock_message(SimpleLock *lock, int msg)
120 {
121 for (const auto &it : lock->get_parent()->get_replicas()) {
122 if (mds->is_cluster_degraded() &&
123 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
124 continue;
125 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
126 mds->send_message_mds(m, it.first);
127 }
128 }
129
130 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
131 {
132 for (const auto &it : lock->get_parent()->get_replicas()) {
133 if (mds->is_cluster_degraded() &&
134 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
135 continue;
136 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
137 m->set_data(data);
138 mds->send_message_mds(m, it.first);
139 }
140 }
141
142 bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
143 int n, bool want_layout)
144 {
145 dout(10) << __func__ << " " << *mdr << " " << *in << dendl;
146 // rdlock ancestor snaps
147 inodeno_t root;
148 int depth = -1;
149 bool found_locked = false;
150 bool found_layout = false;
151
152 if (want_layout)
153 ceph_assert(n == 0);
154
155 client_t client = mdr->get_client();
156
157 CInode *t = in;
158 while (true) {
159 ++depth;
160 if (!found_locked && mdr->is_rdlocked(&t->snaplock))
161 found_locked = true;
162
163 if (!found_locked) {
164 if (!t->snaplock.can_rdlock(client)) {
165 t->snaplock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
166 goto failed;
167 }
168 t->snaplock.get_rdlock();
169 mdr->locks.emplace(&t->snaplock, MutationImpl::LockOp::RDLOCK);
170 dout(20) << " got rdlock on " << t->snaplock << " " << *t << dendl;
171 }
172 if (want_layout && !found_layout) {
173 if (!mdr->is_rdlocked(&t->policylock)) {
174 if (!t->policylock.can_rdlock(client)) {
175 t->policylock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
176 goto failed;
177 }
178 t->policylock.get_rdlock();
179 mdr->locks.emplace(&t->policylock, MutationImpl::LockOp::RDLOCK);
180 dout(20) << " got rdlock on " << t->policylock << " " << *t << dendl;
181 }
182 if (t->get_projected_inode()->has_layout()) {
183 mdr->dir_layout = t->get_projected_inode()->layout;
184 found_layout = true;
185 }
186 }
187 CDentry* pdn = t->get_projected_parent_dn();
188 if (!pdn) {
189 root = t->ino();
190 break;
191 }
192 t = pdn->get_dir()->get_inode();
193 }
194
195 mdr->dir_root[n] = root;
196 mdr->dir_depth[n] = depth;
197 return true;
198
199 failed:
200 dout(10) << __func__ << " failed" << dendl;
201
202 drop_locks(mdr.get(), nullptr);
203 mdr->drop_local_auth_pins();
204 return false;
205 }
206
207 struct MarkEventOnDestruct {
208 MDRequestRef& mdr;
209 std::string_view message;
210 bool mark_event;
211 MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
212 mdr(_mdr),
213 message(_message),
214 mark_event(true) {}
215 ~MarkEventOnDestruct() {
216 if (mark_event)
217 mdr->mark_event(message);
218 }
219 };
220
221 /* If this function returns false, the mdr has been placed
222 * on the appropriate wait list */
223 bool Locker::acquire_locks(MDRequestRef& mdr,
224 MutationImpl::LockOpVec& lov,
225 CInode *auth_pin_freeze,
226 bool auth_pin_nonblocking)
227 {
228 dout(10) << "acquire_locks " << *mdr << dendl;
229
230 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
231
232 client_t client = mdr->get_client();
233
234 set<MDSCacheObject*> mustpin; // items to authpin
235 if (auth_pin_freeze)
236 mustpin.insert(auth_pin_freeze);
237
238 // xlocks
239 for (size_t i = 0; i < lov.size(); ++i) {
240 auto& p = lov[i];
241 SimpleLock *lock = p.lock;
242 MDSCacheObject *object = lock->get_parent();
243
244 if (p.is_xlock()) {
245 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
246 lock->get_type() == CEPH_LOCK_IPOLICY) &&
247 mds->is_cluster_degraded() &&
248 mdr->is_leader() &&
249 !mdr->is_queued_for_replay()) {
250 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
251 // get processed in proper order.
252 bool wait = false;
253 if (object->is_auth()) {
254 if (!mdr->is_xlocked(lock)) {
255 set<mds_rank_t> ls;
256 object->list_replicas(ls);
257 for (auto m : ls) {
258 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
259 wait = true;
260 break;
261 }
262 }
263 }
264 } else {
265 // if the lock is the latest locked one, it's possible that peer mds got the lock
266 // while there are recovering mds.
267 if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock))
268 wait = true;
269 }
270 if (wait) {
271 dout(10) << " must xlock " << *lock << " " << *object
272 << ", waiting for cluster recovered" << dendl;
273 mds->locker->drop_locks(mdr.get(), NULL);
274 mdr->drop_local_auth_pins();
275 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
276 return false;
277 }
278 }
279
280 dout(20) << " must xlock " << *lock << " " << *object << dendl;
281
282 mustpin.insert(object);
283
284 // augment xlock with a versionlock?
285 if (lock->get_type() == CEPH_LOCK_DN) {
286 CDentry *dn = static_cast<CDentry*>(object);
287 if (!dn->is_auth())
288 continue;
289 if (mdr->is_leader()) {
290 // leader. wrlock versionlock so we can pipeline dentry updates to journal.
291 lov.add_wrlock(&dn->versionlock, i + 1);
292 } else {
293 // peer. exclusively lock the dentry version (i.e. block other journal updates).
294 // this makes rollback safe.
295 lov.add_xlock(&dn->versionlock, i + 1);
296 }
297 }
298 if (lock->get_type() >= CEPH_LOCK_IFIRST && lock->get_type() != CEPH_LOCK_IVERSION) {
299 // inode version lock?
300 CInode *in = static_cast<CInode*>(object);
301 if (!in->is_auth())
302 continue;
303 if (mdr->is_leader()) {
304 // leader. wrlock versionlock so we can pipeline inode updates to journal.
305 lov.add_wrlock(&in->versionlock, i + 1);
306 } else {
307 // peer. exclusively lock the inode version (i.e. block other journal updates).
308 // this makes rollback safe.
309 lov.add_xlock(&in->versionlock, i + 1);
310 }
311 }
312 } else if (p.is_wrlock()) {
313 dout(20) << " must wrlock " << *lock << " " << *object << dendl;
314 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
315 if (object->is_auth()) {
316 mustpin.insert(object);
317 } else if (!object->is_auth() &&
318 !lock->can_wrlock(_client) && // we might have to request a scatter
319 !mdr->is_peer()) { // if we are peer (remote_wrlock), the leader already authpinned
320 dout(15) << " will also auth_pin " << *object
321 << " in case we need to request a scatter" << dendl;
322 mustpin.insert(object);
323 }
324 } else if (p.is_remote_wrlock()) {
325 dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
326 << *lock << " " << *object << dendl;
327 mustpin.insert(object);
328 } else if (p.is_rdlock()) {
329
330 dout(20) << " must rdlock " << *lock << " " << *object << dendl;
331 if (object->is_auth()) {
332 mustpin.insert(object);
333 } else if (!object->is_auth() &&
334 !lock->can_rdlock(client)) { // we might have to request an rdlock
335 dout(15) << " will also auth_pin " << *object
336 << " in case we need to request a rdlock" << dendl;
337 mustpin.insert(object);
338 }
339 } else {
340 ceph_assert(0 == "locker unknown lock operation");
341 }
342 }
343
344 lov.sort_and_merge();
345
346 // AUTH PINS
347 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
348
349 // can i auth pin them all now?
350 marker.message = "failed to authpin local pins";
351 for (const auto &p : mustpin) {
352 MDSCacheObject *object = p;
353
354 dout(10) << " must authpin " << *object << dendl;
355
356 if (mdr->is_auth_pinned(object)) {
357 if (object != (MDSCacheObject*)auth_pin_freeze)
358 continue;
359 if (mdr->more()->is_remote_frozen_authpin) {
360 if (mdr->more()->rename_inode == auth_pin_freeze)
361 continue;
362 // unfreeze auth pin for the wrong inode
363 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
364 }
365 }
366
367 if (!object->is_auth()) {
368 if (mdr->lock_cache) { // debug
369 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
370 CDentry *dn = mdr->dn[0].back();
371 ceph_assert(dn->get_projected_linkage()->is_remote());
372 }
373
374 if (object->is_ambiguous_auth()) {
375 // wait
376 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
377 mdr->disable_lock_cache();
378 drop_locks(mdr.get());
379 mdr->drop_local_auth_pins();
380 marker.message = "waiting for single auth, object is being migrated";
381 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
382 return false;
383 }
384 mustpin_remote[object->authority().first].insert(object);
385 continue;
386 }
387 int err = 0;
388 if (!object->can_auth_pin(&err)) {
389 if (mdr->lock_cache) {
390 CDir *dir;
391 if (CInode *in = dynamic_cast<CInode*>(object)) {
392 ceph_assert(!in->is_frozen_inode() && !in->is_frozen_auth_pin());
393 dir = in->get_projected_parent_dir();
394 } else if (CDentry *dn = dynamic_cast<CDentry*>(object)) {
395 dir = dn->get_dir();
396 } else {
397 ceph_assert(0 == "unknown type of lock parent");
398 }
399 if (dir->get_inode() == mdr->lock_cache->get_dir_inode()) {
400 // forcibly auth pin if there is lock cache on parent dir
401 continue;
402 }
403
404 { // debug
405 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
406 CDentry *dn = mdr->dn[0].back();
407 ceph_assert(dn->get_projected_linkage()->is_remote());
408 }
409 }
410
411 // wait
412 mdr->disable_lock_cache();
413 drop_locks(mdr.get());
414 mdr->drop_local_auth_pins();
415 if (auth_pin_nonblocking) {
416 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
417 mdr->aborted = true;
418 return false;
419 }
420 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
421 marker.message = "failed to authpin, subtree is being exported";
422 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
423 marker.message = "failed to authpin, dir is being fragmented";
424 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
425 marker.message = "failed to authpin, inode is being exported";
426 }
427 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
428 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
429
430 if (mdr->is_any_remote_auth_pin())
431 notify_freeze_waiter(object);
432
433 return false;
434 }
435 }
436
437 // ok, grab local auth pins
438 for (const auto& p : mustpin) {
439 MDSCacheObject *object = p;
440 if (mdr->is_auth_pinned(object)) {
441 dout(10) << " already auth_pinned " << *object << dendl;
442 } else if (object->is_auth()) {
443 dout(10) << " auth_pinning " << *object << dendl;
444 mdr->auth_pin(object);
445 }
446 }
447
448 // request remote auth_pins
449 if (!mustpin_remote.empty()) {
450 marker.message = "requesting remote authpins";
451 for (const auto& p : mdr->object_states) {
452 if (p.second.remote_auth_pinned == MDS_RANK_NONE)
453 continue;
454 ceph_assert(p.second.remote_auth_pinned == p.first->authority().first);
455 auto q = mustpin_remote.find(p.second.remote_auth_pinned);
456 if (q != mustpin_remote.end())
457 q->second.insert(p.first);
458 }
459
460 for (auto& p : mustpin_remote) {
461 dout(10) << "requesting remote auth_pins from mds." << p.first << dendl;
462
463 // wait for active auth
464 if (mds->is_cluster_degraded() &&
465 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) {
466 dout(10) << " mds." << p.first << " is not active" << dendl;
467 if (mdr->more()->waiting_on_peer.empty())
468 mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr));
469 return false;
470 }
471
472 auto req = make_message<MMDSPeerRequest>(mdr->reqid, mdr->attempt,
473 MMDSPeerRequest::OP_AUTHPIN);
474 for (auto& o : p.second) {
475 dout(10) << " req remote auth_pin of " << *o << dendl;
476 MDSCacheObjectInfo info;
477 o->set_object_info(info);
478 req->get_authpins().push_back(info);
479 if (o == auth_pin_freeze)
480 o->set_object_info(req->get_authpin_freeze());
481 mdr->pin(o);
482 }
483 if (auth_pin_nonblocking)
484 req->mark_nonblocking();
485 else if (!mdr->locks.empty())
486 req->mark_notify_blocking();
487
488 mds->send_message_mds(req, p.first);
489
490 // put in waiting list
491 auto ret = mdr->more()->waiting_on_peer.insert(p.first);
492 ceph_assert(ret.second);
493 }
494 return false;
495 }
496
497 // caps i'll need to issue
498 set<CInode*> issue_set;
499 bool result = false;
500
501 // acquire locks.
502 // make sure they match currently acquired locks.
503 for (const auto& p : lov) {
504 auto lock = p.lock;
505 if (p.is_xlock()) {
506 if (mdr->is_xlocked(lock)) {
507 dout(10) << " already xlocked " << *lock << " " << *lock->get_parent() << dendl;
508 continue;
509 }
510 if (mdr->locking && lock != mdr->locking)
511 cancel_locking(mdr.get(), &issue_set);
512 if (!xlock_start(lock, mdr)) {
513 marker.message = "failed to xlock, waiting";
514 goto out;
515 }
516 dout(10) << " got xlock on " << *lock << " " << *lock->get_parent() << dendl;
517 } else if (p.is_wrlock() || p.is_remote_wrlock()) {
518 auto it = mdr->locks.find(lock);
519 if (p.is_remote_wrlock()) {
520 if (it != mdr->locks.end() && it->is_remote_wrlock()) {
521 dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl;
522 } else {
523 if (mdr->locking && lock != mdr->locking)
524 cancel_locking(mdr.get(), &issue_set);
525 marker.message = "waiting for remote wrlocks";
526 remote_wrlock_start(lock, p.wrlock_target, mdr);
527 goto out;
528 }
529 }
530 if (p.is_wrlock()) {
531 if (it != mdr->locks.end() && it->is_wrlock()) {
532 dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl;
533 continue;
534 }
535 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
536 if (p.is_remote_wrlock()) {
537 // nowait if we have already gotten remote wrlock
538 if (!wrlock_try(lock, mdr, _client)) {
539 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
540 // can't take the wrlock because the scatter lock is gathering. need to
541 // release the remote wrlock, so that the gathering process can finish.
542 ceph_assert(it != mdr->locks.end());
543 remote_wrlock_finish(it, mdr.get());
544 remote_wrlock_start(lock, p.wrlock_target, mdr);
545 goto out;
546 }
547 } else {
548 if (!wrlock_start(p, mdr)) {
549 ceph_assert(!p.is_remote_wrlock());
550 marker.message = "failed to wrlock, waiting";
551 goto out;
552 }
553 }
554 dout(10) << " got wrlock on " << *lock << " " << *lock->get_parent() << dendl;
555 }
556 } else {
557 if (mdr->is_rdlocked(lock)) {
558 dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl;
559 continue;
560 }
561
562 ceph_assert(mdr->is_leader());
563 if (lock->needs_recover()) {
564 if (mds->is_cluster_degraded()) {
565 if (!mdr->is_queued_for_replay()) {
566 // see comments in SimpleLock::set_state_rejoin() and
567 // ScatterLock::encode_state_for_rejoin()
568 drop_locks(mdr.get());
569 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
570 dout(10) << " rejoin recovering " << *lock << " " << *lock->get_parent()
571 << ", waiting for cluster recovered" << dendl;
572 marker.message = "rejoin recovering lock, waiting for cluster recovered";
573 return false;
574 }
575 } else {
576 lock->clear_need_recover();
577 }
578 }
579
580 if (!rdlock_start(lock, mdr)) {
581 marker.message = "failed to rdlock, waiting";
582 goto out;
583 }
584 dout(10) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
585 }
586 }
587
588 mdr->set_mds_stamp(ceph_clock_now());
589 result = true;
590 marker.message = "acquired locks";
591
592 out:
593 issue_caps_set(issue_set);
594 return result;
595 }
596
597 void Locker::notify_freeze_waiter(MDSCacheObject *o)
598 {
599 CDir *dir = NULL;
600 if (CInode *in = dynamic_cast<CInode*>(o)) {
601 if (!in->is_root())
602 dir = in->get_parent_dir();
603 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
604 dir = dn->get_dir();
605 } else {
606 dir = dynamic_cast<CDir*>(o);
607 ceph_assert(dir);
608 }
609 if (dir) {
610 if (dir->is_freezing_dir())
611 mdcache->fragment_freeze_inc_num_waiters(dir);
612 if (dir->is_freezing_tree()) {
613 while (!dir->is_freezing_tree_root())
614 dir = dir->get_parent_dir();
615 mdcache->migrator->export_freeze_inc_num_waiters(dir);
616 }
617 }
618 }
619
620 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
621 {
622 for (const auto &p : mut->locks) {
623 if (!p.is_xlock())
624 continue;
625 MDSCacheObject *obj = p.lock->get_parent();
626 ceph_assert(obj->is_auth());
627 if (skip_dentry &&
628 (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
629 continue;
630 dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
631 p.lock->set_xlock_done();
632 }
633 }
634
635 void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
636 bool drop_rdlocks)
637 {
638 set<mds_rank_t> peers;
639
640 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
641 SimpleLock *lock = it->lock;
642 MDSCacheObject *obj = lock->get_parent();
643
644 if (it->is_xlock()) {
645 if (obj->is_auth()) {
646 bool ni = false;
647 xlock_finish(it++, mut, &ni);
648 if (ni)
649 pneed_issue->insert(static_cast<CInode*>(obj));
650 } else {
651 ceph_assert(lock->get_sm()->can_remote_xlock);
652 peers.insert(obj->authority().first);
653 lock->put_xlock();
654 mut->locks.erase(it++);
655 }
656 } else if (it->is_wrlock() || it->is_remote_wrlock()) {
657 if (it->is_remote_wrlock()) {
658 peers.insert(it->wrlock_target);
659 it->clear_remote_wrlock();
660 }
661 if (it->is_wrlock()) {
662 bool ni = false;
663 wrlock_finish(it++, mut, &ni);
664 if (ni)
665 pneed_issue->insert(static_cast<CInode*>(obj));
666 } else {
667 mut->locks.erase(it++);
668 }
669 } else if (drop_rdlocks && it->is_rdlock()) {
670 bool ni = false;
671 rdlock_finish(it++, mut, &ni);
672 if (ni)
673 pneed_issue->insert(static_cast<CInode*>(obj));
674 } else {
675 ++it;
676 }
677 }
678
679 if (drop_rdlocks) {
680 if (mut->lock_cache) {
681 put_lock_cache(mut->lock_cache);
682 mut->lock_cache = nullptr;
683 }
684 }
685
686 for (set<mds_rank_t>::iterator p = peers.begin(); p != peers.end(); ++p) {
687 if (!mds->is_cluster_degraded() ||
688 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
689 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
690 auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt,
691 MMDSPeerRequest::OP_DROPLOCKS);
692 mds->send_message_mds(peerreq, *p);
693 }
694 }
695 }
696
697 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
698 {
699 SimpleLock *lock = mut->locking;
700 ceph_assert(lock);
701 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
702
703 if (lock->get_parent()->is_auth()) {
704 bool need_issue = false;
705 if (lock->get_state() == LOCK_PREXLOCK) {
706 _finish_xlock(lock, -1, &need_issue);
707 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
708 lock->set_state(LOCK_XLOCKDONE);
709 eval_gather(lock, true, &need_issue);
710 }
711 if (need_issue)
712 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
713 }
714 mut->finish_locking(lock);
715 }
716
717 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
718 {
719 // leftover locks
720 set<CInode*> my_need_issue;
721 if (!pneed_issue)
722 pneed_issue = &my_need_issue;
723
724 if (mut->locking)
725 cancel_locking(mut, pneed_issue);
726 _drop_locks(mut, pneed_issue, true);
727
728 if (pneed_issue == &my_need_issue)
729 issue_caps_set(*pneed_issue);
730 mut->locking_state = 0;
731 }
732
733 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
734 {
735 set<CInode*> my_need_issue;
736 if (!pneed_issue)
737 pneed_issue = &my_need_issue;
738
739 _drop_locks(mut, pneed_issue, false);
740
741 if (pneed_issue == &my_need_issue)
742 issue_caps_set(*pneed_issue);
743 }
744
745 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
746 {
747 set<CInode*> need_issue;
748
749 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
750 if (!it->is_rdlock()) {
751 ++it;
752 continue;
753 }
754 SimpleLock *lock = it->lock;
755 // make later mksnap/setlayout (at other mds) wait for this unsafe request
756 if (lock->get_type() == CEPH_LOCK_ISNAP ||
757 lock->get_type() == CEPH_LOCK_IPOLICY) {
758 ++it;
759 continue;
760 }
761 bool ni = false;
762 rdlock_finish(it++, mut, &ni);
763 if (ni)
764 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
765 }
766
767 issue_caps_set(need_issue);
768 }
769
770 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
771 {
772 set<CInode*> need_issue;
773
774 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
775 SimpleLock *lock = it->lock;
776 if (lock->get_type() == CEPH_LOCK_IDFT) {
777 ++it;
778 continue;
779 }
780 bool ni = false;
781 wrlock_finish(it++, mut, &ni);
782 if (ni)
783 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
784 }
785 issue_caps_set(need_issue);
786 }
787
788 class C_MDL_DropCache : public LockerContext {
789 MDLockCache *lock_cache;
790 public:
791 C_MDL_DropCache(Locker *l, MDLockCache *lc) :
792 LockerContext(l), lock_cache(lc) { }
793 void finish(int r) override {
794 locker->drop_locks(lock_cache);
795 lock_cache->cleanup();
796 delete lock_cache;
797 }
798 };
799
800 void Locker::put_lock_cache(MDLockCache* lock_cache)
801 {
802 ceph_assert(lock_cache->ref > 0);
803 if (--lock_cache->ref > 0)
804 return;
805
806 ceph_assert(lock_cache->invalidating);
807
808 lock_cache->detach_locks();
809
810 CInode *diri = lock_cache->get_dir_inode();
811 for (auto dir : lock_cache->auth_pinned_dirfrags) {
812 if (dir->get_inode() != diri)
813 continue;
814 dir->enable_frozen_inode();
815 }
816
817 mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
818 }
819
820 int Locker::get_cap_bit_for_lock_cache(int op)
821 {
822 switch(op) {
823 case CEPH_MDS_OP_CREATE:
824 return CEPH_CAP_DIR_CREATE;
825 case CEPH_MDS_OP_UNLINK:
826 return CEPH_CAP_DIR_UNLINK;
827 default:
828 ceph_assert(0 == "unsupported operation");
829 return 0;
830 }
831 }
832
833 void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
834 {
835 ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
836 if (lock_cache->invalidating) {
837 ceph_assert(!lock_cache->client_cap);
838 } else {
839 lock_cache->invalidating = true;
840 lock_cache->detach_dirfrags();
841 }
842
843 Capability *cap = lock_cache->client_cap;
844 if (cap) {
845 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
846 cap->clear_lock_cache_allowed(cap_bit);
847 if (cap->issued() & cap_bit)
848 issue_caps(lock_cache->get_dir_inode(), cap);
849 else
850 cap = nullptr;
851 }
852
853 if (!cap) {
854 lock_cache->item_cap_lock_cache.remove_myself();
855 put_lock_cache(lock_cache);
856 }
857 }
858
859 void Locker::eval_lock_caches(Capability *cap)
860 {
861 for (auto p = cap->lock_caches.begin(); !p.end(); ) {
862 MDLockCache *lock_cache = *p;
863 ++p;
864 if (!lock_cache->invalidating)
865 continue;
866 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
867 if (!(cap->issued() & cap_bit)) {
868 lock_cache->item_cap_lock_cache.remove_myself();
869 put_lock_cache(lock_cache);
870 }
871 }
872 }
873
874 // ask lock caches to release auth pins
875 void Locker::invalidate_lock_caches(CDir *dir)
876 {
877 dout(10) << "invalidate_lock_caches on " << *dir << dendl;
878 auto &lock_caches = dir->lock_caches_with_auth_pins;
879 while (!lock_caches.empty()) {
880 invalidate_lock_cache(lock_caches.front()->parent);
881 }
882 }
883
884 // ask lock caches to release locks
885 void Locker::invalidate_lock_caches(SimpleLock *lock)
886 {
887 dout(10) << "invalidate_lock_caches " << *lock << " on " << *lock->get_parent() << dendl;
888 if (lock->is_cached()) {
889 auto&& lock_caches = lock->get_active_caches();
890 for (auto& lc : lock_caches)
891 invalidate_lock_cache(lc);
892 }
893 }
894
895 void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout)
896 {
897 if (mdr->lock_cache)
898 return;
899
900 client_t client = mdr->get_client();
901 int opcode = mdr->client_request->get_op();
902 dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
903
904 if (!diri->is_auth()) {
905 dout(10) << " dir inode is not auth, noop" << dendl;
906 return;
907 }
908
909 if (mdr->has_more() && !mdr->more()->peers.empty()) {
910 dout(10) << " there are peers requests for " << *mdr << ", noop" << dendl;
911 return;
912 }
913
914 Capability *cap = diri->get_client_cap(client);
915 if (!cap) {
916 dout(10) << " there is no cap for client." << client << ", noop" << dendl;
917 return;
918 }
919
920 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
921 if ((*p)->opcode == opcode) {
922 dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode) << ", noop" << dendl;
923 return;
924 }
925 }
926
927 set<MDSCacheObject*> ancestors;
928 for (CInode *in = diri; ; ) {
929 CDentry *pdn = in->get_projected_parent_dn();
930 if (!pdn)
931 break;
932 // ancestors.insert(pdn);
933 in = pdn->get_dir()->get_inode();
934 ancestors.insert(in);
935 }
936
937 for (auto& p : mdr->object_states) {
938 if (p.first != diri && !ancestors.count(p.first))
939 continue;
940 auto& stat = p.second;
941 if (stat.auth_pinned) {
942 if (!p.first->can_auth_pin()) {
943 dout(10) << " can't auth_pin(freezing?) lock parent " << *p.first << ", noop" << dendl;
944 return;
945 }
946 if (CInode *in = dynamic_cast<CInode*>(p.first); in->is_parent_projected()) {
947 CDir *dir = in->get_projected_parent_dir();
948 if (!dir->can_auth_pin()) {
949 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
950 return;
951 }
952 }
953 }
954 }
955
956 std::vector<CDir*> dfv;
957 dfv.reserve(diri->get_num_dirfrags());
958
959 diri->get_dirfrags(dfv);
960 for (auto dir : dfv) {
961 if (!dir->is_auth() || !dir->can_auth_pin()) {
962 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
963 return;
964 }
965 if (dir->is_any_freezing_or_frozen_inode()) {
966 dout(10) << " there is freezing/frozen inode in " << *dir << ", noop" << dendl;
967 return;
968 }
969 }
970
971 for (auto& p : mdr->locks) {
972 MDSCacheObject *obj = p.lock->get_parent();
973 if (obj != diri && !ancestors.count(obj))
974 continue;
975 if (!p.lock->is_stable()) {
976 dout(10) << " unstable " << *p.lock << " on " << *obj << ", noop" << dendl;
977 return;
978 }
979 }
980
981 auto lock_cache = new MDLockCache(cap, opcode);
982 if (dir_layout)
983 lock_cache->set_dir_layout(*dir_layout);
984 cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
985
986 for (auto dir : dfv) {
987 // prevent subtree migration
988 lock_cache->auth_pin(dir);
989 // prevent frozen inode
990 dir->disable_frozen_inode();
991 }
992
993 for (auto& p : mdr->object_states) {
994 if (p.first != diri && !ancestors.count(p.first))
995 continue;
996 auto& stat = p.second;
997 if (stat.auth_pinned)
998 lock_cache->auth_pin(p.first);
999 else
1000 lock_cache->pin(p.first);
1001
1002 if (CInode *in = dynamic_cast<CInode*>(p.first)) {
1003 CDentry *pdn = in->get_projected_parent_dn();
1004 if (pdn)
1005 dfv.push_back(pdn->get_dir());
1006 } else if (CDentry *dn = dynamic_cast<CDentry*>(p.first)) {
1007 dfv.push_back(dn->get_dir());
1008 } else {
1009 ceph_assert(0 == "unknown type of lock parent");
1010 }
1011 }
1012 lock_cache->attach_dirfrags(std::move(dfv));
1013
1014 for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) {
1015 MDSCacheObject *obj = it->lock->get_parent();
1016 if (obj != diri && !ancestors.count(obj)) {
1017 ++it;
1018 continue;
1019 }
1020 unsigned lock_flag = 0;
1021 if (it->is_wrlock()) {
1022 // skip wrlocks that were added by MDCache::predirty_journal_parent()
1023 if (obj == diri)
1024 lock_flag = MutationImpl::LockOp::WRLOCK;
1025 } else {
1026 ceph_assert(it->is_rdlock());
1027 lock_flag = MutationImpl::LockOp::RDLOCK;
1028 }
1029 if (lock_flag) {
1030 lock_cache->emplace_lock(it->lock, lock_flag);
1031 mdr->locks.erase(it++);
1032 } else {
1033 ++it;
1034 }
1035 }
1036 lock_cache->attach_locks();
1037
1038 lock_cache->ref++;
1039 mdr->lock_cache = lock_cache;
1040 }
1041
1042 bool Locker::find_and_attach_lock_cache(MDRequestRef& mdr, CInode *diri)
1043 {
1044 if (mdr->lock_cache)
1045 return true;
1046
1047 Capability *cap = diri->get_client_cap(mdr->get_client());
1048 if (!cap)
1049 return false;
1050
1051 int opcode = mdr->client_request->get_op();
1052 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
1053 MDLockCache *lock_cache = *p;
1054 if (lock_cache->opcode == opcode) {
1055 dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl;
1056 mdr->lock_cache = lock_cache;
1057 mdr->lock_cache->ref++;
1058 return true;
1059 }
1060 }
1061 return false;
1062 }
1063
1064 // generics
1065
1066 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
1067 {
1068 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
1069 ceph_assert(!lock->is_stable());
1070
1071 int next = lock->get_next_state();
1072
1073 CInode *in = 0;
1074 bool caps = lock->get_cap_shift();
1075 if (lock->get_type() != CEPH_LOCK_DN)
1076 in = static_cast<CInode *>(lock->get_parent());
1077
1078 bool need_issue = false;
1079
1080 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
1081 ceph_assert(!caps || in != NULL);
1082 if (caps && in->is_head()) {
1083 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
1084 lock->get_cap_shift(), lock->get_cap_mask());
1085 dout(10) << " next state is " << lock->get_state_name(next)
1086 << " issued/allows loner " << gcap_string(loner_issued)
1087 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
1088 << " xlocker " << gcap_string(xlocker_issued)
1089 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
1090 << " other " << gcap_string(other_issued)
1091 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
1092 << dendl;
1093
1094 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
1095 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
1096 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
1097 need_issue = true;
1098 }
1099
1100 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
1101 bool auth = lock->get_parent()->is_auth();
1102 if (!lock->is_gathering() &&
1103 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
1104 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
1105 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
1106 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
1107 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
1108 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
1109 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
1110 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
1111 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
1112 lock->get_state() != LOCK_MIX_SYNC2
1113 ) {
1114 dout(7) << "eval_gather finished gather on " << *lock
1115 << " on " << *lock->get_parent() << dendl;
1116
1117 if (lock->get_sm() == &sm_filelock) {
1118 ceph_assert(in);
1119 if (in->state_test(CInode::STATE_RECOVERING)) {
1120 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
1121 return;
1122 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
1123 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
1124 mds->mdcache->queue_file_recover(in);
1125 mds->mdcache->do_file_recover();
1126 return;
1127 }
1128 }
1129
1130 if (!lock->get_parent()->is_auth()) {
1131 // replica: tell auth
1132 mds_rank_t auth = lock->get_parent()->authority().first;
1133
1134 if (lock->get_parent()->is_rejoining() &&
1135 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
1136 dout(7) << "eval_gather finished gather, but still rejoining "
1137 << *lock->get_parent() << dendl;
1138 return;
1139 }
1140
1141 if (!mds->is_cluster_degraded() ||
1142 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1143 switch (lock->get_state()) {
1144 case LOCK_SYNC_LOCK:
1145 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
1146 break;
1147
1148 case LOCK_MIX_SYNC:
1149 {
1150 auto reply = make_message<MLock>(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
1151 lock->encode_locked_state(reply->get_data());
1152 mds->send_message_mds(reply, auth);
1153 next = LOCK_MIX_SYNC2;
1154 (static_cast<ScatterLock *>(lock))->start_flush();
1155 }
1156 break;
1157
1158 case LOCK_MIX_SYNC2:
1159 (static_cast<ScatterLock *>(lock))->finish_flush();
1160 (static_cast<ScatterLock *>(lock))->clear_flushed();
1161
1162 case LOCK_SYNC_MIX2:
1163 // do nothing, we already acked
1164 break;
1165
1166 case LOCK_SYNC_MIX:
1167 {
1168 auto reply = make_message<MLock>(lock, LOCK_AC_MIXACK, mds->get_nodeid());
1169 mds->send_message_mds(reply, auth);
1170 next = LOCK_SYNC_MIX2;
1171 }
1172 break;
1173
1174 case LOCK_MIX_LOCK:
1175 {
1176 bufferlist data;
1177 lock->encode_locked_state(data);
1178 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
1179 (static_cast<ScatterLock *>(lock))->start_flush();
1180 // we'll get an AC_LOCKFLUSHED to complete
1181 }
1182 break;
1183
1184 default:
1185 ceph_abort();
1186 }
1187 }
1188 } else {
1189 // auth
1190
1191 // once the first (local) stage of mix->lock gather complete we can
1192 // gather from replicas
1193 if (lock->get_state() == LOCK_MIX_LOCK &&
1194 lock->get_parent()->is_replicated()) {
1195 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
1196 send_lock_message(lock, LOCK_AC_LOCK);
1197 lock->init_gather();
1198 lock->set_state(LOCK_MIX_LOCK2);
1199 return;
1200 }
1201
1202 if (lock->is_dirty() && !lock->is_flushed()) {
1203 scatter_writebehind(static_cast<ScatterLock *>(lock));
1204 mds->mdlog->flush();
1205 return;
1206 }
1207 lock->clear_flushed();
1208
1209 switch (lock->get_state()) {
1210 // to mixed
1211 case LOCK_TSYN_MIX:
1212 case LOCK_SYNC_MIX:
1213 case LOCK_EXCL_MIX:
1214 case LOCK_XSYN_MIX:
1215 in->start_scatter(static_cast<ScatterLock *>(lock));
1216 if (lock->get_parent()->is_replicated()) {
1217 bufferlist softdata;
1218 lock->encode_locked_state(softdata);
1219 send_lock_message(lock, LOCK_AC_MIX, softdata);
1220 }
1221 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
1222 break;
1223
1224 case LOCK_XLOCK:
1225 case LOCK_XLOCKDONE:
1226 if (next != LOCK_SYNC)
1227 break;
1228 // fall-thru
1229
1230 // to sync
1231 case LOCK_EXCL_SYNC:
1232 case LOCK_LOCK_SYNC:
1233 case LOCK_MIX_SYNC:
1234 case LOCK_XSYN_SYNC:
1235 if (lock->get_parent()->is_replicated()) {
1236 bufferlist softdata;
1237 lock->encode_locked_state(softdata);
1238 send_lock_message(lock, LOCK_AC_SYNC, softdata);
1239 }
1240 break;
1241 }
1242
1243 }
1244
1245 lock->set_state(next);
1246
1247 if (lock->get_parent()->is_auth() &&
1248 lock->is_stable())
1249 lock->get_parent()->auth_unpin(lock);
1250
1251 // drop loner before doing waiters
1252 if (caps &&
1253 in->is_head() &&
1254 in->is_auth() &&
1255 in->get_wanted_loner() != in->get_loner()) {
1256 dout(10) << " trying to drop loner" << dendl;
1257 if (in->try_drop_loner()) {
1258 dout(10) << " dropped loner" << dendl;
1259 need_issue = true;
1260 }
1261 }
1262
1263 if (pfinishers)
1264 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1265 *pfinishers);
1266 else
1267 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1268
1269 if (caps && in->is_head())
1270 need_issue = true;
1271
1272 if (lock->get_parent()->is_auth() &&
1273 lock->is_stable())
1274 try_eval(lock, &need_issue);
1275 }
1276
1277 if (need_issue) {
1278 if (pneed_issue)
1279 *pneed_issue = true;
1280 else if (in->is_head())
1281 issue_caps(in);
1282 }
1283
1284 }
1285
1286 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1287 {
1288 bool need_issue = caps_imported;
1289 MDSContext::vec finishers;
1290
1291 dout(10) << "eval " << mask << " " << *in << dendl;
1292
1293 // choose loner?
1294 if (in->is_auth() && in->is_head()) {
1295 client_t orig_loner = in->get_loner();
1296 if (in->choose_ideal_loner()) {
1297 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1298 need_issue = true;
1299 mask = -1;
1300 } else if (in->get_wanted_loner() != in->get_loner()) {
1301 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1302 mask = -1;
1303 }
1304 }
1305
1306 retry:
1307 if (mask & CEPH_LOCK_IFILE)
1308 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1309 if (mask & CEPH_LOCK_IAUTH)
1310 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1311 if (mask & CEPH_LOCK_ILINK)
1312 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1313 if (mask & CEPH_LOCK_IXATTR)
1314 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1315 if (mask & CEPH_LOCK_INEST)
1316 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1317 if (mask & CEPH_LOCK_IFLOCK)
1318 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1319 if (mask & CEPH_LOCK_IPOLICY)
1320 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1321
1322 // drop loner?
1323 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1324 if (in->try_drop_loner()) {
1325 need_issue = true;
1326 if (in->get_wanted_loner() >= 0) {
1327 dout(10) << "eval end set loner to client." << in->get_wanted_loner() << dendl;
1328 bool ok = in->try_set_loner();
1329 ceph_assert(ok);
1330 mask = -1;
1331 goto retry;
1332 }
1333 }
1334 }
1335
1336 finish_contexts(g_ceph_context, finishers);
1337
1338 if (need_issue && in->is_head())
1339 issue_caps(in);
1340
1341 dout(10) << "eval done" << dendl;
1342 return need_issue;
1343 }
1344
1345 class C_Locker_Eval : public LockerContext {
1346 MDSCacheObject *p;
1347 int mask;
1348 public:
1349 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1350 // We are used as an MDSCacheObject waiter, so should
1351 // only be invoked by someone already holding the big lock.
1352 ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock));
1353 p->get(MDSCacheObject::PIN_PTRWAITER);
1354 }
1355 void finish(int r) override {
1356 locker->try_eval(p, mask);
1357 p->put(MDSCacheObject::PIN_PTRWAITER);
1358 }
1359 };
1360
1361 void Locker::try_eval(MDSCacheObject *p, int mask)
1362 {
1363 // unstable and ambiguous auth?
1364 if (p->is_ambiguous_auth()) {
1365 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1366 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1367 return;
1368 }
1369
1370 if (p->is_auth() && p->is_frozen()) {
1371 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1372 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1373 return;
1374 }
1375
1376 if (mask & CEPH_LOCK_DN) {
1377 ceph_assert(mask == CEPH_LOCK_DN);
1378 bool need_issue = false; // ignore this, no caps on dentries
1379 CDentry *dn = static_cast<CDentry *>(p);
1380 eval_any(&dn->lock, &need_issue);
1381 } else {
1382 CInode *in = static_cast<CInode *>(p);
1383 eval(in, mask);
1384 }
1385 }
1386
1387 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1388 {
1389 MDSCacheObject *p = lock->get_parent();
1390
1391 // unstable and ambiguous auth?
1392 if (p->is_ambiguous_auth()) {
1393 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1394 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1395 return;
1396 }
1397
1398 if (!p->is_auth()) {
1399 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1400 return;
1401 }
1402
1403 if (p->is_frozen()) {
1404 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1405 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1406 return;
1407 }
1408
1409 /*
1410 * We could have a situation like:
1411 *
1412 * - mds A authpins item on mds B
1413 * - mds B starts to freeze tree containing item
1414 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1415 * - mds B lock is unstable, sets scatter_wanted
1416 * - mds B lock stabilizes, calls try_eval.
1417 *
1418 * We can defer while freezing without causing a deadlock. Honor
1419 * scatter_wanted flag here. This will never get deferred by the
1420 * checks above due to the auth_pin held by the leader.
1421 */
1422 if (lock->is_scatterlock()) {
1423 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1424 if (slock->get_scatter_wanted() &&
1425 slock->get_state() != LOCK_MIX) {
1426 scatter_mix(slock, pneed_issue);
1427 if (!lock->is_stable())
1428 return;
1429 } else if (slock->get_unscatter_wanted() &&
1430 slock->get_state() != LOCK_LOCK) {
1431 simple_lock(slock, pneed_issue);
1432 if (!lock->is_stable()) {
1433 return;
1434 }
1435 }
1436 }
1437
1438 if (lock->get_type() != CEPH_LOCK_DN &&
1439 lock->get_type() != CEPH_LOCK_ISNAP &&
1440 lock->get_type() != CEPH_LOCK_IPOLICY &&
1441 p->is_freezing()) {
1442 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1443 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1444 return;
1445 }
1446
1447 eval(lock, pneed_issue);
1448 }
1449
1450 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1451 {
1452 bool need_issue = false;
1453 MDSContext::vec finishers;
1454
1455 // kick locks now
1456 if (!in->filelock.is_stable())
1457 eval_gather(&in->filelock, false, &need_issue, &finishers);
1458 if (!in->authlock.is_stable())
1459 eval_gather(&in->authlock, false, &need_issue, &finishers);
1460 if (!in->linklock.is_stable())
1461 eval_gather(&in->linklock, false, &need_issue, &finishers);
1462 if (!in->xattrlock.is_stable())
1463 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1464
1465 if (need_issue && in->is_head()) {
1466 if (issue_set)
1467 issue_set->insert(in);
1468 else
1469 issue_caps(in);
1470 }
1471
1472 finish_contexts(g_ceph_context, finishers);
1473 }
1474
1475 void Locker::eval_scatter_gathers(CInode *in)
1476 {
1477 bool need_issue = false;
1478 MDSContext::vec finishers;
1479
1480 dout(10) << "eval_scatter_gathers " << *in << dendl;
1481
1482 // kick locks now
1483 if (!in->filelock.is_stable())
1484 eval_gather(&in->filelock, false, &need_issue, &finishers);
1485 if (!in->nestlock.is_stable())
1486 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1487 if (!in->dirfragtreelock.is_stable())
1488 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1489
1490 if (need_issue && in->is_head())
1491 issue_caps(in);
1492
1493 finish_contexts(g_ceph_context, finishers);
1494 }
1495
1496 void Locker::eval(SimpleLock *lock, bool *need_issue)
1497 {
1498 switch (lock->get_type()) {
1499 case CEPH_LOCK_IFILE:
1500 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1501 case CEPH_LOCK_IDFT:
1502 case CEPH_LOCK_INEST:
1503 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1504 default:
1505 return simple_eval(lock, need_issue);
1506 }
1507 }
1508
1509
1510 // ------------------
1511 // rdlock
1512
1513 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1514 {
1515 // kick the lock
1516 if (lock->is_stable()) {
1517 if (lock->get_parent()->is_auth()) {
1518 if (lock->get_sm() == &sm_scatterlock) {
1519 // not until tempsync is fully implemented
1520 //if (lock->get_parent()->is_replicated())
1521 //scatter_tempsync((ScatterLock*)lock);
1522 //else
1523 simple_sync(lock);
1524 } else if (lock->get_sm() == &sm_filelock) {
1525 CInode *in = static_cast<CInode*>(lock->get_parent());
1526 if (lock->get_state() == LOCK_EXCL &&
1527 in->get_target_loner() >= 0 &&
1528 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1529 file_xsyn(lock);
1530 else
1531 simple_sync(lock);
1532 } else
1533 simple_sync(lock);
1534 return true;
1535 } else {
1536 // request rdlock state change from auth
1537 mds_rank_t auth = lock->get_parent()->authority().first;
1538 if (!mds->is_cluster_degraded() ||
1539 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1540 dout(10) << "requesting rdlock from auth on "
1541 << *lock << " on " << *lock->get_parent() << dendl;
1542 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1543 }
1544 return false;
1545 }
1546 }
1547 if (lock->get_type() == CEPH_LOCK_IFILE) {
1548 CInode *in = static_cast<CInode *>(lock->get_parent());
1549 if (in->state_test(CInode::STATE_RECOVERING)) {
1550 mds->mdcache->recovery_queue.prioritize(in);
1551 }
1552 }
1553
1554 return false;
1555 }
1556
1557 bool Locker::rdlock_try(SimpleLock *lock, client_t client)
1558 {
1559 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1560
1561 // can read? grab ref.
1562 if (lock->can_rdlock(client))
1563 return true;
1564
1565 _rdlock_kick(lock, false);
1566
1567 if (lock->can_rdlock(client))
1568 return true;
1569
1570 return false;
1571 }
1572
1573 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1574 {
1575 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1576
1577 // client may be allowed to rdlock the same item it has xlocked.
1578 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1579 if (mut->snapid != CEPH_NOSNAP)
1580 as_anon = true;
1581 client_t client = as_anon ? -1 : mut->get_client();
1582
1583 CInode *in = 0;
1584 if (lock->get_type() != CEPH_LOCK_DN)
1585 in = static_cast<CInode *>(lock->get_parent());
1586
1587 /*
1588 if (!lock->get_parent()->is_auth() &&
1589 lock->fw_rdlock_to_auth()) {
1590 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1591 return false;
1592 }
1593 */
1594
1595 while (1) {
1596 // can read? grab ref.
1597 if (lock->can_rdlock(client)) {
1598 lock->get_rdlock();
1599 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1600 return true;
1601 }
1602
1603 // hmm, wait a second.
1604 if (in && !in->is_head() && in->is_auth() &&
1605 lock->get_state() == LOCK_SNAP_SYNC) {
1606 // okay, we actually need to kick the head's lock to get ourselves synced up.
1607 CInode *head = mdcache->get_inode(in->ino());
1608 ceph_assert(head);
1609 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1610 if (hlock->get_state() == LOCK_SYNC)
1611 hlock = head->get_lock(lock->get_type());
1612
1613 if (hlock->get_state() != LOCK_SYNC) {
1614 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1615 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1616 return false;
1617 // oh, check our lock again then
1618 }
1619 }
1620
1621 if (!_rdlock_kick(lock, as_anon))
1622 break;
1623 }
1624
1625 // wait!
1626 int wait_on;
1627 if (lock->get_parent()->is_auth() && lock->is_stable())
1628 wait_on = SimpleLock::WAIT_RD;
1629 else
1630 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1631 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1632 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1633 nudge_log(lock);
1634 return false;
1635 }
1636
1637 void Locker::nudge_log(SimpleLock *lock)
1638 {
1639 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1640 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1641 mds->mdlog->flush();
1642 }
1643
1644 void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1645 {
1646 ceph_assert(it->is_rdlock());
1647 SimpleLock *lock = it->lock;
1648 // drop ref
1649 lock->put_rdlock();
1650 if (mut)
1651 mut->locks.erase(it);
1652
1653 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1654
1655 // last one?
1656 if (!lock->is_rdlocked()) {
1657 if (!lock->is_stable())
1658 eval_gather(lock, false, pneed_issue);
1659 else if (lock->get_parent()->is_auth())
1660 try_eval(lock, pneed_issue);
1661 }
1662 }
1663
1664 bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MDRequestRef& mdr)
1665 {
1666 dout(10) << __func__ << dendl;
1667 for (const auto& p : lov) {
1668 auto lock = p.lock;
1669 ceph_assert(p.is_rdlock());
1670 if (!mdr->is_rdlocked(lock) && !rdlock_try(lock, mdr->get_client())) {
1671 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD,
1672 new C_MDS_RetryRequest(mdcache, mdr));
1673 goto failed;
1674 }
1675 lock->get_rdlock();
1676 mdr->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1677 dout(20) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
1678 }
1679
1680 return true;
1681 failed:
1682 dout(10) << __func__ << " failed" << dendl;
1683 drop_locks(mdr.get(), nullptr);
1684 mdr->drop_local_auth_pins();
1685 return false;
1686 }
1687
1688 bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
1689 {
1690 dout(10) << __func__ << dendl;
1691 for (const auto& p : lov) {
1692 auto lock = p.lock;
1693 ceph_assert(p.is_rdlock());
1694 if (!lock->can_rdlock(mut->get_client()))
1695 return false;
1696 p.lock->get_rdlock();
1697 mut->emplace_lock(p.lock, MutationImpl::LockOp::RDLOCK);
1698 }
1699 return true;
1700 }
1701
1702 // ------------------
1703 // wrlock
1704
1705 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1706 {
1707 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1708 lock->get_type() == CEPH_LOCK_DVERSION)
1709 return local_wrlock_grab(static_cast<LocalLockC*>(lock), mut);
1710
1711 dout(7) << "wrlock_force on " << *lock
1712 << " on " << *lock->get_parent() << dendl;
1713 lock->get_wrlock(true);
1714 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1715 }
1716
1717 bool Locker::wrlock_try(SimpleLock *lock, const MutationRef& mut, client_t client)
1718 {
1719 dout(10) << "wrlock_try " << *lock << " on " << *lock->get_parent() << dendl;
1720 if (client == -1)
1721 client = mut->get_client();
1722
1723 while (1) {
1724 if (lock->can_wrlock(client)) {
1725 lock->get_wrlock();
1726 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1727 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1728 return true;
1729 }
1730 if (!lock->is_stable())
1731 break;
1732 CInode *in = static_cast<CInode *>(lock->get_parent());
1733 if (!in->is_auth())
1734 break;
1735 // caller may already has a log entry open. To avoid calling
1736 // scatter_writebehind or start_scatter. don't change nest lock
1737 // state if it has dirty scatterdata.
1738 if (lock->is_dirty())
1739 break;
1740 // To avoid calling scatter_writebehind or start_scatter. don't
1741 // change nest lock state to MIX.
1742 ScatterLock *slock = static_cast<ScatterLock*>(lock);
1743 if (slock->get_scatter_wanted() || in->has_subtree_or_exporting_dirfrag())
1744 break;
1745
1746 simple_lock(lock);
1747 }
1748 return false;
1749 }
1750
1751 bool Locker::wrlock_start(const MutationImpl::LockOp &op, MDRequestRef& mut)
1752 {
1753 SimpleLock *lock = op.lock;
1754 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1755 lock->get_type() == CEPH_LOCK_DVERSION)
1756 return local_wrlock_start(static_cast<LocalLockC*>(lock), mut);
1757
1758 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1759
1760 CInode *in = static_cast<CInode *>(lock->get_parent());
1761 client_t client = op.is_state_pin() ? lock->get_excl_client() : mut->get_client();
1762 bool want_scatter = lock->get_parent()->is_auth() &&
1763 (in->has_subtree_or_exporting_dirfrag() ||
1764 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1765
1766 while (1) {
1767 // wrlock?
1768 if (lock->can_wrlock(client) &&
1769 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1770 lock->get_wrlock();
1771 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1772 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1773 return true;
1774 }
1775
1776 if (lock->get_type() == CEPH_LOCK_IFILE &&
1777 in->state_test(CInode::STATE_RECOVERING)) {
1778 mds->mdcache->recovery_queue.prioritize(in);
1779 }
1780
1781 if (!lock->is_stable())
1782 break;
1783
1784 if (in->is_auth()) {
1785 if (want_scatter)
1786 scatter_mix(static_cast<ScatterLock*>(lock));
1787 else
1788 simple_lock(lock);
1789 } else {
1790 // replica.
1791 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1792 mds_rank_t auth = lock->get_parent()->authority().first;
1793 if (!mds->is_cluster_degraded() ||
1794 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1795 dout(10) << "requesting scatter from auth on "
1796 << *lock << " on " << *lock->get_parent() << dendl;
1797 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1798 }
1799 break;
1800 }
1801 }
1802
1803 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1804 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1805 nudge_log(lock);
1806
1807 return false;
1808 }
1809
1810 void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1811 {
1812 ceph_assert(it->is_wrlock());
1813 SimpleLock* lock = it->lock;
1814
1815 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1816 lock->get_type() == CEPH_LOCK_DVERSION)
1817 return local_wrlock_finish(it, mut);
1818
1819 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1820 lock->put_wrlock();
1821
1822 if (it->is_remote_wrlock())
1823 it->clear_wrlock();
1824 else
1825 mut->locks.erase(it);
1826
1827 if (lock->is_wrlocked()) {
1828 // Evaluate unstable lock after scatter_writebehind_finish(). Because
1829 // eval_gather() does not change lock's state when lock is flushing.
1830 if (!lock->is_stable() && lock->is_flushed() &&
1831 lock->get_parent()->is_auth())
1832 eval_gather(lock, false, pneed_issue);
1833 } else {
1834 if (!lock->is_stable())
1835 eval_gather(lock, false, pneed_issue);
1836 else if (lock->get_parent()->is_auth())
1837 try_eval(lock, pneed_issue);
1838 }
1839 }
1840
1841
1842 // remote wrlock
1843
1844 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1845 {
1846 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1847
1848 // wait for active target
1849 if (mds->is_cluster_degraded() &&
1850 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1851 dout(7) << " mds." << target << " is not active" << dendl;
1852 if (mut->more()->waiting_on_peer.empty())
1853 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1854 return;
1855 }
1856
1857 // send lock request
1858 mut->start_locking(lock, target);
1859 mut->more()->peers.insert(target);
1860 auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_WRLOCK);
1861 r->set_lock_type(lock->get_type());
1862 lock->get_parent()->set_object_info(r->get_object_info());
1863 mds->send_message_mds(r, target);
1864
1865 ceph_assert(mut->more()->waiting_on_peer.count(target) == 0);
1866 mut->more()->waiting_on_peer.insert(target);
1867 }
1868
1869 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1870 {
1871 ceph_assert(it->is_remote_wrlock());
1872 SimpleLock *lock = it->lock;
1873 mds_rank_t target = it->wrlock_target;
1874
1875 if (it->is_wrlock())
1876 it->clear_remote_wrlock();
1877 else
1878 mut->locks.erase(it);
1879
1880 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1881 << " " << *lock->get_parent() << dendl;
1882 if (!mds->is_cluster_degraded() ||
1883 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1884 auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNWRLOCK);
1885 peerreq->set_lock_type(lock->get_type());
1886 lock->get_parent()->set_object_info(peerreq->get_object_info());
1887 mds->send_message_mds(peerreq, target);
1888 }
1889 }
1890
1891
1892 // ------------------
1893 // xlock
1894
1895 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1896 {
1897 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1898 lock->get_type() == CEPH_LOCK_DVERSION)
1899 return local_xlock_start(static_cast<LocalLockC*>(lock), mut);
1900
1901 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1902 client_t client = mut->get_client();
1903
1904 CInode *in = nullptr;
1905 if (lock->get_cap_shift())
1906 in = static_cast<CInode *>(lock->get_parent());
1907
1908 // auth?
1909 if (lock->get_parent()->is_auth()) {
1910 // auth
1911 while (1) {
1912 if (mut->locking && // started xlock (not preempt other request)
1913 lock->can_xlock(client) &&
1914 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1915 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
1916 lock->set_state(LOCK_XLOCK);
1917 lock->get_xlock(mut, client);
1918 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
1919 mut->finish_locking(lock);
1920 return true;
1921 }
1922
1923 if (lock->get_type() == CEPH_LOCK_IFILE &&
1924 in->state_test(CInode::STATE_RECOVERING)) {
1925 mds->mdcache->recovery_queue.prioritize(in);
1926 }
1927
1928 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1929 lock->get_xlock_by_client() != client ||
1930 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1931 break;
1932
1933 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1934 mut->start_locking(lock);
1935 simple_xlock(lock);
1936 } else {
1937 simple_lock(lock);
1938 }
1939 }
1940
1941 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1942 nudge_log(lock);
1943 return false;
1944 } else {
1945 // replica
1946 ceph_assert(lock->get_sm()->can_remote_xlock);
1947 ceph_assert(!mut->peer_request);
1948
1949 // wait for single auth
1950 if (lock->get_parent()->is_ambiguous_auth()) {
1951 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1952 new C_MDS_RetryRequest(mdcache, mut));
1953 return false;
1954 }
1955
1956 // wait for active auth
1957 mds_rank_t auth = lock->get_parent()->authority().first;
1958 if (mds->is_cluster_degraded() &&
1959 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1960 dout(7) << " mds." << auth << " is not active" << dendl;
1961 if (mut->more()->waiting_on_peer.empty())
1962 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1963 return false;
1964 }
1965
1966 // send lock request
1967 mut->more()->peers.insert(auth);
1968 mut->start_locking(lock, auth);
1969 auto r = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_XLOCK);
1970 r->set_lock_type(lock->get_type());
1971 lock->get_parent()->set_object_info(r->get_object_info());
1972 mds->send_message_mds(r, auth);
1973
1974 ceph_assert(mut->more()->waiting_on_peer.count(auth) == 0);
1975 mut->more()->waiting_on_peer.insert(auth);
1976
1977 return false;
1978 }
1979 }
1980
1981 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1982 {
1983 ceph_assert(!lock->is_stable());
1984 if (lock->get_type() != CEPH_LOCK_DN &&
1985 lock->get_type() != CEPH_LOCK_ISNAP &&
1986 lock->get_type() != CEPH_LOCK_IPOLICY &&
1987 lock->get_num_rdlocks() == 0 &&
1988 lock->get_num_wrlocks() == 0 &&
1989 !lock->is_leased() &&
1990 lock->get_state() != LOCK_XLOCKSNAP) {
1991 CInode *in = static_cast<CInode*>(lock->get_parent());
1992 client_t loner = in->get_target_loner();
1993 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1994 lock->set_state(LOCK_EXCL);
1995 lock->get_parent()->auth_unpin(lock);
1996 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1997 if (lock->get_cap_shift())
1998 *pneed_issue = true;
1999 if (lock->get_parent()->is_auth() &&
2000 lock->is_stable())
2001 try_eval(lock, pneed_issue);
2002 return;
2003 }
2004 }
2005 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
2006 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
2007 }
2008
2009 void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
2010 {
2011 ceph_assert(it->is_xlock());
2012 SimpleLock *lock = it->lock;
2013
2014 if (lock->get_type() == CEPH_LOCK_IVERSION ||
2015 lock->get_type() == CEPH_LOCK_DVERSION)
2016 return local_xlock_finish(it, mut);
2017
2018 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
2019
2020 client_t xlocker = lock->get_xlock_by_client();
2021
2022 // drop ref
2023 lock->put_xlock();
2024 ceph_assert(mut);
2025 mut->locks.erase(it);
2026
2027 bool do_issue = false;
2028
2029 // remote xlock?
2030 if (!lock->get_parent()->is_auth()) {
2031 ceph_assert(lock->get_sm()->can_remote_xlock);
2032
2033 // tell auth
2034 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
2035 mds_rank_t auth = lock->get_parent()->authority().first;
2036 if (!mds->is_cluster_degraded() ||
2037 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
2038 auto peerreq = make_message<MMDSPeerRequest>(mut->reqid, mut->attempt, MMDSPeerRequest::OP_UNXLOCK);
2039 peerreq->set_lock_type(lock->get_type());
2040 lock->get_parent()->set_object_info(peerreq->get_object_info());
2041 mds->send_message_mds(peerreq, auth);
2042 }
2043 // others waiting?
2044 lock->finish_waiters(SimpleLock::WAIT_STABLE |
2045 SimpleLock::WAIT_WR |
2046 SimpleLock::WAIT_RD, 0);
2047 } else {
2048 if (lock->get_num_xlocks() == 0 &&
2049 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
2050 _finish_xlock(lock, xlocker, &do_issue);
2051 }
2052 }
2053
2054 if (do_issue) {
2055 CInode *in = static_cast<CInode*>(lock->get_parent());
2056 if (in->is_head()) {
2057 if (pneed_issue)
2058 *pneed_issue = true;
2059 else
2060 issue_caps(in);
2061 }
2062 }
2063 }
2064
2065 void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
2066 {
2067 ceph_assert(it->is_xlock());
2068 SimpleLock *lock = it->lock;
2069 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
2070
2071 lock->put_xlock();
2072 mut->locks.erase(it);
2073
2074 MDSCacheObject *p = lock->get_parent();
2075 ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
2076
2077 if (!lock->is_stable())
2078 lock->get_parent()->auth_unpin(lock);
2079
2080 lock->set_state(LOCK_LOCK);
2081 }
2082
2083 void Locker::xlock_import(SimpleLock *lock)
2084 {
2085 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
2086 lock->get_parent()->auth_pin(lock);
2087 }
2088
2089 void Locker::xlock_downgrade(SimpleLock *lock, MutationImpl *mut)
2090 {
2091 dout(10) << "xlock_downgrade on " << *lock << " " << *lock->get_parent() << dendl;
2092 auto it = mut->locks.find(lock);
2093 if (it->is_rdlock())
2094 return; // already downgraded
2095
2096 ceph_assert(lock->get_parent()->is_auth());
2097 ceph_assert(it != mut->locks.end());
2098 ceph_assert(it->is_xlock());
2099
2100 lock->set_xlock_done();
2101 lock->get_rdlock();
2102 xlock_finish(it, mut, nullptr);
2103 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
2104 }
2105
2106
2107 // file i/o -----------------------------------------
2108
2109 version_t Locker::issue_file_data_version(CInode *in)
2110 {
2111 dout(7) << "issue_file_data_version on " << *in << dendl;
2112 return in->get_inode()->file_data_version;
2113 }
2114
2115 class C_Locker_FileUpdate_finish : public LockerLogContext {
2116 CInode *in;
2117 MutationRef mut;
2118 unsigned flags;
2119 client_t client;
2120 ref_t<MClientCaps> ack;
2121 public:
2122 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
2123 const ref_t<MClientCaps> &ack, client_t c=-1)
2124 : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
2125 in->get(CInode::PIN_PTRWAITER);
2126 }
2127 void finish(int r) override {
2128 locker->file_update_finish(in, mut, flags, client, ack);
2129 in->put(CInode::PIN_PTRWAITER);
2130 }
2131 };
2132
2133 enum {
2134 UPDATE_SHAREMAX = 1,
2135 UPDATE_NEEDSISSUE = 2,
2136 UPDATE_SNAPFLUSH = 4,
2137 };
2138
2139 void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
2140 client_t client, const ref_t<MClientCaps> &ack)
2141 {
2142 dout(10) << "file_update_finish on " << *in << dendl;
2143
2144 mut->apply();
2145
2146 if (ack) {
2147 Session *session = mds->get_session(client);
2148 if (session && !session->is_closed()) {
2149 // "oldest flush tid" > 0 means client uses unique TID for each flush
2150 if (ack->get_oldest_flush_tid() > 0)
2151 session->add_completed_flush(ack->get_client_tid());
2152 mds->send_message_client_counted(ack, session);
2153 } else {
2154 dout(10) << " no session for client." << client << " " << *ack << dendl;
2155 }
2156 }
2157
2158 set<CInode*> need_issue;
2159 drop_locks(mut.get(), &need_issue);
2160
2161 if (in->is_head()) {
2162 if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
2163 Capability *cap = in->get_client_cap(client);
2164 if (cap && (cap->wanted() & ~cap->pending()))
2165 issue_caps(in, cap);
2166 }
2167
2168 if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
2169 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
2170 share_inode_max_size(in);
2171
2172 } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
2173 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
2174 // check for snap writeback completion
2175 in->client_snap_caps.erase(client);
2176 if (in->client_snap_caps.empty()) {
2177 for (int i = 0; i < num_cinode_locks; i++) {
2178 SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock);
2179 ceph_assert(lock);
2180 lock->put_wrlock();
2181 }
2182 in->item_open_file.remove_myself();
2183 in->item_caps.remove_myself();
2184 eval_cap_gather(in, &need_issue);
2185 }
2186 }
2187 issue_caps_set(need_issue);
2188
2189 mds->balancer->hit_inode(in, META_POP_IWR);
2190
2191 // auth unpin after issuing caps
2192 mut->cleanup();
2193 }
2194
2195 Capability* Locker::issue_new_caps(CInode *in,
2196 int mode,
2197 MDRequestRef& mdr,
2198 SnapRealm *realm)
2199 {
2200 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
2201 Session *session = mdr->session;
2202 bool new_inode = (mdr->alloc_ino || mdr->used_prealloc_ino);
2203
2204 // if replay or async, try to reconnect cap, and otherwise do nothing.
2205 if (new_inode && mdr->client_request->is_queued_for_replay())
2206 return mds->mdcache->try_reconnect_cap(in, session);
2207
2208 // my needs
2209 ceph_assert(session->info.inst.name.is_client());
2210 client_t my_client = session->get_client();
2211 int my_want = ceph_caps_for_mode(mode);
2212
2213 // register a capability
2214 Capability *cap = in->get_client_cap(my_client);
2215 if (!cap) {
2216 // new cap
2217 cap = in->add_client_cap(my_client, session, realm, new_inode);
2218 cap->set_wanted(my_want);
2219 cap->mark_new();
2220 } else {
2221 // make sure it wants sufficient caps
2222 if (my_want & ~cap->wanted()) {
2223 // augment wanted caps for this client
2224 cap->set_wanted(cap->wanted() | my_want);
2225 }
2226 }
2227 cap->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply)
2228
2229 if (in->is_auth()) {
2230 // [auth] twiddle mode?
2231 eval(in, CEPH_CAP_LOCKS);
2232
2233 if (_need_flush_mdlog(in, my_want))
2234 mds->mdlog->flush();
2235
2236 } else {
2237 // [replica] tell auth about any new caps wanted
2238 request_inode_file_caps(in);
2239 }
2240
2241 // issue caps (pot. incl new one)
2242 //issue_caps(in); // note: _eval above may have done this already...
2243
2244 // re-issue whatever we can
2245 //cap->issue(cap->pending());
2246
2247 cap->dec_suppress();
2248
2249 return cap;
2250 }
2251
2252 void Locker::issue_caps_set(set<CInode*>& inset)
2253 {
2254 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
2255 issue_caps(*p);
2256 }
2257
2258 class C_Locker_RevokeStaleCap : public LockerContext {
2259 CInode *in;
2260 client_t client;
2261 public:
2262 C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
2263 LockerContext(l), in(i), client(c) {
2264 in->get(CInode::PIN_PTRWAITER);
2265 }
2266 void finish(int r) override {
2267 locker->revoke_stale_cap(in, client);
2268 in->put(CInode::PIN_PTRWAITER);
2269 }
2270 };
2271
2272 int Locker::issue_caps(CInode *in, Capability *only_cap)
2273 {
2274 // allowed caps are determined by the lock mode.
2275 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
2276 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
2277 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
2278
2279 client_t loner = in->get_loner();
2280 if (loner >= 0) {
2281 dout(7) << "issue_caps loner client." << loner
2282 << " allowed=" << ccap_string(loner_allowed)
2283 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2284 << ", others allowed=" << ccap_string(all_allowed)
2285 << " on " << *in << dendl;
2286 } else {
2287 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
2288 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2289 << " on " << *in << dendl;
2290 }
2291
2292 ceph_assert(in->is_head());
2293
2294 // count conflicts with
2295 int nissued = 0;
2296
2297 // client caps
2298 map<client_t, Capability>::iterator it;
2299 if (only_cap)
2300 it = in->client_caps.find(only_cap->get_client());
2301 else
2302 it = in->client_caps.begin();
2303 for (; it != in->client_caps.end(); ++it) {
2304 Capability *cap = &it->second;
2305
2306 // do not issue _new_ bits when size|mtime is projected
2307 int allowed;
2308 if (loner == it->first)
2309 allowed = loner_allowed;
2310 else
2311 allowed = all_allowed;
2312
2313 // add in any xlocker-only caps (for locks this client is the xlocker for)
2314 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2315 if (in->is_dir()) {
2316 allowed &= ~CEPH_CAP_ANY_DIR_OPS;
2317 if (allowed & CEPH_CAP_FILE_EXCL)
2318 allowed |= cap->get_lock_cache_allowed();
2319 }
2320
2321 if ((in->get_inode()->inline_data.version != CEPH_INLINE_NONE &&
2322 cap->is_noinline()) ||
2323 (!in->get_inode()->layout.pool_ns.empty() &&
2324 cap->is_nopoolns()))
2325 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2326
2327 int pending = cap->pending();
2328 int wanted = cap->wanted();
2329
2330 dout(20) << " client." << it->first
2331 << " pending " << ccap_string(pending)
2332 << " allowed " << ccap_string(allowed)
2333 << " wanted " << ccap_string(wanted)
2334 << dendl;
2335
2336 if (!(pending & ~allowed)) {
2337 // skip if suppress or new, and not revocation
2338 if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
2339 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
2340 continue;
2341 }
2342 } else {
2343 ceph_assert(!cap->is_new());
2344 if (cap->is_stale()) {
2345 dout(20) << " revoke stale cap from client." << it->first << dendl;
2346 ceph_assert(!cap->is_valid());
2347 cap->issue(allowed & pending, false);
2348 mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
2349 continue;
2350 }
2351
2352 if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) {
2353 // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
2354 // mds needs to re-issue caps, then do revocation.
2355 long seq = cap->issue(pending, true);
2356
2357 dout(7) << " sending MClientCaps to client." << it->first
2358 << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
2359
2360 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
2361
2362 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, in->ino(),
2363 in->find_snaprealm()->inode->ino(),
2364 cap->get_cap_id(), cap->get_last_seq(),
2365 pending, wanted, 0, cap->get_mseq(),
2366 mds->get_osd_epoch_barrier());
2367 in->encode_cap_message(m, cap);
2368
2369 mds->send_message_client_counted(m, cap->get_session());
2370 }
2371 }
2372
2373 // notify clients about deleted inode, to make sure they release caps ASAP.
2374 if (in->get_inode()->nlink == 0)
2375 wanted |= CEPH_CAP_LINK_SHARED;
2376
2377 // are there caps that the client _wants_ and can have, but aren't pending?
2378 // or do we need to revoke?
2379 if ((pending & ~allowed) || // need to revoke ~allowed caps.
2380 ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2381 !cap->is_valid()) { // after stale->resume circle
2382 // issue
2383 nissued++;
2384
2385 // include caps that clients generally like, while we're at it.
2386 int likes = in->get_caps_liked();
2387 int before = pending;
2388 long seq;
2389 if (pending & ~allowed)
2390 seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
2391 else
2392 seq = cap->issue((wanted|likes) & allowed, true);
2393 int after = cap->pending();
2394
2395 dout(7) << " sending MClientCaps to client." << it->first
2396 << " seq " << seq << " new pending " << ccap_string(after)
2397 << " was " << ccap_string(before) << dendl;
2398
2399 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2400 if (op == CEPH_CAP_OP_REVOKE) {
2401 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_revoke);
2402 revoking_caps.push_back(&cap->item_revoking_caps);
2403 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2404 cap->set_last_revoke_stamp(ceph_clock_now());
2405 cap->reset_num_revoke_warnings();
2406 } else {
2407 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
2408 }
2409
2410 auto m = make_message<MClientCaps>(op, in->ino(),
2411 in->find_snaprealm()->inode->ino(),
2412 cap->get_cap_id(), cap->get_last_seq(),
2413 after, wanted, 0, cap->get_mseq(),
2414 mds->get_osd_epoch_barrier());
2415 in->encode_cap_message(m, cap);
2416
2417 mds->send_message_client_counted(m, cap->get_session());
2418 }
2419
2420 if (only_cap)
2421 break;
2422 }
2423
2424 return nissued;
2425 }
2426
2427 void Locker::issue_truncate(CInode *in)
2428 {
2429 dout(7) << "issue_truncate on " << *in << dendl;
2430
2431 for (auto &p : in->client_caps) {
2432 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_trunc);
2433 Capability *cap = &p.second;
2434 auto m = make_message<MClientCaps>(CEPH_CAP_OP_TRUNC,
2435 in->ino(),
2436 in->find_snaprealm()->inode->ino(),
2437 cap->get_cap_id(), cap->get_last_seq(),
2438 cap->pending(), cap->wanted(), 0,
2439 cap->get_mseq(),
2440 mds->get_osd_epoch_barrier());
2441 in->encode_cap_message(m, cap);
2442 mds->send_message_client_counted(m, p.first);
2443 }
2444
2445 // should we increase max_size?
2446 if (in->is_auth() && in->is_file())
2447 check_inode_max_size(in);
2448 }
2449
2450
2451 void Locker::revoke_stale_cap(CInode *in, client_t client)
2452 {
2453 dout(7) << __func__ << " client." << client << " on " << *in << dendl;
2454 Capability *cap = in->get_client_cap(client);
2455 if (!cap)
2456 return;
2457
2458 if (cap->revoking() & CEPH_CAP_ANY_WR) {
2459 CachedStackStringStream css;
2460 mds->evict_client(client.v, false, g_conf()->mds_session_blocklist_on_timeout, *css, nullptr);
2461 return;
2462 }
2463
2464 cap->revoke();
2465
2466 if (in->is_auth() && in->get_inode()->client_ranges.count(cap->get_client()))
2467 in->state_set(CInode::STATE_NEEDSRECOVER);
2468
2469 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2470 return;
2471
2472 if (!in->filelock.is_stable())
2473 eval_gather(&in->filelock);
2474 if (!in->linklock.is_stable())
2475 eval_gather(&in->linklock);
2476 if (!in->authlock.is_stable())
2477 eval_gather(&in->authlock);
2478 if (!in->xattrlock.is_stable())
2479 eval_gather(&in->xattrlock);
2480
2481 if (in->is_auth())
2482 try_eval(in, CEPH_CAP_LOCKS);
2483 else
2484 request_inode_file_caps(in);
2485 }
2486
2487 bool Locker::revoke_stale_caps(Session *session)
2488 {
2489 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2490
2491 // invalidate all caps
2492 session->inc_cap_gen();
2493
2494 bool ret = true;
2495 std::vector<CInode*> to_eval;
2496
2497 for (auto p = session->caps.begin(); !p.end(); ) {
2498 Capability *cap = *p;
2499 ++p;
2500 if (!cap->is_notable()) {
2501 // the rest ones are not being revoked and don't have writeable range
2502 // and don't want exclusive caps or want file read/write. They don't
2503 // need recover, they don't affect eval_gather()/try_eval()
2504 break;
2505 }
2506
2507 int revoking = cap->revoking();
2508 if (!revoking)
2509 continue;
2510
2511 if (revoking & CEPH_CAP_ANY_WR) {
2512 ret = false;
2513 break;
2514 }
2515
2516 int issued = cap->issued();
2517 CInode *in = cap->get_inode();
2518 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2519 int revoked = cap->revoke();
2520 if (revoked & CEPH_CAP_ANY_DIR_OPS)
2521 eval_lock_caches(cap);
2522
2523 if (in->is_auth() &&
2524 in->get_inode()->client_ranges.count(cap->get_client()))
2525 in->state_set(CInode::STATE_NEEDSRECOVER);
2526
2527 // eval lock/inode may finish contexts, which may modify other cap's position
2528 // in the session->caps.
2529 to_eval.push_back(in);
2530 }
2531
2532 for (auto in : to_eval) {
2533 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2534 continue;
2535
2536 if (!in->filelock.is_stable())
2537 eval_gather(&in->filelock);
2538 if (!in->linklock.is_stable())
2539 eval_gather(&in->linklock);
2540 if (!in->authlock.is_stable())
2541 eval_gather(&in->authlock);
2542 if (!in->xattrlock.is_stable())
2543 eval_gather(&in->xattrlock);
2544
2545 if (in->is_auth())
2546 try_eval(in, CEPH_CAP_LOCKS);
2547 else
2548 request_inode_file_caps(in);
2549 }
2550
2551 return ret;
2552 }
2553
2554 void Locker::resume_stale_caps(Session *session)
2555 {
2556 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2557
2558 bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
2559 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
2560 Capability *cap = *p;
2561 ++p;
2562 if (lazy && !cap->is_notable())
2563 break; // see revoke_stale_caps()
2564
2565 CInode *in = cap->get_inode();
2566 ceph_assert(in->is_head());
2567 dout(10) << " clearing stale flag on " << *in << dendl;
2568
2569 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2570 // if export succeeds, the cap will be removed. if export fails,
2571 // we need to re-issue the cap if it's not stale.
2572 in->state_set(CInode::STATE_EVALSTALECAPS);
2573 continue;
2574 }
2575
2576 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2577 issue_caps(in, cap);
2578 }
2579 }
2580
2581 void Locker::remove_stale_leases(Session *session)
2582 {
2583 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2584 xlist<ClientLease*>::iterator p = session->leases.begin();
2585 while (!p.end()) {
2586 ClientLease *l = *p;
2587 ++p;
2588 CDentry *parent = static_cast<CDentry*>(l->parent);
2589 dout(15) << " removing lease on " << *parent << dendl;
2590 parent->remove_client_lease(l, this);
2591 }
2592 }
2593
2594
2595 class C_MDL_RequestInodeFileCaps : public LockerContext {
2596 CInode *in;
2597 public:
2598 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2599 in->get(CInode::PIN_PTRWAITER);
2600 }
2601 void finish(int r) override {
2602 if (!in->is_auth())
2603 locker->request_inode_file_caps(in);
2604 in->put(CInode::PIN_PTRWAITER);
2605 }
2606 };
2607
2608 void Locker::request_inode_file_caps(CInode *in)
2609 {
2610 ceph_assert(!in->is_auth());
2611
2612 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2613 if (wanted != in->replica_caps_wanted) {
2614 // wait for single auth
2615 if (in->is_ambiguous_auth()) {
2616 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2617 new C_MDL_RequestInodeFileCaps(this, in));
2618 return;
2619 }
2620
2621 mds_rank_t auth = in->authority().first;
2622 if (mds->is_cluster_degraded() &&
2623 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2624 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2625 return;
2626 }
2627
2628 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2629 << " was " << ccap_string(in->replica_caps_wanted)
2630 << " on " << *in << " to mds." << auth << dendl;
2631
2632 in->replica_caps_wanted = wanted;
2633
2634 if (!mds->is_cluster_degraded() ||
2635 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2636 mds->send_message_mds(make_message<MInodeFileCaps>(in->ino(), in->replica_caps_wanted), auth);
2637 }
2638 }
2639
2640 void Locker::handle_inode_file_caps(const cref_t<MInodeFileCaps> &m)
2641 {
2642 // nobody should be talking to us during recovery.
2643 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2644 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2645 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2646 return;
2647 }
2648 ceph_abort_msg("got unexpected message during recovery");
2649 }
2650
2651 // ok
2652 CInode *in = mdcache->get_inode(m->get_ino());
2653 mds_rank_t from = mds_rank_t(m->get_source().num());
2654
2655 ceph_assert(in);
2656 ceph_assert(in->is_auth());
2657
2658 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2659
2660 if (mds->logger) mds->logger->inc(l_mdss_handle_inode_file_caps);
2661
2662 in->set_mds_caps_wanted(from, m->get_caps());
2663
2664 try_eval(in, CEPH_CAP_LOCKS);
2665 }
2666
2667
2668 class C_MDL_CheckMaxSize : public LockerContext {
2669 CInode *in;
2670 uint64_t new_max_size;
2671 uint64_t newsize;
2672 utime_t mtime;
2673
2674 public:
2675 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2676 uint64_t _newsize, utime_t _mtime) :
2677 LockerContext(l), in(i),
2678 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2679 {
2680 in->get(CInode::PIN_PTRWAITER);
2681 }
2682 void finish(int r) override {
2683 if (in->is_auth())
2684 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2685 in->put(CInode::PIN_PTRWAITER);
2686 }
2687 };
2688
2689 uint64_t Locker::calc_new_max_size(const CInode::inode_const_ptr &pi, uint64_t size)
2690 {
2691 uint64_t new_max = (size + 1) << 1;
2692 uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
2693 if (max_inc > 0) {
2694 max_inc *= pi->layout.object_size;
2695 new_max = std::min(new_max, size + max_inc);
2696 }
2697 return round_up_to(new_max, pi->get_layout_size_increment());
2698 }
2699
2700 bool Locker::check_client_ranges(CInode *in, uint64_t size)
2701 {
2702 const auto& latest = in->get_projected_inode();
2703 uint64_t ms;
2704 if (latest->has_layout()) {
2705 ms = calc_new_max_size(latest, size);
2706 } else {
2707 // Layout-less directories like ~mds0/, have zero size
2708 ms = 0;
2709 }
2710
2711 auto it = latest->client_ranges.begin();
2712 for (auto &p : in->client_caps) {
2713 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2714 if (it == latest->client_ranges.end())
2715 return true;
2716 if (it->first != p.first)
2717 return true;
2718 if (ms > it->second.range.last)
2719 return true;
2720 ++it;
2721 }
2722 }
2723 return it != latest->client_ranges.end();
2724 }
2725
2726 bool Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool *max_increased)
2727 {
2728 const auto& latest = in->get_projected_inode();
2729 uint64_t ms;
2730 if (latest->has_layout()) {
2731 ms = calc_new_max_size(latest, size);
2732 } else {
2733 // Layout-less directories like ~mds0/, have zero size
2734 ms = 0;
2735 }
2736
2737 auto pi = in->_get_projected_inode();
2738 bool updated = false;
2739
2740 // increase ranges as appropriate.
2741 // shrink to 0 if no WR|BUFFER caps issued.
2742 auto it = pi->client_ranges.begin();
2743 for (auto &p : in->client_caps) {
2744 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2745 while (it != pi->client_ranges.end() && it->first < p.first) {
2746 it = pi->client_ranges.erase(it);
2747 updated = true;
2748 }
2749
2750 if (it != pi->client_ranges.end() && it->first == p.first) {
2751 if (ms > it->second.range.last) {
2752 it->second.range.last = ms;
2753 updated = true;
2754 if (max_increased)
2755 *max_increased = true;
2756 }
2757 } else {
2758 it = pi->client_ranges.emplace_hint(it, std::piecewise_construct,
2759 std::forward_as_tuple(p.first),
2760 std::forward_as_tuple());
2761 it->second.range.last = ms;
2762 it->second.follows = in->first - 1;
2763 updated = true;
2764 if (max_increased)
2765 *max_increased = true;
2766 }
2767 p.second.mark_clientwriteable();
2768 ++it;
2769 } else {
2770 p.second.clear_clientwriteable();
2771 }
2772 }
2773 while (it != pi->client_ranges.end()) {
2774 it = pi->client_ranges.erase(it);
2775 updated = true;
2776 }
2777 if (updated) {
2778 if (pi->client_ranges.empty())
2779 in->clear_clientwriteable();
2780 else
2781 in->mark_clientwriteable();
2782 }
2783 return updated;
2784 }
2785
2786 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2787 uint64_t new_max_size, uint64_t new_size,
2788 utime_t new_mtime)
2789 {
2790 ceph_assert(in->is_auth());
2791 ceph_assert(in->is_file());
2792
2793 const auto& latest = in->get_projected_inode();
2794 uint64_t size = latest->size;
2795 bool update_size = new_size > 0;
2796
2797 if (update_size) {
2798 new_size = size = std::max(size, new_size);
2799 new_mtime = std::max(new_mtime, latest->mtime);
2800 if (latest->size == new_size && latest->mtime == new_mtime)
2801 update_size = false;
2802 }
2803
2804 bool new_ranges = check_client_ranges(in, std::max(new_max_size, size));
2805 if (!update_size && !new_ranges) {
2806 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2807 return false;
2808 }
2809
2810 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2811 << " update_size " << update_size
2812 << " on " << *in << dendl;
2813
2814 if (in->is_frozen()) {
2815 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2816 in->add_waiter(CInode::WAIT_UNFREEZE,
2817 new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime));
2818 return false;
2819 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2820 // lock?
2821 if (in->filelock.is_stable()) {
2822 if (in->get_target_loner() >= 0)
2823 file_excl(&in->filelock);
2824 else
2825 simple_lock(&in->filelock);
2826 }
2827 if (!in->filelock.can_wrlock(in->get_loner())) {
2828 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2829 in->filelock.add_waiter(SimpleLock::WAIT_STABLE,
2830 new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime));
2831 return false;
2832 }
2833 }
2834
2835 MutationRef mut(new MutationImpl());
2836 mut->ls = mds->mdlog->get_current_segment();
2837
2838 auto pi = in->project_inode(mut);
2839 pi.inode->version = in->pre_dirty();
2840
2841 bool max_increased = false;
2842 if (new_ranges &&
2843 calc_new_client_ranges(in, std::max(new_max_size, size), &max_increased)) {
2844 dout(10) << "check_inode_max_size client_ranges "
2845 << in->get_previous_projected_inode()->client_ranges
2846 << " -> " << pi.inode->client_ranges << dendl;
2847 }
2848
2849 if (update_size) {
2850 dout(10) << "check_inode_max_size size " << pi.inode->size << " -> " << new_size << dendl;
2851 pi.inode->size = new_size;
2852 pi.inode->rstat.rbytes = new_size;
2853 dout(10) << "check_inode_max_size mtime " << pi.inode->mtime << " -> " << new_mtime << dendl;
2854 pi.inode->mtime = new_mtime;
2855 if (new_mtime > pi.inode->ctime) {
2856 pi.inode->ctime = new_mtime;
2857 if (new_mtime > pi.inode->rstat.rctime)
2858 pi.inode->rstat.rctime = new_mtime;
2859 }
2860 }
2861
2862 // use EOpen if the file is still open; otherwise, use EUpdate.
2863 // this is just an optimization to push open files forward into
2864 // newer log segments.
2865 LogEvent *le;
2866 EMetaBlob *metablob;
2867 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2868 EOpen *eo = new EOpen(mds->mdlog);
2869 eo->add_ino(in->ino());
2870 metablob = &eo->metablob;
2871 le = eo;
2872 } else {
2873 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2874 metablob = &eu->metablob;
2875 le = eu;
2876 }
2877 mds->mdlog->start_entry(le);
2878
2879 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2880 // no cow, here!
2881 CDentry *parent = in->get_projected_parent_dn();
2882 metablob->add_primary_dentry(parent, in, true);
2883 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2884
2885 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
2886 UPDATE_SHAREMAX, ref_t<MClientCaps>()));
2887 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2888 mut->auth_pin(in);
2889
2890 // make max_size _increase_ timely
2891 if (max_increased)
2892 mds->mdlog->flush();
2893
2894 return true;
2895 }
2896
2897
2898 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2899 {
2900 /*
2901 * only share if currently issued a WR cap. if client doesn't have it,
2902 * file_max doesn't matter, and the client will get it if/when they get
2903 * the cap later.
2904 */
2905 dout(10) << "share_inode_max_size on " << *in << dendl;
2906 map<client_t, Capability>::iterator it;
2907 if (only_cap)
2908 it = in->client_caps.find(only_cap->get_client());
2909 else
2910 it = in->client_caps.begin();
2911 for (; it != in->client_caps.end(); ++it) {
2912 const client_t client = it->first;
2913 Capability *cap = &it->second;
2914 if (cap->is_suppress())
2915 continue;
2916 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2917 dout(10) << "share_inode_max_size with client." << client << dendl;
2918 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_grant);
2919 cap->inc_last_seq();
2920 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT,
2921 in->ino(),
2922 in->find_snaprealm()->inode->ino(),
2923 cap->get_cap_id(),
2924 cap->get_last_seq(),
2925 cap->pending(),
2926 cap->wanted(), 0,
2927 cap->get_mseq(),
2928 mds->get_osd_epoch_barrier());
2929 in->encode_cap_message(m, cap);
2930 mds->send_message_client_counted(m, client);
2931 }
2932 if (only_cap)
2933 break;
2934 }
2935 }
2936
2937 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2938 {
2939 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2940 * pending mutations. */
2941 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2942 in->filelock.is_unstable_and_locked()) ||
2943 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2944 in->authlock.is_unstable_and_locked()) ||
2945 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2946 in->linklock.is_unstable_and_locked()) ||
2947 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2948 in->xattrlock.is_unstable_and_locked()))
2949 return true;
2950 return false;
2951 }
2952
2953 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2954 {
2955 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2956 dout(10) << " wanted " << ccap_string(cap->wanted())
2957 << " -> " << ccap_string(wanted) << dendl;
2958 cap->set_wanted(wanted);
2959 } else if (wanted & ~cap->wanted()) {
2960 dout(10) << " wanted " << ccap_string(cap->wanted())
2961 << " -> " << ccap_string(wanted)
2962 << " (added caps even though we had seq mismatch!)" << dendl;
2963 cap->set_wanted(wanted | cap->wanted());
2964 } else {
2965 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2966 << " -> " << ccap_string(wanted)
2967 << " (issue_seq " << issue_seq << " != last_issue "
2968 << cap->get_last_issue() << ")" << dendl;
2969 return;
2970 }
2971
2972 CInode *cur = cap->get_inode();
2973 if (!cur->is_auth()) {
2974 request_inode_file_caps(cur);
2975 return;
2976 }
2977
2978 if (cap->wanted()) {
2979 if (cur->state_test(CInode::STATE_RECOVERING) &&
2980 (cap->wanted() & (CEPH_CAP_FILE_RD |
2981 CEPH_CAP_FILE_WR))) {
2982 mds->mdcache->recovery_queue.prioritize(cur);
2983 }
2984
2985 if (mdcache->open_file_table.should_log_open(cur)) {
2986 ceph_assert(cur->last == CEPH_NOSNAP);
2987 EOpen *le = new EOpen(mds->mdlog);
2988 mds->mdlog->start_entry(le);
2989 le->add_clean_inode(cur);
2990 mds->mdlog->submit_entry(le);
2991 }
2992 }
2993 }
2994
2995 void Locker::snapflush_nudge(CInode *in)
2996 {
2997 ceph_assert(in->last != CEPH_NOSNAP);
2998 if (in->client_snap_caps.empty())
2999 return;
3000
3001 CInode *head = mdcache->get_inode(in->ino());
3002 // head inode gets unpinned when snapflush starts. It might get trimmed
3003 // before snapflush finishes.
3004 if (!head)
3005 return;
3006
3007 ceph_assert(head->is_auth());
3008 if (head->client_need_snapflush.empty())
3009 return;
3010
3011 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
3012 if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
3013 hlock = NULL;
3014 for (int i = 0; i < num_cinode_locks; i++) {
3015 SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
3016 if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
3017 hlock = lock;
3018 break;
3019 }
3020 }
3021 }
3022 if (hlock) {
3023 _rdlock_kick(hlock, true);
3024 } else {
3025 // also, requeue, in case of unstable lock
3026 need_snapflush_inodes.push_back(&in->item_caps);
3027 }
3028 }
3029
3030 void Locker::mark_need_snapflush_inode(CInode *in)
3031 {
3032 ceph_assert(in->last != CEPH_NOSNAP);
3033 if (!in->item_caps.is_on_list()) {
3034 need_snapflush_inodes.push_back(&in->item_caps);
3035 utime_t now = ceph_clock_now();
3036 in->last_dirstat_prop = now;
3037 dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
3038 }
3039 }
3040
3041 bool Locker::is_revoking_any_caps_from(client_t client)
3042 {
3043 auto it = revoking_caps_by_client.find(client);
3044 if (it == revoking_caps_by_client.end())
3045 return false;
3046 return !it->second.empty();
3047 }
3048
3049 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
3050 {
3051 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
3052 for (auto p = head_in->client_need_snapflush.begin();
3053 p != head_in->client_need_snapflush.end() && p->first < last; ) {
3054 snapid_t snapid = p->first;
3055 auto &clients = p->second;
3056 ++p; // be careful, q loop below depends on this
3057
3058 if (clients.count(client)) {
3059 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
3060 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
3061 ceph_assert(sin);
3062 ceph_assert(sin->first <= snapid);
3063 _do_snap_update(sin, snapid, 0, sin->first - 1, client, ref_t<MClientCaps>(), ref_t<MClientCaps>());
3064 head_in->remove_need_snapflush(sin, snapid, client);
3065 }
3066 }
3067 }
3068
3069
3070 bool Locker::should_defer_client_cap_frozen(CInode *in)
3071 {
3072 if (in->is_frozen())
3073 return true;
3074
3075 /*
3076 * This policy needs to be AT LEAST as permissive as allowing a client
3077 * request to go forward, or else a client request can release something,
3078 * the release gets deferred, but the request gets processed and deadlocks
3079 * because when the caps can't get revoked.
3080 *
3081 * No auth_pin implies that there is no unstable lock and @in is not auth
3082 * pinnned by client request. If parent dirfrag is auth pinned by a lock
3083 * cache, later request from lock cache owner may forcibly auth pin the @in.
3084 */
3085 if (in->is_freezing() && in->get_num_auth_pins() == 0) {
3086 CDir* dir = in->get_parent_dir();
3087 if (!dir || !dir->is_auth_pinned_by_lock_cache())
3088 return true;
3089 }
3090 return false;
3091 }
3092
3093 void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
3094 {
3095 client_t client = m->get_source().num();
3096 snapid_t follows = m->get_snap_follows();
3097 auto op = m->get_op();
3098 auto dirty = m->get_dirty();
3099 dout(7) << "handle_client_caps "
3100 << " on " << m->get_ino()
3101 << " tid " << m->get_client_tid() << " follows " << follows
3102 << " op " << ceph_cap_op_name(op)
3103 << " flags 0x" << std::hex << m->flags << std::dec << dendl;
3104
3105 Session *session = mds->get_session(m);
3106 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3107 if (!session) {
3108 dout(5) << " no session, dropping " << *m << dendl;
3109 return;
3110 }
3111 if (session->is_closed() ||
3112 session->is_closing() ||
3113 session->is_killing()) {
3114 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
3115 return;
3116 }
3117 if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
3118 dirty && m->get_client_tid() > 0 &&
3119 !session->have_completed_flush(m->get_client_tid())) {
3120 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
3121 op == CEPH_CAP_OP_FLUSHSNAP);
3122 }
3123 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3124 return;
3125 }
3126
3127 if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps);
3128 if (dirty) {
3129 if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty);
3130 }
3131
3132 if (m->get_client_tid() > 0 && session &&
3133 session->have_completed_flush(m->get_client_tid())) {
3134 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
3135 << " for client." << client << dendl;
3136 ref_t<MClientCaps> ack;
3137 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3138 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
3139 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
3140 } else {
3141 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
3142 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
3143 }
3144 ack->set_snap_follows(follows);
3145 ack->set_client_tid(m->get_client_tid());
3146 mds->send_message_client_counted(ack, m->get_connection());
3147 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3148 return;
3149 } else {
3150 // fall-thru because the message may release some caps
3151 dirty = false;
3152 op = CEPH_CAP_OP_UPDATE;
3153 }
3154 }
3155
3156 // "oldest flush tid" > 0 means client uses unique TID for each flush
3157 if (m->get_oldest_flush_tid() > 0 && session) {
3158 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
3159 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
3160
3161 if (session->get_num_trim_flushes_warnings() > 0 &&
3162 session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
3163 session->reset_num_trim_flushes_warnings();
3164 } else {
3165 if (session->get_num_completed_flushes() >=
3166 (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
3167 session->inc_num_trim_flushes_warnings();
3168 CachedStackStringStream css;
3169 *css << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
3170 << m->get_oldest_flush_tid() << "), "
3171 << session->get_num_completed_flushes()
3172 << " completed flushes recorded in session";
3173 mds->clog->warn() << css->strv();
3174 dout(20) << __func__ << " " << css->strv() << dendl;
3175 }
3176 }
3177 }
3178
3179 CInode *head_in = mdcache->get_inode(m->get_ino());
3180 if (!head_in) {
3181 if (mds->is_clientreplay()) {
3182 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
3183 << ", will try again after replayed client requests" << dendl;
3184 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
3185 return;
3186 }
3187
3188 /*
3189 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
3190 * Sequence of events that cause this are:
3191 * - client sends caps message to mds.a
3192 * - mds finishes subtree migration, send cap export to client
3193 * - mds trim its cache
3194 * - mds receives cap messages from client
3195 */
3196 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
3197 return;
3198 }
3199
3200 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3201 // Pause RADOS operations until we see the required epoch
3202 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3203 }
3204
3205 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3206 // Record the barrier so that we will retransmit it to clients
3207 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3208 }
3209
3210 dout(10) << " head inode " << *head_in << dendl;
3211
3212 Capability *cap = 0;
3213 cap = head_in->get_client_cap(client);
3214 if (!cap) {
3215 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
3216 return;
3217 }
3218 ceph_assert(cap);
3219
3220 // freezing|frozen?
3221 if (should_defer_client_cap_frozen(head_in)) {
3222 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
3223 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
3224 return;
3225 }
3226 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
3227 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
3228 << ", dropping" << dendl;
3229 return;
3230 }
3231
3232 bool need_unpin = false;
3233
3234 // flushsnap?
3235 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3236 if (!head_in->is_auth()) {
3237 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
3238 goto out;
3239 }
3240
3241 SnapRealm *realm = head_in->find_snaprealm();
3242 snapid_t snap = realm->get_snap_following(follows);
3243 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
3244
3245 auto p = head_in->client_need_snapflush.begin();
3246 if (p != head_in->client_need_snapflush.end() && p->first < snap) {
3247 head_in->auth_pin(this); // prevent subtree frozen
3248 need_unpin = true;
3249 _do_null_snapflush(head_in, client, snap);
3250 }
3251
3252 CInode *in = head_in;
3253 if (snap != CEPH_NOSNAP) {
3254 in = mdcache->pick_inode_snap(head_in, snap - 1);
3255 if (in != head_in)
3256 dout(10) << " snapped inode " << *in << dendl;
3257 }
3258
3259 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
3260 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
3261 // case we get a dup response, so whatever.)
3262 ref_t<MClientCaps> ack;
3263 if (dirty) {
3264 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
3265 ack->set_snap_follows(follows);
3266 ack->set_client_tid(m->get_client_tid());
3267 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3268 }
3269
3270 if (in == head_in ||
3271 (head_in->client_need_snapflush.count(snap) &&
3272 head_in->client_need_snapflush[snap].count(client))) {
3273 dout(7) << " flushsnap snap " << snap
3274 << " client." << client << " on " << *in << dendl;
3275
3276 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
3277 if (in == head_in)
3278 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
3279
3280 _do_snap_update(in, snap, dirty, follows, client, m, ack);
3281
3282 if (in != head_in)
3283 head_in->remove_need_snapflush(in, snap, client);
3284 } else {
3285 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
3286 if (ack) {
3287 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
3288 mds->send_message_client_counted(ack, m->get_connection());
3289 }
3290 }
3291 goto out;
3292 }
3293
3294 if (cap->get_cap_id() != m->get_cap_id()) {
3295 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
3296 } else {
3297 CInode *in = head_in;
3298 if (follows > 0) {
3299 in = mdcache->pick_inode_snap(head_in, follows);
3300 // intermediate snap inodes
3301 while (in != head_in) {
3302 ceph_assert(in->last != CEPH_NOSNAP);
3303 if (in->is_auth() && dirty) {
3304 dout(10) << " updating intermediate snapped inode " << *in << dendl;
3305 _do_cap_update(in, NULL, dirty, follows, m, ref_t<MClientCaps>());
3306 }
3307 in = mdcache->pick_inode_snap(head_in, in->last);
3308 }
3309 }
3310
3311 // head inode, and cap
3312 ref_t<MClientCaps> ack;
3313
3314 int caps = m->get_caps();
3315 if (caps & ~cap->issued()) {
3316 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3317 caps &= cap->issued();
3318 }
3319
3320 int revoked = cap->confirm_receipt(m->get_seq(), caps);
3321 dout(10) << " follows " << follows
3322 << " retains " << ccap_string(m->get_caps())
3323 << " dirty " << ccap_string(dirty)
3324 << " on " << *in << dendl;
3325
3326 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3327 eval_lock_caches(cap);
3328
3329 // missing/skipped snapflush?
3330 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
3331 // presently only does so when it has actual dirty metadata. But, we
3332 // set up the need_snapflush stuff based on the issued caps.
3333 // We can infer that the client WONT send a FLUSHSNAP once they have
3334 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
3335 // update/release).
3336 if (!head_in->client_need_snapflush.empty()) {
3337 if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
3338 !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
3339 head_in->auth_pin(this); // prevent subtree frozen
3340 need_unpin = true;
3341 _do_null_snapflush(head_in, client);
3342 } else {
3343 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
3344 }
3345 }
3346 if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
3347 cap->clear_needsnapflush();
3348
3349 if (dirty && in->is_auth()) {
3350 dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
3351 << " seq " << m->get_seq() << " on " << *in << dendl;
3352 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
3353 m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
3354 ack->set_client_tid(m->get_client_tid());
3355 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3356 }
3357
3358 // filter wanted based on what we could ever give out (given auth/replica status)
3359 bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
3360 int new_wanted = m->get_wanted();
3361 if (new_wanted != cap->wanted()) {
3362 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
3363 // exapnding caps. make sure we aren't waiting for a log flush
3364 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
3365 }
3366
3367 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
3368 }
3369
3370 if (in->is_auth() &&
3371 _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) {
3372 // updated
3373 eval(in, CEPH_CAP_LOCKS);
3374
3375 if (!need_flush && (cap->wanted() & ~cap->pending()))
3376 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
3377 } else {
3378 // no update, ack now.
3379 if (ack) {
3380 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
3381 mds->send_message_client_counted(ack, m->get_connection());
3382 }
3383
3384 bool did_issue = eval(in, CEPH_CAP_LOCKS);
3385 if (!did_issue && (cap->wanted() & ~cap->pending()))
3386 issue_caps(in, cap);
3387
3388 if (cap->get_last_seq() == 0 &&
3389 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
3390 share_inode_max_size(in, cap);
3391 }
3392 }
3393
3394 if (need_flush)
3395 mds->mdlog->flush();
3396 }
3397
3398 out:
3399 if (need_unpin)
3400 head_in->auth_unpin(this);
3401 }
3402
3403
3404 class C_Locker_RetryRequestCapRelease : public LockerContext {
3405 client_t client;
3406 ceph_mds_request_release item;
3407 public:
3408 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
3409 LockerContext(l), client(c), item(it) { }
3410 void finish(int r) override {
3411 string dname;
3412 MDRequestRef null_ref;
3413 locker->process_request_cap_release(null_ref, client, item, dname);
3414 }
3415 };
3416
3417 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
3418 std::string_view dname)
3419 {
3420 inodeno_t ino = (uint64_t)item.ino;
3421 uint64_t cap_id = item.cap_id;
3422 int caps = item.caps;
3423 int wanted = item.wanted;
3424 int seq = item.seq;
3425 int issue_seq = item.issue_seq;
3426 int mseq = item.mseq;
3427
3428 CInode *in = mdcache->get_inode(ino);
3429 if (!in)
3430 return;
3431
3432 if (dname.length()) {
3433 frag_t fg = in->pick_dirfrag(dname);
3434 CDir *dir = in->get_dirfrag(fg);
3435 if (dir) {
3436 CDentry *dn = dir->lookup(dname);
3437 if (dn) {
3438 ClientLease *l = dn->get_client_lease(client);
3439 if (l) {
3440 dout(10) << __func__ << " removing lease on " << *dn << dendl;
3441 dn->remove_client_lease(l, this);
3442 } else {
3443 dout(7) << __func__ << " client." << client
3444 << " doesn't have lease on " << *dn << dendl;
3445 }
3446 } else {
3447 dout(7) << __func__ << " client." << client << " released lease on dn "
3448 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3449 }
3450 }
3451 }
3452
3453 Capability *cap = in->get_client_cap(client);
3454 if (!cap)
3455 return;
3456
3457 dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in
3458 << (mdr ? "" : " (DEFERRED, no mdr)")
3459 << dendl;
3460
3461 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3462 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3463 return;
3464 }
3465
3466 if (cap->get_cap_id() != cap_id) {
3467 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3468 return;
3469 }
3470
3471 if (should_defer_client_cap_frozen(in)) {
3472 dout(7) << " frozen, deferring" << dendl;
3473 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3474 return;
3475 }
3476
3477 if (mds->logger) mds->logger->inc(l_mdss_process_request_cap_release);
3478
3479 if (caps & ~cap->issued()) {
3480 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3481 caps &= cap->issued();
3482 }
3483 int revoked = cap->confirm_receipt(seq, caps);
3484 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3485 eval_lock_caches(cap);
3486
3487 if (!in->client_need_snapflush.empty() &&
3488 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3489 _do_null_snapflush(in, client);
3490 }
3491
3492 adjust_cap_wanted(cap, wanted, issue_seq);
3493
3494 if (mdr)
3495 cap->inc_suppress();
3496 eval(in, CEPH_CAP_LOCKS);
3497 if (mdr)
3498 cap->dec_suppress();
3499
3500 // take note; we may need to reissue on this cap later
3501 if (mdr)
3502 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3503 }
3504
3505 class C_Locker_RetryKickIssueCaps : public LockerContext {
3506 CInode *in;
3507 client_t client;
3508 ceph_seq_t seq;
3509 public:
3510 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3511 LockerContext(l), in(i), client(c), seq(s) {
3512 in->get(CInode::PIN_PTRWAITER);
3513 }
3514 void finish(int r) override {
3515 locker->kick_issue_caps(in, client, seq);
3516 in->put(CInode::PIN_PTRWAITER);
3517 }
3518 };
3519
3520 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3521 {
3522 Capability *cap = in->get_client_cap(client);
3523 if (!cap || cap->get_last_seq() != seq)
3524 return;
3525 if (in->is_frozen()) {
3526 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3527 in->add_waiter(CInode::WAIT_UNFREEZE,
3528 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3529 return;
3530 }
3531 dout(10) << "kick_issue_caps released at current seq " << seq
3532 << ", reissuing" << dendl;
3533 issue_caps(in, cap);
3534 }
3535
3536 void Locker::kick_cap_releases(MDRequestRef& mdr)
3537 {
3538 client_t client = mdr->get_client();
3539 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3540 p != mdr->cap_releases.end();
3541 ++p) {
3542 CInode *in = mdcache->get_inode(p->first);
3543 if (!in)
3544 continue;
3545 kick_issue_caps(in, client, p->second);
3546 }
3547 }
3548
3549 /**
3550 * m and ack might be NULL, so don't dereference them unless dirty != 0
3551 */
3552 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack)
3553 {
3554 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3555 << " follows " << follows << " snap " << snap
3556 << " on " << *in << dendl;
3557
3558 if (snap == CEPH_NOSNAP) {
3559 // hmm, i guess snap was already deleted? just ack!
3560 dout(10) << " wow, the snap following " << follows
3561 << " was already deleted. nothing to record, just ack." << dendl;
3562 if (ack) {
3563 if (ack->get_op() == CEPH_CAP_OP_FLUSHSNAP_ACK) {
3564 if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
3565 }
3566 mds->send_message_client_counted(ack, m->get_connection());
3567 }
3568 return;
3569 }
3570
3571 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3572 mds->mdlog->start_entry(le);
3573 MutationRef mut = new MutationImpl();
3574 mut->ls = mds->mdlog->get_current_segment();
3575
3576 // normal metadata updates that we can apply to the head as well.
3577
3578 // update xattrs?
3579 CInode::mempool_xattr_map *px = nullptr;
3580 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3581 m->xattrbl.length() &&
3582 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3583
3584 CInode::mempool_old_inode *oi = nullptr;
3585 CInode::old_inode_map_ptr _old_inodes;
3586 if (in->is_any_old_inodes()) {
3587 auto last = in->pick_old_inode(snap);
3588 if (last) {
3589 _old_inodes = CInode::allocate_old_inode_map(*in->get_old_inodes());
3590 oi = &_old_inodes->at(last);
3591 if (snap > oi->first) {
3592 (*_old_inodes)[snap - 1] = *oi;;
3593 oi->first = snap;
3594 }
3595 }
3596 }
3597
3598 CInode::mempool_inode *i;
3599 if (oi) {
3600 dout(10) << " writing into old inode" << dendl;
3601 auto pi = in->project_inode(mut);
3602 pi.inode->version = in->pre_dirty();
3603 i = &oi->inode;
3604 if (xattrs)
3605 px = &oi->xattrs;
3606 } else {
3607 auto pi = in->project_inode(mut, xattrs);
3608 pi.inode->version = in->pre_dirty();
3609 i = pi.inode.get();
3610 if (xattrs)
3611 px = pi.xattrs.get();
3612 }
3613
3614 _update_cap_fields(in, dirty, m, i);
3615
3616 // xattr
3617 if (xattrs) {
3618 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3619 << " len " << m->xattrbl.length() << dendl;
3620 i->xattr_version = m->head.xattr_version;
3621 auto p = m->xattrbl.cbegin();
3622 decode(*px, p);
3623 }
3624
3625 {
3626 auto it = i->client_ranges.find(client);
3627 if (it != i->client_ranges.end()) {
3628 if (in->last == snap) {
3629 dout(10) << " removing client_range entirely" << dendl;
3630 i->client_ranges.erase(it);
3631 } else {
3632 dout(10) << " client_range now follows " << snap << dendl;
3633 it->second.follows = snap;
3634 }
3635 }
3636 }
3637
3638 if (_old_inodes)
3639 in->reset_old_inodes(std::move(_old_inodes));
3640
3641 mut->auth_pin(in);
3642 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3643 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3644
3645 // "oldest flush tid" > 0 means client uses unique TID for each flush
3646 if (ack && ack->get_oldest_flush_tid() > 0)
3647 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3648 ack->get_oldest_flush_tid());
3649
3650 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
3651 ack, client));
3652 }
3653
3654 void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps> &m, CInode::mempool_inode *pi)
3655 {
3656 if (dirty == 0)
3657 return;
3658
3659 /* m must be valid if there are dirty caps */
3660 ceph_assert(m);
3661 uint64_t features = m->get_connection()->get_features();
3662
3663 if (m->get_ctime() > pi->ctime) {
3664 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3665 << " for " << *in << dendl;
3666 pi->ctime = m->get_ctime();
3667 if (m->get_ctime() > pi->rstat.rctime)
3668 pi->rstat.rctime = m->get_ctime();
3669 }
3670
3671 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3672 m->get_change_attr() > pi->change_attr) {
3673 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3674 << " for " << *in << dendl;
3675 pi->change_attr = m->get_change_attr();
3676 }
3677
3678 // file
3679 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3680 utime_t atime = m->get_atime();
3681 utime_t mtime = m->get_mtime();
3682 uint64_t size = m->get_size();
3683 version_t inline_version = m->inline_version;
3684
3685 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3686 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3687 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3688 << " for " << *in << dendl;
3689 pi->mtime = mtime;
3690 if (mtime > pi->rstat.rctime)
3691 pi->rstat.rctime = mtime;
3692 }
3693 if (in->is_file() && // ONLY if regular file
3694 size > pi->size) {
3695 dout(7) << " size " << pi->size << " -> " << size
3696 << " for " << *in << dendl;
3697 pi->size = size;
3698 pi->rstat.rbytes = size;
3699 }
3700 if (in->is_file() &&
3701 (dirty & CEPH_CAP_FILE_WR) &&
3702 inline_version > pi->inline_data.version) {
3703 pi->inline_data.version = inline_version;
3704 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3705 pi->inline_data.set_data(m->inline_data);
3706 else
3707 pi->inline_data.free_data();
3708 }
3709 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3710 dout(7) << " atime " << pi->atime << " -> " << atime
3711 << " for " << *in << dendl;
3712 pi->atime = atime;
3713 }
3714 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3715 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3716 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3717 << " for " << *in << dendl;
3718 pi->time_warp_seq = m->get_time_warp_seq();
3719 }
3720 }
3721 // auth
3722 if (dirty & CEPH_CAP_AUTH_EXCL) {
3723 if (m->head.uid != pi->uid) {
3724 dout(7) << " uid " << pi->uid
3725 << " -> " << m->head.uid
3726 << " for " << *in << dendl;
3727 pi->uid = m->head.uid;
3728 }
3729 if (m->head.gid != pi->gid) {
3730 dout(7) << " gid " << pi->gid
3731 << " -> " << m->head.gid
3732 << " for " << *in << dendl;
3733 pi->gid = m->head.gid;
3734 }
3735 if (m->head.mode != pi->mode) {
3736 dout(7) << " mode " << oct << pi->mode
3737 << " -> " << m->head.mode << dec
3738 << " for " << *in << dendl;
3739 pi->mode = m->head.mode;
3740 }
3741 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3742 dout(7) << " btime " << oct << pi->btime
3743 << " -> " << m->get_btime() << dec
3744 << " for " << *in << dendl;
3745 pi->btime = m->get_btime();
3746 }
3747 }
3748 }
3749
3750 /*
3751 * update inode based on cap flush|flushsnap|wanted.
3752 * adjust max_size, if needed.
3753 * if we update, return true; otherwise, false (no updated needed).
3754 */
3755 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3756 int dirty, snapid_t follows,
3757 const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack,
3758 bool *need_flush)
3759 {
3760 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3761 << " issued " << ccap_string(cap ? cap->issued() : 0)
3762 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3763 << " on " << *in << dendl;
3764 ceph_assert(in->is_auth());
3765 client_t client = m->get_source().num();
3766 const auto& latest = in->get_projected_inode();
3767
3768 // increase or zero max_size?
3769 uint64_t size = m->get_size();
3770 bool change_max = false;
3771 uint64_t old_max = latest->get_client_range(client);
3772 uint64_t new_max = old_max;
3773
3774 if (in->is_file()) {
3775 bool forced_change_max = false;
3776 dout(20) << "inode is file" << dendl;
3777 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3778 dout(20) << "client has write caps; m->get_max_size="
3779 << m->get_max_size() << "; old_max=" << old_max << dendl;
3780 if (m->get_max_size() > new_max) {
3781 dout(10) << "client requests file_max " << m->get_max_size()
3782 << " > max " << old_max << dendl;
3783 change_max = true;
3784 forced_change_max = true;
3785 new_max = calc_new_max_size(latest, m->get_max_size());
3786 } else {
3787 new_max = calc_new_max_size(latest, size);
3788
3789 if (new_max > old_max)
3790 change_max = true;
3791 else
3792 new_max = old_max;
3793 }
3794 } else {
3795 if (old_max) {
3796 change_max = true;
3797 new_max = 0;
3798 }
3799 }
3800
3801 if (in->last == CEPH_NOSNAP &&
3802 change_max &&
3803 !in->filelock.can_wrlock(client) &&
3804 !in->filelock.can_force_wrlock(client)) {
3805 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3806 if (in->filelock.is_stable()) {
3807 bool need_issue = false;
3808 if (cap)
3809 cap->inc_suppress();
3810 if (in->get_mds_caps_wanted().empty() &&
3811 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3812 if (in->filelock.get_state() != LOCK_EXCL)
3813 file_excl(&in->filelock, &need_issue);
3814 } else
3815 simple_lock(&in->filelock, &need_issue);
3816 if (need_issue)
3817 issue_caps(in);
3818 if (cap)
3819 cap->dec_suppress();
3820 }
3821 if (!in->filelock.can_wrlock(client) &&
3822 !in->filelock.can_force_wrlock(client)) {
3823 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3824 forced_change_max ? new_max : 0,
3825 0, utime_t());
3826
3827 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3828 change_max = false;
3829 }
3830 }
3831 }
3832
3833 if (m->flockbl.length()) {
3834 int32_t num_locks;
3835 auto bli = m->flockbl.cbegin();
3836 decode(num_locks, bli);
3837 for ( int i=0; i < num_locks; ++i) {
3838 ceph_filelock decoded_lock;
3839 decode(decoded_lock, bli);
3840 in->get_fcntl_lock_state()->held_locks.
3841 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3842 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3843 }
3844 decode(num_locks, bli);
3845 for ( int i=0; i < num_locks; ++i) {
3846 ceph_filelock decoded_lock;
3847 decode(decoded_lock, bli);
3848 in->get_flock_lock_state()->held_locks.
3849 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3850 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3851 }
3852 }
3853
3854 if (!dirty && !change_max)
3855 return false;
3856
3857 Session *session = mds->get_session(m);
3858 if (session->check_access(in, MAY_WRITE,
3859 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3860 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3861 return false;
3862 }
3863
3864 // do the update.
3865 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3866 mds->mdlog->start_entry(le);
3867
3868 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3869 m->xattrbl.length() &&
3870 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3871
3872 MutationRef mut(new MutationImpl());
3873 mut->ls = mds->mdlog->get_current_segment();
3874
3875 auto pi = in->project_inode(mut, xattr);
3876 pi.inode->version = in->pre_dirty();
3877
3878 _update_cap_fields(in, dirty, m, pi.inode.get());
3879
3880 if (change_max) {
3881 dout(7) << " max_size " << old_max << " -> " << new_max
3882 << " for " << *in << dendl;
3883 if (new_max) {
3884 auto &cr = pi.inode->client_ranges[client];
3885 cr.range.first = 0;
3886 cr.range.last = new_max;
3887 cr.follows = in->first - 1;
3888 in->mark_clientwriteable();
3889 if (cap)
3890 cap->mark_clientwriteable();
3891 } else {
3892 pi.inode->client_ranges.erase(client);
3893 if (pi.inode->client_ranges.empty())
3894 in->clear_clientwriteable();
3895 if (cap)
3896 cap->clear_clientwriteable();
3897 }
3898 }
3899
3900 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3901 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3902
3903 // auth
3904 if (dirty & CEPH_CAP_AUTH_EXCL)
3905 wrlock_force(&in->authlock, mut);
3906
3907 // xattrs update?
3908 if (xattr) {
3909 dout(7) << " xattrs v" << pi.inode->xattr_version << " -> " << m->head.xattr_version << dendl;
3910 pi.inode->xattr_version = m->head.xattr_version;
3911 auto p = m->xattrbl.cbegin();
3912 decode_noshare(*pi.xattrs, p);
3913 wrlock_force(&in->xattrlock, mut);
3914 }
3915
3916 mut->auth_pin(in);
3917 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3918 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3919
3920 // "oldest flush tid" > 0 means client uses unique TID for each flush
3921 if (ack && ack->get_oldest_flush_tid() > 0)
3922 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3923 ack->get_oldest_flush_tid());
3924
3925 unsigned update_flags = 0;
3926 if (change_max)
3927 update_flags |= UPDATE_SHAREMAX;
3928 if (cap)
3929 update_flags |= UPDATE_NEEDSISSUE;
3930 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
3931 ack, client));
3932 if (need_flush && !*need_flush &&
3933 ((change_max && new_max) || // max INCREASE
3934 _need_flush_mdlog(in, dirty)))
3935 *need_flush = true;
3936
3937 return true;
3938 }
3939
3940 void Locker::handle_client_cap_release(const cref_t<MClientCapRelease> &m)
3941 {
3942 client_t client = m->get_source().num();
3943 dout(10) << "handle_client_cap_release " << *m << dendl;
3944
3945 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3946 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3947 return;
3948 }
3949
3950 if (mds->logger) mds->logger->inc(l_mdss_handle_client_cap_release);
3951
3952 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3953 // Pause RADOS operations until we see the required epoch
3954 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3955 }
3956
3957 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3958 // Record the barrier so that we will retransmit it to clients
3959 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3960 }
3961
3962 Session *session = mds->get_session(m);
3963
3964 for (const auto &cap : m->caps) {
3965 _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
3966 }
3967
3968 if (session) {
3969 session->notify_cap_release(m->caps.size());
3970 }
3971 }
3972
3973 class C_Locker_RetryCapRelease : public LockerContext {
3974 client_t client;
3975 inodeno_t ino;
3976 uint64_t cap_id;
3977 ceph_seq_t migrate_seq;
3978 ceph_seq_t issue_seq;
3979 public:
3980 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3981 ceph_seq_t mseq, ceph_seq_t seq) :
3982 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3983 void finish(int r) override {
3984 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3985 }
3986 };
3987
3988 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3989 ceph_seq_t mseq, ceph_seq_t seq)
3990 {
3991 CInode *in = mdcache->get_inode(ino);
3992 if (!in) {
3993 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3994 return;
3995 }
3996 Capability *cap = in->get_client_cap(client);
3997 if (!cap) {
3998 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3999 return;
4000 }
4001
4002 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
4003 if (cap->get_cap_id() != cap_id) {
4004 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
4005 return;
4006 }
4007 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
4008 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
4009 return;
4010 }
4011 if (should_defer_client_cap_frozen(in)) {
4012 dout(7) << " freezing|frozen, deferring" << dendl;
4013 in->add_waiter(CInode::WAIT_UNFREEZE,
4014 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
4015 return;
4016 }
4017 if (seq != cap->get_last_issue()) {
4018 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
4019 // clean out any old revoke history
4020 cap->clean_revoke_from(seq);
4021 eval_cap_gather(in);
4022 return;
4023 }
4024 remove_client_cap(in, cap);
4025 }
4026
4027 void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
4028 {
4029 client_t client = cap->get_client();
4030 // clean out any pending snapflush state
4031 if (!in->client_need_snapflush.empty())
4032 _do_null_snapflush(in, client);
4033
4034 while (!cap->lock_caches.empty()) {
4035 MDLockCache* lock_cache = cap->lock_caches.front();
4036 lock_cache->client_cap = nullptr;
4037 invalidate_lock_cache(lock_cache);
4038 }
4039
4040 bool notable = cap->is_notable();
4041 in->remove_client_cap(client);
4042 if (!notable)
4043 return;
4044
4045 if (in->is_auth()) {
4046 // make sure we clear out the client byte range
4047 if (in->get_projected_inode()->client_ranges.count(client) &&
4048 !(in->get_inode()->nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
4049 if (kill)
4050 in->state_set(CInode::STATE_NEEDSRECOVER);
4051 else
4052 check_inode_max_size(in);
4053 }
4054 } else {
4055 request_inode_file_caps(in);
4056 }
4057
4058 try_eval(in, CEPH_CAP_LOCKS);
4059 }
4060
4061
4062 /**
4063 * Return true if any currently revoking caps exceed the
4064 * session_timeout threshold.
4065 */
4066 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
4067 double timeout) const
4068 {
4069 xlist<Capability*>::const_iterator p = revoking.begin();
4070 if (p.end()) {
4071 // No revoking caps at the moment
4072 return false;
4073 } else {
4074 utime_t now = ceph_clock_now();
4075 utime_t age = now - (*p)->get_last_revoke_stamp();
4076 if (age <= timeout) {
4077 return false;
4078 } else {
4079 return true;
4080 }
4081 }
4082 }
4083
4084 std::set<client_t> Locker::get_late_revoking_clients(double timeout) const
4085 {
4086 std::set<client_t> result;
4087
4088 if (any_late_revoking_caps(revoking_caps, timeout)) {
4089 // Slow path: execute in O(N_clients)
4090 for (auto &p : revoking_caps_by_client) {
4091 if (any_late_revoking_caps(p.second, timeout)) {
4092 result.insert(p.first);
4093 }
4094 }
4095 } else {
4096 // Fast path: no misbehaving clients, execute in O(1)
4097 }
4098 return result;
4099 }
4100
4101 // Hard-code instead of surfacing a config settings because this is
4102 // really a hack that should go away at some point when we have better
4103 // inspection tools for getting at detailed cap state (#7316)
4104 #define MAX_WARN_CAPS 100
4105
4106 void Locker::caps_tick()
4107 {
4108 utime_t now = ceph_clock_now();
4109
4110 if (!need_snapflush_inodes.empty()) {
4111 // snap inodes that needs flush are auth pinned, they affect
4112 // subtree/difrarg freeze.
4113 utime_t cutoff = now;
4114 cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
4115
4116 CInode *last = need_snapflush_inodes.back();
4117 while (!need_snapflush_inodes.empty()) {
4118 CInode *in = need_snapflush_inodes.front();
4119 if (in->last_dirstat_prop >= cutoff)
4120 break;
4121 in->item_caps.remove_myself();
4122 snapflush_nudge(in);
4123 if (in == last)
4124 break;
4125 }
4126 }
4127
4128 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
4129
4130 now = ceph_clock_now();
4131 int n = 0;
4132 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
4133 Capability *cap = *p;
4134
4135 utime_t age = now - cap->get_last_revoke_stamp();
4136 dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
4137 if (age <= mds->mdsmap->get_session_timeout()) {
4138 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
4139 break;
4140 } else {
4141 ++n;
4142 if (n > MAX_WARN_CAPS) {
4143 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
4144 << "revoking, ignoring subsequent caps" << dendl;
4145 break;
4146 }
4147 }
4148 // exponential backoff of warning intervals
4149 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
4150 cap->inc_num_revoke_warnings();
4151 CachedStackStringStream css;
4152 *css << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
4153 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
4154 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
4155 mds->clog->warn() << css->strv();
4156 dout(20) << __func__ << " " << css->strv() << dendl;
4157 } else {
4158 dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
4159 }
4160 }
4161 }
4162
4163
4164 void Locker::handle_client_lease(const cref_t<MClientLease> &m)
4165 {
4166 dout(10) << "handle_client_lease " << *m << dendl;
4167
4168 ceph_assert(m->get_source().is_client());
4169 client_t client = m->get_source().num();
4170
4171 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
4172 if (!in) {
4173 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
4174 return;
4175 }
4176 CDentry *dn = 0;
4177
4178 frag_t fg = in->pick_dirfrag(m->dname);
4179 CDir *dir = in->get_dirfrag(fg);
4180 if (dir)
4181 dn = dir->lookup(m->dname);
4182 if (!dn) {
4183 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
4184 return;
4185 }
4186 dout(10) << " on " << *dn << dendl;
4187
4188 // replica and lock
4189 ClientLease *l = dn->get_client_lease(client);
4190 if (!l) {
4191 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
4192 return;
4193 }
4194
4195 switch (m->get_action()) {
4196 case CEPH_MDS_LEASE_REVOKE_ACK:
4197 case CEPH_MDS_LEASE_RELEASE:
4198 if (l->seq != m->get_seq()) {
4199 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
4200 } else {
4201 dout(7) << "handle_client_lease client." << client
4202 << " on " << *dn << dendl;
4203 dn->remove_client_lease(l, this);
4204 }
4205 break;
4206
4207 case CEPH_MDS_LEASE_RENEW:
4208 {
4209 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
4210 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
4211 if (dn->lock.can_lease(client)) {
4212 auto reply = make_message<MClientLease>(*m);
4213 int pool = 1; // fixme.. do something smart!
4214 reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
4215 reply->h.seq = ++l->seq;
4216 reply->clear_payload();
4217
4218 utime_t now = ceph_clock_now();
4219 now += mdcache->client_lease_durations[pool];
4220 mdcache->touch_client_lease(l, pool, now);
4221
4222 mds->send_message_client_counted(reply, m->get_connection());
4223 }
4224 }
4225 break;
4226
4227 default:
4228 ceph_abort(); // implement me
4229 break;
4230 }
4231 }
4232
4233
4234 void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
4235 utime_t now, bufferlist &bl)
4236 {
4237 client_t client = mdr->get_client();
4238 Session *session = mdr->session;
4239
4240 CInode *diri = dn->get_dir()->get_inode();
4241 if (mdr->snapid == CEPH_NOSNAP &&
4242 dn->lock.can_lease(client) &&
4243 !diri->is_stray() && // do not issue dn leases in stray dir!
4244 !diri->filelock.can_lease(client) &&
4245 !(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) {
4246 // issue a dentry lease
4247 ClientLease *l = dn->add_client_lease(client, session);
4248 session->touch_lease(l);
4249
4250 int pool = 1; // fixme.. do something smart!
4251 now += mdcache->client_lease_durations[pool];
4252 mdcache->touch_client_lease(l, pool, now);
4253
4254 LeaseStat lstat;
4255 lstat.mask = CEPH_LEASE_VALID | mask;
4256 lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
4257 lstat.seq = ++l->seq;
4258 lstat.alternate_name = std::string(dn->alternate_name);
4259 encode_lease(bl, session->info, lstat);
4260 dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
4261 << " on " << *dn << dendl;
4262 } else {
4263 // null lease
4264 LeaseStat lstat;
4265 lstat.mask = mask;
4266 lstat.alternate_name = std::string(dn->alternate_name);
4267 encode_lease(bl, session->info, lstat);
4268 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
4269 }
4270 }
4271
4272
4273 void Locker::revoke_client_leases(SimpleLock *lock)
4274 {
4275 int n = 0;
4276 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4277 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
4278 p != dn->client_lease_map.end();
4279 ++p) {
4280 ClientLease *l = p->second;
4281
4282 n++;
4283 ceph_assert(lock->get_type() == CEPH_LOCK_DN);
4284
4285 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4286 int mask = 1 | CEPH_LOCK_DN; // old and new bits
4287
4288 // i should also revoke the dir ICONTENT lease, if they have it!
4289 CInode *diri = dn->get_dir()->get_inode();
4290 auto lease = make_message<MClientLease>(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
4291 mds->send_message_client_counted(lease, l->client);
4292 }
4293 }
4294
4295 void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
4296 const LeaseStat& ls)
4297 {
4298 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
4299 ENCODE_START(2, 1, bl);
4300 encode(ls.mask, bl);
4301 encode(ls.duration_ms, bl);
4302 encode(ls.seq, bl);
4303 encode(ls.alternate_name, bl);
4304 ENCODE_FINISH(bl);
4305 }
4306 else {
4307 encode(ls.mask, bl);
4308 encode(ls.duration_ms, bl);
4309 encode(ls.seq, bl);
4310 }
4311 }
4312
4313 // locks ----------------------------------------------------------------
4314
4315 SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
4316 {
4317 switch (lock_type) {
4318 case CEPH_LOCK_DN:
4319 {
4320 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
4321 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
4322 frag_t fg;
4323 CDir *dir = 0;
4324 CDentry *dn = 0;
4325 if (diri) {
4326 fg = diri->pick_dirfrag(info.dname);
4327 dir = diri->get_dirfrag(fg);
4328 if (dir)
4329 dn = dir->lookup(info.dname, info.snapid);
4330 }
4331 if (!dn) {
4332 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
4333 return 0;
4334 }
4335 return &dn->lock;
4336 }
4337
4338 case CEPH_LOCK_IAUTH:
4339 case CEPH_LOCK_ILINK:
4340 case CEPH_LOCK_IDFT:
4341 case CEPH_LOCK_IFILE:
4342 case CEPH_LOCK_INEST:
4343 case CEPH_LOCK_IXATTR:
4344 case CEPH_LOCK_ISNAP:
4345 case CEPH_LOCK_IFLOCK:
4346 case CEPH_LOCK_IPOLICY:
4347 {
4348 CInode *in = mdcache->get_inode(info.ino, info.snapid);
4349 if (!in) {
4350 dout(7) << "get_lock don't have ino " << info.ino << dendl;
4351 return 0;
4352 }
4353 switch (lock_type) {
4354 case CEPH_LOCK_IAUTH: return &in->authlock;
4355 case CEPH_LOCK_ILINK: return &in->linklock;
4356 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
4357 case CEPH_LOCK_IFILE: return &in->filelock;
4358 case CEPH_LOCK_INEST: return &in->nestlock;
4359 case CEPH_LOCK_IXATTR: return &in->xattrlock;
4360 case CEPH_LOCK_ISNAP: return &in->snaplock;
4361 case CEPH_LOCK_IFLOCK: return &in->flocklock;
4362 case CEPH_LOCK_IPOLICY: return &in->policylock;
4363 }
4364 }
4365
4366 default:
4367 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
4368 ceph_abort();
4369 break;
4370 }
4371
4372 return 0;
4373 }
4374
4375 void Locker::handle_lock(const cref_t<MLock> &m)
4376 {
4377 // nobody should be talking to us during recovery.
4378 ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
4379
4380 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
4381 if (!lock) {
4382 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
4383 return;
4384 }
4385
4386 switch (lock->get_type()) {
4387 case CEPH_LOCK_DN:
4388 case CEPH_LOCK_IAUTH:
4389 case CEPH_LOCK_ILINK:
4390 case CEPH_LOCK_ISNAP:
4391 case CEPH_LOCK_IXATTR:
4392 case CEPH_LOCK_IFLOCK:
4393 case CEPH_LOCK_IPOLICY:
4394 handle_simple_lock(lock, m);
4395 break;
4396
4397 case CEPH_LOCK_IDFT:
4398 case CEPH_LOCK_INEST:
4399 //handle_scatter_lock((ScatterLock*)lock, m);
4400 //break;
4401
4402 case CEPH_LOCK_IFILE:
4403 handle_file_lock(static_cast<ScatterLock*>(lock), m);
4404 break;
4405
4406 default:
4407 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
4408 ceph_abort();
4409 break;
4410 }
4411 }
4412
4413
4414
4415
4416
4417 // ==========================================================================
4418 // simple lock
4419
4420 /** This function may take a reference to m if it needs one, but does
4421 * not put references. */
4422 void Locker::handle_reqrdlock(SimpleLock *lock, const cref_t<MLock> &m)
4423 {
4424 MDSCacheObject *parent = lock->get_parent();
4425 if (parent->is_auth() &&
4426 lock->get_state() != LOCK_SYNC &&
4427 !parent->is_frozen()) {
4428 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4429 << " on " << *parent << dendl;
4430 ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
4431 if (lock->is_stable()) {
4432 simple_sync(lock);
4433 } else {
4434 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
4435 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
4436 new C_MDS_RetryMessage(mds, m));
4437 }
4438 } else {
4439 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4440 << " on " << *parent << dendl;
4441 // replica should retry
4442 }
4443 }
4444
4445 void Locker::handle_simple_lock(SimpleLock *lock, const cref_t<MLock> &m)
4446 {
4447 int from = m->get_asker();
4448
4449 dout(10) << "handle_simple_lock " << *m
4450 << " on " << *lock << " " << *lock->get_parent() << dendl;
4451
4452 if (mds->is_rejoin()) {
4453 if (lock->get_parent()->is_rejoining()) {
4454 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
4455 << ", dropping " << *m << dendl;
4456 return;
4457 }
4458 }
4459
4460 switch (m->get_action()) {
4461 // -- replica --
4462 case LOCK_AC_SYNC:
4463 ceph_assert(lock->get_state() == LOCK_LOCK);
4464 lock->decode_locked_state(m->get_data());
4465 lock->set_state(LOCK_SYNC);
4466 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4467 break;
4468
4469 case LOCK_AC_LOCK:
4470 ceph_assert(lock->get_state() == LOCK_SYNC);
4471 lock->set_state(LOCK_SYNC_LOCK);
4472 if (lock->is_leased())
4473 revoke_client_leases(lock);
4474 eval_gather(lock, true);
4475 if (lock->is_unstable_and_locked()) {
4476 if (lock->is_cached())
4477 invalidate_lock_caches(lock);
4478 mds->mdlog->flush();
4479 }
4480 break;
4481
4482
4483 // -- auth --
4484 case LOCK_AC_LOCKACK:
4485 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
4486 lock->get_state() == LOCK_SYNC_EXCL);
4487 ceph_assert(lock->is_gathering(from));
4488 lock->remove_gather(from);
4489
4490 if (lock->is_gathering()) {
4491 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4492 << ", still gathering " << lock->get_gather_set() << dendl;
4493 } else {
4494 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4495 << ", last one" << dendl;
4496 eval_gather(lock);
4497 }
4498 break;
4499
4500 case LOCK_AC_REQRDLOCK:
4501 handle_reqrdlock(lock, m);
4502 break;
4503
4504 }
4505 }
4506
4507 /* unused, currently.
4508
4509 class C_Locker_SimpleEval : public Context {
4510 Locker *locker;
4511 SimpleLock *lock;
4512 public:
4513 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4514 void finish(int r) {
4515 locker->try_simple_eval(lock);
4516 }
4517 };
4518
4519 void Locker::try_simple_eval(SimpleLock *lock)
4520 {
4521 // unstable and ambiguous auth?
4522 if (!lock->is_stable() &&
4523 lock->get_parent()->is_ambiguous_auth()) {
4524 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4525 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4526 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4527 return;
4528 }
4529
4530 if (!lock->get_parent()->is_auth()) {
4531 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4532 return;
4533 }
4534
4535 if (!lock->get_parent()->can_auth_pin()) {
4536 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4537 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4538 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4539 return;
4540 }
4541
4542 if (lock->is_stable())
4543 simple_eval(lock);
4544 }
4545 */
4546
4547
4548 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4549 {
4550 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4551
4552 ceph_assert(lock->get_parent()->is_auth());
4553 ceph_assert(lock->is_stable());
4554
4555 if (lock->get_parent()->is_freezing_or_frozen()) {
4556 // dentry/snap lock in unreadable state can block path traverse
4557 if ((lock->get_type() != CEPH_LOCK_DN &&
4558 lock->get_type() != CEPH_LOCK_ISNAP &&
4559 lock->get_type() != CEPH_LOCK_IPOLICY) ||
4560 lock->get_state() == LOCK_SYNC ||
4561 lock->get_parent()->is_frozen())
4562 return;
4563 }
4564
4565 if (mdcache->is_readonly()) {
4566 if (lock->get_state() != LOCK_SYNC) {
4567 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4568 simple_sync(lock, need_issue);
4569 }
4570 return;
4571 }
4572
4573 CInode *in = 0;
4574 int wanted = 0;
4575 if (lock->get_cap_shift()) {
4576 in = static_cast<CInode*>(lock->get_parent());
4577 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4578 }
4579
4580 // -> excl?
4581 if (lock->get_state() != LOCK_EXCL &&
4582 in && in->get_target_loner() >= 0 &&
4583 (wanted & CEPH_CAP_GEXCL)) {
4584 dout(7) << "simple_eval stable, going to excl " << *lock
4585 << " on " << *lock->get_parent() << dendl;
4586 simple_excl(lock, need_issue);
4587 }
4588
4589 // stable -> sync?
4590 else if (lock->get_state() != LOCK_SYNC &&
4591 !lock->is_wrlocked() &&
4592 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4593 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4594 dout(7) << "simple_eval stable, syncing " << *lock
4595 << " on " << *lock->get_parent() << dendl;
4596 simple_sync(lock, need_issue);
4597 }
4598 }
4599
4600
4601 // mid
4602
4603 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4604 {
4605 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4606 ceph_assert(lock->get_parent()->is_auth());
4607 ceph_assert(lock->is_stable());
4608
4609 CInode *in = 0;
4610 if (lock->get_cap_shift())
4611 in = static_cast<CInode *>(lock->get_parent());
4612
4613 int old_state = lock->get_state();
4614
4615 if (old_state != LOCK_TSYN) {
4616
4617 switch (lock->get_state()) {
4618 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4619 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4620 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4621 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4622 default: ceph_abort();
4623 }
4624
4625 int gather = 0;
4626 if (lock->is_wrlocked()) {
4627 gather++;
4628 if (lock->is_cached())
4629 invalidate_lock_caches(lock);
4630
4631 // After a client request is early replied the mdlog won't be flushed
4632 // immediately, but before safe replied the request will hold the write
4633 // locks. So if the client sends another request to a different MDS
4634 // daemon, which then needs to request read lock from current MDS daemon,
4635 // then that daemon maybe stuck at most for 5 seconds. Which will lead
4636 // the client stuck at most 5 seconds.
4637 //
4638 // Let's try to flush the mdlog when the write lock is held, which will
4639 // release the write locks after mdlog is successfully flushed.
4640 mds->mdlog->flush();
4641 }
4642
4643 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4644 send_lock_message(lock, LOCK_AC_SYNC);
4645 lock->init_gather();
4646 gather++;
4647 }
4648
4649 if (in && in->is_head()) {
4650 if (in->issued_caps_need_gather(lock)) {
4651 if (need_issue)
4652 *need_issue = true;
4653 else
4654 issue_caps(in);
4655 gather++;
4656 }
4657 }
4658
4659 bool need_recover = false;
4660 if (lock->get_type() == CEPH_LOCK_IFILE) {
4661 ceph_assert(in);
4662 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4663 mds->mdcache->queue_file_recover(in);
4664 need_recover = true;
4665 gather++;
4666 }
4667 }
4668
4669 if (!gather && lock->is_dirty()) {
4670 lock->get_parent()->auth_pin(lock);
4671 scatter_writebehind(static_cast<ScatterLock*>(lock));
4672 mds->mdlog->flush();
4673 return false;
4674 }
4675
4676 if (gather) {
4677 lock->get_parent()->auth_pin(lock);
4678 if (need_recover)
4679 mds->mdcache->do_file_recover();
4680 return false;
4681 }
4682 }
4683
4684 if (lock->get_parent()->is_replicated()) { // FIXME
4685 bufferlist data;
4686 lock->encode_locked_state(data);
4687 send_lock_message(lock, LOCK_AC_SYNC, data);
4688 }
4689 lock->set_state(LOCK_SYNC);
4690 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4691 if (in && in->is_head()) {
4692 if (need_issue)
4693 *need_issue = true;
4694 else
4695 issue_caps(in);
4696 }
4697 return true;
4698 }
4699
4700 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4701 {
4702 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4703 ceph_assert(lock->get_parent()->is_auth());
4704 ceph_assert(lock->is_stable());
4705
4706 CInode *in = 0;
4707 if (lock->get_cap_shift())
4708 in = static_cast<CInode *>(lock->get_parent());
4709
4710 switch (lock->get_state()) {
4711 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4712 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4713 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4714 default: ceph_abort();
4715 }
4716
4717 int gather = 0;
4718 if (lock->is_rdlocked())
4719 gather++;
4720 if (lock->is_wrlocked())
4721 gather++;
4722 if (gather && lock->is_cached())
4723 invalidate_lock_caches(lock);
4724
4725 if (lock->get_parent()->is_replicated() &&
4726 lock->get_state() != LOCK_LOCK_EXCL &&
4727 lock->get_state() != LOCK_XSYN_EXCL) {
4728 send_lock_message(lock, LOCK_AC_LOCK);
4729 lock->init_gather();
4730 gather++;
4731 }
4732
4733 if (in && in->is_head()) {
4734 if (in->issued_caps_need_gather(lock)) {
4735 if (need_issue)
4736 *need_issue = true;
4737 else
4738 issue_caps(in);
4739 gather++;
4740 }
4741 }
4742
4743 if (gather) {
4744 lock->get_parent()->auth_pin(lock);
4745 } else {
4746 lock->set_state(LOCK_EXCL);
4747 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4748 if (in) {
4749 if (need_issue)
4750 *need_issue = true;
4751 else
4752 issue_caps(in);
4753 }
4754 }
4755 }
4756
4757 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4758 {
4759 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4760 ceph_assert(lock->get_parent()->is_auth());
4761 ceph_assert(lock->is_stable());
4762 ceph_assert(lock->get_state() != LOCK_LOCK);
4763
4764 CInode *in = 0;
4765 if (lock->get_cap_shift())
4766 in = static_cast<CInode *>(lock->get_parent());
4767
4768 int old_state = lock->get_state();
4769
4770 switch (lock->get_state()) {
4771 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4772 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4773 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4774 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4775 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4776 break;
4777 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4778 default: ceph_abort();
4779 }
4780
4781 int gather = 0;
4782 if (lock->is_leased()) {
4783 gather++;
4784 revoke_client_leases(lock);
4785 }
4786 if (lock->is_rdlocked()) {
4787 if (lock->is_cached())
4788 invalidate_lock_caches(lock);
4789 gather++;
4790 }
4791 if (in && in->is_head()) {
4792 if (in->issued_caps_need_gather(lock)) {
4793 if (need_issue)
4794 *need_issue = true;
4795 else
4796 issue_caps(in);
4797 gather++;
4798 }
4799 }
4800
4801 bool need_recover = false;
4802 if (lock->get_type() == CEPH_LOCK_IFILE) {
4803 ceph_assert(in);
4804 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4805 mds->mdcache->queue_file_recover(in);
4806 need_recover = true;
4807 gather++;
4808 }
4809 }
4810
4811 if (lock->get_parent()->is_replicated() &&
4812 lock->get_state() == LOCK_MIX_LOCK &&
4813 gather) {
4814 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4815 } else {
4816 // move to second stage of gather now, so we don't send the lock action later.
4817 if (lock->get_state() == LOCK_MIX_LOCK)
4818 lock->set_state(LOCK_MIX_LOCK2);
4819
4820 if (lock->get_parent()->is_replicated() &&
4821 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4822 gather++;
4823 send_lock_message(lock, LOCK_AC_LOCK);
4824 lock->init_gather();
4825 }
4826 }
4827
4828 if (!gather && lock->is_dirty()) {
4829 lock->get_parent()->auth_pin(lock);
4830 scatter_writebehind(static_cast<ScatterLock*>(lock));
4831 mds->mdlog->flush();
4832 return;
4833 }
4834
4835 if (gather) {
4836 lock->get_parent()->auth_pin(lock);
4837 if (need_recover)
4838 mds->mdcache->do_file_recover();
4839 } else {
4840 lock->set_state(LOCK_LOCK);
4841 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4842 }
4843 }
4844
4845
4846 void Locker::simple_xlock(SimpleLock *lock)
4847 {
4848 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4849 ceph_assert(lock->get_parent()->is_auth());
4850 //assert(lock->is_stable());
4851 ceph_assert(lock->get_state() != LOCK_XLOCK);
4852
4853 CInode *in = 0;
4854 if (lock->get_cap_shift())
4855 in = static_cast<CInode *>(lock->get_parent());
4856
4857 if (lock->is_stable())
4858 lock->get_parent()->auth_pin(lock);
4859
4860 switch (lock->get_state()) {
4861 case LOCK_LOCK:
4862 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4863 default: ceph_abort();
4864 }
4865
4866 int gather = 0;
4867 if (lock->is_rdlocked())
4868 gather++;
4869 if (lock->is_wrlocked())
4870 gather++;
4871 if (gather && lock->is_cached())
4872 invalidate_lock_caches(lock);
4873
4874 if (in && in->is_head()) {
4875 if (in->issued_caps_need_gather(lock)) {
4876 issue_caps(in);
4877 gather++;
4878 }
4879 }
4880
4881 if (!gather) {
4882 lock->set_state(LOCK_PREXLOCK);
4883 //assert("shouldn't be called if we are already xlockable" == 0);
4884 }
4885 }
4886
4887
4888
4889
4890
4891 // ==========================================================================
4892 // scatter lock
4893
4894 /*
4895
4896 Some notes on scatterlocks.
4897
4898 - The scatter/gather is driven by the inode lock. The scatter always
4899 brings in the latest metadata from the fragments.
4900
4901 - When in a scattered/MIX state, fragments are only allowed to
4902 update/be written to if the accounted stat matches the inode's
4903 current version.
4904
4905 - That means, on gather, we _only_ assimilate diffs for frag metadata
4906 that match the current version, because those are the only ones
4907 written during this scatter/gather cycle. (Others didn't permit
4908 it.) We increment the version and journal this to disk.
4909
4910 - When possible, we also simultaneously update our local frag
4911 accounted stats to match.
4912
4913 - On scatter, the new inode info is broadcast to frags, both local
4914 and remote. If possible (auth and !frozen), the dirfrag auth
4915 should update the accounted state (if it isn't already up to date).
4916 Note that this may occur on both the local inode auth node and
4917 inode replicas, so there are two potential paths. If it is NOT
4918 possible, they need to mark_stale to prevent any possible writes.
4919
4920 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4921 only). Both are opportunities to update the frag accounted stats,
4922 even though only the MIX case is affected by a stale dirfrag.
4923
4924 - Because many scatter/gather cycles can potentially go by without a
4925 frag being able to update its accounted stats (due to being frozen
4926 by exports/refragments in progress), the frag may have (even very)
4927 old stat versions. That's fine. If when we do want to update it,
4928 we can update accounted_* and the version first.
4929
4930 */
4931
4932 class C_Locker_ScatterWB : public LockerLogContext {
4933 ScatterLock *lock;
4934 MutationRef mut;
4935 public:
4936 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4937 LockerLogContext(l), lock(sl), mut(m) {}
4938 void finish(int r) override {
4939 locker->scatter_writebehind_finish(lock, mut);
4940 }
4941 };
4942
4943 void Locker::scatter_writebehind(ScatterLock *lock)
4944 {
4945 CInode *in = static_cast<CInode*>(lock->get_parent());
4946 dout(10) << "scatter_writebehind " << in->get_inode()->mtime << " on " << *lock << " on " << *in << dendl;
4947
4948 // journal
4949 MutationRef mut(new MutationImpl());
4950 mut->ls = mds->mdlog->get_current_segment();
4951
4952 // forcefully take a wrlock
4953 lock->get_wrlock(true);
4954 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
4955
4956 in->pre_cow_old_inode(); // avoid cow mayhem
4957
4958 auto pi = in->project_inode(mut);
4959 pi.inode->version = in->pre_dirty();
4960
4961 in->finish_scatter_gather_update(lock->get_type(), mut);
4962 lock->start_flush();
4963
4964 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4965 mds->mdlog->start_entry(le);
4966
4967 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4968 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4969
4970 in->finish_scatter_gather_update_accounted(lock->get_type(), &le->metablob);
4971
4972 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4973 }
4974
4975 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4976 {
4977 CInode *in = static_cast<CInode*>(lock->get_parent());
4978 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4979
4980 mut->apply();
4981
4982 lock->finish_flush();
4983
4984 // if replicas may have flushed in a mix->lock state, send another
4985 // message so they can finish_flush().
4986 if (in->is_replicated()) {
4987 switch (lock->get_state()) {
4988 case LOCK_MIX_LOCK:
4989 case LOCK_MIX_LOCK2:
4990 case LOCK_MIX_EXCL:
4991 case LOCK_MIX_TSYN:
4992 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4993 }
4994 }
4995
4996 drop_locks(mut.get());
4997 mut->cleanup();
4998
4999 if (lock->is_stable())
5000 lock->finish_waiters(ScatterLock::WAIT_STABLE);
5001
5002 //scatter_eval_gather(lock);
5003 }
5004
5005 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
5006 {
5007 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
5008
5009 ceph_assert(lock->get_parent()->is_auth());
5010 ceph_assert(lock->is_stable());
5011
5012 if (lock->get_parent()->is_freezing_or_frozen()) {
5013 dout(20) << " freezing|frozen" << dendl;
5014 return;
5015 }
5016
5017 if (mdcache->is_readonly()) {
5018 if (lock->get_state() != LOCK_SYNC) {
5019 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
5020 simple_sync(lock, need_issue);
5021 }
5022 return;
5023 }
5024
5025 if (!lock->is_rdlocked() &&
5026 lock->get_state() != LOCK_MIX &&
5027 lock->get_scatter_wanted()) {
5028 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
5029 << " on " << *lock->get_parent() << dendl;
5030 scatter_mix(lock, need_issue);
5031 return;
5032 }
5033
5034 if (lock->get_type() == CEPH_LOCK_INEST) {
5035 // in general, we want to keep INEST writable at all times.
5036 if (!lock->is_rdlocked()) {
5037 if (lock->get_parent()->is_replicated()) {
5038 if (lock->get_state() != LOCK_MIX)
5039 scatter_mix(lock, need_issue);
5040 } else {
5041 if (lock->get_state() != LOCK_LOCK)
5042 simple_lock(lock, need_issue);
5043 }
5044 }
5045 return;
5046 }
5047
5048 CInode *in = static_cast<CInode*>(lock->get_parent());
5049 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
5050 // i _should_ be sync.
5051 if (!lock->is_wrlocked() &&
5052 lock->get_state() != LOCK_SYNC) {
5053 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
5054 simple_sync(lock, need_issue);
5055 }
5056 }
5057 }
5058
5059
5060 /*
5061 * mark a scatterlock to indicate that the dir fnode has some dirty data
5062 */
5063 void Locker::mark_updated_scatterlock(ScatterLock *lock)
5064 {
5065 lock->mark_dirty();
5066 if (lock->get_updated_item()->is_on_list()) {
5067 dout(10) << "mark_updated_scatterlock " << *lock
5068 << " - already on list since " << lock->get_update_stamp() << dendl;
5069 } else {
5070 updated_scatterlocks.push_back(lock->get_updated_item());
5071 utime_t now = ceph_clock_now();
5072 lock->set_update_stamp(now);
5073 dout(10) << "mark_updated_scatterlock " << *lock
5074 << " - added at " << now << dendl;
5075 }
5076 }
5077
5078 /*
5079 * this is called by scatter_tick and LogSegment::try_to_trim() when
5080 * trying to flush dirty scattered data (i.e. updated fnode) back to
5081 * the inode.
5082 *
5083 * we need to lock|scatter in order to push fnode changes into the
5084 * inode.dirstat.
5085 */
5086 void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
5087 {
5088 CInode *p = static_cast<CInode *>(lock->get_parent());
5089
5090 if (p->is_frozen() || p->is_freezing()) {
5091 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
5092 if (c)
5093 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
5094 else if (lock->is_dirty())
5095 // just requeue. not ideal.. starvation prone..
5096 updated_scatterlocks.push_back(lock->get_updated_item());
5097 return;
5098 }
5099
5100 if (p->is_ambiguous_auth()) {
5101 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
5102 if (c)
5103 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
5104 else if (lock->is_dirty())
5105 // just requeue. not ideal.. starvation prone..
5106 updated_scatterlocks.push_back(lock->get_updated_item());
5107 return;
5108 }
5109
5110 if (p->is_auth()) {
5111 int count = 0;
5112 while (true) {
5113 if (lock->is_stable()) {
5114 // can we do it now?
5115 // (only if we're not replicated.. if we are, we really do need
5116 // to nudge the lock state!)
5117 /*
5118 actually, even if we're not replicated, we can't stay in MIX, because another mds
5119 could discover and replicate us at any time. if that happens while we're flushing,
5120 they end up in MIX but their inode has the old scatterstat version.
5121
5122 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
5123 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
5124 scatter_writebehind(lock);
5125 if (c)
5126 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5127 return;
5128 }
5129 */
5130
5131 if (mdcache->is_readonly()) {
5132 if (lock->get_state() != LOCK_SYNC) {
5133 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
5134 simple_sync(static_cast<ScatterLock*>(lock));
5135 }
5136 break;
5137 }
5138
5139 // adjust lock state
5140 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
5141 switch (lock->get_type()) {
5142 case CEPH_LOCK_IFILE:
5143 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5144 scatter_mix(static_cast<ScatterLock*>(lock));
5145 else if (lock->get_state() != LOCK_LOCK)
5146 simple_lock(static_cast<ScatterLock*>(lock));
5147 else
5148 simple_sync(static_cast<ScatterLock*>(lock));
5149 break;
5150
5151 case CEPH_LOCK_IDFT:
5152 case CEPH_LOCK_INEST:
5153 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5154 scatter_mix(lock);
5155 else if (lock->get_state() != LOCK_LOCK)
5156 simple_lock(lock);
5157 else
5158 simple_sync(lock);
5159 break;
5160 default:
5161 ceph_abort();
5162 }
5163 ++count;
5164 if (lock->is_stable() && count == 2) {
5165 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
5166 // this should only realy happen when called via
5167 // handle_file_lock due to AC_NUDGE, because the rest of the
5168 // time we are replicated or have dirty data and won't get
5169 // called. bailing here avoids an infinite loop.
5170 ceph_assert(!c);
5171 break;
5172 }
5173 } else {
5174 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
5175 if (c)
5176 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5177 return;
5178 }
5179 }
5180 } else {
5181 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
5182 << *lock << " on " << *p << dendl;
5183 // request unscatter?
5184 mds_rank_t auth = lock->get_parent()->authority().first;
5185 if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
5186 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
5187 }
5188
5189 // wait...
5190 if (c)
5191 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5192
5193 // also, requeue, in case we had wrong auth or something
5194 if (lock->is_dirty())
5195 updated_scatterlocks.push_back(lock->get_updated_item());
5196 }
5197 }
5198
5199 void Locker::scatter_tick()
5200 {
5201 dout(10) << "scatter_tick" << dendl;
5202
5203 // updated
5204 utime_t now = ceph_clock_now();
5205 int n = updated_scatterlocks.size();
5206 while (!updated_scatterlocks.empty()) {
5207 ScatterLock *lock = updated_scatterlocks.front();
5208
5209 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
5210
5211 if (!lock->is_dirty()) {
5212 updated_scatterlocks.pop_front();
5213 dout(10) << " removing from updated_scatterlocks "
5214 << *lock << " " << *lock->get_parent() << dendl;
5215 continue;
5216 }
5217 if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
5218 break;
5219 updated_scatterlocks.pop_front();
5220 scatter_nudge(lock, 0);
5221 }
5222 mds->mdlog->flush();
5223 }
5224
5225
5226 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
5227 {
5228 dout(10) << "scatter_tempsync " << *lock
5229 << " on " << *lock->get_parent() << dendl;
5230 ceph_assert(lock->get_parent()->is_auth());
5231 ceph_assert(lock->is_stable());
5232
5233 ceph_abort_msg("not fully implemented, at least not for filelock");
5234
5235 CInode *in = static_cast<CInode *>(lock->get_parent());
5236
5237 switch (lock->get_state()) {
5238 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
5239 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
5240 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
5241 default: ceph_abort();
5242 }
5243
5244 int gather = 0;
5245 if (lock->is_wrlocked()) {
5246 if (lock->is_cached())
5247 invalidate_lock_caches(lock);
5248 gather++;
5249 }
5250
5251 if (lock->get_cap_shift() &&
5252 in->is_head() &&
5253 in->issued_caps_need_gather(lock)) {
5254 if (need_issue)
5255 *need_issue = true;
5256 else
5257 issue_caps(in);
5258 gather++;
5259 }
5260
5261 if (lock->get_state() == LOCK_MIX_TSYN &&
5262 in->is_replicated()) {
5263 lock->init_gather();
5264 send_lock_message(lock, LOCK_AC_LOCK);
5265 gather++;
5266 }
5267
5268 if (gather) {
5269 in->auth_pin(lock);
5270 } else {
5271 // do tempsync
5272 lock->set_state(LOCK_TSYN);
5273 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
5274 if (lock->get_cap_shift()) {
5275 if (need_issue)
5276 *need_issue = true;
5277 else
5278 issue_caps(in);
5279 }
5280 }
5281 }
5282
5283
5284
5285 // ==========================================================================
5286 // local lock
5287
5288 void Locker::local_wrlock_grab(LocalLockC *lock, MutationRef& mut)
5289 {
5290 dout(7) << "local_wrlock_grab on " << *lock
5291 << " on " << *lock->get_parent() << dendl;
5292
5293 ceph_assert(lock->get_parent()->is_auth());
5294 ceph_assert(lock->can_wrlock());
5295 lock->get_wrlock(mut->get_client());
5296
5297 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5298 ceph_assert(it->is_wrlock());
5299 }
5300
5301 bool Locker::local_wrlock_start(LocalLockC *lock, MDRequestRef& mut)
5302 {
5303 dout(7) << "local_wrlock_start on " << *lock
5304 << " on " << *lock->get_parent() << dendl;
5305
5306 ceph_assert(lock->get_parent()->is_auth());
5307 if (lock->can_wrlock()) {
5308 lock->get_wrlock(mut->get_client());
5309 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5310 ceph_assert(it->is_wrlock());
5311 return true;
5312 } else {
5313 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5314 return false;
5315 }
5316 }
5317
5318 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
5319 {
5320 ceph_assert(it->is_wrlock());
5321 LocalLockC *lock = static_cast<LocalLockC*>(it->lock);
5322 dout(7) << "local_wrlock_finish on " << *lock
5323 << " on " << *lock->get_parent() << dendl;
5324 lock->put_wrlock();
5325 mut->locks.erase(it);
5326 if (lock->get_num_wrlocks() == 0) {
5327 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5328 SimpleLock::WAIT_WR |
5329 SimpleLock::WAIT_RD);
5330 }
5331 }
5332
5333 bool Locker::local_xlock_start(LocalLockC *lock, MDRequestRef& mut)
5334 {
5335 dout(7) << "local_xlock_start on " << *lock
5336 << " on " << *lock->get_parent() << dendl;
5337
5338 ceph_assert(lock->get_parent()->is_auth());
5339 if (!lock->can_xlock_local()) {
5340 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5341 return false;
5342 }
5343
5344 lock->get_xlock(mut, mut->get_client());
5345 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
5346 return true;
5347 }
5348
5349 void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
5350 {
5351 ceph_assert(it->is_xlock());
5352 LocalLockC *lock = static_cast<LocalLockC*>(it->lock);
5353 dout(7) << "local_xlock_finish on " << *lock
5354 << " on " << *lock->get_parent() << dendl;
5355 lock->put_xlock();
5356 mut->locks.erase(it);
5357
5358 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5359 SimpleLock::WAIT_WR |
5360 SimpleLock::WAIT_RD);
5361 }
5362
5363
5364
5365 // ==========================================================================
5366 // file lock
5367
5368
5369 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
5370 {
5371 CInode *in = static_cast<CInode*>(lock->get_parent());
5372 int loner_wanted, other_wanted;
5373 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
5374 dout(7) << "file_eval wanted=" << gcap_string(wanted)
5375 << " loner_wanted=" << gcap_string(loner_wanted)
5376 << " other_wanted=" << gcap_string(other_wanted)
5377 << " filelock=" << *lock << " on " << *lock->get_parent()
5378 << dendl;
5379
5380 ceph_assert(lock->get_parent()->is_auth());
5381 ceph_assert(lock->is_stable());
5382
5383 if (lock->get_parent()->is_freezing_or_frozen())
5384 return;
5385
5386 if (mdcache->is_readonly()) {
5387 if (lock->get_state() != LOCK_SYNC) {
5388 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
5389 simple_sync(lock, need_issue);
5390 }
5391 return;
5392 }
5393
5394 // excl -> *?
5395 if (lock->get_state() == LOCK_EXCL) {
5396 dout(20) << " is excl" << dendl;
5397 int loner_issued, other_issued, xlocker_issued;
5398 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
5399 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
5400 << " other_issued=" << gcap_string(other_issued)
5401 << " xlocker_issued=" << gcap_string(xlocker_issued)
5402 << dendl;
5403 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
5404 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
5405 (in->is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
5406 dout(20) << " should lose it" << dendl;
5407 // we should lose it.
5408 // loner other want
5409 // R R SYNC
5410 // R R|W MIX
5411 // R W MIX
5412 // R|W R MIX
5413 // R|W R|W MIX
5414 // R|W W MIX
5415 // W R MIX
5416 // W R|W MIX
5417 // W W MIX
5418 // -> any writer means MIX; RD doesn't matter.
5419 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
5420 lock->is_waiter_for(SimpleLock::WAIT_WR))
5421 scatter_mix(lock, need_issue);
5422 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
5423 simple_sync(lock, need_issue);
5424 else
5425 dout(10) << " waiting for wrlock to drain" << dendl;
5426 }
5427 }
5428
5429 // * -> excl?
5430 else if (lock->get_state() != LOCK_EXCL &&
5431 !lock->is_rdlocked() &&
5432 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5433 in->get_target_loner() >= 0 &&
5434 (in->is_dir() ?
5435 !in->has_subtree_or_exporting_dirfrag() :
5436 (wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))) {
5437 dout(7) << "file_eval stable, bump to loner " << *lock
5438 << " on " << *lock->get_parent() << dendl;
5439 file_excl(lock, need_issue);
5440 }
5441
5442 // * -> mixed?
5443 else if (lock->get_state() != LOCK_MIX &&
5444 !lock->is_rdlocked() &&
5445 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5446 (lock->get_scatter_wanted() ||
5447 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
5448 dout(7) << "file_eval stable, bump to mixed " << *lock
5449 << " on " << *lock->get_parent() << dendl;
5450 scatter_mix(lock, need_issue);
5451 }
5452
5453 // * -> sync?
5454 else if (lock->get_state() != LOCK_SYNC &&
5455 !lock->is_wrlocked() && // drain wrlocks first!
5456 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5457 !(wanted & CEPH_CAP_GWR) &&
5458 !((lock->get_state() == LOCK_MIX) &&
5459 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5460 //((wanted & CEPH_CAP_RD) ||
5461 //in->is_replicated() ||
5462 //lock->is_leased() ||
5463 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5464 ) {
5465 dout(7) << "file_eval stable, bump to sync " << *lock
5466 << " on " << *lock->get_parent() << dendl;
5467 simple_sync(lock, need_issue);
5468 }
5469 else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5470 mds->mdcache->queue_file_recover(in);
5471 mds->mdcache->do_file_recover();
5472 }
5473 }
5474
5475
5476
5477 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
5478 {
5479 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
5480
5481 CInode *in = static_cast<CInode*>(lock->get_parent());
5482 ceph_assert(in->is_auth());
5483 ceph_assert(lock->is_stable());
5484
5485 if (lock->get_state() == LOCK_LOCK) {
5486 in->start_scatter(lock);
5487 if (in->is_replicated()) {
5488 // data
5489 bufferlist softdata;
5490 lock->encode_locked_state(softdata);
5491
5492 // bcast to replicas
5493 send_lock_message(lock, LOCK_AC_MIX, softdata);
5494 }
5495
5496 // change lock
5497 lock->set_state(LOCK_MIX);
5498 lock->clear_scatter_wanted();
5499 if (lock->get_cap_shift()) {
5500 if (need_issue)
5501 *need_issue = true;
5502 else
5503 issue_caps(in);
5504 }
5505 } else {
5506 // gather?
5507 switch (lock->get_state()) {
5508 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
5509 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
5510 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
5511 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
5512 default: ceph_abort();
5513 }
5514
5515 int gather = 0;
5516 if (lock->is_rdlocked()) {
5517 if (lock->is_cached())
5518 invalidate_lock_caches(lock);
5519 gather++;
5520 }
5521 if (in->is_replicated()) {
5522 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
5523 send_lock_message(lock, LOCK_AC_MIX);
5524 lock->init_gather();
5525 gather++;
5526 }
5527 }
5528 if (lock->is_leased()) {
5529 revoke_client_leases(lock);
5530 gather++;
5531 }
5532 if (lock->get_cap_shift() &&
5533 in->is_head() &&
5534 in->issued_caps_need_gather(lock)) {
5535 if (need_issue)
5536 *need_issue = true;
5537 else
5538 issue_caps(in);
5539 gather++;
5540 }
5541 bool need_recover = false;
5542 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5543 mds->mdcache->queue_file_recover(in);
5544 need_recover = true;
5545 gather++;
5546 }
5547
5548 if (gather) {
5549 lock->get_parent()->auth_pin(lock);
5550 if (need_recover)
5551 mds->mdcache->do_file_recover();
5552 } else {
5553 in->start_scatter(lock);
5554 lock->set_state(LOCK_MIX);
5555 lock->clear_scatter_wanted();
5556 if (in->is_replicated()) {
5557 bufferlist softdata;
5558 lock->encode_locked_state(softdata);
5559 send_lock_message(lock, LOCK_AC_MIX, softdata);
5560 }
5561 if (lock->get_cap_shift()) {
5562 if (need_issue)
5563 *need_issue = true;
5564 else
5565 issue_caps(in);
5566 }
5567 }
5568 }
5569 }
5570
5571
5572 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5573 {
5574 CInode *in = static_cast<CInode*>(lock->get_parent());
5575 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5576
5577 ceph_assert(in->is_auth());
5578 ceph_assert(lock->is_stable());
5579
5580 ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
5581 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5582
5583 switch (lock->get_state()) {
5584 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5585 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5586 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5587 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5588 default: ceph_abort();
5589 }
5590 int gather = 0;
5591
5592 if (lock->is_rdlocked())
5593 gather++;
5594 if (lock->is_wrlocked())
5595 gather++;
5596 if (gather && lock->is_cached())
5597 invalidate_lock_caches(lock);
5598
5599 if (in->is_replicated() &&
5600 lock->get_state() != LOCK_LOCK_EXCL &&
5601 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5602 send_lock_message(lock, LOCK_AC_LOCK);
5603 lock->init_gather();
5604 gather++;
5605 }
5606 if (lock->is_leased()) {
5607 revoke_client_leases(lock);
5608 gather++;
5609 }
5610 if (in->is_head() &&
5611 in->issued_caps_need_gather(lock)) {
5612 if (need_issue)
5613 *need_issue = true;
5614 else
5615 issue_caps(in);
5616 gather++;
5617 }
5618 bool need_recover = false;
5619 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5620 mds->mdcache->queue_file_recover(in);
5621 need_recover = true;
5622 gather++;
5623 }
5624
5625 if (gather) {
5626 lock->get_parent()->auth_pin(lock);
5627 if (need_recover)
5628 mds->mdcache->do_file_recover();
5629 } else {
5630 lock->set_state(LOCK_EXCL);
5631 if (need_issue)
5632 *need_issue = true;
5633 else
5634 issue_caps(in);
5635 }
5636 }
5637
5638 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5639 {
5640 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5641 CInode *in = static_cast<CInode *>(lock->get_parent());
5642 ceph_assert(in->is_auth());
5643 ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
5644
5645 switch (lock->get_state()) {
5646 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5647 default: ceph_abort();
5648 }
5649
5650 int gather = 0;
5651 if (lock->is_wrlocked()) {
5652 if (lock->is_cached())
5653 invalidate_lock_caches(lock);
5654 gather++;
5655 }
5656
5657 if (in->is_head() &&
5658 in->issued_caps_need_gather(lock)) {
5659 if (need_issue)
5660 *need_issue = true;
5661 else
5662 issue_caps(in);
5663 gather++;
5664 }
5665
5666 if (gather) {
5667 lock->get_parent()->auth_pin(lock);
5668 } else {
5669 lock->set_state(LOCK_XSYN);
5670 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5671 if (need_issue)
5672 *need_issue = true;
5673 else
5674 issue_caps(in);
5675 }
5676 }
5677
5678 void Locker::file_recover(ScatterLock *lock)
5679 {
5680 CInode *in = static_cast<CInode *>(lock->get_parent());
5681 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5682
5683 ceph_assert(in->is_auth());
5684 //assert(lock->is_stable());
5685 ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5686
5687 int gather = 0;
5688
5689 /*
5690 if (in->is_replicated()
5691 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5692 send_lock_message(lock, LOCK_AC_LOCK);
5693 lock->init_gather();
5694 gather++;
5695 }
5696 */
5697 if (in->is_head() &&
5698 in->issued_caps_need_gather(lock)) {
5699 issue_caps(in);
5700 gather++;
5701 }
5702
5703 lock->set_state(LOCK_SCAN);
5704 if (gather)
5705 in->state_set(CInode::STATE_NEEDSRECOVER);
5706 else
5707 mds->mdcache->queue_file_recover(in);
5708 }
5709
5710
5711 // messenger
5712 void Locker::handle_file_lock(ScatterLock *lock, const cref_t<MLock> &m)
5713 {
5714 CInode *in = static_cast<CInode*>(lock->get_parent());
5715 int from = m->get_asker();
5716
5717 if (mds->is_rejoin()) {
5718 if (in->is_rejoining()) {
5719 dout(7) << "handle_file_lock still rejoining " << *in
5720 << ", dropping " << *m << dendl;
5721 return;
5722 }
5723 }
5724
5725 dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
5726 << " on " << *lock
5727 << " from mds." << from << " "
5728 << *in << dendl;
5729
5730 bool caps = lock->get_cap_shift();
5731
5732 switch (m->get_action()) {
5733 // -- replica --
5734 case LOCK_AC_SYNC:
5735 ceph_assert(lock->get_state() == LOCK_LOCK ||
5736 lock->get_state() == LOCK_MIX ||
5737 lock->get_state() == LOCK_MIX_SYNC2);
5738
5739 if (lock->get_state() == LOCK_MIX) {
5740 lock->set_state(LOCK_MIX_SYNC);
5741 eval_gather(lock, true);
5742 if (lock->is_unstable_and_locked()) {
5743 if (lock->is_cached())
5744 invalidate_lock_caches(lock);
5745 mds->mdlog->flush();
5746 }
5747 break;
5748 }
5749
5750 (static_cast<ScatterLock *>(lock))->finish_flush();
5751 (static_cast<ScatterLock *>(lock))->clear_flushed();
5752
5753 // ok
5754 lock->decode_locked_state(m->get_data());
5755 lock->set_state(LOCK_SYNC);
5756
5757 lock->get_rdlock();
5758 if (caps)
5759 issue_caps(in);
5760 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5761 lock->put_rdlock();
5762 break;
5763
5764 case LOCK_AC_LOCK:
5765 switch (lock->get_state()) {
5766 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5767 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5768 default: ceph_abort();
5769 }
5770
5771 eval_gather(lock, true);
5772 if (lock->is_unstable_and_locked()) {
5773 if (lock->is_cached())
5774 invalidate_lock_caches(lock);
5775 mds->mdlog->flush();
5776 }
5777
5778 break;
5779
5780 case LOCK_AC_LOCKFLUSHED:
5781 (static_cast<ScatterLock *>(lock))->finish_flush();
5782 (static_cast<ScatterLock *>(lock))->clear_flushed();
5783 // wake up scatter_nudge waiters
5784 if (lock->is_stable())
5785 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5786 break;
5787
5788 case LOCK_AC_MIX:
5789 ceph_assert(lock->get_state() == LOCK_SYNC ||
5790 lock->get_state() == LOCK_LOCK ||
5791 lock->get_state() == LOCK_SYNC_MIX2);
5792
5793 if (lock->get_state() == LOCK_SYNC) {
5794 // MIXED
5795 lock->set_state(LOCK_SYNC_MIX);
5796 eval_gather(lock, true);
5797 if (lock->is_unstable_and_locked()) {
5798 if (lock->is_cached())
5799 invalidate_lock_caches(lock);
5800 mds->mdlog->flush();
5801 }
5802 break;
5803 }
5804
5805 // ok
5806 lock->set_state(LOCK_MIX);
5807 lock->decode_locked_state(m->get_data());
5808
5809 if (caps)
5810 issue_caps(in);
5811
5812 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5813 break;
5814
5815
5816 // -- auth --
5817 case LOCK_AC_LOCKACK:
5818 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
5819 lock->get_state() == LOCK_MIX_LOCK ||
5820 lock->get_state() == LOCK_MIX_LOCK2 ||
5821 lock->get_state() == LOCK_MIX_EXCL ||
5822 lock->get_state() == LOCK_SYNC_EXCL ||
5823 lock->get_state() == LOCK_SYNC_MIX ||
5824 lock->get_state() == LOCK_MIX_TSYN);
5825 ceph_assert(lock->is_gathering(from));
5826 lock->remove_gather(from);
5827
5828 if (lock->get_state() == LOCK_MIX_LOCK ||
5829 lock->get_state() == LOCK_MIX_LOCK2 ||
5830 lock->get_state() == LOCK_MIX_EXCL ||
5831 lock->get_state() == LOCK_MIX_TSYN) {
5832 lock->decode_locked_state(m->get_data());
5833 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5834 // delay calling scatter_writebehind().
5835 lock->clear_flushed();
5836 }
5837
5838 if (lock->is_gathering()) {
5839 dout(7) << "handle_file_lock " << *in << " from " << from
5840 << ", still gathering " << lock->get_gather_set() << dendl;
5841 } else {
5842 dout(7) << "handle_file_lock " << *in << " from " << from
5843 << ", last one" << dendl;
5844 eval_gather(lock);
5845 }
5846 break;
5847
5848 case LOCK_AC_SYNCACK:
5849 ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
5850 ceph_assert(lock->is_gathering(from));
5851 lock->remove_gather(from);
5852
5853 lock->decode_locked_state(m->get_data());
5854
5855 if (lock->is_gathering()) {
5856 dout(7) << "handle_file_lock " << *in << " from " << from
5857 << ", still gathering " << lock->get_gather_set() << dendl;
5858 } else {
5859 dout(7) << "handle_file_lock " << *in << " from " << from
5860 << ", last one" << dendl;
5861 eval_gather(lock);
5862 }
5863 break;
5864
5865 case LOCK_AC_MIXACK:
5866 ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
5867 ceph_assert(lock->is_gathering(from));
5868 lock->remove_gather(from);
5869
5870 if (lock->is_gathering()) {
5871 dout(7) << "handle_file_lock " << *in << " from " << from
5872 << ", still gathering " << lock->get_gather_set() << dendl;
5873 } else {
5874 dout(7) << "handle_file_lock " << *in << " from " << from
5875 << ", last one" << dendl;
5876 eval_gather(lock);
5877 }
5878 break;
5879
5880
5881 // requests....
5882 case LOCK_AC_REQSCATTER:
5883 if (lock->is_stable()) {
5884 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5885 * because the replica should be holding an auth_pin if they're
5886 * doing this (and thus, we are freezing, not frozen, and indefinite
5887 * starvation isn't an issue).
5888 */
5889 dout(7) << "handle_file_lock got scatter request on " << *lock
5890 << " on " << *lock->get_parent() << dendl;
5891 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5892 scatter_mix(lock);
5893 } else {
5894 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5895 << " on " << *lock->get_parent() << dendl;
5896 lock->set_scatter_wanted();
5897 }
5898 break;
5899
5900 case LOCK_AC_REQUNSCATTER:
5901 if (lock->is_stable()) {
5902 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5903 * because the replica should be holding an auth_pin if they're
5904 * doing this (and thus, we are freezing, not frozen, and indefinite
5905 * starvation isn't an issue).
5906 */
5907 dout(7) << "handle_file_lock got unscatter request on " << *lock
5908 << " on " << *lock->get_parent() << dendl;
5909 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5910 simple_lock(lock); // FIXME tempsync?
5911 } else {
5912 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5913 << " on " << *lock->get_parent() << dendl;
5914 lock->set_unscatter_wanted();
5915 }
5916 break;
5917
5918 case LOCK_AC_REQRDLOCK:
5919 handle_reqrdlock(lock, m);
5920 break;
5921
5922 case LOCK_AC_NUDGE:
5923 if (!lock->get_parent()->is_auth()) {
5924 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5925 << " on " << *lock->get_parent() << dendl;
5926 } else if (!lock->get_parent()->is_replicated()) {
5927 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5928 << " on " << *lock->get_parent() << dendl;
5929 } else {
5930 dout(7) << "handle_file_lock trying nudge on " << *lock
5931 << " on " << *lock->get_parent() << dendl;
5932 scatter_nudge(lock, 0, true);
5933 mds->mdlog->flush();
5934 }
5935 break;
5936
5937 default:
5938 ceph_abort();
5939 }
5940 }