]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
2a587659afdef348ebc7706e899ceafc2136b4c6
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "CDir.h"
17 #include "CDentry.h"
18 #include "CInode.h"
19 #include "common/config.h"
20 #include "events/EOpen.h"
21 #include "events/EUpdate.h"
22 #include "Locker.h"
23 #include "MDBalancer.h"
24 #include "MDCache.h"
25 #include "MDLog.h"
26 #include "MDSRank.h"
27 #include "MDSMap.h"
28 #include "messages/MInodeFileCaps.h"
29 #include "messages/MMDSSlaveRequest.h"
30 #include "Migrator.h"
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #define dout_subsys ceph_subsys_mds
35 #undef dout_prefix
36 #define dout_context g_ceph_context
37 #define dout_prefix _prefix(_dout, mds)
38 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
39 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
40 }
41
42
43 class LockerContext : public MDSContext {
44 protected:
45 Locker *locker;
46 MDSRank *get_mds() override
47 {
48 return locker->mds;
49 }
50
51 public:
52 explicit LockerContext(Locker *locker_) : locker(locker_) {
53 ceph_assert(locker != NULL);
54 }
55 };
56
57 class LockerLogContext : public MDSLogContextBase {
58 protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65 public:
66 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
67 ceph_assert(locker != NULL);
68 }
69 };
70
71 Locker::Locker(MDSRank *m, MDCache *c) :
72 need_snapflush_inodes(member_offset(CInode, item_caps)), mds(m), mdcache(c) {}
73
74
75 void Locker::dispatch(const cref_t<Message> &m)
76 {
77
78 switch (m->get_type()) {
79 // inter-mds locking
80 case MSG_MDS_LOCK:
81 handle_lock(ref_cast<MLock>(m));
82 break;
83 // inter-mds caps
84 case MSG_MDS_INODEFILECAPS:
85 handle_inode_file_caps(ref_cast<MInodeFileCaps>(m));
86 break;
87 // client sync
88 case CEPH_MSG_CLIENT_CAPS:
89 handle_client_caps(ref_cast<MClientCaps>(m));
90 break;
91 case CEPH_MSG_CLIENT_CAPRELEASE:
92 handle_client_cap_release(ref_cast<MClientCapRelease>(m));
93 break;
94 case CEPH_MSG_CLIENT_LEASE:
95 handle_client_lease(ref_cast<MClientLease>(m));
96 break;
97 default:
98 derr << "locker unknown message " << m->get_type() << dendl;
99 ceph_abort_msg("locker unknown message");
100 }
101 }
102
103 void Locker::tick()
104 {
105 scatter_tick();
106 caps_tick();
107 }
108
109 /*
110 * locks vs rejoin
111 *
112 *
113 *
114 */
115
116 void Locker::send_lock_message(SimpleLock *lock, int msg)
117 {
118 for (const auto &it : lock->get_parent()->get_replicas()) {
119 if (mds->is_cluster_degraded() &&
120 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
121 continue;
122 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
123 mds->send_message_mds(m, it.first);
124 }
125 }
126
127 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
128 {
129 for (const auto &it : lock->get_parent()->get_replicas()) {
130 if (mds->is_cluster_degraded() &&
131 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
132 continue;
133 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
134 m->set_data(data);
135 mds->send_message_mds(m, it.first);
136 }
137 }
138
139 bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
140 int n, bool want_layout)
141 {
142 dout(10) << __func__ << " " << *mdr << " " << *in << dendl;
143 // rdlock ancestor snaps
144 inodeno_t root;
145 int depth = -1;
146 bool found_locked = false;
147 bool found_layout = false;
148
149 if (want_layout)
150 ceph_assert(n == 0);
151
152 client_t client = mdr->get_client();
153
154 CInode *t = in;
155 while (true) {
156 ++depth;
157 if (!found_locked && mdr->is_rdlocked(&t->snaplock))
158 found_locked = true;
159
160 if (!found_locked) {
161 if (!t->snaplock.can_rdlock(client)) {
162 t->snaplock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
163 goto failed;
164 }
165 t->snaplock.get_rdlock();
166 mdr->locks.emplace(&t->snaplock, MutationImpl::LockOp::RDLOCK);
167 dout(20) << " got rdlock on " << t->snaplock << " " << *t << dendl;
168 }
169 if (want_layout && !found_layout) {
170 if (!mdr->is_rdlocked(&t->policylock)) {
171 if (!t->policylock.can_rdlock(client)) {
172 t->policylock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
173 goto failed;
174 }
175 t->policylock.get_rdlock();
176 mdr->locks.emplace(&t->policylock, MutationImpl::LockOp::RDLOCK);
177 dout(20) << " got rdlock on " << t->policylock << " " << *t << dendl;
178 }
179 if (t->get_projected_inode()->has_layout()) {
180 mdr->dir_layout = t->get_projected_inode()->layout;
181 found_layout = true;
182 }
183 }
184 CDentry* pdn = t->get_projected_parent_dn();
185 if (!pdn) {
186 root = t->ino();
187 break;
188 }
189 t = pdn->get_dir()->get_inode();
190 }
191
192 mdr->dir_root[n] = root;
193 mdr->dir_depth[n] = depth;
194 return true;
195
196 failed:
197 dout(10) << __func__ << " failed" << dendl;
198
199 drop_locks(mdr.get(), nullptr);
200 mdr->drop_local_auth_pins();
201 return false;
202 }
203
204 struct MarkEventOnDestruct {
205 MDRequestRef& mdr;
206 std::string_view message;
207 bool mark_event;
208 MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
209 mdr(_mdr),
210 message(_message),
211 mark_event(true) {}
212 ~MarkEventOnDestruct() {
213 if (mark_event)
214 mdr->mark_event(message);
215 }
216 };
217
218 /* If this function returns false, the mdr has been placed
219 * on the appropriate wait list */
220 bool Locker::acquire_locks(MDRequestRef& mdr,
221 MutationImpl::LockOpVec& lov,
222 CInode *auth_pin_freeze,
223 bool auth_pin_nonblocking)
224 {
225 dout(10) << "acquire_locks " << *mdr << dendl;
226
227 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
228
229 client_t client = mdr->get_client();
230
231 set<MDSCacheObject*> mustpin; // items to authpin
232 if (auth_pin_freeze)
233 mustpin.insert(auth_pin_freeze);
234
235 // xlocks
236 for (size_t i = 0; i < lov.size(); ++i) {
237 auto& p = lov[i];
238 SimpleLock *lock = p.lock;
239 MDSCacheObject *object = lock->get_parent();
240
241 if (p.is_xlock()) {
242 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
243 lock->get_type() == CEPH_LOCK_IPOLICY) &&
244 mds->is_cluster_degraded() &&
245 mdr->is_master() &&
246 !mdr->is_queued_for_replay()) {
247 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
248 // get processed in proper order.
249 bool wait = false;
250 if (object->is_auth()) {
251 if (!mdr->is_xlocked(lock)) {
252 set<mds_rank_t> ls;
253 object->list_replicas(ls);
254 for (auto m : ls) {
255 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
256 wait = true;
257 break;
258 }
259 }
260 }
261 } else {
262 // if the lock is the latest locked one, it's possible that slave mds got the lock
263 // while there are recovering mds.
264 if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock))
265 wait = true;
266 }
267 if (wait) {
268 dout(10) << " must xlock " << *lock << " " << *object
269 << ", waiting for cluster recovered" << dendl;
270 mds->locker->drop_locks(mdr.get(), NULL);
271 mdr->drop_local_auth_pins();
272 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
273 return false;
274 }
275 }
276
277 dout(20) << " must xlock " << *lock << " " << *object << dendl;
278
279 mustpin.insert(object);
280
281 // augment xlock with a versionlock?
282 if (lock->get_type() == CEPH_LOCK_DN) {
283 CDentry *dn = static_cast<CDentry*>(object);
284 if (!dn->is_auth())
285 continue;
286 if (mdr->is_master()) {
287 // master. wrlock versionlock so we can pipeline dentry updates to journal.
288 lov.add_wrlock(&dn->versionlock, i + 1);
289 } else {
290 // slave. exclusively lock the dentry version (i.e. block other journal updates).
291 // this makes rollback safe.
292 lov.add_xlock(&dn->versionlock, i + 1);
293 }
294 }
295 if (lock->get_type() >= CEPH_LOCK_IFIRST && lock->get_type() != CEPH_LOCK_IVERSION) {
296 // inode version lock?
297 CInode *in = static_cast<CInode*>(object);
298 if (!in->is_auth())
299 continue;
300 if (mdr->is_master()) {
301 // master. wrlock versionlock so we can pipeline inode updates to journal.
302 lov.add_wrlock(&in->versionlock, i + 1);
303 } else {
304 // slave. exclusively lock the inode version (i.e. block other journal updates).
305 // this makes rollback safe.
306 lov.add_xlock(&in->versionlock, i + 1);
307 }
308 }
309 } else if (p.is_wrlock()) {
310 dout(20) << " must wrlock " << *lock << " " << *object << dendl;
311 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
312 if (object->is_auth()) {
313 mustpin.insert(object);
314 } else if (!object->is_auth() &&
315 !lock->can_wrlock(_client) && // we might have to request a scatter
316 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
317 dout(15) << " will also auth_pin " << *object
318 << " in case we need to request a scatter" << dendl;
319 mustpin.insert(object);
320 }
321 } else if (p.is_remote_wrlock()) {
322 dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
323 << *lock << " " << *object << dendl;
324 mustpin.insert(object);
325 } else if (p.is_rdlock()) {
326
327 dout(20) << " must rdlock " << *lock << " " << *object << dendl;
328 if (object->is_auth()) {
329 mustpin.insert(object);
330 } else if (!object->is_auth() &&
331 !lock->can_rdlock(client)) { // we might have to request an rdlock
332 dout(15) << " will also auth_pin " << *object
333 << " in case we need to request a rdlock" << dendl;
334 mustpin.insert(object);
335 }
336 } else {
337 ceph_assert(0 == "locker unknown lock operation");
338 }
339 }
340
341 lov.sort_and_merge();
342
343 // AUTH PINS
344 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
345
346 // can i auth pin them all now?
347 marker.message = "failed to authpin local pins";
348 for (const auto &p : mustpin) {
349 MDSCacheObject *object = p;
350
351 dout(10) << " must authpin " << *object << dendl;
352
353 if (mdr->is_auth_pinned(object)) {
354 if (object != (MDSCacheObject*)auth_pin_freeze)
355 continue;
356 if (mdr->more()->is_remote_frozen_authpin) {
357 if (mdr->more()->rename_inode == auth_pin_freeze)
358 continue;
359 // unfreeze auth pin for the wrong inode
360 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
361 }
362 }
363
364 if (!object->is_auth()) {
365 if (mdr->lock_cache) { // debug
366 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
367 CDentry *dn = mdr->dn[0].back();
368 ceph_assert(dn->get_projected_linkage()->is_remote());
369 }
370
371 if (object->is_ambiguous_auth()) {
372 // wait
373 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
374 mdr->disable_lock_cache();
375 drop_locks(mdr.get());
376 mdr->drop_local_auth_pins();
377 marker.message = "waiting for single auth, object is being migrated";
378 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
379 return false;
380 }
381 mustpin_remote[object->authority().first].insert(object);
382 continue;
383 }
384 int err = 0;
385 if (!object->can_auth_pin(&err)) {
386 if (mdr->lock_cache) {
387 CDir *dir;
388 if (CInode *in = dynamic_cast<CInode*>(object)) {
389 ceph_assert(!in->is_frozen_inode() && !in->is_frozen_auth_pin());
390 dir = in->get_projected_parent_dir();
391 } else if (CDentry *dn = dynamic_cast<CDentry*>(object)) {
392 dir = dn->get_dir();
393 } else {
394 ceph_assert(0 == "unknown type of lock parent");
395 }
396 if (dir->get_inode() == mdr->lock_cache->get_dir_inode()) {
397 // forcibly auth pin if there is lock cache on parent dir
398 continue;
399 }
400
401 { // debug
402 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
403 CDentry *dn = mdr->dn[0].back();
404 ceph_assert(dn->get_projected_linkage()->is_remote());
405 }
406 }
407
408 // wait
409 mdr->disable_lock_cache();
410 drop_locks(mdr.get());
411 mdr->drop_local_auth_pins();
412 if (auth_pin_nonblocking) {
413 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
414 mdr->aborted = true;
415 return false;
416 }
417 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
418 marker.message = "failed to authpin, subtree is being exported";
419 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
420 marker.message = "failed to authpin, dir is being fragmented";
421 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
422 marker.message = "failed to authpin, inode is being exported";
423 }
424 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
425 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
426
427 if (mdr->is_any_remote_auth_pin())
428 notify_freeze_waiter(object);
429
430 return false;
431 }
432 }
433
434 // ok, grab local auth pins
435 for (const auto& p : mustpin) {
436 MDSCacheObject *object = p;
437 if (mdr->is_auth_pinned(object)) {
438 dout(10) << " already auth_pinned " << *object << dendl;
439 } else if (object->is_auth()) {
440 dout(10) << " auth_pinning " << *object << dendl;
441 mdr->auth_pin(object);
442 }
443 }
444
445 // request remote auth_pins
446 if (!mustpin_remote.empty()) {
447 marker.message = "requesting remote authpins";
448 for (const auto& p : mdr->object_states) {
449 if (p.second.remote_auth_pinned == MDS_RANK_NONE)
450 continue;
451 ceph_assert(p.second.remote_auth_pinned == p.first->authority().first);
452 auto q = mustpin_remote.find(p.second.remote_auth_pinned);
453 if (q != mustpin_remote.end())
454 q->second.insert(p.first);
455 }
456
457 for (auto& p : mustpin_remote) {
458 dout(10) << "requesting remote auth_pins from mds." << p.first << dendl;
459
460 // wait for active auth
461 if (mds->is_cluster_degraded() &&
462 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) {
463 dout(10) << " mds." << p.first << " is not active" << dendl;
464 if (mdr->more()->waiting_on_slave.empty())
465 mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr));
466 return false;
467 }
468
469 auto req = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt,
470 MMDSSlaveRequest::OP_AUTHPIN);
471 for (auto& o : p.second) {
472 dout(10) << " req remote auth_pin of " << *o << dendl;
473 MDSCacheObjectInfo info;
474 o->set_object_info(info);
475 req->get_authpins().push_back(info);
476 if (o == auth_pin_freeze)
477 o->set_object_info(req->get_authpin_freeze());
478 mdr->pin(o);
479 }
480 if (auth_pin_nonblocking)
481 req->mark_nonblocking();
482 else if (!mdr->locks.empty())
483 req->mark_notify_blocking();
484
485 mds->send_message_mds(req, p.first);
486
487 // put in waiting list
488 auto ret = mdr->more()->waiting_on_slave.insert(p.first);
489 ceph_assert(ret.second);
490 }
491 return false;
492 }
493
494 // caps i'll need to issue
495 set<CInode*> issue_set;
496 bool result = false;
497
498 // acquire locks.
499 // make sure they match currently acquired locks.
500 for (const auto& p : lov) {
501 auto lock = p.lock;
502 if (p.is_xlock()) {
503 if (mdr->is_xlocked(lock)) {
504 dout(10) << " already xlocked " << *lock << " " << *lock->get_parent() << dendl;
505 continue;
506 }
507 if (mdr->locking && lock != mdr->locking)
508 cancel_locking(mdr.get(), &issue_set);
509 if (!xlock_start(lock, mdr)) {
510 marker.message = "failed to xlock, waiting";
511 goto out;
512 }
513 dout(10) << " got xlock on " << *lock << " " << *lock->get_parent() << dendl;
514 } else if (p.is_wrlock() || p.is_remote_wrlock()) {
515 auto it = mdr->locks.find(lock);
516 if (p.is_remote_wrlock()) {
517 if (it != mdr->locks.end() && it->is_remote_wrlock()) {
518 dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl;
519 } else {
520 if (mdr->locking && lock != mdr->locking)
521 cancel_locking(mdr.get(), &issue_set);
522 marker.message = "waiting for remote wrlocks";
523 remote_wrlock_start(lock, p.wrlock_target, mdr);
524 goto out;
525 }
526 }
527 if (p.is_wrlock()) {
528 if (it != mdr->locks.end() && it->is_wrlock()) {
529 dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl;
530 continue;
531 }
532 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
533 if (p.is_remote_wrlock()) {
534 // nowait if we have already gotten remote wrlock
535 if (!wrlock_try(lock, mdr, _client)) {
536 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
537 // can't take the wrlock because the scatter lock is gathering. need to
538 // release the remote wrlock, so that the gathering process can finish.
539 ceph_assert(it != mdr->locks.end());
540 remote_wrlock_finish(it, mdr.get());
541 remote_wrlock_start(lock, p.wrlock_target, mdr);
542 goto out;
543 }
544 } else {
545 if (!wrlock_start(p, mdr)) {
546 ceph_assert(!p.is_remote_wrlock());
547 marker.message = "failed to wrlock, waiting";
548 goto out;
549 }
550 }
551 dout(10) << " got wrlock on " << *lock << " " << *lock->get_parent() << dendl;
552 }
553 } else {
554 if (mdr->is_rdlocked(lock)) {
555 dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl;
556 continue;
557 }
558
559 ceph_assert(mdr->is_master());
560 if (lock->needs_recover()) {
561 if (mds->is_cluster_degraded()) {
562 if (!mdr->is_queued_for_replay()) {
563 // see comments in SimpleLock::set_state_rejoin() and
564 // ScatterLock::encode_state_for_rejoin()
565 drop_locks(mdr.get());
566 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
567 dout(10) << " rejoin recovering " << *lock << " " << *lock->get_parent()
568 << ", waiting for cluster recovered" << dendl;
569 marker.message = "rejoin recovering lock, waiting for cluster recovered";
570 return false;
571 }
572 } else {
573 lock->clear_need_recover();
574 }
575 }
576
577 if (!rdlock_start(lock, mdr)) {
578 marker.message = "failed to rdlock, waiting";
579 goto out;
580 }
581 dout(10) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
582 }
583 }
584
585 mdr->set_mds_stamp(ceph_clock_now());
586 result = true;
587 marker.message = "acquired locks";
588
589 out:
590 issue_caps_set(issue_set);
591 return result;
592 }
593
594 void Locker::notify_freeze_waiter(MDSCacheObject *o)
595 {
596 CDir *dir = NULL;
597 if (CInode *in = dynamic_cast<CInode*>(o)) {
598 if (!in->is_root())
599 dir = in->get_parent_dir();
600 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
601 dir = dn->get_dir();
602 } else {
603 dir = dynamic_cast<CDir*>(o);
604 ceph_assert(dir);
605 }
606 if (dir) {
607 if (dir->is_freezing_dir())
608 mdcache->fragment_freeze_inc_num_waiters(dir);
609 if (dir->is_freezing_tree()) {
610 while (!dir->is_freezing_tree_root())
611 dir = dir->get_parent_dir();
612 mdcache->migrator->export_freeze_inc_num_waiters(dir);
613 }
614 }
615 }
616
617 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
618 {
619 for (const auto &p : mut->locks) {
620 if (!p.is_xlock())
621 continue;
622 MDSCacheObject *obj = p.lock->get_parent();
623 ceph_assert(obj->is_auth());
624 if (skip_dentry &&
625 (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
626 continue;
627 dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
628 p.lock->set_xlock_done();
629 }
630 }
631
632 void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
633 bool drop_rdlocks)
634 {
635 set<mds_rank_t> slaves;
636
637 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
638 SimpleLock *lock = it->lock;
639 MDSCacheObject *obj = lock->get_parent();
640
641 if (it->is_xlock()) {
642 if (obj->is_auth()) {
643 bool ni = false;
644 xlock_finish(it++, mut, &ni);
645 if (ni)
646 pneed_issue->insert(static_cast<CInode*>(obj));
647 } else {
648 ceph_assert(lock->get_sm()->can_remote_xlock);
649 slaves.insert(obj->authority().first);
650 lock->put_xlock();
651 mut->locks.erase(it++);
652 }
653 } else if (it->is_wrlock() || it->is_remote_wrlock()) {
654 if (it->is_remote_wrlock()) {
655 slaves.insert(it->wrlock_target);
656 it->clear_remote_wrlock();
657 }
658 if (it->is_wrlock()) {
659 bool ni = false;
660 wrlock_finish(it++, mut, &ni);
661 if (ni)
662 pneed_issue->insert(static_cast<CInode*>(obj));
663 } else {
664 mut->locks.erase(it++);
665 }
666 } else if (drop_rdlocks && it->is_rdlock()) {
667 bool ni = false;
668 rdlock_finish(it++, mut, &ni);
669 if (ni)
670 pneed_issue->insert(static_cast<CInode*>(obj));
671 } else {
672 ++it;
673 }
674 }
675
676 if (drop_rdlocks) {
677 if (mut->lock_cache) {
678 put_lock_cache(mut->lock_cache);
679 mut->lock_cache = nullptr;
680 }
681 }
682
683 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
684 if (!mds->is_cluster_degraded() ||
685 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
686 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
687 auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt,
688 MMDSSlaveRequest::OP_DROPLOCKS);
689 mds->send_message_mds(slavereq, *p);
690 }
691 }
692 }
693
694 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
695 {
696 SimpleLock *lock = mut->locking;
697 ceph_assert(lock);
698 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
699
700 if (lock->get_parent()->is_auth()) {
701 bool need_issue = false;
702 if (lock->get_state() == LOCK_PREXLOCK) {
703 _finish_xlock(lock, -1, &need_issue);
704 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
705 lock->set_state(LOCK_XLOCKDONE);
706 eval_gather(lock, true, &need_issue);
707 }
708 if (need_issue)
709 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
710 }
711 mut->finish_locking(lock);
712 }
713
714 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
715 {
716 // leftover locks
717 set<CInode*> my_need_issue;
718 if (!pneed_issue)
719 pneed_issue = &my_need_issue;
720
721 if (mut->locking)
722 cancel_locking(mut, pneed_issue);
723 _drop_locks(mut, pneed_issue, true);
724
725 if (pneed_issue == &my_need_issue)
726 issue_caps_set(*pneed_issue);
727 mut->locking_state = 0;
728 }
729
730 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
731 {
732 set<CInode*> my_need_issue;
733 if (!pneed_issue)
734 pneed_issue = &my_need_issue;
735
736 _drop_locks(mut, pneed_issue, false);
737
738 if (pneed_issue == &my_need_issue)
739 issue_caps_set(*pneed_issue);
740 }
741
742 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
743 {
744 set<CInode*> need_issue;
745
746 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
747 if (!it->is_rdlock()) {
748 ++it;
749 continue;
750 }
751 SimpleLock *lock = it->lock;
752 // make later mksnap/setlayout (at other mds) wait for this unsafe request
753 if (lock->get_type() == CEPH_LOCK_ISNAP ||
754 lock->get_type() == CEPH_LOCK_IPOLICY) {
755 ++it;
756 continue;
757 }
758 bool ni = false;
759 rdlock_finish(it++, mut, &ni);
760 if (ni)
761 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
762 }
763
764 issue_caps_set(need_issue);
765 }
766
767 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
768 {
769 set<CInode*> need_issue;
770
771 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
772 SimpleLock *lock = it->lock;
773 if (lock->get_type() == CEPH_LOCK_IDFT) {
774 ++it;
775 continue;
776 }
777 bool ni = false;
778 wrlock_finish(it++, mut, &ni);
779 if (ni)
780 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
781 }
782 issue_caps_set(need_issue);
783 }
784
785 class C_MDL_DropCache : public LockerContext {
786 MDLockCache *lock_cache;
787 public:
788 C_MDL_DropCache(Locker *l, MDLockCache *lc) :
789 LockerContext(l), lock_cache(lc) { }
790 void finish(int r) override {
791 locker->drop_locks(lock_cache);
792 lock_cache->cleanup();
793 delete lock_cache;
794 }
795 };
796
797 void Locker::put_lock_cache(MDLockCache* lock_cache)
798 {
799 ceph_assert(lock_cache->ref > 0);
800 if (--lock_cache->ref > 0)
801 return;
802
803 ceph_assert(lock_cache->invalidating);
804
805 lock_cache->detach_locks();
806
807 CInode *diri = lock_cache->get_dir_inode();
808 for (auto dir : lock_cache->auth_pinned_dirfrags) {
809 if (dir->get_inode() != diri)
810 continue;
811 dir->enable_frozen_inode();
812 }
813
814 mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
815 }
816
817 int Locker::get_cap_bit_for_lock_cache(int op)
818 {
819 switch(op) {
820 case CEPH_MDS_OP_CREATE:
821 return CEPH_CAP_DIR_CREATE;
822 case CEPH_MDS_OP_UNLINK:
823 return CEPH_CAP_DIR_UNLINK;
824 default:
825 ceph_assert(0 == "unsupported operation");
826 return 0;
827 }
828 }
829
830 void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
831 {
832 ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
833 if (lock_cache->invalidating) {
834 ceph_assert(!lock_cache->client_cap);
835 } else {
836 lock_cache->invalidating = true;
837 lock_cache->detach_dirfrags();
838 }
839
840 Capability *cap = lock_cache->client_cap;
841 if (cap) {
842 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
843 cap->clear_lock_cache_allowed(cap_bit);
844 if (cap->issued() & cap_bit)
845 issue_caps(lock_cache->get_dir_inode(), cap);
846 else
847 cap = nullptr;
848 }
849
850 if (!cap) {
851 lock_cache->item_cap_lock_cache.remove_myself();
852 put_lock_cache(lock_cache);
853 }
854 }
855
856 void Locker::eval_lock_caches(Capability *cap)
857 {
858 for (auto p = cap->lock_caches.begin(); !p.end(); ) {
859 MDLockCache *lock_cache = *p;
860 ++p;
861 if (!lock_cache->invalidating)
862 continue;
863 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
864 if (!(cap->issued() & cap_bit)) {
865 lock_cache->item_cap_lock_cache.remove_myself();
866 put_lock_cache(lock_cache);
867 }
868 }
869 }
870
871 // ask lock caches to release auth pins
872 void Locker::invalidate_lock_caches(CDir *dir)
873 {
874 dout(10) << "invalidate_lock_caches on " << *dir << dendl;
875 auto &lock_caches = dir->lock_caches_with_auth_pins;
876 while (!lock_caches.empty()) {
877 invalidate_lock_cache(lock_caches.front()->parent);
878 }
879 }
880
881 // ask lock caches to release locks
882 void Locker::invalidate_lock_caches(SimpleLock *lock)
883 {
884 dout(10) << "invalidate_lock_caches " << *lock << " on " << *lock->get_parent() << dendl;
885 if (lock->is_cached()) {
886 auto&& lock_caches = lock->get_active_caches();
887 for (auto& lc : lock_caches)
888 invalidate_lock_cache(lc);
889 }
890 }
891
892 void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout)
893 {
894 if (mdr->lock_cache)
895 return;
896
897 client_t client = mdr->get_client();
898 int opcode = mdr->client_request->get_op();
899 dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
900
901 if (!diri->is_auth()) {
902 dout(10) << " dir inode is not auth, noop" << dendl;
903 return;
904 }
905
906 if (mdr->has_more() && !mdr->more()->slaves.empty()) {
907 dout(10) << " there are slaves requests for " << *mdr << ", noop" << dendl;
908 return;
909 }
910
911 Capability *cap = diri->get_client_cap(client);
912 if (!cap) {
913 dout(10) << " there is no cap for client." << client << ", noop" << dendl;
914 return;
915 }
916
917 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
918 if ((*p)->opcode == opcode) {
919 dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode) << ", noop" << dendl;
920 return;
921 }
922 }
923
924 set<MDSCacheObject*> ancestors;
925 for (CInode *in = diri; ; ) {
926 CDentry *pdn = in->get_projected_parent_dn();
927 if (!pdn)
928 break;
929 // ancestors.insert(pdn);
930 in = pdn->get_dir()->get_inode();
931 ancestors.insert(in);
932 }
933
934 for (auto& p : mdr->object_states) {
935 if (p.first != diri && !ancestors.count(p.first))
936 continue;
937 auto& stat = p.second;
938 if (stat.auth_pinned) {
939 if (!p.first->can_auth_pin()) {
940 dout(10) << " can't auth_pin(freezing?) lock parent " << *p.first << ", noop" << dendl;
941 return;
942 }
943 if (CInode *in = dynamic_cast<CInode*>(p.first); in->is_parent_projected()) {
944 CDir *dir = in->get_projected_parent_dir();
945 if (!dir->can_auth_pin()) {
946 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
947 return;
948 }
949 }
950 }
951 }
952
953 std::vector<CDir*> dfv;
954 dfv.reserve(diri->get_num_dirfrags());
955
956 diri->get_dirfrags(dfv);
957 for (auto dir : dfv) {
958 if (!dir->is_auth() || !dir->can_auth_pin()) {
959 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
960 return;
961 }
962 if (dir->is_any_freezing_or_frozen_inode()) {
963 dout(10) << " there is freezing/frozen inode in " << *dir << ", noop" << dendl;
964 return;
965 }
966 }
967
968 for (auto& p : mdr->locks) {
969 MDSCacheObject *obj = p.lock->get_parent();
970 if (obj != diri && !ancestors.count(obj))
971 continue;
972 if (!p.lock->is_stable()) {
973 dout(10) << " unstable " << *p.lock << " on " << *obj << ", noop" << dendl;
974 return;
975 }
976 }
977
978 auto lock_cache = new MDLockCache(cap, opcode);
979 if (dir_layout)
980 lock_cache->set_dir_layout(*dir_layout);
981 cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
982
983 for (auto dir : dfv) {
984 // prevent subtree migration
985 lock_cache->auth_pin(dir);
986 // prevent frozen inode
987 dir->disable_frozen_inode();
988 }
989
990 for (auto& p : mdr->object_states) {
991 if (p.first != diri && !ancestors.count(p.first))
992 continue;
993 auto& stat = p.second;
994 if (stat.auth_pinned)
995 lock_cache->auth_pin(p.first);
996 else
997 lock_cache->pin(p.first);
998
999 if (CInode *in = dynamic_cast<CInode*>(p.first)) {
1000 CDentry *pdn = in->get_projected_parent_dn();
1001 if (pdn)
1002 dfv.push_back(pdn->get_dir());
1003 } else if (CDentry *dn = dynamic_cast<CDentry*>(p.first)) {
1004 dfv.push_back(dn->get_dir());
1005 } else {
1006 ceph_assert(0 == "unknown type of lock parent");
1007 }
1008 }
1009 lock_cache->attach_dirfrags(std::move(dfv));
1010
1011 for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) {
1012 MDSCacheObject *obj = it->lock->get_parent();
1013 if (obj != diri && !ancestors.count(obj)) {
1014 ++it;
1015 continue;
1016 }
1017 unsigned lock_flag = 0;
1018 if (it->is_wrlock()) {
1019 // skip wrlocks that were added by MDCache::predirty_journal_parent()
1020 if (obj == diri)
1021 lock_flag = MutationImpl::LockOp::WRLOCK;
1022 } else {
1023 ceph_assert(it->is_rdlock());
1024 lock_flag = MutationImpl::LockOp::RDLOCK;
1025 }
1026 if (lock_flag) {
1027 lock_cache->emplace_lock(it->lock, lock_flag);
1028 mdr->locks.erase(it++);
1029 } else {
1030 ++it;
1031 }
1032 }
1033 lock_cache->attach_locks();
1034
1035 lock_cache->ref++;
1036 mdr->lock_cache = lock_cache;
1037 }
1038
1039 bool Locker::find_and_attach_lock_cache(MDRequestRef& mdr, CInode *diri)
1040 {
1041 if (mdr->lock_cache)
1042 return true;
1043
1044 Capability *cap = diri->get_client_cap(mdr->get_client());
1045 if (!cap)
1046 return false;
1047
1048 int opcode = mdr->client_request->get_op();
1049 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
1050 MDLockCache *lock_cache = *p;
1051 if (lock_cache->opcode == opcode) {
1052 dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl;
1053 mdr->lock_cache = lock_cache;
1054 mdr->lock_cache->ref++;
1055 return true;
1056 }
1057 }
1058 return false;
1059 }
1060
1061 // generics
1062
1063 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
1064 {
1065 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
1066 ceph_assert(!lock->is_stable());
1067
1068 int next = lock->get_next_state();
1069
1070 CInode *in = 0;
1071 bool caps = lock->get_cap_shift();
1072 if (lock->get_type() != CEPH_LOCK_DN)
1073 in = static_cast<CInode *>(lock->get_parent());
1074
1075 bool need_issue = false;
1076
1077 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
1078 ceph_assert(!caps || in != NULL);
1079 if (caps && in->is_head()) {
1080 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
1081 lock->get_cap_shift(), lock->get_cap_mask());
1082 dout(10) << " next state is " << lock->get_state_name(next)
1083 << " issued/allows loner " << gcap_string(loner_issued)
1084 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
1085 << " xlocker " << gcap_string(xlocker_issued)
1086 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
1087 << " other " << gcap_string(other_issued)
1088 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
1089 << dendl;
1090
1091 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
1092 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
1093 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
1094 need_issue = true;
1095 }
1096
1097 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
1098 bool auth = lock->get_parent()->is_auth();
1099 if (!lock->is_gathering() &&
1100 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
1101 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
1102 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
1103 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
1104 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
1105 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
1106 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
1107 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
1108 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
1109 lock->get_state() != LOCK_MIX_SYNC2
1110 ) {
1111 dout(7) << "eval_gather finished gather on " << *lock
1112 << " on " << *lock->get_parent() << dendl;
1113
1114 if (lock->get_sm() == &sm_filelock) {
1115 ceph_assert(in);
1116 if (in->state_test(CInode::STATE_RECOVERING)) {
1117 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
1118 return;
1119 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
1120 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
1121 mds->mdcache->queue_file_recover(in);
1122 mds->mdcache->do_file_recover();
1123 return;
1124 }
1125 }
1126
1127 if (!lock->get_parent()->is_auth()) {
1128 // replica: tell auth
1129 mds_rank_t auth = lock->get_parent()->authority().first;
1130
1131 if (lock->get_parent()->is_rejoining() &&
1132 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
1133 dout(7) << "eval_gather finished gather, but still rejoining "
1134 << *lock->get_parent() << dendl;
1135 return;
1136 }
1137
1138 if (!mds->is_cluster_degraded() ||
1139 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1140 switch (lock->get_state()) {
1141 case LOCK_SYNC_LOCK:
1142 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
1143 break;
1144
1145 case LOCK_MIX_SYNC:
1146 {
1147 auto reply = make_message<MLock>(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
1148 lock->encode_locked_state(reply->get_data());
1149 mds->send_message_mds(reply, auth);
1150 next = LOCK_MIX_SYNC2;
1151 (static_cast<ScatterLock *>(lock))->start_flush();
1152 }
1153 break;
1154
1155 case LOCK_MIX_SYNC2:
1156 (static_cast<ScatterLock *>(lock))->finish_flush();
1157 (static_cast<ScatterLock *>(lock))->clear_flushed();
1158
1159 case LOCK_SYNC_MIX2:
1160 // do nothing, we already acked
1161 break;
1162
1163 case LOCK_SYNC_MIX:
1164 {
1165 auto reply = make_message<MLock>(lock, LOCK_AC_MIXACK, mds->get_nodeid());
1166 mds->send_message_mds(reply, auth);
1167 next = LOCK_SYNC_MIX2;
1168 }
1169 break;
1170
1171 case LOCK_MIX_LOCK:
1172 {
1173 bufferlist data;
1174 lock->encode_locked_state(data);
1175 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
1176 (static_cast<ScatterLock *>(lock))->start_flush();
1177 // we'll get an AC_LOCKFLUSHED to complete
1178 }
1179 break;
1180
1181 default:
1182 ceph_abort();
1183 }
1184 }
1185 } else {
1186 // auth
1187
1188 // once the first (local) stage of mix->lock gather complete we can
1189 // gather from replicas
1190 if (lock->get_state() == LOCK_MIX_LOCK &&
1191 lock->get_parent()->is_replicated()) {
1192 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
1193 send_lock_message(lock, LOCK_AC_LOCK);
1194 lock->init_gather();
1195 lock->set_state(LOCK_MIX_LOCK2);
1196 return;
1197 }
1198
1199 if (lock->is_dirty() && !lock->is_flushed()) {
1200 scatter_writebehind(static_cast<ScatterLock *>(lock));
1201 mds->mdlog->flush();
1202 return;
1203 }
1204 lock->clear_flushed();
1205
1206 switch (lock->get_state()) {
1207 // to mixed
1208 case LOCK_TSYN_MIX:
1209 case LOCK_SYNC_MIX:
1210 case LOCK_EXCL_MIX:
1211 case LOCK_XSYN_MIX:
1212 in->start_scatter(static_cast<ScatterLock *>(lock));
1213 if (lock->get_parent()->is_replicated()) {
1214 bufferlist softdata;
1215 lock->encode_locked_state(softdata);
1216 send_lock_message(lock, LOCK_AC_MIX, softdata);
1217 }
1218 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
1219 break;
1220
1221 case LOCK_XLOCK:
1222 case LOCK_XLOCKDONE:
1223 if (next != LOCK_SYNC)
1224 break;
1225 // fall-thru
1226
1227 // to sync
1228 case LOCK_EXCL_SYNC:
1229 case LOCK_LOCK_SYNC:
1230 case LOCK_MIX_SYNC:
1231 case LOCK_XSYN_SYNC:
1232 if (lock->get_parent()->is_replicated()) {
1233 bufferlist softdata;
1234 lock->encode_locked_state(softdata);
1235 send_lock_message(lock, LOCK_AC_SYNC, softdata);
1236 }
1237 break;
1238 }
1239
1240 }
1241
1242 lock->set_state(next);
1243
1244 if (lock->get_parent()->is_auth() &&
1245 lock->is_stable())
1246 lock->get_parent()->auth_unpin(lock);
1247
1248 // drop loner before doing waiters
1249 if (caps &&
1250 in->is_head() &&
1251 in->is_auth() &&
1252 in->get_wanted_loner() != in->get_loner()) {
1253 dout(10) << " trying to drop loner" << dendl;
1254 if (in->try_drop_loner()) {
1255 dout(10) << " dropped loner" << dendl;
1256 need_issue = true;
1257 }
1258 }
1259
1260 if (pfinishers)
1261 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1262 *pfinishers);
1263 else
1264 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1265
1266 if (caps && in->is_head())
1267 need_issue = true;
1268
1269 if (lock->get_parent()->is_auth() &&
1270 lock->is_stable())
1271 try_eval(lock, &need_issue);
1272 }
1273
1274 if (need_issue) {
1275 if (pneed_issue)
1276 *pneed_issue = true;
1277 else if (in->is_head())
1278 issue_caps(in);
1279 }
1280
1281 }
1282
1283 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1284 {
1285 bool need_issue = caps_imported;
1286 MDSContext::vec finishers;
1287
1288 dout(10) << "eval " << mask << " " << *in << dendl;
1289
1290 // choose loner?
1291 if (in->is_auth() && in->is_head()) {
1292 client_t orig_loner = in->get_loner();
1293 if (in->choose_ideal_loner()) {
1294 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1295 need_issue = true;
1296 mask = -1;
1297 } else if (in->get_wanted_loner() != in->get_loner()) {
1298 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1299 mask = -1;
1300 }
1301 }
1302
1303 retry:
1304 if (mask & CEPH_LOCK_IFILE)
1305 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1306 if (mask & CEPH_LOCK_IAUTH)
1307 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1308 if (mask & CEPH_LOCK_ILINK)
1309 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1310 if (mask & CEPH_LOCK_IXATTR)
1311 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1312 if (mask & CEPH_LOCK_INEST)
1313 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1314 if (mask & CEPH_LOCK_IFLOCK)
1315 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1316 if (mask & CEPH_LOCK_IPOLICY)
1317 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1318
1319 // drop loner?
1320 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1321 if (in->try_drop_loner()) {
1322 need_issue = true;
1323 if (in->get_wanted_loner() >= 0) {
1324 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1325 bool ok = in->try_set_loner();
1326 ceph_assert(ok);
1327 mask = -1;
1328 goto retry;
1329 }
1330 }
1331 }
1332
1333 finish_contexts(g_ceph_context, finishers);
1334
1335 if (need_issue && in->is_head())
1336 issue_caps(in);
1337
1338 dout(10) << "eval done" << dendl;
1339 return need_issue;
1340 }
1341
1342 class C_Locker_Eval : public LockerContext {
1343 MDSCacheObject *p;
1344 int mask;
1345 public:
1346 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1347 // We are used as an MDSCacheObject waiter, so should
1348 // only be invoked by someone already holding the big lock.
1349 ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock));
1350 p->get(MDSCacheObject::PIN_PTRWAITER);
1351 }
1352 void finish(int r) override {
1353 locker->try_eval(p, mask);
1354 p->put(MDSCacheObject::PIN_PTRWAITER);
1355 }
1356 };
1357
1358 void Locker::try_eval(MDSCacheObject *p, int mask)
1359 {
1360 // unstable and ambiguous auth?
1361 if (p->is_ambiguous_auth()) {
1362 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1363 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1364 return;
1365 }
1366
1367 if (p->is_auth() && p->is_frozen()) {
1368 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1369 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1370 return;
1371 }
1372
1373 if (mask & CEPH_LOCK_DN) {
1374 ceph_assert(mask == CEPH_LOCK_DN);
1375 bool need_issue = false; // ignore this, no caps on dentries
1376 CDentry *dn = static_cast<CDentry *>(p);
1377 eval_any(&dn->lock, &need_issue);
1378 } else {
1379 CInode *in = static_cast<CInode *>(p);
1380 eval(in, mask);
1381 }
1382 }
1383
1384 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1385 {
1386 MDSCacheObject *p = lock->get_parent();
1387
1388 // unstable and ambiguous auth?
1389 if (p->is_ambiguous_auth()) {
1390 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1391 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1392 return;
1393 }
1394
1395 if (!p->is_auth()) {
1396 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1397 return;
1398 }
1399
1400 if (p->is_frozen()) {
1401 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1402 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1403 return;
1404 }
1405
1406 /*
1407 * We could have a situation like:
1408 *
1409 * - mds A authpins item on mds B
1410 * - mds B starts to freeze tree containing item
1411 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1412 * - mds B lock is unstable, sets scatter_wanted
1413 * - mds B lock stabilizes, calls try_eval.
1414 *
1415 * We can defer while freezing without causing a deadlock. Honor
1416 * scatter_wanted flag here. This will never get deferred by the
1417 * checks above due to the auth_pin held by the master.
1418 */
1419 if (lock->is_scatterlock()) {
1420 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1421 if (slock->get_scatter_wanted() &&
1422 slock->get_state() != LOCK_MIX) {
1423 scatter_mix(slock, pneed_issue);
1424 if (!lock->is_stable())
1425 return;
1426 } else if (slock->get_unscatter_wanted() &&
1427 slock->get_state() != LOCK_LOCK) {
1428 simple_lock(slock, pneed_issue);
1429 if (!lock->is_stable()) {
1430 return;
1431 }
1432 }
1433 }
1434
1435 if (lock->get_type() != CEPH_LOCK_DN &&
1436 lock->get_type() != CEPH_LOCK_ISNAP &&
1437 lock->get_type() != CEPH_LOCK_IPOLICY &&
1438 p->is_freezing()) {
1439 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1440 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1441 return;
1442 }
1443
1444 eval(lock, pneed_issue);
1445 }
1446
1447 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1448 {
1449 bool need_issue = false;
1450 MDSContext::vec finishers;
1451
1452 // kick locks now
1453 if (!in->filelock.is_stable())
1454 eval_gather(&in->filelock, false, &need_issue, &finishers);
1455 if (!in->authlock.is_stable())
1456 eval_gather(&in->authlock, false, &need_issue, &finishers);
1457 if (!in->linklock.is_stable())
1458 eval_gather(&in->linklock, false, &need_issue, &finishers);
1459 if (!in->xattrlock.is_stable())
1460 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1461
1462 if (need_issue && in->is_head()) {
1463 if (issue_set)
1464 issue_set->insert(in);
1465 else
1466 issue_caps(in);
1467 }
1468
1469 finish_contexts(g_ceph_context, finishers);
1470 }
1471
1472 void Locker::eval_scatter_gathers(CInode *in)
1473 {
1474 bool need_issue = false;
1475 MDSContext::vec finishers;
1476
1477 dout(10) << "eval_scatter_gathers " << *in << dendl;
1478
1479 // kick locks now
1480 if (!in->filelock.is_stable())
1481 eval_gather(&in->filelock, false, &need_issue, &finishers);
1482 if (!in->nestlock.is_stable())
1483 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1484 if (!in->dirfragtreelock.is_stable())
1485 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1486
1487 if (need_issue && in->is_head())
1488 issue_caps(in);
1489
1490 finish_contexts(g_ceph_context, finishers);
1491 }
1492
1493 void Locker::eval(SimpleLock *lock, bool *need_issue)
1494 {
1495 switch (lock->get_type()) {
1496 case CEPH_LOCK_IFILE:
1497 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1498 case CEPH_LOCK_IDFT:
1499 case CEPH_LOCK_INEST:
1500 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1501 default:
1502 return simple_eval(lock, need_issue);
1503 }
1504 }
1505
1506
1507 // ------------------
1508 // rdlock
1509
1510 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1511 {
1512 // kick the lock
1513 if (lock->is_stable()) {
1514 if (lock->get_parent()->is_auth()) {
1515 if (lock->get_sm() == &sm_scatterlock) {
1516 // not until tempsync is fully implemented
1517 //if (lock->get_parent()->is_replicated())
1518 //scatter_tempsync((ScatterLock*)lock);
1519 //else
1520 simple_sync(lock);
1521 } else if (lock->get_sm() == &sm_filelock) {
1522 CInode *in = static_cast<CInode*>(lock->get_parent());
1523 if (lock->get_state() == LOCK_EXCL &&
1524 in->get_target_loner() >= 0 &&
1525 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1526 file_xsyn(lock);
1527 else
1528 simple_sync(lock);
1529 } else
1530 simple_sync(lock);
1531 return true;
1532 } else {
1533 // request rdlock state change from auth
1534 mds_rank_t auth = lock->get_parent()->authority().first;
1535 if (!mds->is_cluster_degraded() ||
1536 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1537 dout(10) << "requesting rdlock from auth on "
1538 << *lock << " on " << *lock->get_parent() << dendl;
1539 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1540 }
1541 return false;
1542 }
1543 }
1544 if (lock->get_type() == CEPH_LOCK_IFILE) {
1545 CInode *in = static_cast<CInode *>(lock->get_parent());
1546 if (in->state_test(CInode::STATE_RECOVERING)) {
1547 mds->mdcache->recovery_queue.prioritize(in);
1548 }
1549 }
1550
1551 return false;
1552 }
1553
1554 bool Locker::rdlock_try(SimpleLock *lock, client_t client)
1555 {
1556 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1557
1558 // can read? grab ref.
1559 if (lock->can_rdlock(client))
1560 return true;
1561
1562 _rdlock_kick(lock, false);
1563
1564 if (lock->can_rdlock(client))
1565 return true;
1566
1567 return false;
1568 }
1569
1570 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1571 {
1572 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1573
1574 // client may be allowed to rdlock the same item it has xlocked.
1575 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1576 if (mut->snapid != CEPH_NOSNAP)
1577 as_anon = true;
1578 client_t client = as_anon ? -1 : mut->get_client();
1579
1580 CInode *in = 0;
1581 if (lock->get_type() != CEPH_LOCK_DN)
1582 in = static_cast<CInode *>(lock->get_parent());
1583
1584 /*
1585 if (!lock->get_parent()->is_auth() &&
1586 lock->fw_rdlock_to_auth()) {
1587 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1588 return false;
1589 }
1590 */
1591
1592 while (1) {
1593 // can read? grab ref.
1594 if (lock->can_rdlock(client)) {
1595 lock->get_rdlock();
1596 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1597 return true;
1598 }
1599
1600 // hmm, wait a second.
1601 if (in && !in->is_head() && in->is_auth() &&
1602 lock->get_state() == LOCK_SNAP_SYNC) {
1603 // okay, we actually need to kick the head's lock to get ourselves synced up.
1604 CInode *head = mdcache->get_inode(in->ino());
1605 ceph_assert(head);
1606 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1607 if (hlock->get_state() == LOCK_SYNC)
1608 hlock = head->get_lock(lock->get_type());
1609
1610 if (hlock->get_state() != LOCK_SYNC) {
1611 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1612 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1613 return false;
1614 // oh, check our lock again then
1615 }
1616 }
1617
1618 if (!_rdlock_kick(lock, as_anon))
1619 break;
1620 }
1621
1622 // wait!
1623 int wait_on;
1624 if (lock->get_parent()->is_auth() && lock->is_stable())
1625 wait_on = SimpleLock::WAIT_RD;
1626 else
1627 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1628 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1629 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1630 nudge_log(lock);
1631 return false;
1632 }
1633
1634 void Locker::nudge_log(SimpleLock *lock)
1635 {
1636 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1637 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1638 mds->mdlog->flush();
1639 }
1640
1641 void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1642 {
1643 ceph_assert(it->is_rdlock());
1644 SimpleLock *lock = it->lock;
1645 // drop ref
1646 lock->put_rdlock();
1647 if (mut)
1648 mut->locks.erase(it);
1649
1650 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1651
1652 // last one?
1653 if (!lock->is_rdlocked()) {
1654 if (!lock->is_stable())
1655 eval_gather(lock, false, pneed_issue);
1656 else if (lock->get_parent()->is_auth())
1657 try_eval(lock, pneed_issue);
1658 }
1659 }
1660
1661 bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MDRequestRef& mdr)
1662 {
1663 dout(10) << __func__ << dendl;
1664 for (const auto& p : lov) {
1665 auto lock = p.lock;
1666 ceph_assert(p.is_rdlock());
1667 if (!mdr->is_rdlocked(lock) && !rdlock_try(lock, mdr->get_client())) {
1668 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD,
1669 new C_MDS_RetryRequest(mdcache, mdr));
1670 goto failed;
1671 }
1672 lock->get_rdlock();
1673 mdr->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1674 dout(20) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
1675 }
1676
1677 return true;
1678 failed:
1679 dout(10) << __func__ << " failed" << dendl;
1680 drop_locks(mdr.get(), nullptr);
1681 mdr->drop_local_auth_pins();
1682 return false;
1683 }
1684
1685 bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
1686 {
1687 dout(10) << __func__ << dendl;
1688 for (const auto& p : lov) {
1689 auto lock = p.lock;
1690 ceph_assert(p.is_rdlock());
1691 if (!lock->can_rdlock(mut->get_client()))
1692 return false;
1693 p.lock->get_rdlock();
1694 mut->emplace_lock(p.lock, MutationImpl::LockOp::RDLOCK);
1695 }
1696 return true;
1697 }
1698
1699 // ------------------
1700 // wrlock
1701
1702 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1703 {
1704 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1705 lock->get_type() == CEPH_LOCK_DVERSION)
1706 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1707
1708 dout(7) << "wrlock_force on " << *lock
1709 << " on " << *lock->get_parent() << dendl;
1710 lock->get_wrlock(true);
1711 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1712 }
1713
1714 bool Locker::wrlock_try(SimpleLock *lock, const MutationRef& mut, client_t client)
1715 {
1716 dout(10) << "wrlock_try " << *lock << " on " << *lock->get_parent() << dendl;
1717 if (client == -1)
1718 client = mut->get_client();
1719
1720 while (1) {
1721 if (lock->can_wrlock(client)) {
1722 lock->get_wrlock();
1723 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1724 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1725 return true;
1726 }
1727 if (!lock->is_stable())
1728 break;
1729 CInode *in = static_cast<CInode *>(lock->get_parent());
1730 if (!in->is_auth())
1731 break;
1732 // caller may already has a log entry open. To avoid calling
1733 // scatter_writebehind or start_scatter. don't change nest lock
1734 // state if it has dirty scatterdata.
1735 if (lock->is_dirty())
1736 break;
1737 // To avoid calling scatter_writebehind or start_scatter. don't
1738 // change nest lock state to MIX.
1739 ScatterLock *slock = static_cast<ScatterLock*>(lock);
1740 if (slock->get_scatter_wanted() || in->has_subtree_or_exporting_dirfrag())
1741 break;
1742
1743 simple_lock(lock);
1744 }
1745 return false;
1746 }
1747
1748 bool Locker::wrlock_start(const MutationImpl::LockOp &op, MDRequestRef& mut)
1749 {
1750 SimpleLock *lock = op.lock;
1751 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1752 lock->get_type() == CEPH_LOCK_DVERSION)
1753 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1754
1755 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1756
1757 CInode *in = static_cast<CInode *>(lock->get_parent());
1758 client_t client = op.is_state_pin() ? lock->get_excl_client() : mut->get_client();
1759 bool want_scatter = lock->get_parent()->is_auth() &&
1760 (in->has_subtree_or_exporting_dirfrag() ||
1761 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1762
1763 while (1) {
1764 // wrlock?
1765 if (lock->can_wrlock(client) &&
1766 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1767 lock->get_wrlock();
1768 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1769 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1770 return true;
1771 }
1772
1773 if (lock->get_type() == CEPH_LOCK_IFILE &&
1774 in->state_test(CInode::STATE_RECOVERING)) {
1775 mds->mdcache->recovery_queue.prioritize(in);
1776 }
1777
1778 if (!lock->is_stable())
1779 break;
1780
1781 if (in->is_auth()) {
1782 if (want_scatter)
1783 scatter_mix(static_cast<ScatterLock*>(lock));
1784 else
1785 simple_lock(lock);
1786 } else {
1787 // replica.
1788 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1789 mds_rank_t auth = lock->get_parent()->authority().first;
1790 if (!mds->is_cluster_degraded() ||
1791 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1792 dout(10) << "requesting scatter from auth on "
1793 << *lock << " on " << *lock->get_parent() << dendl;
1794 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1795 }
1796 break;
1797 }
1798 }
1799
1800 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1801 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1802 nudge_log(lock);
1803
1804 return false;
1805 }
1806
1807 void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1808 {
1809 ceph_assert(it->is_wrlock());
1810 SimpleLock* lock = it->lock;
1811
1812 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1813 lock->get_type() == CEPH_LOCK_DVERSION)
1814 return local_wrlock_finish(it, mut);
1815
1816 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1817 lock->put_wrlock();
1818
1819 if (it->is_remote_wrlock())
1820 it->clear_wrlock();
1821 else
1822 mut->locks.erase(it);
1823
1824 if (lock->is_wrlocked()) {
1825 // Evaluate unstable lock after scatter_writebehind_finish(). Because
1826 // eval_gather() does not change lock's state when lock is flushing.
1827 if (!lock->is_stable() && lock->is_flushed() &&
1828 lock->get_parent()->is_auth())
1829 eval_gather(lock, false, pneed_issue);
1830 } else {
1831 if (!lock->is_stable())
1832 eval_gather(lock, false, pneed_issue);
1833 else if (lock->get_parent()->is_auth())
1834 try_eval(lock, pneed_issue);
1835 }
1836 }
1837
1838
1839 // remote wrlock
1840
1841 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1842 {
1843 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1844
1845 // wait for active target
1846 if (mds->is_cluster_degraded() &&
1847 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1848 dout(7) << " mds." << target << " is not active" << dendl;
1849 if (mut->more()->waiting_on_slave.empty())
1850 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1851 return;
1852 }
1853
1854 // send lock request
1855 mut->start_locking(lock, target);
1856 mut->more()->slaves.insert(target);
1857 auto r = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
1858 r->set_lock_type(lock->get_type());
1859 lock->get_parent()->set_object_info(r->get_object_info());
1860 mds->send_message_mds(r, target);
1861
1862 ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
1863 mut->more()->waiting_on_slave.insert(target);
1864 }
1865
1866 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1867 {
1868 ceph_assert(it->is_remote_wrlock());
1869 SimpleLock *lock = it->lock;
1870 mds_rank_t target = it->wrlock_target;
1871
1872 if (it->is_wrlock())
1873 it->clear_remote_wrlock();
1874 else
1875 mut->locks.erase(it);
1876
1877 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1878 << " " << *lock->get_parent() << dendl;
1879 if (!mds->is_cluster_degraded() ||
1880 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1881 auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
1882 slavereq->set_lock_type(lock->get_type());
1883 lock->get_parent()->set_object_info(slavereq->get_object_info());
1884 mds->send_message_mds(slavereq, target);
1885 }
1886 }
1887
1888
1889 // ------------------
1890 // xlock
1891
1892 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1893 {
1894 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1895 lock->get_type() == CEPH_LOCK_DVERSION)
1896 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1897
1898 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1899 client_t client = mut->get_client();
1900
1901 CInode *in = nullptr;
1902 if (lock->get_cap_shift())
1903 in = static_cast<CInode *>(lock->get_parent());
1904
1905 // auth?
1906 if (lock->get_parent()->is_auth()) {
1907 // auth
1908 while (1) {
1909 if (mut->locking && // started xlock (not preempt other request)
1910 lock->can_xlock(client) &&
1911 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1912 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
1913 lock->set_state(LOCK_XLOCK);
1914 lock->get_xlock(mut, client);
1915 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
1916 mut->finish_locking(lock);
1917 return true;
1918 }
1919
1920 if (lock->get_type() == CEPH_LOCK_IFILE &&
1921 in->state_test(CInode::STATE_RECOVERING)) {
1922 mds->mdcache->recovery_queue.prioritize(in);
1923 }
1924
1925 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1926 lock->get_xlock_by_client() != client ||
1927 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1928 break;
1929
1930 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1931 mut->start_locking(lock);
1932 simple_xlock(lock);
1933 } else {
1934 simple_lock(lock);
1935 }
1936 }
1937
1938 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1939 nudge_log(lock);
1940 return false;
1941 } else {
1942 // replica
1943 ceph_assert(lock->get_sm()->can_remote_xlock);
1944 ceph_assert(!mut->slave_request);
1945
1946 // wait for single auth
1947 if (lock->get_parent()->is_ambiguous_auth()) {
1948 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1949 new C_MDS_RetryRequest(mdcache, mut));
1950 return false;
1951 }
1952
1953 // wait for active auth
1954 mds_rank_t auth = lock->get_parent()->authority().first;
1955 if (mds->is_cluster_degraded() &&
1956 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1957 dout(7) << " mds." << auth << " is not active" << dendl;
1958 if (mut->more()->waiting_on_slave.empty())
1959 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1960 return false;
1961 }
1962
1963 // send lock request
1964 mut->more()->slaves.insert(auth);
1965 mut->start_locking(lock, auth);
1966 auto r = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
1967 r->set_lock_type(lock->get_type());
1968 lock->get_parent()->set_object_info(r->get_object_info());
1969 mds->send_message_mds(r, auth);
1970
1971 ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
1972 mut->more()->waiting_on_slave.insert(auth);
1973
1974 return false;
1975 }
1976 }
1977
1978 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1979 {
1980 ceph_assert(!lock->is_stable());
1981 if (lock->get_type() != CEPH_LOCK_DN &&
1982 lock->get_type() != CEPH_LOCK_ISNAP &&
1983 lock->get_type() != CEPH_LOCK_IPOLICY &&
1984 lock->get_num_rdlocks() == 0 &&
1985 lock->get_num_wrlocks() == 0 &&
1986 !lock->is_leased() &&
1987 lock->get_state() != LOCK_XLOCKSNAP) {
1988 CInode *in = static_cast<CInode*>(lock->get_parent());
1989 client_t loner = in->get_target_loner();
1990 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1991 lock->set_state(LOCK_EXCL);
1992 lock->get_parent()->auth_unpin(lock);
1993 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1994 if (lock->get_cap_shift())
1995 *pneed_issue = true;
1996 if (lock->get_parent()->is_auth() &&
1997 lock->is_stable())
1998 try_eval(lock, pneed_issue);
1999 return;
2000 }
2001 }
2002 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
2003 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
2004 }
2005
2006 void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
2007 {
2008 ceph_assert(it->is_xlock());
2009 SimpleLock *lock = it->lock;
2010
2011 if (lock->get_type() == CEPH_LOCK_IVERSION ||
2012 lock->get_type() == CEPH_LOCK_DVERSION)
2013 return local_xlock_finish(it, mut);
2014
2015 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
2016
2017 client_t xlocker = lock->get_xlock_by_client();
2018
2019 // drop ref
2020 lock->put_xlock();
2021 ceph_assert(mut);
2022 mut->locks.erase(it);
2023
2024 bool do_issue = false;
2025
2026 // remote xlock?
2027 if (!lock->get_parent()->is_auth()) {
2028 ceph_assert(lock->get_sm()->can_remote_xlock);
2029
2030 // tell auth
2031 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
2032 mds_rank_t auth = lock->get_parent()->authority().first;
2033 if (!mds->is_cluster_degraded() ||
2034 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
2035 auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
2036 slavereq->set_lock_type(lock->get_type());
2037 lock->get_parent()->set_object_info(slavereq->get_object_info());
2038 mds->send_message_mds(slavereq, auth);
2039 }
2040 // others waiting?
2041 lock->finish_waiters(SimpleLock::WAIT_STABLE |
2042 SimpleLock::WAIT_WR |
2043 SimpleLock::WAIT_RD, 0);
2044 } else {
2045 if (lock->get_num_xlocks() == 0 &&
2046 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
2047 _finish_xlock(lock, xlocker, &do_issue);
2048 }
2049 }
2050
2051 if (do_issue) {
2052 CInode *in = static_cast<CInode*>(lock->get_parent());
2053 if (in->is_head()) {
2054 if (pneed_issue)
2055 *pneed_issue = true;
2056 else
2057 issue_caps(in);
2058 }
2059 }
2060 }
2061
2062 void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
2063 {
2064 ceph_assert(it->is_xlock());
2065 SimpleLock *lock = it->lock;
2066 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
2067
2068 lock->put_xlock();
2069 mut->locks.erase(it);
2070
2071 MDSCacheObject *p = lock->get_parent();
2072 ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
2073
2074 if (!lock->is_stable())
2075 lock->get_parent()->auth_unpin(lock);
2076
2077 lock->set_state(LOCK_LOCK);
2078 }
2079
2080 void Locker::xlock_import(SimpleLock *lock)
2081 {
2082 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
2083 lock->get_parent()->auth_pin(lock);
2084 }
2085
2086 void Locker::xlock_downgrade(SimpleLock *lock, MutationImpl *mut)
2087 {
2088 dout(10) << "xlock_downgrade on " << *lock << " " << *lock->get_parent() << dendl;
2089 auto it = mut->locks.find(lock);
2090 if (it->is_rdlock())
2091 return; // already downgraded
2092
2093 ceph_assert(lock->get_parent()->is_auth());
2094 ceph_assert(it != mut->locks.end());
2095 ceph_assert(it->is_xlock());
2096
2097 lock->set_xlock_done();
2098 lock->get_rdlock();
2099 xlock_finish(it, mut, nullptr);
2100 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
2101 }
2102
2103
2104 // file i/o -----------------------------------------
2105
2106 version_t Locker::issue_file_data_version(CInode *in)
2107 {
2108 dout(7) << "issue_file_data_version on " << *in << dendl;
2109 return in->inode.file_data_version;
2110 }
2111
2112 class C_Locker_FileUpdate_finish : public LockerLogContext {
2113 CInode *in;
2114 MutationRef mut;
2115 unsigned flags;
2116 client_t client;
2117 ref_t<MClientCaps> ack;
2118 public:
2119 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
2120 const ref_t<MClientCaps> &ack, client_t c=-1)
2121 : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
2122 in->get(CInode::PIN_PTRWAITER);
2123 }
2124 void finish(int r) override {
2125 locker->file_update_finish(in, mut, flags, client, ack);
2126 in->put(CInode::PIN_PTRWAITER);
2127 }
2128 };
2129
2130 enum {
2131 UPDATE_SHAREMAX = 1,
2132 UPDATE_NEEDSISSUE = 2,
2133 UPDATE_SNAPFLUSH = 4,
2134 };
2135
2136 void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
2137 client_t client, const ref_t<MClientCaps> &ack)
2138 {
2139 dout(10) << "file_update_finish on " << *in << dendl;
2140 in->pop_and_dirty_projected_inode(mut->ls);
2141
2142 mut->apply();
2143
2144 if (ack) {
2145 Session *session = mds->get_session(client);
2146 if (session && !session->is_closed()) {
2147 // "oldest flush tid" > 0 means client uses unique TID for each flush
2148 if (ack->get_oldest_flush_tid() > 0)
2149 session->add_completed_flush(ack->get_client_tid());
2150 mds->send_message_client_counted(ack, session);
2151 } else {
2152 dout(10) << " no session for client." << client << " " << *ack << dendl;
2153 }
2154 }
2155
2156 set<CInode*> need_issue;
2157 drop_locks(mut.get(), &need_issue);
2158
2159 if (in->is_head()) {
2160 if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
2161 Capability *cap = in->get_client_cap(client);
2162 if (cap && (cap->wanted() & ~cap->pending()))
2163 issue_caps(in, cap);
2164 }
2165
2166 if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
2167 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
2168 share_inode_max_size(in);
2169
2170 } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
2171 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
2172 // check for snap writeback completion
2173 in->client_snap_caps.erase(client);
2174 if (in->client_snap_caps.empty()) {
2175 for (int i = 0; i < num_cinode_locks; i++) {
2176 SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock);
2177 ceph_assert(lock);
2178 lock->put_wrlock();
2179 }
2180 in->item_open_file.remove_myself();
2181 in->item_caps.remove_myself();
2182 eval_cap_gather(in, &need_issue);
2183 }
2184 }
2185 issue_caps_set(need_issue);
2186
2187 mds->balancer->hit_inode(in, META_POP_IWR);
2188
2189 // auth unpin after issuing caps
2190 mut->cleanup();
2191 }
2192
2193 Capability* Locker::issue_new_caps(CInode *in,
2194 int mode,
2195 MDRequestRef& mdr,
2196 SnapRealm *realm)
2197 {
2198 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
2199 Session *session = mdr->session;
2200 bool new_inode = (mdr->alloc_ino || mdr->used_prealloc_ino);
2201
2202 // if replay or async, try to reconnect cap, and otherwise do nothing.
2203 if (new_inode && mdr->client_request->is_queued_for_replay())
2204 return mds->mdcache->try_reconnect_cap(in, session);
2205
2206 // my needs
2207 ceph_assert(session->info.inst.name.is_client());
2208 client_t my_client = session->get_client();
2209 int my_want = ceph_caps_for_mode(mode);
2210
2211 // register a capability
2212 Capability *cap = in->get_client_cap(my_client);
2213 if (!cap) {
2214 // new cap
2215 cap = in->add_client_cap(my_client, session, realm, new_inode);
2216 cap->set_wanted(my_want);
2217 cap->mark_new();
2218 } else {
2219 // make sure it wants sufficient caps
2220 if (my_want & ~cap->wanted()) {
2221 // augment wanted caps for this client
2222 cap->set_wanted(cap->wanted() | my_want);
2223 }
2224 }
2225 cap->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply)
2226
2227 if (in->is_auth()) {
2228 // [auth] twiddle mode?
2229 eval(in, CEPH_CAP_LOCKS);
2230
2231 if (_need_flush_mdlog(in, my_want))
2232 mds->mdlog->flush();
2233
2234 } else {
2235 // [replica] tell auth about any new caps wanted
2236 request_inode_file_caps(in);
2237 }
2238
2239 // issue caps (pot. incl new one)
2240 //issue_caps(in); // note: _eval above may have done this already...
2241
2242 // re-issue whatever we can
2243 //cap->issue(cap->pending());
2244
2245 cap->dec_suppress();
2246
2247 return cap;
2248 }
2249
2250 void Locker::issue_caps_set(set<CInode*>& inset)
2251 {
2252 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
2253 issue_caps(*p);
2254 }
2255
2256 class C_Locker_RevokeStaleCap : public LockerContext {
2257 CInode *in;
2258 client_t client;
2259 public:
2260 C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
2261 LockerContext(l), in(i), client(c) {
2262 in->get(CInode::PIN_PTRWAITER);
2263 }
2264 void finish(int r) override {
2265 locker->revoke_stale_cap(in, client);
2266 in->put(CInode::PIN_PTRWAITER);
2267 }
2268 };
2269
2270 int Locker::issue_caps(CInode *in, Capability *only_cap)
2271 {
2272 // allowed caps are determined by the lock mode.
2273 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
2274 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
2275 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
2276
2277 client_t loner = in->get_loner();
2278 if (loner >= 0) {
2279 dout(7) << "issue_caps loner client." << loner
2280 << " allowed=" << ccap_string(loner_allowed)
2281 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2282 << ", others allowed=" << ccap_string(all_allowed)
2283 << " on " << *in << dendl;
2284 } else {
2285 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
2286 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2287 << " on " << *in << dendl;
2288 }
2289
2290 ceph_assert(in->is_head());
2291
2292 // count conflicts with
2293 int nissued = 0;
2294
2295 // client caps
2296 map<client_t, Capability>::iterator it;
2297 if (only_cap)
2298 it = in->client_caps.find(only_cap->get_client());
2299 else
2300 it = in->client_caps.begin();
2301 for (; it != in->client_caps.end(); ++it) {
2302 Capability *cap = &it->second;
2303
2304 // do not issue _new_ bits when size|mtime is projected
2305 int allowed;
2306 if (loner == it->first)
2307 allowed = loner_allowed;
2308 else
2309 allowed = all_allowed;
2310
2311 // add in any xlocker-only caps (for locks this client is the xlocker for)
2312 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2313 if (in->is_dir()) {
2314 allowed &= ~CEPH_CAP_ANY_DIR_OPS;
2315 if (allowed & CEPH_CAP_FILE_EXCL)
2316 allowed |= cap->get_lock_cache_allowed();
2317 }
2318
2319 if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
2320 cap->is_noinline()) ||
2321 (!in->inode.layout.pool_ns.empty() &&
2322 cap->is_nopoolns()))
2323 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2324
2325 int pending = cap->pending();
2326 int wanted = cap->wanted();
2327
2328 dout(20) << " client." << it->first
2329 << " pending " << ccap_string(pending)
2330 << " allowed " << ccap_string(allowed)
2331 << " wanted " << ccap_string(wanted)
2332 << dendl;
2333
2334 if (!(pending & ~allowed)) {
2335 // skip if suppress or new, and not revocation
2336 if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
2337 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
2338 continue;
2339 }
2340 } else {
2341 ceph_assert(!cap->is_new());
2342 if (cap->is_stale()) {
2343 dout(20) << " revoke stale cap from client." << it->first << dendl;
2344 ceph_assert(!cap->is_valid());
2345 cap->issue(allowed & pending, false);
2346 mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
2347 continue;
2348 }
2349
2350 if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) {
2351 // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
2352 // mds needs to re-issue caps, then do revocation.
2353 long seq = cap->issue(pending, true);
2354
2355 dout(7) << " sending MClientCaps to client." << it->first
2356 << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
2357
2358 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, in->ino(),
2359 in->find_snaprealm()->inode->ino(),
2360 cap->get_cap_id(), cap->get_last_seq(),
2361 pending, wanted, 0, cap->get_mseq(),
2362 mds->get_osd_epoch_barrier());
2363 in->encode_cap_message(m, cap);
2364
2365 mds->send_message_client_counted(m, cap->get_session());
2366 }
2367 }
2368
2369 // notify clients about deleted inode, to make sure they release caps ASAP.
2370 if (in->inode.nlink == 0)
2371 wanted |= CEPH_CAP_LINK_SHARED;
2372
2373 // are there caps that the client _wants_ and can have, but aren't pending?
2374 // or do we need to revoke?
2375 if ((pending & ~allowed) || // need to revoke ~allowed caps.
2376 ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2377 !cap->is_valid()) { // after stale->resume circle
2378 // issue
2379 nissued++;
2380
2381 // include caps that clients generally like, while we're at it.
2382 int likes = in->get_caps_liked();
2383 int before = pending;
2384 long seq;
2385 if (pending & ~allowed)
2386 seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
2387 else
2388 seq = cap->issue((wanted|likes) & allowed, true);
2389 int after = cap->pending();
2390
2391 dout(7) << " sending MClientCaps to client." << it->first
2392 << " seq " << seq << " new pending " << ccap_string(after)
2393 << " was " << ccap_string(before) << dendl;
2394
2395 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2396 if (op == CEPH_CAP_OP_REVOKE) {
2397 revoking_caps.push_back(&cap->item_revoking_caps);
2398 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2399 cap->set_last_revoke_stamp(ceph_clock_now());
2400 cap->reset_num_revoke_warnings();
2401 }
2402
2403 auto m = make_message<MClientCaps>(op, in->ino(),
2404 in->find_snaprealm()->inode->ino(),
2405 cap->get_cap_id(), cap->get_last_seq(),
2406 after, wanted, 0, cap->get_mseq(),
2407 mds->get_osd_epoch_barrier());
2408 in->encode_cap_message(m, cap);
2409
2410 mds->send_message_client_counted(m, cap->get_session());
2411 }
2412
2413 if (only_cap)
2414 break;
2415 }
2416
2417 return nissued;
2418 }
2419
2420 void Locker::issue_truncate(CInode *in)
2421 {
2422 dout(7) << "issue_truncate on " << *in << dendl;
2423
2424 for (auto &p : in->client_caps) {
2425 Capability *cap = &p.second;
2426 auto m = make_message<MClientCaps>(CEPH_CAP_OP_TRUNC,
2427 in->ino(),
2428 in->find_snaprealm()->inode->ino(),
2429 cap->get_cap_id(), cap->get_last_seq(),
2430 cap->pending(), cap->wanted(), 0,
2431 cap->get_mseq(),
2432 mds->get_osd_epoch_barrier());
2433 in->encode_cap_message(m, cap);
2434 mds->send_message_client_counted(m, p.first);
2435 }
2436
2437 // should we increase max_size?
2438 if (in->is_auth() && in->is_file())
2439 check_inode_max_size(in);
2440 }
2441
2442
2443 void Locker::revoke_stale_cap(CInode *in, client_t client)
2444 {
2445 dout(7) << __func__ << " client." << client << " on " << *in << dendl;
2446 Capability *cap = in->get_client_cap(client);
2447 if (!cap)
2448 return;
2449
2450 if (cap->revoking() & CEPH_CAP_ANY_WR) {
2451 std::stringstream ss;
2452 mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr);
2453 return;
2454 }
2455
2456 cap->revoke();
2457
2458 if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
2459 in->state_set(CInode::STATE_NEEDSRECOVER);
2460
2461 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2462 return;
2463
2464 if (!in->filelock.is_stable())
2465 eval_gather(&in->filelock);
2466 if (!in->linklock.is_stable())
2467 eval_gather(&in->linklock);
2468 if (!in->authlock.is_stable())
2469 eval_gather(&in->authlock);
2470 if (!in->xattrlock.is_stable())
2471 eval_gather(&in->xattrlock);
2472
2473 if (in->is_auth())
2474 try_eval(in, CEPH_CAP_LOCKS);
2475 else
2476 request_inode_file_caps(in);
2477 }
2478
2479 bool Locker::revoke_stale_caps(Session *session)
2480 {
2481 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2482
2483 // invalidate all caps
2484 session->inc_cap_gen();
2485
2486 bool ret = true;
2487 std::vector<CInode*> to_eval;
2488
2489 for (auto p = session->caps.begin(); !p.end(); ) {
2490 Capability *cap = *p;
2491 ++p;
2492 if (!cap->is_notable()) {
2493 // the rest ones are not being revoked and don't have writeable range
2494 // and don't want exclusive caps or want file read/write. They don't
2495 // need recover, they don't affect eval_gather()/try_eval()
2496 break;
2497 }
2498
2499 int revoking = cap->revoking();
2500 if (!revoking)
2501 continue;
2502
2503 if (revoking & CEPH_CAP_ANY_WR) {
2504 ret = false;
2505 break;
2506 }
2507
2508 int issued = cap->issued();
2509 CInode *in = cap->get_inode();
2510 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2511 int revoked = cap->revoke();
2512 if (revoked & CEPH_CAP_ANY_DIR_OPS)
2513 eval_lock_caches(cap);
2514
2515 if (in->is_auth() &&
2516 in->inode.client_ranges.count(cap->get_client()))
2517 in->state_set(CInode::STATE_NEEDSRECOVER);
2518
2519 // eval lock/inode may finish contexts, which may modify other cap's position
2520 // in the session->caps.
2521 to_eval.push_back(in);
2522 }
2523
2524 for (auto in : to_eval) {
2525 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2526 continue;
2527
2528 if (!in->filelock.is_stable())
2529 eval_gather(&in->filelock);
2530 if (!in->linklock.is_stable())
2531 eval_gather(&in->linklock);
2532 if (!in->authlock.is_stable())
2533 eval_gather(&in->authlock);
2534 if (!in->xattrlock.is_stable())
2535 eval_gather(&in->xattrlock);
2536
2537 if (in->is_auth())
2538 try_eval(in, CEPH_CAP_LOCKS);
2539 else
2540 request_inode_file_caps(in);
2541 }
2542
2543 return ret;
2544 }
2545
2546 void Locker::resume_stale_caps(Session *session)
2547 {
2548 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2549
2550 bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
2551 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
2552 Capability *cap = *p;
2553 ++p;
2554 if (lazy && !cap->is_notable())
2555 break; // see revoke_stale_caps()
2556
2557 CInode *in = cap->get_inode();
2558 ceph_assert(in->is_head());
2559 dout(10) << " clearing stale flag on " << *in << dendl;
2560
2561 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2562 // if export succeeds, the cap will be removed. if export fails,
2563 // we need to re-issue the cap if it's not stale.
2564 in->state_set(CInode::STATE_EVALSTALECAPS);
2565 continue;
2566 }
2567
2568 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2569 issue_caps(in, cap);
2570 }
2571 }
2572
2573 void Locker::remove_stale_leases(Session *session)
2574 {
2575 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2576 xlist<ClientLease*>::iterator p = session->leases.begin();
2577 while (!p.end()) {
2578 ClientLease *l = *p;
2579 ++p;
2580 CDentry *parent = static_cast<CDentry*>(l->parent);
2581 dout(15) << " removing lease on " << *parent << dendl;
2582 parent->remove_client_lease(l, this);
2583 }
2584 }
2585
2586
2587 class C_MDL_RequestInodeFileCaps : public LockerContext {
2588 CInode *in;
2589 public:
2590 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2591 in->get(CInode::PIN_PTRWAITER);
2592 }
2593 void finish(int r) override {
2594 if (!in->is_auth())
2595 locker->request_inode_file_caps(in);
2596 in->put(CInode::PIN_PTRWAITER);
2597 }
2598 };
2599
2600 void Locker::request_inode_file_caps(CInode *in)
2601 {
2602 ceph_assert(!in->is_auth());
2603
2604 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2605 if (wanted != in->replica_caps_wanted) {
2606 // wait for single auth
2607 if (in->is_ambiguous_auth()) {
2608 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2609 new C_MDL_RequestInodeFileCaps(this, in));
2610 return;
2611 }
2612
2613 mds_rank_t auth = in->authority().first;
2614 if (mds->is_cluster_degraded() &&
2615 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2616 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2617 return;
2618 }
2619
2620 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2621 << " was " << ccap_string(in->replica_caps_wanted)
2622 << " on " << *in << " to mds." << auth << dendl;
2623
2624 in->replica_caps_wanted = wanted;
2625
2626 if (!mds->is_cluster_degraded() ||
2627 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2628 mds->send_message_mds(make_message<MInodeFileCaps>(in->ino(), in->replica_caps_wanted), auth);
2629 }
2630 }
2631
2632 void Locker::handle_inode_file_caps(const cref_t<MInodeFileCaps> &m)
2633 {
2634 // nobody should be talking to us during recovery.
2635 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2636 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2637 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2638 return;
2639 }
2640 ceph_abort_msg("got unexpected message during recovery");
2641 }
2642
2643 // ok
2644 CInode *in = mdcache->get_inode(m->get_ino());
2645 mds_rank_t from = mds_rank_t(m->get_source().num());
2646
2647 ceph_assert(in);
2648 ceph_assert(in->is_auth());
2649
2650 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2651
2652 in->set_mds_caps_wanted(from, m->get_caps());
2653
2654 try_eval(in, CEPH_CAP_LOCKS);
2655 }
2656
2657
2658 class C_MDL_CheckMaxSize : public LockerContext {
2659 CInode *in;
2660 uint64_t new_max_size;
2661 uint64_t newsize;
2662 utime_t mtime;
2663
2664 public:
2665 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2666 uint64_t _newsize, utime_t _mtime) :
2667 LockerContext(l), in(i),
2668 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2669 {
2670 in->get(CInode::PIN_PTRWAITER);
2671 }
2672 void finish(int r) override {
2673 if (in->is_auth())
2674 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2675 in->put(CInode::PIN_PTRWAITER);
2676 }
2677 };
2678
2679 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
2680 {
2681 uint64_t new_max = (size + 1) << 1;
2682 uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
2683 if (max_inc > 0) {
2684 max_inc *= pi->layout.object_size;
2685 new_max = std::min(new_max, size + max_inc);
2686 }
2687 return round_up_to(new_max, pi->get_layout_size_increment());
2688 }
2689
2690 bool Locker::check_client_ranges(CInode *in, uint64_t size)
2691 {
2692 auto latest = in->get_projected_inode();
2693 uint64_t ms;
2694 if (latest->has_layout()) {
2695 ms = calc_new_max_size(latest, size);
2696 } else {
2697 // Layout-less directories like ~mds0/, have zero size
2698 ms = 0;
2699 }
2700
2701 auto it = latest->client_ranges.begin();
2702 for (auto &p : in->client_caps) {
2703 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2704 if (it == latest->client_ranges.end())
2705 return true;
2706 if (it->first != p.first)
2707 return true;
2708 if (ms > it->second.range.last)
2709 return true;
2710 ++it;
2711 }
2712 }
2713 return it != latest->client_ranges.end();
2714 }
2715
2716 bool Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool *max_increased)
2717 {
2718 auto pi = in->get_projected_inode();
2719 uint64_t ms;
2720 if (pi->has_layout()) {
2721 ms = calc_new_max_size(pi, size);
2722 } else {
2723 // Layout-less directories like ~mds0/, have zero size
2724 ms = 0;
2725 }
2726
2727 bool updated = false;
2728
2729 // increase ranges as appropriate.
2730 // shrink to 0 if no WR|BUFFER caps issued.
2731 auto it = pi->client_ranges.begin();
2732 for (auto &p : in->client_caps) {
2733 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2734 while (it != pi->client_ranges.end() && it->first < p.first) {
2735 it = pi->client_ranges.erase(it);
2736 updated = true;
2737 }
2738
2739 if (it != pi->client_ranges.end() && it->first == p.first) {
2740 if (ms > it->second.range.last) {
2741 it->second.range.last = ms;
2742 updated = true;
2743 if (max_increased)
2744 *max_increased = true;
2745 }
2746 } else {
2747 it = pi->client_ranges.emplace_hint(it, std::piecewise_construct,
2748 std::forward_as_tuple(p.first),
2749 std::forward_as_tuple());
2750 it->second.range.last = ms;
2751 it->second.follows = in->first - 1;
2752 updated = true;
2753 if (max_increased)
2754 *max_increased = true;
2755 }
2756 p.second.mark_clientwriteable();
2757 ++it;
2758 } else {
2759 p.second.clear_clientwriteable();
2760 }
2761 }
2762 while (it != pi->client_ranges.end()) {
2763 it = pi->client_ranges.erase(it);
2764 updated = true;
2765 }
2766 if (updated) {
2767 if (pi->client_ranges.empty())
2768 in->clear_clientwriteable();
2769 else
2770 in->mark_clientwriteable();
2771 }
2772 return updated;
2773 }
2774
2775 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2776 uint64_t new_max_size, uint64_t new_size,
2777 utime_t new_mtime)
2778 {
2779 ceph_assert(in->is_auth());
2780 ceph_assert(in->is_file());
2781
2782 CInode::mempool_inode *latest = in->get_projected_inode();
2783 uint64_t size = latest->size;
2784 bool update_size = new_size > 0;
2785
2786 if (update_size) {
2787 new_size = size = std::max(size, new_size);
2788 new_mtime = std::max(new_mtime, latest->mtime);
2789 if (latest->size == new_size && latest->mtime == new_mtime)
2790 update_size = false;
2791 }
2792
2793 bool new_ranges = check_client_ranges(in, std::max(new_max_size, size));
2794 if (!update_size && !new_ranges) {
2795 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2796 return false;
2797 }
2798
2799 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2800 << " update_size " << update_size
2801 << " on " << *in << dendl;
2802
2803 if (in->is_frozen()) {
2804 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2805 in->add_waiter(CInode::WAIT_UNFREEZE,
2806 new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime));
2807 return false;
2808 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2809 // lock?
2810 if (in->filelock.is_stable()) {
2811 if (in->get_target_loner() >= 0)
2812 file_excl(&in->filelock);
2813 else
2814 simple_lock(&in->filelock);
2815 }
2816 if (!in->filelock.can_wrlock(in->get_loner())) {
2817 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2818 in->filelock.add_waiter(SimpleLock::WAIT_STABLE,
2819 new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime));
2820 return false;
2821 }
2822 }
2823
2824 MutationRef mut(new MutationImpl());
2825 mut->ls = mds->mdlog->get_current_segment();
2826
2827 auto &pi = in->project_inode();
2828 pi.inode.version = in->pre_dirty();
2829
2830 bool max_increased = false;
2831 if (new_ranges &&
2832 calc_new_client_ranges(in, std::max(new_max_size, size), &max_increased)) {
2833 dout(10) << "check_inode_max_size client_ranges "
2834 << in->get_previous_projected_inode()->client_ranges
2835 << " -> " << pi.inode.client_ranges << dendl;
2836 }
2837
2838 if (update_size) {
2839 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2840 pi.inode.size = new_size;
2841 pi.inode.rstat.rbytes = new_size;
2842 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2843 pi.inode.mtime = new_mtime;
2844 if (new_mtime > pi.inode.ctime) {
2845 pi.inode.ctime = new_mtime;
2846 if (new_mtime > pi.inode.rstat.rctime)
2847 pi.inode.rstat.rctime = new_mtime;
2848 }
2849 }
2850
2851 // use EOpen if the file is still open; otherwise, use EUpdate.
2852 // this is just an optimization to push open files forward into
2853 // newer log segments.
2854 LogEvent *le;
2855 EMetaBlob *metablob;
2856 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2857 EOpen *eo = new EOpen(mds->mdlog);
2858 eo->add_ino(in->ino());
2859 metablob = &eo->metablob;
2860 le = eo;
2861 } else {
2862 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2863 metablob = &eu->metablob;
2864 le = eu;
2865 }
2866 mds->mdlog->start_entry(le);
2867 if (update_size) { // FIXME if/when we do max_size nested accounting
2868 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2869 // no cow, here!
2870 CDentry *parent = in->get_projected_parent_dn();
2871 metablob->add_primary_dentry(parent, in, true);
2872 } else {
2873 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2874 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2875 }
2876 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
2877 UPDATE_SHAREMAX, ref_t<MClientCaps>()));
2878 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2879 mut->auth_pin(in);
2880
2881 // make max_size _increase_ timely
2882 if (max_increased)
2883 mds->mdlog->flush();
2884
2885 return true;
2886 }
2887
2888
2889 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2890 {
2891 /*
2892 * only share if currently issued a WR cap. if client doesn't have it,
2893 * file_max doesn't matter, and the client will get it if/when they get
2894 * the cap later.
2895 */
2896 dout(10) << "share_inode_max_size on " << *in << dendl;
2897 map<client_t, Capability>::iterator it;
2898 if (only_cap)
2899 it = in->client_caps.find(only_cap->get_client());
2900 else
2901 it = in->client_caps.begin();
2902 for (; it != in->client_caps.end(); ++it) {
2903 const client_t client = it->first;
2904 Capability *cap = &it->second;
2905 if (cap->is_suppress())
2906 continue;
2907 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2908 dout(10) << "share_inode_max_size with client." << client << dendl;
2909 cap->inc_last_seq();
2910 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT,
2911 in->ino(),
2912 in->find_snaprealm()->inode->ino(),
2913 cap->get_cap_id(),
2914 cap->get_last_seq(),
2915 cap->pending(),
2916 cap->wanted(), 0,
2917 cap->get_mseq(),
2918 mds->get_osd_epoch_barrier());
2919 in->encode_cap_message(m, cap);
2920 mds->send_message_client_counted(m, client);
2921 }
2922 if (only_cap)
2923 break;
2924 }
2925 }
2926
2927 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2928 {
2929 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2930 * pending mutations. */
2931 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2932 in->filelock.is_unstable_and_locked()) ||
2933 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2934 in->authlock.is_unstable_and_locked()) ||
2935 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2936 in->linklock.is_unstable_and_locked()) ||
2937 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2938 in->xattrlock.is_unstable_and_locked()))
2939 return true;
2940 return false;
2941 }
2942
2943 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2944 {
2945 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2946 dout(10) << " wanted " << ccap_string(cap->wanted())
2947 << " -> " << ccap_string(wanted) << dendl;
2948 cap->set_wanted(wanted);
2949 } else if (wanted & ~cap->wanted()) {
2950 dout(10) << " wanted " << ccap_string(cap->wanted())
2951 << " -> " << ccap_string(wanted)
2952 << " (added caps even though we had seq mismatch!)" << dendl;
2953 cap->set_wanted(wanted | cap->wanted());
2954 } else {
2955 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2956 << " -> " << ccap_string(wanted)
2957 << " (issue_seq " << issue_seq << " != last_issue "
2958 << cap->get_last_issue() << ")" << dendl;
2959 return;
2960 }
2961
2962 CInode *cur = cap->get_inode();
2963 if (!cur->is_auth()) {
2964 request_inode_file_caps(cur);
2965 return;
2966 }
2967
2968 if (cap->wanted()) {
2969 if (cur->state_test(CInode::STATE_RECOVERING) &&
2970 (cap->wanted() & (CEPH_CAP_FILE_RD |
2971 CEPH_CAP_FILE_WR))) {
2972 mds->mdcache->recovery_queue.prioritize(cur);
2973 }
2974
2975 if (mdcache->open_file_table.should_log_open(cur)) {
2976 ceph_assert(cur->last == CEPH_NOSNAP);
2977 EOpen *le = new EOpen(mds->mdlog);
2978 mds->mdlog->start_entry(le);
2979 le->add_clean_inode(cur);
2980 mds->mdlog->submit_entry(le);
2981 }
2982 }
2983 }
2984
2985 void Locker::snapflush_nudge(CInode *in)
2986 {
2987 ceph_assert(in->last != CEPH_NOSNAP);
2988 if (in->client_snap_caps.empty())
2989 return;
2990
2991 CInode *head = mdcache->get_inode(in->ino());
2992 // head inode gets unpinned when snapflush starts. It might get trimmed
2993 // before snapflush finishes.
2994 if (!head)
2995 return;
2996
2997 ceph_assert(head->is_auth());
2998 if (head->client_need_snapflush.empty())
2999 return;
3000
3001 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
3002 if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
3003 hlock = NULL;
3004 for (int i = 0; i < num_cinode_locks; i++) {
3005 SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
3006 if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
3007 hlock = lock;
3008 break;
3009 }
3010 }
3011 }
3012 if (hlock) {
3013 _rdlock_kick(hlock, true);
3014 } else {
3015 // also, requeue, in case of unstable lock
3016 need_snapflush_inodes.push_back(&in->item_caps);
3017 }
3018 }
3019
3020 void Locker::mark_need_snapflush_inode(CInode *in)
3021 {
3022 ceph_assert(in->last != CEPH_NOSNAP);
3023 if (!in->item_caps.is_on_list()) {
3024 need_snapflush_inodes.push_back(&in->item_caps);
3025 utime_t now = ceph_clock_now();
3026 in->last_dirstat_prop = now;
3027 dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
3028 }
3029 }
3030
3031 bool Locker::is_revoking_any_caps_from(client_t client)
3032 {
3033 auto it = revoking_caps_by_client.find(client);
3034 if (it == revoking_caps_by_client.end())
3035 return false;
3036 return !it->second.empty();
3037 }
3038
3039 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
3040 {
3041 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
3042 for (auto p = head_in->client_need_snapflush.begin();
3043 p != head_in->client_need_snapflush.end() && p->first < last; ) {
3044 snapid_t snapid = p->first;
3045 auto &clients = p->second;
3046 ++p; // be careful, q loop below depends on this
3047
3048 if (clients.count(client)) {
3049 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
3050 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
3051 ceph_assert(sin);
3052 ceph_assert(sin->first <= snapid);
3053 _do_snap_update(sin, snapid, 0, sin->first - 1, client, ref_t<MClientCaps>(), ref_t<MClientCaps>());
3054 head_in->remove_need_snapflush(sin, snapid, client);
3055 }
3056 }
3057 }
3058
3059
3060 bool Locker::should_defer_client_cap_frozen(CInode *in)
3061 {
3062 if (in->is_frozen())
3063 return true;
3064
3065 /*
3066 * This policy needs to be AT LEAST as permissive as allowing a client
3067 * request to go forward, or else a client request can release something,
3068 * the release gets deferred, but the request gets processed and deadlocks
3069 * because when the caps can't get revoked.
3070 *
3071 * No auth_pin implies that there is no unstable lock and @in is not auth
3072 * pinnned by client request. If parent dirfrag is auth pinned by a lock
3073 * cache, later request from lock cache owner may forcibly auth pin the @in.
3074 */
3075 if (in->is_freezing() && in->get_num_auth_pins() == 0) {
3076 CDir* dir = in->get_parent_dir();
3077 if (!dir || !dir->is_auth_pinned_by_lock_cache())
3078 return true;
3079 }
3080 return false;
3081 }
3082
3083 void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
3084 {
3085 client_t client = m->get_source().num();
3086 snapid_t follows = m->get_snap_follows();
3087 auto op = m->get_op();
3088 auto dirty = m->get_dirty();
3089 dout(7) << "handle_client_caps "
3090 << " on " << m->get_ino()
3091 << " tid " << m->get_client_tid() << " follows " << follows
3092 << " op " << ceph_cap_op_name(op)
3093 << " flags 0x" << std::hex << m->flags << std::dec << dendl;
3094
3095 Session *session = mds->get_session(m);
3096 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3097 if (!session) {
3098 dout(5) << " no session, dropping " << *m << dendl;
3099 return;
3100 }
3101 if (session->is_closed() ||
3102 session->is_closing() ||
3103 session->is_killing()) {
3104 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
3105 return;
3106 }
3107 if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
3108 dirty && m->get_client_tid() > 0 &&
3109 !session->have_completed_flush(m->get_client_tid())) {
3110 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
3111 op == CEPH_CAP_OP_FLUSHSNAP);
3112 }
3113 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3114 return;
3115 }
3116
3117 if (m->get_client_tid() > 0 && session &&
3118 session->have_completed_flush(m->get_client_tid())) {
3119 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
3120 << " for client." << client << dendl;
3121 ref_t<MClientCaps> ack;
3122 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3123 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
3124 } else {
3125 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
3126 }
3127 ack->set_snap_follows(follows);
3128 ack->set_client_tid(m->get_client_tid());
3129 mds->send_message_client_counted(ack, m->get_connection());
3130 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3131 return;
3132 } else {
3133 // fall-thru because the message may release some caps
3134 dirty = false;
3135 op = CEPH_CAP_OP_UPDATE;
3136 }
3137 }
3138
3139 // "oldest flush tid" > 0 means client uses unique TID for each flush
3140 if (m->get_oldest_flush_tid() > 0 && session) {
3141 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
3142 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
3143
3144 if (session->get_num_trim_flushes_warnings() > 0 &&
3145 session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
3146 session->reset_num_trim_flushes_warnings();
3147 } else {
3148 if (session->get_num_completed_flushes() >=
3149 (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
3150 session->inc_num_trim_flushes_warnings();
3151 stringstream ss;
3152 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
3153 << m->get_oldest_flush_tid() << "), "
3154 << session->get_num_completed_flushes()
3155 << " completed flushes recorded in session";
3156 mds->clog->warn() << ss.str();
3157 dout(20) << __func__ << " " << ss.str() << dendl;
3158 }
3159 }
3160 }
3161
3162 CInode *head_in = mdcache->get_inode(m->get_ino());
3163 if (!head_in) {
3164 if (mds->is_clientreplay()) {
3165 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
3166 << ", will try again after replayed client requests" << dendl;
3167 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
3168 return;
3169 }
3170
3171 /*
3172 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
3173 * Sequence of events that cause this are:
3174 * - client sends caps message to mds.a
3175 * - mds finishes subtree migration, send cap export to client
3176 * - mds trim its cache
3177 * - mds receives cap messages from client
3178 */
3179 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
3180 return;
3181 }
3182
3183 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3184 // Pause RADOS operations until we see the required epoch
3185 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3186 }
3187
3188 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3189 // Record the barrier so that we will retransmit it to clients
3190 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3191 }
3192
3193 dout(10) << " head inode " << *head_in << dendl;
3194
3195 Capability *cap = 0;
3196 cap = head_in->get_client_cap(client);
3197 if (!cap) {
3198 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
3199 return;
3200 }
3201 ceph_assert(cap);
3202
3203 // freezing|frozen?
3204 if (should_defer_client_cap_frozen(head_in)) {
3205 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
3206 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
3207 return;
3208 }
3209 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
3210 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
3211 << ", dropping" << dendl;
3212 return;
3213 }
3214
3215 bool need_unpin = false;
3216
3217 // flushsnap?
3218 if (op == CEPH_CAP_OP_FLUSHSNAP) {
3219 if (!head_in->is_auth()) {
3220 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
3221 goto out;
3222 }
3223
3224 SnapRealm *realm = head_in->find_snaprealm();
3225 snapid_t snap = realm->get_snap_following(follows);
3226 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
3227
3228 auto p = head_in->client_need_snapflush.begin();
3229 if (p != head_in->client_need_snapflush.end() && p->first < snap) {
3230 head_in->auth_pin(this); // prevent subtree frozen
3231 need_unpin = true;
3232 _do_null_snapflush(head_in, client, snap);
3233 }
3234
3235 CInode *in = head_in;
3236 if (snap != CEPH_NOSNAP) {
3237 in = mdcache->pick_inode_snap(head_in, snap - 1);
3238 if (in != head_in)
3239 dout(10) << " snapped inode " << *in << dendl;
3240 }
3241
3242 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
3243 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
3244 // case we get a dup response, so whatever.)
3245 ref_t<MClientCaps> ack;
3246 if (dirty) {
3247 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
3248 ack->set_snap_follows(follows);
3249 ack->set_client_tid(m->get_client_tid());
3250 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3251 }
3252
3253 if (in == head_in ||
3254 (head_in->client_need_snapflush.count(snap) &&
3255 head_in->client_need_snapflush[snap].count(client))) {
3256 dout(7) << " flushsnap snap " << snap
3257 << " client." << client << " on " << *in << dendl;
3258
3259 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
3260 if (in == head_in)
3261 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
3262
3263 _do_snap_update(in, snap, dirty, follows, client, m, ack);
3264
3265 if (in != head_in)
3266 head_in->remove_need_snapflush(in, snap, client);
3267 } else {
3268 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
3269 if (ack)
3270 mds->send_message_client_counted(ack, m->get_connection());
3271 }
3272 goto out;
3273 }
3274
3275 if (cap->get_cap_id() != m->get_cap_id()) {
3276 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
3277 } else {
3278 CInode *in = head_in;
3279 if (follows > 0) {
3280 in = mdcache->pick_inode_snap(head_in, follows);
3281 // intermediate snap inodes
3282 while (in != head_in) {
3283 ceph_assert(in->last != CEPH_NOSNAP);
3284 if (in->is_auth() && dirty) {
3285 dout(10) << " updating intermediate snapped inode " << *in << dendl;
3286 _do_cap_update(in, NULL, dirty, follows, m, ref_t<MClientCaps>());
3287 }
3288 in = mdcache->pick_inode_snap(head_in, in->last);
3289 }
3290 }
3291
3292 // head inode, and cap
3293 ref_t<MClientCaps> ack;
3294
3295 int caps = m->get_caps();
3296 if (caps & ~cap->issued()) {
3297 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3298 caps &= cap->issued();
3299 }
3300
3301 int revoked = cap->confirm_receipt(m->get_seq(), caps);
3302 dout(10) << " follows " << follows
3303 << " retains " << ccap_string(m->get_caps())
3304 << " dirty " << ccap_string(dirty)
3305 << " on " << *in << dendl;
3306
3307 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3308 eval_lock_caches(cap);
3309
3310 // missing/skipped snapflush?
3311 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
3312 // presently only does so when it has actual dirty metadata. But, we
3313 // set up the need_snapflush stuff based on the issued caps.
3314 // We can infer that the client WONT send a FLUSHSNAP once they have
3315 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
3316 // update/release).
3317 if (!head_in->client_need_snapflush.empty()) {
3318 if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
3319 !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
3320 head_in->auth_pin(this); // prevent subtree frozen
3321 need_unpin = true;
3322 _do_null_snapflush(head_in, client);
3323 } else {
3324 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
3325 }
3326 }
3327 if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
3328 cap->clear_needsnapflush();
3329
3330 if (dirty && in->is_auth()) {
3331 dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
3332 << " seq " << m->get_seq() << " on " << *in << dendl;
3333 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
3334 m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
3335 ack->set_client_tid(m->get_client_tid());
3336 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3337 }
3338
3339 // filter wanted based on what we could ever give out (given auth/replica status)
3340 bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
3341 int new_wanted = m->get_wanted();
3342 if (new_wanted != cap->wanted()) {
3343 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
3344 // exapnding caps. make sure we aren't waiting for a log flush
3345 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
3346 }
3347
3348 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
3349 }
3350
3351 if (in->is_auth() &&
3352 _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) {
3353 // updated
3354 eval(in, CEPH_CAP_LOCKS);
3355
3356 if (!need_flush && (cap->wanted() & ~cap->pending()))
3357 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
3358 } else {
3359 // no update, ack now.
3360 if (ack)
3361 mds->send_message_client_counted(ack, m->get_connection());
3362
3363 bool did_issue = eval(in, CEPH_CAP_LOCKS);
3364 if (!did_issue && (cap->wanted() & ~cap->pending()))
3365 issue_caps(in, cap);
3366
3367 if (cap->get_last_seq() == 0 &&
3368 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
3369 share_inode_max_size(in, cap);
3370 }
3371 }
3372
3373 if (need_flush)
3374 mds->mdlog->flush();
3375 }
3376
3377 out:
3378 if (need_unpin)
3379 head_in->auth_unpin(this);
3380 }
3381
3382
3383 class C_Locker_RetryRequestCapRelease : public LockerContext {
3384 client_t client;
3385 ceph_mds_request_release item;
3386 public:
3387 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
3388 LockerContext(l), client(c), item(it) { }
3389 void finish(int r) override {
3390 string dname;
3391 MDRequestRef null_ref;
3392 locker->process_request_cap_release(null_ref, client, item, dname);
3393 }
3394 };
3395
3396 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
3397 std::string_view dname)
3398 {
3399 inodeno_t ino = (uint64_t)item.ino;
3400 uint64_t cap_id = item.cap_id;
3401 int caps = item.caps;
3402 int wanted = item.wanted;
3403 int seq = item.seq;
3404 int issue_seq = item.issue_seq;
3405 int mseq = item.mseq;
3406
3407 CInode *in = mdcache->get_inode(ino);
3408 if (!in)
3409 return;
3410
3411 if (dname.length()) {
3412 frag_t fg = in->pick_dirfrag(dname);
3413 CDir *dir = in->get_dirfrag(fg);
3414 if (dir) {
3415 CDentry *dn = dir->lookup(dname);
3416 if (dn) {
3417 ClientLease *l = dn->get_client_lease(client);
3418 if (l) {
3419 dout(10) << __func__ << " removing lease on " << *dn << dendl;
3420 dn->remove_client_lease(l, this);
3421 } else {
3422 dout(7) << __func__ << " client." << client
3423 << " doesn't have lease on " << *dn << dendl;
3424 }
3425 } else {
3426 dout(7) << __func__ << " client." << client << " released lease on dn "
3427 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3428 }
3429 }
3430 }
3431
3432 Capability *cap = in->get_client_cap(client);
3433 if (!cap)
3434 return;
3435
3436 dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in
3437 << (mdr ? "" : " (DEFERRED, no mdr)")
3438 << dendl;
3439
3440 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3441 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3442 return;
3443 }
3444
3445 if (cap->get_cap_id() != cap_id) {
3446 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3447 return;
3448 }
3449
3450 if (should_defer_client_cap_frozen(in)) {
3451 dout(7) << " frozen, deferring" << dendl;
3452 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3453 return;
3454 }
3455
3456 if (caps & ~cap->issued()) {
3457 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3458 caps &= cap->issued();
3459 }
3460 int revoked = cap->confirm_receipt(seq, caps);
3461 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3462 eval_lock_caches(cap);
3463
3464 if (!in->client_need_snapflush.empty() &&
3465 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3466 _do_null_snapflush(in, client);
3467 }
3468
3469 adjust_cap_wanted(cap, wanted, issue_seq);
3470
3471 if (mdr)
3472 cap->inc_suppress();
3473 eval(in, CEPH_CAP_LOCKS);
3474 if (mdr)
3475 cap->dec_suppress();
3476
3477 // take note; we may need to reissue on this cap later
3478 if (mdr)
3479 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3480 }
3481
3482 class C_Locker_RetryKickIssueCaps : public LockerContext {
3483 CInode *in;
3484 client_t client;
3485 ceph_seq_t seq;
3486 public:
3487 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3488 LockerContext(l), in(i), client(c), seq(s) {
3489 in->get(CInode::PIN_PTRWAITER);
3490 }
3491 void finish(int r) override {
3492 locker->kick_issue_caps(in, client, seq);
3493 in->put(CInode::PIN_PTRWAITER);
3494 }
3495 };
3496
3497 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3498 {
3499 Capability *cap = in->get_client_cap(client);
3500 if (!cap || cap->get_last_seq() != seq)
3501 return;
3502 if (in->is_frozen()) {
3503 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3504 in->add_waiter(CInode::WAIT_UNFREEZE,
3505 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3506 return;
3507 }
3508 dout(10) << "kick_issue_caps released at current seq " << seq
3509 << ", reissuing" << dendl;
3510 issue_caps(in, cap);
3511 }
3512
3513 void Locker::kick_cap_releases(MDRequestRef& mdr)
3514 {
3515 client_t client = mdr->get_client();
3516 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3517 p != mdr->cap_releases.end();
3518 ++p) {
3519 CInode *in = mdcache->get_inode(p->first);
3520 if (!in)
3521 continue;
3522 kick_issue_caps(in, client, p->second);
3523 }
3524 }
3525
3526 /**
3527 * m and ack might be NULL, so don't dereference them unless dirty != 0
3528 */
3529 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack)
3530 {
3531 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3532 << " follows " << follows << " snap " << snap
3533 << " on " << *in << dendl;
3534
3535 if (snap == CEPH_NOSNAP) {
3536 // hmm, i guess snap was already deleted? just ack!
3537 dout(10) << " wow, the snap following " << follows
3538 << " was already deleted. nothing to record, just ack." << dendl;
3539 if (ack)
3540 mds->send_message_client_counted(ack, m->get_connection());
3541 return;
3542 }
3543
3544 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3545 mds->mdlog->start_entry(le);
3546 MutationRef mut = new MutationImpl();
3547 mut->ls = mds->mdlog->get_current_segment();
3548
3549 // normal metadata updates that we can apply to the head as well.
3550
3551 // update xattrs?
3552 CInode::mempool_xattr_map *px = nullptr;
3553 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3554 m->xattrbl.length() &&
3555 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3556
3557 CInode::mempool_old_inode *oi = 0;
3558 if (in->is_multiversion()) {
3559 oi = in->pick_old_inode(snap);
3560 }
3561
3562 CInode::mempool_inode *i;
3563 if (oi) {
3564 dout(10) << " writing into old inode" << dendl;
3565 auto &pi = in->project_inode();
3566 pi.inode.version = in->pre_dirty();
3567 if (snap > oi->first)
3568 in->split_old_inode(snap);
3569 i = &oi->inode;
3570 if (xattrs)
3571 px = &oi->xattrs;
3572 } else {
3573 auto &pi = in->project_inode(xattrs);
3574 pi.inode.version = in->pre_dirty();
3575 i = &pi.inode;
3576 if (xattrs)
3577 px = pi.xattrs.get();
3578 }
3579
3580 _update_cap_fields(in, dirty, m, i);
3581
3582 // xattr
3583 if (xattrs) {
3584 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3585 << " len " << m->xattrbl.length() << dendl;
3586 i->xattr_version = m->head.xattr_version;
3587 auto p = m->xattrbl.cbegin();
3588 decode(*px, p);
3589 }
3590
3591 {
3592 auto it = i->client_ranges.find(client);
3593 if (it != i->client_ranges.end()) {
3594 if (in->last == snap) {
3595 dout(10) << " removing client_range entirely" << dendl;
3596 i->client_ranges.erase(it);
3597 } else {
3598 dout(10) << " client_range now follows " << snap << dendl;
3599 it->second.follows = snap;
3600 }
3601 }
3602 }
3603
3604 mut->auth_pin(in);
3605 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3606 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3607
3608 // "oldest flush tid" > 0 means client uses unique TID for each flush
3609 if (ack && ack->get_oldest_flush_tid() > 0)
3610 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3611 ack->get_oldest_flush_tid());
3612
3613 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
3614 ack, client));
3615 }
3616
3617 void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps> &m, CInode::mempool_inode *pi)
3618 {
3619 if (dirty == 0)
3620 return;
3621
3622 /* m must be valid if there are dirty caps */
3623 ceph_assert(m);
3624 uint64_t features = m->get_connection()->get_features();
3625
3626 if (m->get_ctime() > pi->ctime) {
3627 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3628 << " for " << *in << dendl;
3629 pi->ctime = m->get_ctime();
3630 if (m->get_ctime() > pi->rstat.rctime)
3631 pi->rstat.rctime = m->get_ctime();
3632 }
3633
3634 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3635 m->get_change_attr() > pi->change_attr) {
3636 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3637 << " for " << *in << dendl;
3638 pi->change_attr = m->get_change_attr();
3639 }
3640
3641 // file
3642 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3643 utime_t atime = m->get_atime();
3644 utime_t mtime = m->get_mtime();
3645 uint64_t size = m->get_size();
3646 version_t inline_version = m->inline_version;
3647
3648 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3649 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3650 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3651 << " for " << *in << dendl;
3652 pi->mtime = mtime;
3653 if (mtime > pi->rstat.rctime)
3654 pi->rstat.rctime = mtime;
3655 }
3656 if (in->inode.is_file() && // ONLY if regular file
3657 size > pi->size) {
3658 dout(7) << " size " << pi->size << " -> " << size
3659 << " for " << *in << dendl;
3660 pi->size = size;
3661 pi->rstat.rbytes = size;
3662 }
3663 if (in->inode.is_file() &&
3664 (dirty & CEPH_CAP_FILE_WR) &&
3665 inline_version > pi->inline_data.version) {
3666 pi->inline_data.version = inline_version;
3667 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3668 pi->inline_data.get_data() = m->inline_data;
3669 else
3670 pi->inline_data.free_data();
3671 }
3672 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3673 dout(7) << " atime " << pi->atime << " -> " << atime
3674 << " for " << *in << dendl;
3675 pi->atime = atime;
3676 }
3677 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3678 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3679 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3680 << " for " << *in << dendl;
3681 pi->time_warp_seq = m->get_time_warp_seq();
3682 }
3683 }
3684 // auth
3685 if (dirty & CEPH_CAP_AUTH_EXCL) {
3686 if (m->head.uid != pi->uid) {
3687 dout(7) << " uid " << pi->uid
3688 << " -> " << m->head.uid
3689 << " for " << *in << dendl;
3690 pi->uid = m->head.uid;
3691 }
3692 if (m->head.gid != pi->gid) {
3693 dout(7) << " gid " << pi->gid
3694 << " -> " << m->head.gid
3695 << " for " << *in << dendl;
3696 pi->gid = m->head.gid;
3697 }
3698 if (m->head.mode != pi->mode) {
3699 dout(7) << " mode " << oct << pi->mode
3700 << " -> " << m->head.mode << dec
3701 << " for " << *in << dendl;
3702 pi->mode = m->head.mode;
3703 }
3704 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3705 dout(7) << " btime " << oct << pi->btime
3706 << " -> " << m->get_btime() << dec
3707 << " for " << *in << dendl;
3708 pi->btime = m->get_btime();
3709 }
3710 }
3711 }
3712
3713 /*
3714 * update inode based on cap flush|flushsnap|wanted.
3715 * adjust max_size, if needed.
3716 * if we update, return true; otherwise, false (no updated needed).
3717 */
3718 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3719 int dirty, snapid_t follows,
3720 const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack,
3721 bool *need_flush)
3722 {
3723 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3724 << " issued " << ccap_string(cap ? cap->issued() : 0)
3725 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3726 << " on " << *in << dendl;
3727 ceph_assert(in->is_auth());
3728 client_t client = m->get_source().num();
3729 CInode::mempool_inode *latest = in->get_projected_inode();
3730
3731 // increase or zero max_size?
3732 uint64_t size = m->get_size();
3733 bool change_max = false;
3734 uint64_t old_max = latest->get_client_range(client);
3735 uint64_t new_max = old_max;
3736
3737 if (in->is_file()) {
3738 bool forced_change_max = false;
3739 dout(20) << "inode is file" << dendl;
3740 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3741 dout(20) << "client has write caps; m->get_max_size="
3742 << m->get_max_size() << "; old_max=" << old_max << dendl;
3743 if (m->get_max_size() > new_max) {
3744 dout(10) << "client requests file_max " << m->get_max_size()
3745 << " > max " << old_max << dendl;
3746 change_max = true;
3747 forced_change_max = true;
3748 new_max = calc_new_max_size(latest, m->get_max_size());
3749 } else {
3750 new_max = calc_new_max_size(latest, size);
3751
3752 if (new_max > old_max)
3753 change_max = true;
3754 else
3755 new_max = old_max;
3756 }
3757 } else {
3758 if (old_max) {
3759 change_max = true;
3760 new_max = 0;
3761 }
3762 }
3763
3764 if (in->last == CEPH_NOSNAP &&
3765 change_max &&
3766 !in->filelock.can_wrlock(client) &&
3767 !in->filelock.can_force_wrlock(client)) {
3768 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3769 if (in->filelock.is_stable()) {
3770 bool need_issue = false;
3771 if (cap)
3772 cap->inc_suppress();
3773 if (in->get_mds_caps_wanted().empty() &&
3774 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3775 if (in->filelock.get_state() != LOCK_EXCL)
3776 file_excl(&in->filelock, &need_issue);
3777 } else
3778 simple_lock(&in->filelock, &need_issue);
3779 if (need_issue)
3780 issue_caps(in);
3781 if (cap)
3782 cap->dec_suppress();
3783 }
3784 if (!in->filelock.can_wrlock(client) &&
3785 !in->filelock.can_force_wrlock(client)) {
3786 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3787 forced_change_max ? new_max : 0,
3788 0, utime_t());
3789
3790 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3791 change_max = false;
3792 }
3793 }
3794 }
3795
3796 if (m->flockbl.length()) {
3797 int32_t num_locks;
3798 auto bli = m->flockbl.cbegin();
3799 decode(num_locks, bli);
3800 for ( int i=0; i < num_locks; ++i) {
3801 ceph_filelock decoded_lock;
3802 decode(decoded_lock, bli);
3803 in->get_fcntl_lock_state()->held_locks.
3804 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3805 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3806 }
3807 decode(num_locks, bli);
3808 for ( int i=0; i < num_locks; ++i) {
3809 ceph_filelock decoded_lock;
3810 decode(decoded_lock, bli);
3811 in->get_flock_lock_state()->held_locks.
3812 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3813 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3814 }
3815 }
3816
3817 if (!dirty && !change_max)
3818 return false;
3819
3820 Session *session = mds->get_session(m);
3821 if (session->check_access(in, MAY_WRITE,
3822 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3823 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3824 return false;
3825 }
3826
3827 // do the update.
3828 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3829 mds->mdlog->start_entry(le);
3830
3831 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3832 m->xattrbl.length() &&
3833 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3834
3835 auto &pi = in->project_inode(xattr);
3836 pi.inode.version = in->pre_dirty();
3837
3838 MutationRef mut(new MutationImpl());
3839 mut->ls = mds->mdlog->get_current_segment();
3840
3841 _update_cap_fields(in, dirty, m, &pi.inode);
3842
3843 if (change_max) {
3844 dout(7) << " max_size " << old_max << " -> " << new_max
3845 << " for " << *in << dendl;
3846 if (new_max) {
3847 auto &cr = pi.inode.client_ranges[client];
3848 cr.range.first = 0;
3849 cr.range.last = new_max;
3850 cr.follows = in->first - 1;
3851 in->mark_clientwriteable();
3852 if (cap)
3853 cap->mark_clientwriteable();
3854 } else {
3855 pi.inode.client_ranges.erase(client);
3856 if (pi.inode.client_ranges.empty())
3857 in->clear_clientwriteable();
3858 if (cap)
3859 cap->clear_clientwriteable();
3860 }
3861 }
3862
3863 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3864 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3865
3866 // auth
3867 if (dirty & CEPH_CAP_AUTH_EXCL)
3868 wrlock_force(&in->authlock, mut);
3869
3870 // xattrs update?
3871 if (xattr) {
3872 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3873 pi.inode.xattr_version = m->head.xattr_version;
3874 auto p = m->xattrbl.cbegin();
3875 decode_noshare(*pi.xattrs, p);
3876 wrlock_force(&in->xattrlock, mut);
3877 }
3878
3879 mut->auth_pin(in);
3880 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3881 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3882
3883 // "oldest flush tid" > 0 means client uses unique TID for each flush
3884 if (ack && ack->get_oldest_flush_tid() > 0)
3885 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3886 ack->get_oldest_flush_tid());
3887
3888 unsigned update_flags = 0;
3889 if (change_max)
3890 update_flags |= UPDATE_SHAREMAX;
3891 if (cap)
3892 update_flags |= UPDATE_NEEDSISSUE;
3893 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
3894 ack, client));
3895 if (need_flush && !*need_flush &&
3896 ((change_max && new_max) || // max INCREASE
3897 _need_flush_mdlog(in, dirty)))
3898 *need_flush = true;
3899
3900 return true;
3901 }
3902
3903 void Locker::handle_client_cap_release(const cref_t<MClientCapRelease> &m)
3904 {
3905 client_t client = m->get_source().num();
3906 dout(10) << "handle_client_cap_release " << *m << dendl;
3907
3908 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3909 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3910 return;
3911 }
3912
3913 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3914 // Pause RADOS operations until we see the required epoch
3915 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3916 }
3917
3918 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3919 // Record the barrier so that we will retransmit it to clients
3920 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3921 }
3922
3923 Session *session = mds->get_session(m);
3924
3925 for (const auto &cap : m->caps) {
3926 _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
3927 }
3928
3929 if (session) {
3930 session->notify_cap_release(m->caps.size());
3931 }
3932 }
3933
3934 class C_Locker_RetryCapRelease : public LockerContext {
3935 client_t client;
3936 inodeno_t ino;
3937 uint64_t cap_id;
3938 ceph_seq_t migrate_seq;
3939 ceph_seq_t issue_seq;
3940 public:
3941 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3942 ceph_seq_t mseq, ceph_seq_t seq) :
3943 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3944 void finish(int r) override {
3945 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3946 }
3947 };
3948
3949 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3950 ceph_seq_t mseq, ceph_seq_t seq)
3951 {
3952 CInode *in = mdcache->get_inode(ino);
3953 if (!in) {
3954 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3955 return;
3956 }
3957 Capability *cap = in->get_client_cap(client);
3958 if (!cap) {
3959 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3960 return;
3961 }
3962
3963 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3964 if (cap->get_cap_id() != cap_id) {
3965 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3966 return;
3967 }
3968 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3969 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3970 return;
3971 }
3972 if (should_defer_client_cap_frozen(in)) {
3973 dout(7) << " freezing|frozen, deferring" << dendl;
3974 in->add_waiter(CInode::WAIT_UNFREEZE,
3975 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3976 return;
3977 }
3978 if (seq != cap->get_last_issue()) {
3979 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3980 // clean out any old revoke history
3981 cap->clean_revoke_from(seq);
3982 eval_cap_gather(in);
3983 return;
3984 }
3985 remove_client_cap(in, cap);
3986 }
3987
3988 void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
3989 {
3990 client_t client = cap->get_client();
3991 // clean out any pending snapflush state
3992 if (!in->client_need_snapflush.empty())
3993 _do_null_snapflush(in, client);
3994
3995 while (!cap->lock_caches.empty()) {
3996 MDLockCache* lock_cache = cap->lock_caches.front();
3997 lock_cache->client_cap = nullptr;
3998 invalidate_lock_cache(lock_cache);
3999 }
4000
4001 bool notable = cap->is_notable();
4002 in->remove_client_cap(client);
4003 if (!notable)
4004 return;
4005
4006 if (in->is_auth()) {
4007 // make sure we clear out the client byte range
4008 if (in->get_projected_inode()->client_ranges.count(client) &&
4009 !(in->inode.nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
4010 if (kill)
4011 in->state_set(CInode::STATE_NEEDSRECOVER);
4012 else
4013 check_inode_max_size(in);
4014 }
4015 } else {
4016 request_inode_file_caps(in);
4017 }
4018
4019 try_eval(in, CEPH_CAP_LOCKS);
4020 }
4021
4022
4023 /**
4024 * Return true if any currently revoking caps exceed the
4025 * session_timeout threshold.
4026 */
4027 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
4028 double timeout) const
4029 {
4030 xlist<Capability*>::const_iterator p = revoking.begin();
4031 if (p.end()) {
4032 // No revoking caps at the moment
4033 return false;
4034 } else {
4035 utime_t now = ceph_clock_now();
4036 utime_t age = now - (*p)->get_last_revoke_stamp();
4037 if (age <= timeout) {
4038 return false;
4039 } else {
4040 return true;
4041 }
4042 }
4043 }
4044
4045 std::set<client_t> Locker::get_late_revoking_clients(double timeout) const
4046 {
4047 std::set<client_t> result;
4048
4049 if (any_late_revoking_caps(revoking_caps, timeout)) {
4050 // Slow path: execute in O(N_clients)
4051 for (auto &p : revoking_caps_by_client) {
4052 if (any_late_revoking_caps(p.second, timeout)) {
4053 result.insert(p.first);
4054 }
4055 }
4056 } else {
4057 // Fast path: no misbehaving clients, execute in O(1)
4058 }
4059 return result;
4060 }
4061
4062 // Hard-code instead of surfacing a config settings because this is
4063 // really a hack that should go away at some point when we have better
4064 // inspection tools for getting at detailed cap state (#7316)
4065 #define MAX_WARN_CAPS 100
4066
4067 void Locker::caps_tick()
4068 {
4069 utime_t now = ceph_clock_now();
4070
4071 if (!need_snapflush_inodes.empty()) {
4072 // snap inodes that needs flush are auth pinned, they affect
4073 // subtree/difrarg freeze.
4074 utime_t cutoff = now;
4075 cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
4076
4077 CInode *last = need_snapflush_inodes.back();
4078 while (!need_snapflush_inodes.empty()) {
4079 CInode *in = need_snapflush_inodes.front();
4080 if (in->last_dirstat_prop >= cutoff)
4081 break;
4082 in->item_caps.remove_myself();
4083 snapflush_nudge(in);
4084 if (in == last)
4085 break;
4086 }
4087 }
4088
4089 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
4090
4091 now = ceph_clock_now();
4092 int n = 0;
4093 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
4094 Capability *cap = *p;
4095
4096 utime_t age = now - cap->get_last_revoke_stamp();
4097 dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
4098 if (age <= mds->mdsmap->get_session_timeout()) {
4099 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
4100 break;
4101 } else {
4102 ++n;
4103 if (n > MAX_WARN_CAPS) {
4104 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
4105 << "revoking, ignoring subsequent caps" << dendl;
4106 break;
4107 }
4108 }
4109 // exponential backoff of warning intervals
4110 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
4111 cap->inc_num_revoke_warnings();
4112 stringstream ss;
4113 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
4114 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
4115 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
4116 mds->clog->warn() << ss.str();
4117 dout(20) << __func__ << " " << ss.str() << dendl;
4118 } else {
4119 dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
4120 }
4121 }
4122 }
4123
4124
4125 void Locker::handle_client_lease(const cref_t<MClientLease> &m)
4126 {
4127 dout(10) << "handle_client_lease " << *m << dendl;
4128
4129 ceph_assert(m->get_source().is_client());
4130 client_t client = m->get_source().num();
4131
4132 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
4133 if (!in) {
4134 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
4135 return;
4136 }
4137 CDentry *dn = 0;
4138
4139 frag_t fg = in->pick_dirfrag(m->dname);
4140 CDir *dir = in->get_dirfrag(fg);
4141 if (dir)
4142 dn = dir->lookup(m->dname);
4143 if (!dn) {
4144 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
4145 return;
4146 }
4147 dout(10) << " on " << *dn << dendl;
4148
4149 // replica and lock
4150 ClientLease *l = dn->get_client_lease(client);
4151 if (!l) {
4152 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
4153 return;
4154 }
4155
4156 switch (m->get_action()) {
4157 case CEPH_MDS_LEASE_REVOKE_ACK:
4158 case CEPH_MDS_LEASE_RELEASE:
4159 if (l->seq != m->get_seq()) {
4160 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
4161 } else {
4162 dout(7) << "handle_client_lease client." << client
4163 << " on " << *dn << dendl;
4164 dn->remove_client_lease(l, this);
4165 }
4166 break;
4167
4168 case CEPH_MDS_LEASE_RENEW:
4169 {
4170 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
4171 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
4172 if (dn->lock.can_lease(client)) {
4173 auto reply = make_message<MClientLease>(*m);
4174 int pool = 1; // fixme.. do something smart!
4175 reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
4176 reply->h.seq = ++l->seq;
4177 reply->clear_payload();
4178
4179 utime_t now = ceph_clock_now();
4180 now += mdcache->client_lease_durations[pool];
4181 mdcache->touch_client_lease(l, pool, now);
4182
4183 mds->send_message_client_counted(reply, m->get_connection());
4184 }
4185 }
4186 break;
4187
4188 default:
4189 ceph_abort(); // implement me
4190 break;
4191 }
4192 }
4193
4194
4195 void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
4196 utime_t now, bufferlist &bl)
4197 {
4198 client_t client = mdr->get_client();
4199 Session *session = mdr->session;
4200
4201 CInode *diri = dn->get_dir()->get_inode();
4202 if (mdr->snapid == CEPH_NOSNAP &&
4203 dn->lock.can_lease(client) &&
4204 !diri->is_stray() && // do not issue dn leases in stray dir!
4205 !diri->filelock.can_lease(client) &&
4206 !(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) {
4207 // issue a dentry lease
4208 ClientLease *l = dn->add_client_lease(client, session);
4209 session->touch_lease(l);
4210
4211 int pool = 1; // fixme.. do something smart!
4212 now += mdcache->client_lease_durations[pool];
4213 mdcache->touch_client_lease(l, pool, now);
4214
4215 LeaseStat lstat;
4216 lstat.mask = CEPH_LEASE_VALID | mask;
4217 lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
4218 lstat.seq = ++l->seq;
4219 encode_lease(bl, session->info, lstat);
4220 dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
4221 << " on " << *dn << dendl;
4222 } else {
4223 // null lease
4224 LeaseStat lstat;
4225 lstat.mask = mask;
4226 encode_lease(bl, session->info, lstat);
4227 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
4228 }
4229 }
4230
4231
4232 void Locker::revoke_client_leases(SimpleLock *lock)
4233 {
4234 int n = 0;
4235 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4236 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
4237 p != dn->client_lease_map.end();
4238 ++p) {
4239 ClientLease *l = p->second;
4240
4241 n++;
4242 ceph_assert(lock->get_type() == CEPH_LOCK_DN);
4243
4244 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4245 int mask = 1 | CEPH_LOCK_DN; // old and new bits
4246
4247 // i should also revoke the dir ICONTENT lease, if they have it!
4248 CInode *diri = dn->get_dir()->get_inode();
4249 auto lease = make_message<MClientLease>(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
4250 mds->send_message_client_counted(lease, l->client);
4251 }
4252 }
4253
4254 void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
4255 const LeaseStat& ls)
4256 {
4257 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
4258 ENCODE_START(1, 1, bl);
4259 encode(ls.mask, bl);
4260 encode(ls.duration_ms, bl);
4261 encode(ls.seq, bl);
4262 ENCODE_FINISH(bl);
4263 }
4264 else {
4265 encode(ls.mask, bl);
4266 encode(ls.duration_ms, bl);
4267 encode(ls.seq, bl);
4268 }
4269 }
4270
4271 // locks ----------------------------------------------------------------
4272
4273 SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
4274 {
4275 switch (lock_type) {
4276 case CEPH_LOCK_DN:
4277 {
4278 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
4279 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
4280 frag_t fg;
4281 CDir *dir = 0;
4282 CDentry *dn = 0;
4283 if (diri) {
4284 fg = diri->pick_dirfrag(info.dname);
4285 dir = diri->get_dirfrag(fg);
4286 if (dir)
4287 dn = dir->lookup(info.dname, info.snapid);
4288 }
4289 if (!dn) {
4290 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
4291 return 0;
4292 }
4293 return &dn->lock;
4294 }
4295
4296 case CEPH_LOCK_IAUTH:
4297 case CEPH_LOCK_ILINK:
4298 case CEPH_LOCK_IDFT:
4299 case CEPH_LOCK_IFILE:
4300 case CEPH_LOCK_INEST:
4301 case CEPH_LOCK_IXATTR:
4302 case CEPH_LOCK_ISNAP:
4303 case CEPH_LOCK_IFLOCK:
4304 case CEPH_LOCK_IPOLICY:
4305 {
4306 CInode *in = mdcache->get_inode(info.ino, info.snapid);
4307 if (!in) {
4308 dout(7) << "get_lock don't have ino " << info.ino << dendl;
4309 return 0;
4310 }
4311 switch (lock_type) {
4312 case CEPH_LOCK_IAUTH: return &in->authlock;
4313 case CEPH_LOCK_ILINK: return &in->linklock;
4314 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
4315 case CEPH_LOCK_IFILE: return &in->filelock;
4316 case CEPH_LOCK_INEST: return &in->nestlock;
4317 case CEPH_LOCK_IXATTR: return &in->xattrlock;
4318 case CEPH_LOCK_ISNAP: return &in->snaplock;
4319 case CEPH_LOCK_IFLOCK: return &in->flocklock;
4320 case CEPH_LOCK_IPOLICY: return &in->policylock;
4321 }
4322 }
4323
4324 default:
4325 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
4326 ceph_abort();
4327 break;
4328 }
4329
4330 return 0;
4331 }
4332
4333 void Locker::handle_lock(const cref_t<MLock> &m)
4334 {
4335 // nobody should be talking to us during recovery.
4336 ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
4337
4338 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
4339 if (!lock) {
4340 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
4341 return;
4342 }
4343
4344 switch (lock->get_type()) {
4345 case CEPH_LOCK_DN:
4346 case CEPH_LOCK_IAUTH:
4347 case CEPH_LOCK_ILINK:
4348 case CEPH_LOCK_ISNAP:
4349 case CEPH_LOCK_IXATTR:
4350 case CEPH_LOCK_IFLOCK:
4351 case CEPH_LOCK_IPOLICY:
4352 handle_simple_lock(lock, m);
4353 break;
4354
4355 case CEPH_LOCK_IDFT:
4356 case CEPH_LOCK_INEST:
4357 //handle_scatter_lock((ScatterLock*)lock, m);
4358 //break;
4359
4360 case CEPH_LOCK_IFILE:
4361 handle_file_lock(static_cast<ScatterLock*>(lock), m);
4362 break;
4363
4364 default:
4365 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
4366 ceph_abort();
4367 break;
4368 }
4369 }
4370
4371
4372
4373
4374
4375 // ==========================================================================
4376 // simple lock
4377
4378 /** This function may take a reference to m if it needs one, but does
4379 * not put references. */
4380 void Locker::handle_reqrdlock(SimpleLock *lock, const cref_t<MLock> &m)
4381 {
4382 MDSCacheObject *parent = lock->get_parent();
4383 if (parent->is_auth() &&
4384 lock->get_state() != LOCK_SYNC &&
4385 !parent->is_frozen()) {
4386 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4387 << " on " << *parent << dendl;
4388 ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
4389 if (lock->is_stable()) {
4390 simple_sync(lock);
4391 } else {
4392 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
4393 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
4394 new C_MDS_RetryMessage(mds, m));
4395 }
4396 } else {
4397 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4398 << " on " << *parent << dendl;
4399 // replica should retry
4400 }
4401 }
4402
4403 void Locker::handle_simple_lock(SimpleLock *lock, const cref_t<MLock> &m)
4404 {
4405 int from = m->get_asker();
4406
4407 dout(10) << "handle_simple_lock " << *m
4408 << " on " << *lock << " " << *lock->get_parent() << dendl;
4409
4410 if (mds->is_rejoin()) {
4411 if (lock->get_parent()->is_rejoining()) {
4412 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
4413 << ", dropping " << *m << dendl;
4414 return;
4415 }
4416 }
4417
4418 switch (m->get_action()) {
4419 // -- replica --
4420 case LOCK_AC_SYNC:
4421 ceph_assert(lock->get_state() == LOCK_LOCK);
4422 lock->decode_locked_state(m->get_data());
4423 lock->set_state(LOCK_SYNC);
4424 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4425 break;
4426
4427 case LOCK_AC_LOCK:
4428 ceph_assert(lock->get_state() == LOCK_SYNC);
4429 lock->set_state(LOCK_SYNC_LOCK);
4430 if (lock->is_leased())
4431 revoke_client_leases(lock);
4432 eval_gather(lock, true);
4433 if (lock->is_unstable_and_locked()) {
4434 if (lock->is_cached())
4435 invalidate_lock_caches(lock);
4436 mds->mdlog->flush();
4437 }
4438 break;
4439
4440
4441 // -- auth --
4442 case LOCK_AC_LOCKACK:
4443 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
4444 lock->get_state() == LOCK_SYNC_EXCL);
4445 ceph_assert(lock->is_gathering(from));
4446 lock->remove_gather(from);
4447
4448 if (lock->is_gathering()) {
4449 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4450 << ", still gathering " << lock->get_gather_set() << dendl;
4451 } else {
4452 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4453 << ", last one" << dendl;
4454 eval_gather(lock);
4455 }
4456 break;
4457
4458 case LOCK_AC_REQRDLOCK:
4459 handle_reqrdlock(lock, m);
4460 break;
4461
4462 }
4463 }
4464
4465 /* unused, currently.
4466
4467 class C_Locker_SimpleEval : public Context {
4468 Locker *locker;
4469 SimpleLock *lock;
4470 public:
4471 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4472 void finish(int r) {
4473 locker->try_simple_eval(lock);
4474 }
4475 };
4476
4477 void Locker::try_simple_eval(SimpleLock *lock)
4478 {
4479 // unstable and ambiguous auth?
4480 if (!lock->is_stable() &&
4481 lock->get_parent()->is_ambiguous_auth()) {
4482 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4483 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4484 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4485 return;
4486 }
4487
4488 if (!lock->get_parent()->is_auth()) {
4489 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4490 return;
4491 }
4492
4493 if (!lock->get_parent()->can_auth_pin()) {
4494 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4495 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4496 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4497 return;
4498 }
4499
4500 if (lock->is_stable())
4501 simple_eval(lock);
4502 }
4503 */
4504
4505
4506 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4507 {
4508 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4509
4510 ceph_assert(lock->get_parent()->is_auth());
4511 ceph_assert(lock->is_stable());
4512
4513 if (lock->get_parent()->is_freezing_or_frozen()) {
4514 // dentry/snap lock in unreadable state can block path traverse
4515 if ((lock->get_type() != CEPH_LOCK_DN &&
4516 lock->get_type() != CEPH_LOCK_ISNAP &&
4517 lock->get_type() != CEPH_LOCK_IPOLICY) ||
4518 lock->get_state() == LOCK_SYNC ||
4519 lock->get_parent()->is_frozen())
4520 return;
4521 }
4522
4523 if (mdcache->is_readonly()) {
4524 if (lock->get_state() != LOCK_SYNC) {
4525 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4526 simple_sync(lock, need_issue);
4527 }
4528 return;
4529 }
4530
4531 CInode *in = 0;
4532 int wanted = 0;
4533 if (lock->get_cap_shift()) {
4534 in = static_cast<CInode*>(lock->get_parent());
4535 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4536 }
4537
4538 // -> excl?
4539 if (lock->get_state() != LOCK_EXCL &&
4540 in && in->get_target_loner() >= 0 &&
4541 (wanted & CEPH_CAP_GEXCL)) {
4542 dout(7) << "simple_eval stable, going to excl " << *lock
4543 << " on " << *lock->get_parent() << dendl;
4544 simple_excl(lock, need_issue);
4545 }
4546
4547 // stable -> sync?
4548 else if (lock->get_state() != LOCK_SYNC &&
4549 !lock->is_wrlocked() &&
4550 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4551 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4552 dout(7) << "simple_eval stable, syncing " << *lock
4553 << " on " << *lock->get_parent() << dendl;
4554 simple_sync(lock, need_issue);
4555 }
4556 }
4557
4558
4559 // mid
4560
4561 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4562 {
4563 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4564 ceph_assert(lock->get_parent()->is_auth());
4565 ceph_assert(lock->is_stable());
4566
4567 CInode *in = 0;
4568 if (lock->get_cap_shift())
4569 in = static_cast<CInode *>(lock->get_parent());
4570
4571 int old_state = lock->get_state();
4572
4573 if (old_state != LOCK_TSYN) {
4574
4575 switch (lock->get_state()) {
4576 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4577 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4578 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4579 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4580 default: ceph_abort();
4581 }
4582
4583 int gather = 0;
4584 if (lock->is_wrlocked()) {
4585 gather++;
4586 if (lock->is_cached())
4587 invalidate_lock_caches(lock);
4588 }
4589
4590 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4591 send_lock_message(lock, LOCK_AC_SYNC);
4592 lock->init_gather();
4593 gather++;
4594 }
4595
4596 if (in && in->is_head()) {
4597 if (in->issued_caps_need_gather(lock)) {
4598 if (need_issue)
4599 *need_issue = true;
4600 else
4601 issue_caps(in);
4602 gather++;
4603 }
4604 }
4605
4606 bool need_recover = false;
4607 if (lock->get_type() == CEPH_LOCK_IFILE) {
4608 ceph_assert(in);
4609 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4610 mds->mdcache->queue_file_recover(in);
4611 need_recover = true;
4612 gather++;
4613 }
4614 }
4615
4616 if (!gather && lock->is_dirty()) {
4617 lock->get_parent()->auth_pin(lock);
4618 scatter_writebehind(static_cast<ScatterLock*>(lock));
4619 mds->mdlog->flush();
4620 return false;
4621 }
4622
4623 if (gather) {
4624 lock->get_parent()->auth_pin(lock);
4625 if (need_recover)
4626 mds->mdcache->do_file_recover();
4627 return false;
4628 }
4629 }
4630
4631 if (lock->get_parent()->is_replicated()) { // FIXME
4632 bufferlist data;
4633 lock->encode_locked_state(data);
4634 send_lock_message(lock, LOCK_AC_SYNC, data);
4635 }
4636 lock->set_state(LOCK_SYNC);
4637 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4638 if (in && in->is_head()) {
4639 if (need_issue)
4640 *need_issue = true;
4641 else
4642 issue_caps(in);
4643 }
4644 return true;
4645 }
4646
4647 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4648 {
4649 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4650 ceph_assert(lock->get_parent()->is_auth());
4651 ceph_assert(lock->is_stable());
4652
4653 CInode *in = 0;
4654 if (lock->get_cap_shift())
4655 in = static_cast<CInode *>(lock->get_parent());
4656
4657 switch (lock->get_state()) {
4658 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4659 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4660 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4661 default: ceph_abort();
4662 }
4663
4664 int gather = 0;
4665 if (lock->is_rdlocked())
4666 gather++;
4667 if (lock->is_wrlocked())
4668 gather++;
4669 if (gather && lock->is_cached())
4670 invalidate_lock_caches(lock);
4671
4672 if (lock->get_parent()->is_replicated() &&
4673 lock->get_state() != LOCK_LOCK_EXCL &&
4674 lock->get_state() != LOCK_XSYN_EXCL) {
4675 send_lock_message(lock, LOCK_AC_LOCK);
4676 lock->init_gather();
4677 gather++;
4678 }
4679
4680 if (in && in->is_head()) {
4681 if (in->issued_caps_need_gather(lock)) {
4682 if (need_issue)
4683 *need_issue = true;
4684 else
4685 issue_caps(in);
4686 gather++;
4687 }
4688 }
4689
4690 if (gather) {
4691 lock->get_parent()->auth_pin(lock);
4692 } else {
4693 lock->set_state(LOCK_EXCL);
4694 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4695 if (in) {
4696 if (need_issue)
4697 *need_issue = true;
4698 else
4699 issue_caps(in);
4700 }
4701 }
4702 }
4703
4704 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4705 {
4706 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4707 ceph_assert(lock->get_parent()->is_auth());
4708 ceph_assert(lock->is_stable());
4709 ceph_assert(lock->get_state() != LOCK_LOCK);
4710
4711 CInode *in = 0;
4712 if (lock->get_cap_shift())
4713 in = static_cast<CInode *>(lock->get_parent());
4714
4715 int old_state = lock->get_state();
4716
4717 switch (lock->get_state()) {
4718 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4719 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4720 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4721 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4722 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4723 break;
4724 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4725 default: ceph_abort();
4726 }
4727
4728 int gather = 0;
4729 if (lock->is_leased()) {
4730 gather++;
4731 revoke_client_leases(lock);
4732 }
4733 if (lock->is_rdlocked()) {
4734 if (lock->is_cached())
4735 invalidate_lock_caches(lock);
4736 gather++;
4737 }
4738 if (in && in->is_head()) {
4739 if (in->issued_caps_need_gather(lock)) {
4740 if (need_issue)
4741 *need_issue = true;
4742 else
4743 issue_caps(in);
4744 gather++;
4745 }
4746 }
4747
4748 bool need_recover = false;
4749 if (lock->get_type() == CEPH_LOCK_IFILE) {
4750 ceph_assert(in);
4751 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4752 mds->mdcache->queue_file_recover(in);
4753 need_recover = true;
4754 gather++;
4755 }
4756 }
4757
4758 if (lock->get_parent()->is_replicated() &&
4759 lock->get_state() == LOCK_MIX_LOCK &&
4760 gather) {
4761 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4762 } else {
4763 // move to second stage of gather now, so we don't send the lock action later.
4764 if (lock->get_state() == LOCK_MIX_LOCK)
4765 lock->set_state(LOCK_MIX_LOCK2);
4766
4767 if (lock->get_parent()->is_replicated() &&
4768 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4769 gather++;
4770 send_lock_message(lock, LOCK_AC_LOCK);
4771 lock->init_gather();
4772 }
4773 }
4774
4775 if (!gather && lock->is_dirty()) {
4776 lock->get_parent()->auth_pin(lock);
4777 scatter_writebehind(static_cast<ScatterLock*>(lock));
4778 mds->mdlog->flush();
4779 return;
4780 }
4781
4782 if (gather) {
4783 lock->get_parent()->auth_pin(lock);
4784 if (need_recover)
4785 mds->mdcache->do_file_recover();
4786 } else {
4787 lock->set_state(LOCK_LOCK);
4788 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4789 }
4790 }
4791
4792
4793 void Locker::simple_xlock(SimpleLock *lock)
4794 {
4795 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4796 ceph_assert(lock->get_parent()->is_auth());
4797 //assert(lock->is_stable());
4798 ceph_assert(lock->get_state() != LOCK_XLOCK);
4799
4800 CInode *in = 0;
4801 if (lock->get_cap_shift())
4802 in = static_cast<CInode *>(lock->get_parent());
4803
4804 if (lock->is_stable())
4805 lock->get_parent()->auth_pin(lock);
4806
4807 switch (lock->get_state()) {
4808 case LOCK_LOCK:
4809 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4810 default: ceph_abort();
4811 }
4812
4813 int gather = 0;
4814 if (lock->is_rdlocked())
4815 gather++;
4816 if (lock->is_wrlocked())
4817 gather++;
4818 if (gather && lock->is_cached())
4819 invalidate_lock_caches(lock);
4820
4821 if (in && in->is_head()) {
4822 if (in->issued_caps_need_gather(lock)) {
4823 issue_caps(in);
4824 gather++;
4825 }
4826 }
4827
4828 if (!gather) {
4829 lock->set_state(LOCK_PREXLOCK);
4830 //assert("shouldn't be called if we are already xlockable" == 0);
4831 }
4832 }
4833
4834
4835
4836
4837
4838 // ==========================================================================
4839 // scatter lock
4840
4841 /*
4842
4843 Some notes on scatterlocks.
4844
4845 - The scatter/gather is driven by the inode lock. The scatter always
4846 brings in the latest metadata from the fragments.
4847
4848 - When in a scattered/MIX state, fragments are only allowed to
4849 update/be written to if the accounted stat matches the inode's
4850 current version.
4851
4852 - That means, on gather, we _only_ assimilate diffs for frag metadata
4853 that match the current version, because those are the only ones
4854 written during this scatter/gather cycle. (Others didn't permit
4855 it.) We increment the version and journal this to disk.
4856
4857 - When possible, we also simultaneously update our local frag
4858 accounted stats to match.
4859
4860 - On scatter, the new inode info is broadcast to frags, both local
4861 and remote. If possible (auth and !frozen), the dirfrag auth
4862 should update the accounted state (if it isn't already up to date).
4863 Note that this may occur on both the local inode auth node and
4864 inode replicas, so there are two potential paths. If it is NOT
4865 possible, they need to mark_stale to prevent any possible writes.
4866
4867 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4868 only). Both are opportunities to update the frag accounted stats,
4869 even though only the MIX case is affected by a stale dirfrag.
4870
4871 - Because many scatter/gather cycles can potentially go by without a
4872 frag being able to update its accounted stats (due to being frozen
4873 by exports/refragments in progress), the frag may have (even very)
4874 old stat versions. That's fine. If when we do want to update it,
4875 we can update accounted_* and the version first.
4876
4877 */
4878
4879 class C_Locker_ScatterWB : public LockerLogContext {
4880 ScatterLock *lock;
4881 MutationRef mut;
4882 public:
4883 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4884 LockerLogContext(l), lock(sl), mut(m) {}
4885 void finish(int r) override {
4886 locker->scatter_writebehind_finish(lock, mut);
4887 }
4888 };
4889
4890 void Locker::scatter_writebehind(ScatterLock *lock)
4891 {
4892 CInode *in = static_cast<CInode*>(lock->get_parent());
4893 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4894
4895 // journal
4896 MutationRef mut(new MutationImpl());
4897 mut->ls = mds->mdlog->get_current_segment();
4898
4899 // forcefully take a wrlock
4900 lock->get_wrlock(true);
4901 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
4902
4903 in->pre_cow_old_inode(); // avoid cow mayhem
4904
4905 auto &pi = in->project_inode();
4906 pi.inode.version = in->pre_dirty();
4907
4908 in->finish_scatter_gather_update(lock->get_type());
4909 lock->start_flush();
4910
4911 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4912 mds->mdlog->start_entry(le);
4913
4914 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4915 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4916
4917 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4918
4919 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4920 }
4921
4922 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4923 {
4924 CInode *in = static_cast<CInode*>(lock->get_parent());
4925 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4926 in->pop_and_dirty_projected_inode(mut->ls);
4927
4928 lock->finish_flush();
4929
4930 // if replicas may have flushed in a mix->lock state, send another
4931 // message so they can finish_flush().
4932 if (in->is_replicated()) {
4933 switch (lock->get_state()) {
4934 case LOCK_MIX_LOCK:
4935 case LOCK_MIX_LOCK2:
4936 case LOCK_MIX_EXCL:
4937 case LOCK_MIX_TSYN:
4938 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4939 }
4940 }
4941
4942 mut->apply();
4943 drop_locks(mut.get());
4944 mut->cleanup();
4945
4946 if (lock->is_stable())
4947 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4948
4949 //scatter_eval_gather(lock);
4950 }
4951
4952 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4953 {
4954 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4955
4956 ceph_assert(lock->get_parent()->is_auth());
4957 ceph_assert(lock->is_stable());
4958
4959 if (lock->get_parent()->is_freezing_or_frozen()) {
4960 dout(20) << " freezing|frozen" << dendl;
4961 return;
4962 }
4963
4964 if (mdcache->is_readonly()) {
4965 if (lock->get_state() != LOCK_SYNC) {
4966 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4967 simple_sync(lock, need_issue);
4968 }
4969 return;
4970 }
4971
4972 if (!lock->is_rdlocked() &&
4973 lock->get_state() != LOCK_MIX &&
4974 lock->get_scatter_wanted()) {
4975 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4976 << " on " << *lock->get_parent() << dendl;
4977 scatter_mix(lock, need_issue);
4978 return;
4979 }
4980
4981 if (lock->get_type() == CEPH_LOCK_INEST) {
4982 // in general, we want to keep INEST writable at all times.
4983 if (!lock->is_rdlocked()) {
4984 if (lock->get_parent()->is_replicated()) {
4985 if (lock->get_state() != LOCK_MIX)
4986 scatter_mix(lock, need_issue);
4987 } else {
4988 if (lock->get_state() != LOCK_LOCK)
4989 simple_lock(lock, need_issue);
4990 }
4991 }
4992 return;
4993 }
4994
4995 CInode *in = static_cast<CInode*>(lock->get_parent());
4996 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4997 // i _should_ be sync.
4998 if (!lock->is_wrlocked() &&
4999 lock->get_state() != LOCK_SYNC) {
5000 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
5001 simple_sync(lock, need_issue);
5002 }
5003 }
5004 }
5005
5006
5007 /*
5008 * mark a scatterlock to indicate that the dir fnode has some dirty data
5009 */
5010 void Locker::mark_updated_scatterlock(ScatterLock *lock)
5011 {
5012 lock->mark_dirty();
5013 if (lock->get_updated_item()->is_on_list()) {
5014 dout(10) << "mark_updated_scatterlock " << *lock
5015 << " - already on list since " << lock->get_update_stamp() << dendl;
5016 } else {
5017 updated_scatterlocks.push_back(lock->get_updated_item());
5018 utime_t now = ceph_clock_now();
5019 lock->set_update_stamp(now);
5020 dout(10) << "mark_updated_scatterlock " << *lock
5021 << " - added at " << now << dendl;
5022 }
5023 }
5024
5025 /*
5026 * this is called by scatter_tick and LogSegment::try_to_trim() when
5027 * trying to flush dirty scattered data (i.e. updated fnode) back to
5028 * the inode.
5029 *
5030 * we need to lock|scatter in order to push fnode changes into the
5031 * inode.dirstat.
5032 */
5033 void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
5034 {
5035 CInode *p = static_cast<CInode *>(lock->get_parent());
5036
5037 if (p->is_frozen() || p->is_freezing()) {
5038 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
5039 if (c)
5040 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
5041 else if (lock->is_dirty())
5042 // just requeue. not ideal.. starvation prone..
5043 updated_scatterlocks.push_back(lock->get_updated_item());
5044 return;
5045 }
5046
5047 if (p->is_ambiguous_auth()) {
5048 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
5049 if (c)
5050 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
5051 else if (lock->is_dirty())
5052 // just requeue. not ideal.. starvation prone..
5053 updated_scatterlocks.push_back(lock->get_updated_item());
5054 return;
5055 }
5056
5057 if (p->is_auth()) {
5058 int count = 0;
5059 while (true) {
5060 if (lock->is_stable()) {
5061 // can we do it now?
5062 // (only if we're not replicated.. if we are, we really do need
5063 // to nudge the lock state!)
5064 /*
5065 actually, even if we're not replicated, we can't stay in MIX, because another mds
5066 could discover and replicate us at any time. if that happens while we're flushing,
5067 they end up in MIX but their inode has the old scatterstat version.
5068
5069 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
5070 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
5071 scatter_writebehind(lock);
5072 if (c)
5073 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5074 return;
5075 }
5076 */
5077
5078 if (mdcache->is_readonly()) {
5079 if (lock->get_state() != LOCK_SYNC) {
5080 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
5081 simple_sync(static_cast<ScatterLock*>(lock));
5082 }
5083 break;
5084 }
5085
5086 // adjust lock state
5087 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
5088 switch (lock->get_type()) {
5089 case CEPH_LOCK_IFILE:
5090 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5091 scatter_mix(static_cast<ScatterLock*>(lock));
5092 else if (lock->get_state() != LOCK_LOCK)
5093 simple_lock(static_cast<ScatterLock*>(lock));
5094 else
5095 simple_sync(static_cast<ScatterLock*>(lock));
5096 break;
5097
5098 case CEPH_LOCK_IDFT:
5099 case CEPH_LOCK_INEST:
5100 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5101 scatter_mix(lock);
5102 else if (lock->get_state() != LOCK_LOCK)
5103 simple_lock(lock);
5104 else
5105 simple_sync(lock);
5106 break;
5107 default:
5108 ceph_abort();
5109 }
5110 ++count;
5111 if (lock->is_stable() && count == 2) {
5112 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
5113 // this should only realy happen when called via
5114 // handle_file_lock due to AC_NUDGE, because the rest of the
5115 // time we are replicated or have dirty data and won't get
5116 // called. bailing here avoids an infinite loop.
5117 ceph_assert(!c);
5118 break;
5119 }
5120 } else {
5121 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
5122 if (c)
5123 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5124 return;
5125 }
5126 }
5127 } else {
5128 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
5129 << *lock << " on " << *p << dendl;
5130 // request unscatter?
5131 mds_rank_t auth = lock->get_parent()->authority().first;
5132 if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
5133 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
5134 }
5135
5136 // wait...
5137 if (c)
5138 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5139
5140 // also, requeue, in case we had wrong auth or something
5141 if (lock->is_dirty())
5142 updated_scatterlocks.push_back(lock->get_updated_item());
5143 }
5144 }
5145
5146 void Locker::scatter_tick()
5147 {
5148 dout(10) << "scatter_tick" << dendl;
5149
5150 // updated
5151 utime_t now = ceph_clock_now();
5152 int n = updated_scatterlocks.size();
5153 while (!updated_scatterlocks.empty()) {
5154 ScatterLock *lock = updated_scatterlocks.front();
5155
5156 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
5157
5158 if (!lock->is_dirty()) {
5159 updated_scatterlocks.pop_front();
5160 dout(10) << " removing from updated_scatterlocks "
5161 << *lock << " " << *lock->get_parent() << dendl;
5162 continue;
5163 }
5164 if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
5165 break;
5166 updated_scatterlocks.pop_front();
5167 scatter_nudge(lock, 0);
5168 }
5169 mds->mdlog->flush();
5170 }
5171
5172
5173 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
5174 {
5175 dout(10) << "scatter_tempsync " << *lock
5176 << " on " << *lock->get_parent() << dendl;
5177 ceph_assert(lock->get_parent()->is_auth());
5178 ceph_assert(lock->is_stable());
5179
5180 ceph_abort_msg("not fully implemented, at least not for filelock");
5181
5182 CInode *in = static_cast<CInode *>(lock->get_parent());
5183
5184 switch (lock->get_state()) {
5185 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
5186 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
5187 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
5188 default: ceph_abort();
5189 }
5190
5191 int gather = 0;
5192 if (lock->is_wrlocked()) {
5193 if (lock->is_cached())
5194 invalidate_lock_caches(lock);
5195 gather++;
5196 }
5197
5198 if (lock->get_cap_shift() &&
5199 in->is_head() &&
5200 in->issued_caps_need_gather(lock)) {
5201 if (need_issue)
5202 *need_issue = true;
5203 else
5204 issue_caps(in);
5205 gather++;
5206 }
5207
5208 if (lock->get_state() == LOCK_MIX_TSYN &&
5209 in->is_replicated()) {
5210 lock->init_gather();
5211 send_lock_message(lock, LOCK_AC_LOCK);
5212 gather++;
5213 }
5214
5215 if (gather) {
5216 in->auth_pin(lock);
5217 } else {
5218 // do tempsync
5219 lock->set_state(LOCK_TSYN);
5220 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
5221 if (lock->get_cap_shift()) {
5222 if (need_issue)
5223 *need_issue = true;
5224 else
5225 issue_caps(in);
5226 }
5227 }
5228 }
5229
5230
5231
5232 // ==========================================================================
5233 // local lock
5234
5235 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
5236 {
5237 dout(7) << "local_wrlock_grab on " << *lock
5238 << " on " << *lock->get_parent() << dendl;
5239
5240 ceph_assert(lock->get_parent()->is_auth());
5241 ceph_assert(lock->can_wrlock());
5242 lock->get_wrlock(mut->get_client());
5243
5244 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5245 ceph_assert(it->is_wrlock());
5246 }
5247
5248 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
5249 {
5250 dout(7) << "local_wrlock_start on " << *lock
5251 << " on " << *lock->get_parent() << dendl;
5252
5253 ceph_assert(lock->get_parent()->is_auth());
5254 if (lock->can_wrlock()) {
5255 lock->get_wrlock(mut->get_client());
5256 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5257 ceph_assert(it->is_wrlock());
5258 return true;
5259 } else {
5260 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5261 return false;
5262 }
5263 }
5264
5265 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
5266 {
5267 ceph_assert(it->is_wrlock());
5268 LocalLock *lock = static_cast<LocalLock*>(it->lock);
5269 dout(7) << "local_wrlock_finish on " << *lock
5270 << " on " << *lock->get_parent() << dendl;
5271 lock->put_wrlock();
5272 mut->locks.erase(it);
5273 if (lock->get_num_wrlocks() == 0) {
5274 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5275 SimpleLock::WAIT_WR |
5276 SimpleLock::WAIT_RD);
5277 }
5278 }
5279
5280 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
5281 {
5282 dout(7) << "local_xlock_start on " << *lock
5283 << " on " << *lock->get_parent() << dendl;
5284
5285 ceph_assert(lock->get_parent()->is_auth());
5286 if (!lock->can_xlock_local()) {
5287 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5288 return false;
5289 }
5290
5291 lock->get_xlock(mut, mut->get_client());
5292 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
5293 return true;
5294 }
5295
5296 void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
5297 {
5298 ceph_assert(it->is_xlock());
5299 LocalLock *lock = static_cast<LocalLock*>(it->lock);
5300 dout(7) << "local_xlock_finish on " << *lock
5301 << " on " << *lock->get_parent() << dendl;
5302 lock->put_xlock();
5303 mut->locks.erase(it);
5304
5305 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5306 SimpleLock::WAIT_WR |
5307 SimpleLock::WAIT_RD);
5308 }
5309
5310
5311
5312 // ==========================================================================
5313 // file lock
5314
5315
5316 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
5317 {
5318 CInode *in = static_cast<CInode*>(lock->get_parent());
5319 int loner_wanted, other_wanted;
5320 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
5321 dout(7) << "file_eval wanted=" << gcap_string(wanted)
5322 << " loner_wanted=" << gcap_string(loner_wanted)
5323 << " other_wanted=" << gcap_string(other_wanted)
5324 << " filelock=" << *lock << " on " << *lock->get_parent()
5325 << dendl;
5326
5327 ceph_assert(lock->get_parent()->is_auth());
5328 ceph_assert(lock->is_stable());
5329
5330 if (lock->get_parent()->is_freezing_or_frozen())
5331 return;
5332
5333 if (mdcache->is_readonly()) {
5334 if (lock->get_state() != LOCK_SYNC) {
5335 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
5336 simple_sync(lock, need_issue);
5337 }
5338 return;
5339 }
5340
5341 // excl -> *?
5342 if (lock->get_state() == LOCK_EXCL) {
5343 dout(20) << " is excl" << dendl;
5344 int loner_issued, other_issued, xlocker_issued;
5345 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
5346 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
5347 << " other_issued=" << gcap_string(other_issued)
5348 << " xlocker_issued=" << gcap_string(xlocker_issued)
5349 << dendl;
5350 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
5351 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
5352 (in->is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
5353 dout(20) << " should lose it" << dendl;
5354 // we should lose it.
5355 // loner other want
5356 // R R SYNC
5357 // R R|W MIX
5358 // R W MIX
5359 // R|W R MIX
5360 // R|W R|W MIX
5361 // R|W W MIX
5362 // W R MIX
5363 // W R|W MIX
5364 // W W MIX
5365 // -> any writer means MIX; RD doesn't matter.
5366 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
5367 lock->is_waiter_for(SimpleLock::WAIT_WR))
5368 scatter_mix(lock, need_issue);
5369 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
5370 simple_sync(lock, need_issue);
5371 else
5372 dout(10) << " waiting for wrlock to drain" << dendl;
5373 }
5374 }
5375
5376 // * -> excl?
5377 else if (lock->get_state() != LOCK_EXCL &&
5378 !lock->is_rdlocked() &&
5379 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5380 in->get_target_loner() >= 0 &&
5381 (in->is_dir() ?
5382 !in->has_subtree_or_exporting_dirfrag() :
5383 (wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))) {
5384 dout(7) << "file_eval stable, bump to loner " << *lock
5385 << " on " << *lock->get_parent() << dendl;
5386 file_excl(lock, need_issue);
5387 }
5388
5389 // * -> mixed?
5390 else if (lock->get_state() != LOCK_MIX &&
5391 !lock->is_rdlocked() &&
5392 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5393 (lock->get_scatter_wanted() ||
5394 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
5395 dout(7) << "file_eval stable, bump to mixed " << *lock
5396 << " on " << *lock->get_parent() << dendl;
5397 scatter_mix(lock, need_issue);
5398 }
5399
5400 // * -> sync?
5401 else if (lock->get_state() != LOCK_SYNC &&
5402 !lock->is_wrlocked() && // drain wrlocks first!
5403 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5404 !(wanted & CEPH_CAP_GWR) &&
5405 !((lock->get_state() == LOCK_MIX) &&
5406 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5407 //((wanted & CEPH_CAP_RD) ||
5408 //in->is_replicated() ||
5409 //lock->is_leased() ||
5410 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5411 ) {
5412 dout(7) << "file_eval stable, bump to sync " << *lock
5413 << " on " << *lock->get_parent() << dendl;
5414 simple_sync(lock, need_issue);
5415 }
5416 else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5417 mds->mdcache->queue_file_recover(in);
5418 }
5419 }
5420
5421
5422
5423 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
5424 {
5425 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
5426
5427 CInode *in = static_cast<CInode*>(lock->get_parent());
5428 ceph_assert(in->is_auth());
5429 ceph_assert(lock->is_stable());
5430
5431 if (lock->get_state() == LOCK_LOCK) {
5432 in->start_scatter(lock);
5433 if (in->is_replicated()) {
5434 // data
5435 bufferlist softdata;
5436 lock->encode_locked_state(softdata);
5437
5438 // bcast to replicas
5439 send_lock_message(lock, LOCK_AC_MIX, softdata);
5440 }
5441
5442 // change lock
5443 lock->set_state(LOCK_MIX);
5444 lock->clear_scatter_wanted();
5445 if (lock->get_cap_shift()) {
5446 if (need_issue)
5447 *need_issue = true;
5448 else
5449 issue_caps(in);
5450 }
5451 } else {
5452 // gather?
5453 switch (lock->get_state()) {
5454 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
5455 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
5456 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
5457 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
5458 default: ceph_abort();
5459 }
5460
5461 int gather = 0;
5462 if (lock->is_rdlocked()) {
5463 if (lock->is_cached())
5464 invalidate_lock_caches(lock);
5465 gather++;
5466 }
5467 if (in->is_replicated()) {
5468 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
5469 send_lock_message(lock, LOCK_AC_MIX);
5470 lock->init_gather();
5471 gather++;
5472 }
5473 }
5474 if (lock->is_leased()) {
5475 revoke_client_leases(lock);
5476 gather++;
5477 }
5478 if (lock->get_cap_shift() &&
5479 in->is_head() &&
5480 in->issued_caps_need_gather(lock)) {
5481 if (need_issue)
5482 *need_issue = true;
5483 else
5484 issue_caps(in);
5485 gather++;
5486 }
5487 bool need_recover = false;
5488 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5489 mds->mdcache->queue_file_recover(in);
5490 need_recover = true;
5491 gather++;
5492 }
5493
5494 if (gather) {
5495 lock->get_parent()->auth_pin(lock);
5496 if (need_recover)
5497 mds->mdcache->do_file_recover();
5498 } else {
5499 in->start_scatter(lock);
5500 lock->set_state(LOCK_MIX);
5501 lock->clear_scatter_wanted();
5502 if (in->is_replicated()) {
5503 bufferlist softdata;
5504 lock->encode_locked_state(softdata);
5505 send_lock_message(lock, LOCK_AC_MIX, softdata);
5506 }
5507 if (lock->get_cap_shift()) {
5508 if (need_issue)
5509 *need_issue = true;
5510 else
5511 issue_caps(in);
5512 }
5513 }
5514 }
5515 }
5516
5517
5518 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5519 {
5520 CInode *in = static_cast<CInode*>(lock->get_parent());
5521 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5522
5523 ceph_assert(in->is_auth());
5524 ceph_assert(lock->is_stable());
5525
5526 ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
5527 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5528
5529 switch (lock->get_state()) {
5530 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5531 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5532 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5533 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5534 default: ceph_abort();
5535 }
5536 int gather = 0;
5537
5538 if (lock->is_rdlocked())
5539 gather++;
5540 if (lock->is_wrlocked())
5541 gather++;
5542 if (gather && lock->is_cached())
5543 invalidate_lock_caches(lock);
5544
5545 if (in->is_replicated() &&
5546 lock->get_state() != LOCK_LOCK_EXCL &&
5547 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5548 send_lock_message(lock, LOCK_AC_LOCK);
5549 lock->init_gather();
5550 gather++;
5551 }
5552 if (lock->is_leased()) {
5553 revoke_client_leases(lock);
5554 gather++;
5555 }
5556 if (in->is_head() &&
5557 in->issued_caps_need_gather(lock)) {
5558 if (need_issue)
5559 *need_issue = true;
5560 else
5561 issue_caps(in);
5562 gather++;
5563 }
5564 bool need_recover = false;
5565 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5566 mds->mdcache->queue_file_recover(in);
5567 need_recover = true;
5568 gather++;
5569 }
5570
5571 if (gather) {
5572 lock->get_parent()->auth_pin(lock);
5573 if (need_recover)
5574 mds->mdcache->do_file_recover();
5575 } else {
5576 lock->set_state(LOCK_EXCL);
5577 if (need_issue)
5578 *need_issue = true;
5579 else
5580 issue_caps(in);
5581 }
5582 }
5583
5584 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5585 {
5586 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5587 CInode *in = static_cast<CInode *>(lock->get_parent());
5588 ceph_assert(in->is_auth());
5589 ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
5590
5591 switch (lock->get_state()) {
5592 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5593 default: ceph_abort();
5594 }
5595
5596 int gather = 0;
5597 if (lock->is_wrlocked()) {
5598 if (lock->is_cached())
5599 invalidate_lock_caches(lock);
5600 gather++;
5601 }
5602
5603 if (in->is_head() &&
5604 in->issued_caps_need_gather(lock)) {
5605 if (need_issue)
5606 *need_issue = true;
5607 else
5608 issue_caps(in);
5609 gather++;
5610 }
5611
5612 if (gather) {
5613 lock->get_parent()->auth_pin(lock);
5614 } else {
5615 lock->set_state(LOCK_XSYN);
5616 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5617 if (need_issue)
5618 *need_issue = true;
5619 else
5620 issue_caps(in);
5621 }
5622 }
5623
5624 void Locker::file_recover(ScatterLock *lock)
5625 {
5626 CInode *in = static_cast<CInode *>(lock->get_parent());
5627 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5628
5629 ceph_assert(in->is_auth());
5630 //assert(lock->is_stable());
5631 ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5632
5633 int gather = 0;
5634
5635 /*
5636 if (in->is_replicated()
5637 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5638 send_lock_message(lock, LOCK_AC_LOCK);
5639 lock->init_gather();
5640 gather++;
5641 }
5642 */
5643 if (in->is_head() &&
5644 in->issued_caps_need_gather(lock)) {
5645 issue_caps(in);
5646 gather++;
5647 }
5648
5649 lock->set_state(LOCK_SCAN);
5650 if (gather)
5651 in->state_set(CInode::STATE_NEEDSRECOVER);
5652 else
5653 mds->mdcache->queue_file_recover(in);
5654 }
5655
5656
5657 // messenger
5658 void Locker::handle_file_lock(ScatterLock *lock, const cref_t<MLock> &m)
5659 {
5660 CInode *in = static_cast<CInode*>(lock->get_parent());
5661 int from = m->get_asker();
5662
5663 if (mds->is_rejoin()) {
5664 if (in->is_rejoining()) {
5665 dout(7) << "handle_file_lock still rejoining " << *in
5666 << ", dropping " << *m << dendl;
5667 return;
5668 }
5669 }
5670
5671 dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
5672 << " on " << *lock
5673 << " from mds." << from << " "
5674 << *in << dendl;
5675
5676 bool caps = lock->get_cap_shift();
5677
5678 switch (m->get_action()) {
5679 // -- replica --
5680 case LOCK_AC_SYNC:
5681 ceph_assert(lock->get_state() == LOCK_LOCK ||
5682 lock->get_state() == LOCK_MIX ||
5683 lock->get_state() == LOCK_MIX_SYNC2);
5684
5685 if (lock->get_state() == LOCK_MIX) {
5686 lock->set_state(LOCK_MIX_SYNC);
5687 eval_gather(lock, true);
5688 if (lock->is_unstable_and_locked()) {
5689 if (lock->is_cached())
5690 invalidate_lock_caches(lock);
5691 mds->mdlog->flush();
5692 }
5693 break;
5694 }
5695
5696 (static_cast<ScatterLock *>(lock))->finish_flush();
5697 (static_cast<ScatterLock *>(lock))->clear_flushed();
5698
5699 // ok
5700 lock->decode_locked_state(m->get_data());
5701 lock->set_state(LOCK_SYNC);
5702
5703 lock->get_rdlock();
5704 if (caps)
5705 issue_caps(in);
5706 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5707 lock->put_rdlock();
5708 break;
5709
5710 case LOCK_AC_LOCK:
5711 switch (lock->get_state()) {
5712 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5713 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5714 default: ceph_abort();
5715 }
5716
5717 eval_gather(lock, true);
5718 if (lock->is_unstable_and_locked()) {
5719 if (lock->is_cached())
5720 invalidate_lock_caches(lock);
5721 mds->mdlog->flush();
5722 }
5723
5724 break;
5725
5726 case LOCK_AC_LOCKFLUSHED:
5727 (static_cast<ScatterLock *>(lock))->finish_flush();
5728 (static_cast<ScatterLock *>(lock))->clear_flushed();
5729 // wake up scatter_nudge waiters
5730 if (lock->is_stable())
5731 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5732 break;
5733
5734 case LOCK_AC_MIX:
5735 ceph_assert(lock->get_state() == LOCK_SYNC ||
5736 lock->get_state() == LOCK_LOCK ||
5737 lock->get_state() == LOCK_SYNC_MIX2);
5738
5739 if (lock->get_state() == LOCK_SYNC) {
5740 // MIXED
5741 lock->set_state(LOCK_SYNC_MIX);
5742 eval_gather(lock, true);
5743 if (lock->is_unstable_and_locked()) {
5744 if (lock->is_cached())
5745 invalidate_lock_caches(lock);
5746 mds->mdlog->flush();
5747 }
5748 break;
5749 }
5750
5751 // ok
5752 lock->set_state(LOCK_MIX);
5753 lock->decode_locked_state(m->get_data());
5754
5755 if (caps)
5756 issue_caps(in);
5757
5758 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5759 break;
5760
5761
5762 // -- auth --
5763 case LOCK_AC_LOCKACK:
5764 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
5765 lock->get_state() == LOCK_MIX_LOCK ||
5766 lock->get_state() == LOCK_MIX_LOCK2 ||
5767 lock->get_state() == LOCK_MIX_EXCL ||
5768 lock->get_state() == LOCK_SYNC_EXCL ||
5769 lock->get_state() == LOCK_SYNC_MIX ||
5770 lock->get_state() == LOCK_MIX_TSYN);
5771 ceph_assert(lock->is_gathering(from));
5772 lock->remove_gather(from);
5773
5774 if (lock->get_state() == LOCK_MIX_LOCK ||
5775 lock->get_state() == LOCK_MIX_LOCK2 ||
5776 lock->get_state() == LOCK_MIX_EXCL ||
5777 lock->get_state() == LOCK_MIX_TSYN) {
5778 lock->decode_locked_state(m->get_data());
5779 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5780 // delay calling scatter_writebehind().
5781 lock->clear_flushed();
5782 }
5783
5784 if (lock->is_gathering()) {
5785 dout(7) << "handle_file_lock " << *in << " from " << from
5786 << ", still gathering " << lock->get_gather_set() << dendl;
5787 } else {
5788 dout(7) << "handle_file_lock " << *in << " from " << from
5789 << ", last one" << dendl;
5790 eval_gather(lock);
5791 }
5792 break;
5793
5794 case LOCK_AC_SYNCACK:
5795 ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
5796 ceph_assert(lock->is_gathering(from));
5797 lock->remove_gather(from);
5798
5799 lock->decode_locked_state(m->get_data());
5800
5801 if (lock->is_gathering()) {
5802 dout(7) << "handle_file_lock " << *in << " from " << from
5803 << ", still gathering " << lock->get_gather_set() << dendl;
5804 } else {
5805 dout(7) << "handle_file_lock " << *in << " from " << from
5806 << ", last one" << dendl;
5807 eval_gather(lock);
5808 }
5809 break;
5810
5811 case LOCK_AC_MIXACK:
5812 ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
5813 ceph_assert(lock->is_gathering(from));
5814 lock->remove_gather(from);
5815
5816 if (lock->is_gathering()) {
5817 dout(7) << "handle_file_lock " << *in << " from " << from
5818 << ", still gathering " << lock->get_gather_set() << dendl;
5819 } else {
5820 dout(7) << "handle_file_lock " << *in << " from " << from
5821 << ", last one" << dendl;
5822 eval_gather(lock);
5823 }
5824 break;
5825
5826
5827 // requests....
5828 case LOCK_AC_REQSCATTER:
5829 if (lock->is_stable()) {
5830 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5831 * because the replica should be holding an auth_pin if they're
5832 * doing this (and thus, we are freezing, not frozen, and indefinite
5833 * starvation isn't an issue).
5834 */
5835 dout(7) << "handle_file_lock got scatter request on " << *lock
5836 << " on " << *lock->get_parent() << dendl;
5837 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5838 scatter_mix(lock);
5839 } else {
5840 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5841 << " on " << *lock->get_parent() << dendl;
5842 lock->set_scatter_wanted();
5843 }
5844 break;
5845
5846 case LOCK_AC_REQUNSCATTER:
5847 if (lock->is_stable()) {
5848 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5849 * because the replica should be holding an auth_pin if they're
5850 * doing this (and thus, we are freezing, not frozen, and indefinite
5851 * starvation isn't an issue).
5852 */
5853 dout(7) << "handle_file_lock got unscatter request on " << *lock
5854 << " on " << *lock->get_parent() << dendl;
5855 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5856 simple_lock(lock); // FIXME tempsync?
5857 } else {
5858 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5859 << " on " << *lock->get_parent() << dendl;
5860 lock->set_unscatter_wanted();
5861 }
5862 break;
5863
5864 case LOCK_AC_REQRDLOCK:
5865 handle_reqrdlock(lock, m);
5866 break;
5867
5868 case LOCK_AC_NUDGE:
5869 if (!lock->get_parent()->is_auth()) {
5870 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5871 << " on " << *lock->get_parent() << dendl;
5872 } else if (!lock->get_parent()->is_replicated()) {
5873 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5874 << " on " << *lock->get_parent() << dendl;
5875 } else {
5876 dout(7) << "handle_file_lock trying nudge on " << *lock
5877 << " on " << *lock->get_parent() << dendl;
5878 scatter_nudge(lock, 0, true);
5879 mds->mdlog->flush();
5880 }
5881 break;
5882
5883 default:
5884 ceph_abort();
5885 }
5886 }