]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <string_view>
16
17 #include "MDSRank.h"
18 #include "MDCache.h"
19 #include "Locker.h"
20 #include "MDBalancer.h"
21 #include "Migrator.h"
22 #include "CInode.h"
23 #include "CDir.h"
24 #include "CDentry.h"
25 #include "Mutation.h"
26 #include "MDSContext.h"
27
28 #include "MDLog.h"
29 #include "MDSMap.h"
30
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
33
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
36
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
43
44 #include "messages/MMDSSlaveRequest.h"
45
46 #include <errno.h>
47
48 #include "common/config.h"
49
50
51 #define dout_subsys ceph_subsys_mds
52 #undef dout_prefix
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
56 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
57 }
58
59
60 class LockerContext : public MDSContext {
61 protected:
62 Locker *locker;
63 MDSRank *get_mds() override
64 {
65 return locker->mds;
66 }
67
68 public:
69 explicit LockerContext(Locker *locker_) : locker(locker_) {
70 ceph_assert(locker != NULL);
71 }
72 };
73
74 class LockerLogContext : public MDSLogContextBase {
75 protected:
76 Locker *locker;
77 MDSRank *get_mds() override
78 {
79 return locker->mds;
80 }
81
82 public:
83 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
84 ceph_assert(locker != NULL);
85 }
86 };
87
88 Locker::Locker(MDSRank *m, MDCache *c) :
89 mds(m), mdcache(c), need_snapflush_inodes(member_offset(CInode, item_caps)) {}
90
91
92 void Locker::dispatch(const Message::const_ref &m)
93 {
94
95 switch (m->get_type()) {
96 // inter-mds locking
97 case MSG_MDS_LOCK:
98 handle_lock(MLock::msgref_cast(m));
99 break;
100 // inter-mds caps
101 case MSG_MDS_INODEFILECAPS:
102 handle_inode_file_caps(MInodeFileCaps::msgref_cast(m));
103 break;
104 // client sync
105 case CEPH_MSG_CLIENT_CAPS:
106 handle_client_caps(MClientCaps::msgref_cast(m));
107 break;
108 case CEPH_MSG_CLIENT_CAPRELEASE:
109 handle_client_cap_release(MClientCapRelease::msgref_cast(m));
110 break;
111 case CEPH_MSG_CLIENT_LEASE:
112 handle_client_lease(MClientLease::msgref_cast(m));
113 break;
114 default:
115 derr << "locker unknown message " << m->get_type() << dendl;
116 ceph_abort_msg("locker unknown message");
117 }
118 }
119
120 void Locker::tick()
121 {
122 scatter_tick();
123 caps_tick();
124 }
125
126 /*
127 * locks vs rejoin
128 *
129 *
130 *
131 */
132
133 void Locker::send_lock_message(SimpleLock *lock, int msg)
134 {
135 for (const auto &it : lock->get_parent()->get_replicas()) {
136 if (mds->is_cluster_degraded() &&
137 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
138 continue;
139 auto m = MLock::create(lock, msg, mds->get_nodeid());
140 mds->send_message_mds(m, it.first);
141 }
142 }
143
144 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145 {
146 for (const auto &it : lock->get_parent()->get_replicas()) {
147 if (mds->is_cluster_degraded() &&
148 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
149 continue;
150 auto m = MLock::create(lock, msg, mds->get_nodeid());
151 m->set_data(data);
152 mds->send_message_mds(m, it.first);
153 }
154 }
155
156
157
158
159 void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
160 {
161 // rdlock ancestor snaps
162 CInode *t = in;
163 while (t->get_projected_parent_dn()) {
164 t = t->get_projected_parent_dn()->get_dir()->get_inode();
165 lov.add_rdlock(&t->snaplock);
166 }
167 lov.add_rdlock(&in->snaplock);
168 }
169
170 void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
171 file_layout_t **layout)
172 {
173 //rdlock ancestor snaps
174 CInode *t = in;
175 lov.add_rdlock(&in->snaplock);
176 lov.add_rdlock(&in->policylock);
177 bool found_layout = false;
178 while (t) {
179 lov.add_rdlock(&t->snaplock);
180 if (!found_layout) {
181 lov.add_rdlock(&t->policylock);
182 if (t->get_projected_inode()->has_layout()) {
183 *layout = &t->get_projected_inode()->layout;
184 found_layout = true;
185 }
186 }
187 if (t->get_projected_parent_dn() &&
188 t->get_projected_parent_dn()->get_dir())
189 t = t->get_projected_parent_dn()->get_dir()->get_inode();
190 else t = NULL;
191 }
192 }
193
194 struct MarkEventOnDestruct {
195 MDRequestRef& mdr;
196 std::string_view message;
197 bool mark_event;
198 MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
199 mdr(_mdr),
200 message(_message),
201 mark_event(true) {}
202 ~MarkEventOnDestruct() {
203 if (mark_event)
204 mdr->mark_event(message);
205 }
206 };
207
208 /* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210 bool Locker::acquire_locks(MDRequestRef& mdr,
211 MutationImpl::LockOpVec& lov,
212 CInode *auth_pin_freeze,
213 bool auth_pin_nonblock)
214 {
215 if (mdr->done_locking &&
216 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
217 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
218 return true; // at least we had better be!
219 }
220 dout(10) << "acquire_locks " << *mdr << dendl;
221
222 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
223
224 client_t client = mdr->get_client();
225
226 set<MDSCacheObject*> mustpin; // items to authpin
227
228 // xlocks
229 for (int i = 0, size = lov.size(); i < size; ++i) {
230 auto& p = lov[i];
231 SimpleLock *lock = p.lock;
232 MDSCacheObject *object = lock->get_parent();
233
234 if (p.is_xlock()) {
235 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
236 lock->get_type() == CEPH_LOCK_IPOLICY) &&
237 mds->is_cluster_degraded() &&
238 mdr->is_master() &&
239 !mdr->is_queued_for_replay()) {
240 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
241 // get processed in proper order.
242 bool wait = false;
243 if (object->is_auth()) {
244 if (!mdr->locks.count(lock)) {
245 set<mds_rank_t> ls;
246 object->list_replicas(ls);
247 for (auto m : ls) {
248 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
249 wait = true;
250 break;
251 }
252 }
253 }
254 } else {
255 // if the lock is the latest locked one, it's possible that slave mds got the lock
256 // while there are recovering mds.
257 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
258 wait = true;
259 }
260 if (wait) {
261 dout(10) << " must xlock " << *lock << " " << *object
262 << ", waiting for cluster recovered" << dendl;
263 mds->locker->drop_locks(mdr.get(), NULL);
264 mdr->drop_local_auth_pins();
265 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
266 return false;
267 }
268 }
269
270 dout(20) << " must xlock " << *lock << " " << *object << dendl;
271
272 mustpin.insert(object);
273
274 // augment xlock with a versionlock?
275 if (lock->get_type() == CEPH_LOCK_DN) {
276 CDentry *dn = static_cast<CDentry*>(object);
277 if (!dn->is_auth())
278 continue;
279 if (mdr->is_master()) {
280 // master. wrlock versionlock so we can pipeline dentry updates to journal.
281 lov.add_wrlock(&dn->versionlock);
282 } else {
283 // slave. exclusively lock the dentry version (i.e. block other journal updates).
284 // this makes rollback safe.
285 lov.add_xlock(&dn->versionlock);
286 }
287 }
288 if (lock->get_type() > CEPH_LOCK_IVERSION) {
289 // inode version lock?
290 CInode *in = static_cast<CInode*>(object);
291 if (!in->is_auth())
292 continue;
293 if (mdr->is_master()) {
294 // master. wrlock versionlock so we can pipeline inode updates to journal.
295 lov.add_wrlock(&in->versionlock);
296 } else {
297 // slave. exclusively lock the inode version (i.e. block other journal updates).
298 // this makes rollback safe.
299 lov.add_xlock(&in->versionlock);
300 }
301 }
302 } else if (p.is_wrlock()) {
303 dout(20) << " must wrlock " << *lock << " " << *object << dendl;
304 if (object->is_auth()) {
305 mustpin.insert(object);
306 } else if (!object->is_auth() &&
307 !lock->can_wrlock(client) && // we might have to request a scatter
308 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
309 dout(15) << " will also auth_pin " << *object
310 << " in case we need to request a scatter" << dendl;
311 mustpin.insert(object);
312 }
313 } else if (p.is_remote_wrlock()) {
314 dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
315 << *lock << " " << *object << dendl;
316 mustpin.insert(object);
317 } else if (p.is_rdlock()) {
318
319 dout(20) << " must rdlock " << *lock << " " << *object << dendl;
320 if (object->is_auth()) {
321 mustpin.insert(object);
322 } else if (!object->is_auth() &&
323 !lock->can_rdlock(client)) { // we might have to request an rdlock
324 dout(15) << " will also auth_pin " << *object
325 << " in case we need to request a rdlock" << dendl;
326 mustpin.insert(object);
327 }
328 } else {
329 ceph_assert(0 == "locker unknown lock operation");
330 }
331 }
332
333 lov.sort_and_merge();
334
335 // AUTH PINS
336 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
337
338 // can i auth pin them all now?
339 marker.message = "failed to authpin local pins";
340 for (const auto &p : mustpin) {
341 MDSCacheObject *object = p;
342
343 dout(10) << " must authpin " << *object << dendl;
344
345 if (mdr->is_auth_pinned(object)) {
346 if (object != (MDSCacheObject*)auth_pin_freeze)
347 continue;
348 if (mdr->more()->is_remote_frozen_authpin) {
349 if (mdr->more()->rename_inode == auth_pin_freeze)
350 continue;
351 // unfreeze auth pin for the wrong inode
352 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
353 }
354 }
355
356 if (!object->is_auth()) {
357 if (!mdr->locks.empty())
358 drop_locks(mdr.get());
359 if (object->is_ambiguous_auth()) {
360 // wait
361 marker.message = "waiting for single auth, object is being migrated";
362 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
363 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
364 mdr->drop_local_auth_pins();
365 return false;
366 }
367 mustpin_remote[object->authority().first].insert(object);
368 continue;
369 }
370 int err = 0;
371 if (!object->can_auth_pin(&err)) {
372 // wait
373 drop_locks(mdr.get());
374 mdr->drop_local_auth_pins();
375 if (auth_pin_nonblock) {
376 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
377 mdr->aborted = true;
378 return false;
379 }
380 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
381 marker.message = "failed to authpin, subtree is being exported";
382 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
383 marker.message = "failed to authpin, dir is being fragmented";
384 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
385 marker.message = "failed to authpin, inode is being exported";
386 }
387 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
388 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
389
390 if (!mdr->remote_auth_pins.empty())
391 notify_freeze_waiter(object);
392
393 return false;
394 }
395 }
396
397 // ok, grab local auth pins
398 for (const auto& p : mustpin) {
399 MDSCacheObject *object = p;
400 if (mdr->is_auth_pinned(object)) {
401 dout(10) << " already auth_pinned " << *object << dendl;
402 } else if (object->is_auth()) {
403 dout(10) << " auth_pinning " << *object << dendl;
404 mdr->auth_pin(object);
405 }
406 }
407
408 // request remote auth_pins
409 if (!mustpin_remote.empty()) {
410 marker.message = "requesting remote authpins";
411 for (const auto& p : mdr->remote_auth_pins) {
412 if (mustpin.count(p.first)) {
413 ceph_assert(p.second == p.first->authority().first);
414 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
415 if (q != mustpin_remote.end())
416 q->second.insert(p.first);
417 }
418 }
419 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
420 p != mustpin_remote.end();
421 ++p) {
422 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
423
424 // wait for active auth
425 if (mds->is_cluster_degraded() &&
426 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
427 dout(10) << " mds." << p->first << " is not active" << dendl;
428 if (mdr->more()->waiting_on_slave.empty())
429 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
430 return false;
431 }
432
433 auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN);
434 for (set<MDSCacheObject*>::iterator q = p->second.begin();
435 q != p->second.end();
436 ++q) {
437 dout(10) << " req remote auth_pin of " << **q << dendl;
438 MDSCacheObjectInfo info;
439 (*q)->set_object_info(info);
440 req->get_authpins().push_back(info);
441 if (*q == auth_pin_freeze)
442 (*q)->set_object_info(req->get_authpin_freeze());
443 mdr->pin(*q);
444 }
445 if (auth_pin_nonblock)
446 req->mark_nonblock();
447 mds->send_message_mds(req, p->first);
448
449 // put in waiting list
450 ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
451 mdr->more()->waiting_on_slave.insert(p->first);
452 }
453 return false;
454 }
455
456 // caps i'll need to issue
457 set<CInode*> issue_set;
458 bool result = false;
459
460 // acquire locks.
461 // make sure they match currently acquired locks.
462 auto existing = mdr->locks.begin();
463 for (const auto& p : lov) {
464 bool need_wrlock = p.is_wrlock();
465 bool need_remote_wrlock = p.is_remote_wrlock();
466
467 // already locked?
468 if (existing != mdr->locks.end() && existing->lock == p.lock) {
469 // right kind?
470 auto it = existing++;
471 auto have = *it; // don't reference
472
473 if (have.is_xlock() && p.is_xlock()) {
474 dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
475 continue;
476 }
477
478 if (have.is_remote_wrlock() &&
479 (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
480 dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
481 << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
482 remote_wrlock_finish(it, mdr.get());
483 have.clear_remote_wrlock();
484 }
485
486 if (need_wrlock || need_remote_wrlock) {
487 if (need_wrlock == have.is_wrlock() &&
488 need_remote_wrlock == have.is_remote_wrlock()) {
489 if (need_wrlock)
490 dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
491 if (need_remote_wrlock)
492 dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
493 continue;
494 }
495
496 if (have.is_wrlock()) {
497 if (!need_wrlock)
498 dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
499 else if (need_remote_wrlock) // acquire remote_wrlock first
500 dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
501 bool need_issue = false;
502 wrlock_finish(it, mdr.get(), &need_issue);
503 if (need_issue)
504 issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
505 }
506 } else if (have.is_rdlock() && p.is_rdlock()) {
507 dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
508 continue;
509 }
510 }
511
512 // hose any stray locks
513 while (existing != mdr->locks.end()) {
514 auto it = existing++;
515 auto stray = *it; // don't reference
516 dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
517 bool need_issue = false;
518 if (stray.is_xlock()) {
519 xlock_finish(it, mdr.get(), &need_issue);
520 } else if (stray.is_rdlock()) {
521 rdlock_finish(it, mdr.get(), &need_issue);
522 } else {
523 // may have acquired both wrlock and remore wrlock
524 if (stray.is_wrlock())
525 wrlock_finish(it, mdr.get(), &need_issue);
526 if (stray.is_remote_wrlock())
527 remote_wrlock_finish(it, mdr.get());
528 }
529 if (need_issue)
530 issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
531 }
532
533 // lock
534 if (mdr->locking && p.lock != mdr->locking) {
535 cancel_locking(mdr.get(), &issue_set);
536 }
537 if (p.is_xlock()) {
538 marker.message = "failed to xlock, waiting";
539 if (!xlock_start(p.lock, mdr))
540 goto out;
541 dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
542 } else if (need_wrlock || need_remote_wrlock) {
543 if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
544 marker.message = "waiting for remote wrlocks";
545 remote_wrlock_start(p, p.wrlock_target, mdr);
546 goto out;
547 }
548 if (need_wrlock) {
549 marker.message = "failed to wrlock, waiting";
550 if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
551 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
552 // can't take the wrlock because the scatter lock is gathering. need to
553 // release the remote wrlock, so that the gathering process can finish.
554 auto it = mdr->locks.end();
555 ++it;
556 remote_wrlock_finish(it, mdr.get());
557 remote_wrlock_start(p, p.wrlock_target, mdr);
558 goto out;
559 }
560 // nowait if we have already gotten remote wrlock
561 if (!wrlock_start(p, mdr, need_remote_wrlock))
562 goto out;
563 dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
564 }
565 } else {
566 ceph_assert(mdr->is_master());
567 if (p.lock->needs_recover()) {
568 if (mds->is_cluster_degraded()) {
569 if (!mdr->is_queued_for_replay()) {
570 // see comments in SimpleLock::set_state_rejoin() and
571 // ScatterLock::encode_state_for_rejoin()
572 drop_locks(mdr.get());
573 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
574 dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
575 << ", waiting for cluster recovered" << dendl;
576 marker.message = "rejoin recovering lock, waiting for cluster recovered";
577 return false;
578 }
579 } else {
580 p.lock->clear_need_recover();
581 }
582 }
583
584 marker.message = "failed to rdlock, waiting";
585 if (!rdlock_start(p, mdr))
586 goto out;
587 dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
588 }
589 }
590
591 // any extra unneeded locks?
592 while (existing != mdr->locks.end()) {
593 auto it = existing++;
594 auto stray = *it;
595 dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
596 bool need_issue = false;
597 if (stray.is_xlock()) {
598 xlock_finish(it, mdr.get(), &need_issue);
599 } else if (stray.is_rdlock()) {
600 rdlock_finish(it, mdr.get(), &need_issue);
601 } else {
602 // may have acquired both wrlock and remore wrlock
603 if (stray.is_wrlock())
604 wrlock_finish(it, mdr.get(), &need_issue);
605 if (stray.is_remote_wrlock())
606 remote_wrlock_finish(it, mdr.get());
607 }
608 if (need_issue)
609 issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
610 }
611
612 mdr->done_locking = true;
613 mdr->set_mds_stamp(ceph_clock_now());
614 result = true;
615 marker.message = "acquired locks";
616
617 out:
618 issue_caps_set(issue_set);
619 return result;
620 }
621
622 void Locker::notify_freeze_waiter(MDSCacheObject *o)
623 {
624 CDir *dir = NULL;
625 if (CInode *in = dynamic_cast<CInode*>(o)) {
626 if (!in->is_root())
627 dir = in->get_parent_dir();
628 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
629 dir = dn->get_dir();
630 } else {
631 dir = dynamic_cast<CDir*>(o);
632 ceph_assert(dir);
633 }
634 if (dir) {
635 if (dir->is_freezing_dir())
636 mdcache->fragment_freeze_inc_num_waiters(dir);
637 if (dir->is_freezing_tree()) {
638 while (!dir->is_freezing_tree_root())
639 dir = dir->get_parent_dir();
640 mdcache->migrator->export_freeze_inc_num_waiters(dir);
641 }
642 }
643 }
644
645 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
646 {
647 for (const auto &p : mut->locks) {
648 if (!p.is_xlock())
649 continue;
650 MDSCacheObject *obj = p.lock->get_parent();
651 ceph_assert(obj->is_auth());
652 if (skip_dentry &&
653 (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
654 continue;
655 dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
656 p.lock->set_xlock_done();
657 }
658 }
659
660 void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
661 bool drop_rdlocks)
662 {
663 set<mds_rank_t> slaves;
664
665 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
666 SimpleLock *lock = it->lock;
667 MDSCacheObject *obj = lock->get_parent();
668
669 if (it->is_xlock()) {
670 if (obj->is_auth()) {
671 bool ni = false;
672 xlock_finish(it++, mut, &ni);
673 if (ni)
674 pneed_issue->insert(static_cast<CInode*>(obj));
675 } else {
676 ceph_assert(lock->get_sm()->can_remote_xlock);
677 slaves.insert(obj->authority().first);
678 lock->put_xlock();
679 mut->locks.erase(it++);
680 }
681 } else if (it->is_wrlock() || it->is_remote_wrlock()) {
682 if (it->is_remote_wrlock()) {
683 slaves.insert(it->wrlock_target);
684 it->clear_remote_wrlock();
685 }
686 if (it->is_wrlock()) {
687 bool ni = false;
688 wrlock_finish(it++, mut, &ni);
689 if (ni)
690 pneed_issue->insert(static_cast<CInode*>(obj));
691 } else {
692 mut->locks.erase(it++);
693 }
694 } else if (drop_rdlocks && it->is_rdlock()) {
695 bool ni = false;
696 rdlock_finish(it++, mut, &ni);
697 if (ni)
698 pneed_issue->insert(static_cast<CInode*>(obj));
699 } else {
700 ++it;
701 }
702 }
703
704 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
705 if (!mds->is_cluster_degraded() ||
706 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
707 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
708 auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS);
709 mds->send_message_mds(slavereq, *p);
710 }
711 }
712 }
713
714 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
715 {
716 SimpleLock *lock = mut->locking;
717 ceph_assert(lock);
718 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
719
720 if (lock->get_parent()->is_auth()) {
721 bool need_issue = false;
722 if (lock->get_state() == LOCK_PREXLOCK) {
723 _finish_xlock(lock, -1, &need_issue);
724 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
725 lock->set_state(LOCK_XLOCKDONE);
726 eval_gather(lock, true, &need_issue);
727 }
728 if (need_issue)
729 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
730 }
731 mut->finish_locking(lock);
732 }
733
734 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
735 {
736 // leftover locks
737 set<CInode*> my_need_issue;
738 if (!pneed_issue)
739 pneed_issue = &my_need_issue;
740
741 if (mut->locking)
742 cancel_locking(mut, pneed_issue);
743 _drop_locks(mut, pneed_issue, true);
744
745 if (pneed_issue == &my_need_issue)
746 issue_caps_set(*pneed_issue);
747 mut->done_locking = false;
748 }
749
750 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
751 {
752 set<CInode*> my_need_issue;
753 if (!pneed_issue)
754 pneed_issue = &my_need_issue;
755
756 _drop_locks(mut, pneed_issue, false);
757
758 if (pneed_issue == &my_need_issue)
759 issue_caps_set(*pneed_issue);
760 }
761
762 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
763 {
764 set<CInode*> need_issue;
765
766 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
767 if (!it->is_rdlock()) {
768 ++it;
769 continue;
770 }
771 SimpleLock *lock = it->lock;
772 // make later mksnap/setlayout (at other mds) wait for this unsafe request
773 if (lock->get_type() == CEPH_LOCK_ISNAP ||
774 lock->get_type() == CEPH_LOCK_IPOLICY) {
775 ++it;
776 continue;
777 }
778 bool ni = false;
779 rdlock_finish(it++, mut, &ni);
780 if (ni)
781 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
782 }
783
784 issue_caps_set(need_issue);
785 }
786
787 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
788 {
789 set<CInode*> need_issue;
790
791 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
792 SimpleLock *lock = it->lock;
793 if (lock->get_type() == CEPH_LOCK_IDFT) {
794 ++it;
795 continue;
796 }
797 bool ni = false;
798 wrlock_finish(it++, mut, &ni);
799 if (ni)
800 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
801 }
802 issue_caps_set(need_issue);
803 }
804
805 // generics
806
807 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
808 {
809 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
810 ceph_assert(!lock->is_stable());
811
812 int next = lock->get_next_state();
813
814 CInode *in = 0;
815 bool caps = lock->get_cap_shift();
816 if (lock->get_type() != CEPH_LOCK_DN)
817 in = static_cast<CInode *>(lock->get_parent());
818
819 bool need_issue = false;
820
821 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
822 ceph_assert(!caps || in != NULL);
823 if (caps && in->is_head()) {
824 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
825 lock->get_cap_shift(), lock->get_cap_mask());
826 dout(10) << " next state is " << lock->get_state_name(next)
827 << " issued/allows loner " << gcap_string(loner_issued)
828 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
829 << " xlocker " << gcap_string(xlocker_issued)
830 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
831 << " other " << gcap_string(other_issued)
832 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
833 << dendl;
834
835 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
836 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
837 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
838 need_issue = true;
839 }
840
841 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
842 bool auth = lock->get_parent()->is_auth();
843 if (!lock->is_gathering() &&
844 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
845 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
846 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
847 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
848 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
849 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
850 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
851 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
852 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
853 lock->get_state() != LOCK_MIX_SYNC2
854 ) {
855 dout(7) << "eval_gather finished gather on " << *lock
856 << " on " << *lock->get_parent() << dendl;
857
858 if (lock->get_sm() == &sm_filelock) {
859 ceph_assert(in);
860 if (in->state_test(CInode::STATE_RECOVERING)) {
861 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
862 return;
863 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
864 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
865 mds->mdcache->queue_file_recover(in);
866 mds->mdcache->do_file_recover();
867 return;
868 }
869 }
870
871 if (!lock->get_parent()->is_auth()) {
872 // replica: tell auth
873 mds_rank_t auth = lock->get_parent()->authority().first;
874
875 if (lock->get_parent()->is_rejoining() &&
876 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
877 dout(7) << "eval_gather finished gather, but still rejoining "
878 << *lock->get_parent() << dendl;
879 return;
880 }
881
882 if (!mds->is_cluster_degraded() ||
883 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
884 switch (lock->get_state()) {
885 case LOCK_SYNC_LOCK:
886 mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
887 break;
888
889 case LOCK_MIX_SYNC:
890 {
891 auto reply = MLock::create(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
892 lock->encode_locked_state(reply->get_data());
893 mds->send_message_mds(reply, auth);
894 next = LOCK_MIX_SYNC2;
895 (static_cast<ScatterLock *>(lock))->start_flush();
896 }
897 break;
898
899 case LOCK_MIX_SYNC2:
900 (static_cast<ScatterLock *>(lock))->finish_flush();
901 (static_cast<ScatterLock *>(lock))->clear_flushed();
902
903 case LOCK_SYNC_MIX2:
904 // do nothing, we already acked
905 break;
906
907 case LOCK_SYNC_MIX:
908 {
909 auto reply = MLock::create(lock, LOCK_AC_MIXACK, mds->get_nodeid());
910 mds->send_message_mds(reply, auth);
911 next = LOCK_SYNC_MIX2;
912 }
913 break;
914
915 case LOCK_MIX_LOCK:
916 {
917 bufferlist data;
918 lock->encode_locked_state(data);
919 mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
920 (static_cast<ScatterLock *>(lock))->start_flush();
921 // we'll get an AC_LOCKFLUSHED to complete
922 }
923 break;
924
925 default:
926 ceph_abort();
927 }
928 }
929 } else {
930 // auth
931
932 // once the first (local) stage of mix->lock gather complete we can
933 // gather from replicas
934 if (lock->get_state() == LOCK_MIX_LOCK &&
935 lock->get_parent()->is_replicated()) {
936 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
937 send_lock_message(lock, LOCK_AC_LOCK);
938 lock->init_gather();
939 lock->set_state(LOCK_MIX_LOCK2);
940 return;
941 }
942
943 if (lock->is_dirty() && !lock->is_flushed()) {
944 scatter_writebehind(static_cast<ScatterLock *>(lock));
945 mds->mdlog->flush();
946 return;
947 }
948 lock->clear_flushed();
949
950 switch (lock->get_state()) {
951 // to mixed
952 case LOCK_TSYN_MIX:
953 case LOCK_SYNC_MIX:
954 case LOCK_EXCL_MIX:
955 case LOCK_XSYN_MIX:
956 in->start_scatter(static_cast<ScatterLock *>(lock));
957 if (lock->get_parent()->is_replicated()) {
958 bufferlist softdata;
959 lock->encode_locked_state(softdata);
960 send_lock_message(lock, LOCK_AC_MIX, softdata);
961 }
962 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
963 break;
964
965 case LOCK_XLOCK:
966 case LOCK_XLOCKDONE:
967 if (next != LOCK_SYNC)
968 break;
969 // fall-thru
970
971 // to sync
972 case LOCK_EXCL_SYNC:
973 case LOCK_LOCK_SYNC:
974 case LOCK_MIX_SYNC:
975 case LOCK_XSYN_SYNC:
976 if (lock->get_parent()->is_replicated()) {
977 bufferlist softdata;
978 lock->encode_locked_state(softdata);
979 send_lock_message(lock, LOCK_AC_SYNC, softdata);
980 }
981 break;
982 }
983
984 }
985
986 lock->set_state(next);
987
988 if (lock->get_parent()->is_auth() &&
989 lock->is_stable())
990 lock->get_parent()->auth_unpin(lock);
991
992 // drop loner before doing waiters
993 if (caps &&
994 in->is_head() &&
995 in->is_auth() &&
996 in->get_wanted_loner() != in->get_loner()) {
997 dout(10) << " trying to drop loner" << dendl;
998 if (in->try_drop_loner()) {
999 dout(10) << " dropped loner" << dendl;
1000 need_issue = true;
1001 }
1002 }
1003
1004 if (pfinishers)
1005 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1006 *pfinishers);
1007 else
1008 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1009
1010 if (caps && in->is_head())
1011 need_issue = true;
1012
1013 if (lock->get_parent()->is_auth() &&
1014 lock->is_stable())
1015 try_eval(lock, &need_issue);
1016 }
1017
1018 if (need_issue) {
1019 if (pneed_issue)
1020 *pneed_issue = true;
1021 else if (in->is_head())
1022 issue_caps(in);
1023 }
1024
1025 }
1026
1027 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1028 {
1029 bool need_issue = caps_imported;
1030 MDSContext::vec finishers;
1031
1032 dout(10) << "eval " << mask << " " << *in << dendl;
1033
1034 // choose loner?
1035 if (in->is_auth() && in->is_head()) {
1036 client_t orig_loner = in->get_loner();
1037 if (in->choose_ideal_loner()) {
1038 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1039 need_issue = true;
1040 mask = -1;
1041 } else if (in->get_wanted_loner() != in->get_loner()) {
1042 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1043 mask = -1;
1044 }
1045 }
1046
1047 retry:
1048 if (mask & CEPH_LOCK_IFILE)
1049 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1050 if (mask & CEPH_LOCK_IAUTH)
1051 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1052 if (mask & CEPH_LOCK_ILINK)
1053 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1054 if (mask & CEPH_LOCK_IXATTR)
1055 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1056 if (mask & CEPH_LOCK_INEST)
1057 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1058 if (mask & CEPH_LOCK_IFLOCK)
1059 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1060 if (mask & CEPH_LOCK_IPOLICY)
1061 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1062
1063 // drop loner?
1064 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1065 if (in->try_drop_loner()) {
1066 need_issue = true;
1067 if (in->get_wanted_loner() >= 0) {
1068 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1069 bool ok = in->try_set_loner();
1070 ceph_assert(ok);
1071 mask = -1;
1072 goto retry;
1073 }
1074 }
1075 }
1076
1077 finish_contexts(g_ceph_context, finishers);
1078
1079 if (need_issue && in->is_head())
1080 issue_caps(in);
1081
1082 dout(10) << "eval done" << dendl;
1083 return need_issue;
1084 }
1085
1086 class C_Locker_Eval : public LockerContext {
1087 MDSCacheObject *p;
1088 int mask;
1089 public:
1090 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1091 // We are used as an MDSCacheObject waiter, so should
1092 // only be invoked by someone already holding the big lock.
1093 ceph_assert(locker->mds->mds_lock.is_locked_by_me());
1094 p->get(MDSCacheObject::PIN_PTRWAITER);
1095 }
1096 void finish(int r) override {
1097 locker->try_eval(p, mask);
1098 p->put(MDSCacheObject::PIN_PTRWAITER);
1099 }
1100 };
1101
1102 void Locker::try_eval(MDSCacheObject *p, int mask)
1103 {
1104 // unstable and ambiguous auth?
1105 if (p->is_ambiguous_auth()) {
1106 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1107 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1108 return;
1109 }
1110
1111 if (p->is_auth() && p->is_frozen()) {
1112 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1113 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1114 return;
1115 }
1116
1117 if (mask & CEPH_LOCK_DN) {
1118 ceph_assert(mask == CEPH_LOCK_DN);
1119 bool need_issue = false; // ignore this, no caps on dentries
1120 CDentry *dn = static_cast<CDentry *>(p);
1121 eval_any(&dn->lock, &need_issue);
1122 } else {
1123 CInode *in = static_cast<CInode *>(p);
1124 eval(in, mask);
1125 }
1126 }
1127
1128 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1129 {
1130 MDSCacheObject *p = lock->get_parent();
1131
1132 // unstable and ambiguous auth?
1133 if (p->is_ambiguous_auth()) {
1134 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1135 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1136 return;
1137 }
1138
1139 if (!p->is_auth()) {
1140 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1141 return;
1142 }
1143
1144 if (p->is_frozen()) {
1145 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1146 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1147 return;
1148 }
1149
1150 /*
1151 * We could have a situation like:
1152 *
1153 * - mds A authpins item on mds B
1154 * - mds B starts to freeze tree containing item
1155 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1156 * - mds B lock is unstable, sets scatter_wanted
1157 * - mds B lock stabilizes, calls try_eval.
1158 *
1159 * We can defer while freezing without causing a deadlock. Honor
1160 * scatter_wanted flag here. This will never get deferred by the
1161 * checks above due to the auth_pin held by the master.
1162 */
1163 if (lock->is_scatterlock()) {
1164 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1165 if (slock->get_scatter_wanted() &&
1166 slock->get_state() != LOCK_MIX) {
1167 scatter_mix(slock, pneed_issue);
1168 if (!lock->is_stable())
1169 return;
1170 } else if (slock->get_unscatter_wanted() &&
1171 slock->get_state() != LOCK_LOCK) {
1172 simple_lock(slock, pneed_issue);
1173 if (!lock->is_stable()) {
1174 return;
1175 }
1176 }
1177 }
1178
1179 if (lock->get_type() != CEPH_LOCK_DN &&
1180 lock->get_type() != CEPH_LOCK_ISNAP &&
1181 p->is_freezing()) {
1182 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1183 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1184 return;
1185 }
1186
1187 eval(lock, pneed_issue);
1188 }
1189
1190 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1191 {
1192 bool need_issue = false;
1193 MDSContext::vec finishers;
1194
1195 // kick locks now
1196 if (!in->filelock.is_stable())
1197 eval_gather(&in->filelock, false, &need_issue, &finishers);
1198 if (!in->authlock.is_stable())
1199 eval_gather(&in->authlock, false, &need_issue, &finishers);
1200 if (!in->linklock.is_stable())
1201 eval_gather(&in->linklock, false, &need_issue, &finishers);
1202 if (!in->xattrlock.is_stable())
1203 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1204
1205 if (need_issue && in->is_head()) {
1206 if (issue_set)
1207 issue_set->insert(in);
1208 else
1209 issue_caps(in);
1210 }
1211
1212 finish_contexts(g_ceph_context, finishers);
1213 }
1214
1215 void Locker::eval_scatter_gathers(CInode *in)
1216 {
1217 bool need_issue = false;
1218 MDSContext::vec finishers;
1219
1220 dout(10) << "eval_scatter_gathers " << *in << dendl;
1221
1222 // kick locks now
1223 if (!in->filelock.is_stable())
1224 eval_gather(&in->filelock, false, &need_issue, &finishers);
1225 if (!in->nestlock.is_stable())
1226 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1227 if (!in->dirfragtreelock.is_stable())
1228 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1229
1230 if (need_issue && in->is_head())
1231 issue_caps(in);
1232
1233 finish_contexts(g_ceph_context, finishers);
1234 }
1235
1236 void Locker::eval(SimpleLock *lock, bool *need_issue)
1237 {
1238 switch (lock->get_type()) {
1239 case CEPH_LOCK_IFILE:
1240 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1241 case CEPH_LOCK_IDFT:
1242 case CEPH_LOCK_INEST:
1243 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1244 default:
1245 return simple_eval(lock, need_issue);
1246 }
1247 }
1248
1249
1250 // ------------------
1251 // rdlock
1252
1253 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1254 {
1255 // kick the lock
1256 if (lock->is_stable()) {
1257 if (lock->get_parent()->is_auth()) {
1258 if (lock->get_sm() == &sm_scatterlock) {
1259 // not until tempsync is fully implemented
1260 //if (lock->get_parent()->is_replicated())
1261 //scatter_tempsync((ScatterLock*)lock);
1262 //else
1263 simple_sync(lock);
1264 } else if (lock->get_sm() == &sm_filelock) {
1265 CInode *in = static_cast<CInode*>(lock->get_parent());
1266 if (lock->get_state() == LOCK_EXCL &&
1267 in->get_target_loner() >= 0 &&
1268 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1269 file_xsyn(lock);
1270 else
1271 simple_sync(lock);
1272 } else
1273 simple_sync(lock);
1274 return true;
1275 } else {
1276 // request rdlock state change from auth
1277 mds_rank_t auth = lock->get_parent()->authority().first;
1278 if (!mds->is_cluster_degraded() ||
1279 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1280 dout(10) << "requesting rdlock from auth on "
1281 << *lock << " on " << *lock->get_parent() << dendl;
1282 mds->send_message_mds(MLock::create(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1283 }
1284 return false;
1285 }
1286 }
1287 if (lock->get_type() == CEPH_LOCK_IFILE) {
1288 CInode *in = static_cast<CInode *>(lock->get_parent());
1289 if (in->state_test(CInode::STATE_RECOVERING)) {
1290 mds->mdcache->recovery_queue.prioritize(in);
1291 }
1292 }
1293
1294 return false;
1295 }
1296
1297 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSContext *con)
1298 {
1299 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1300
1301 // can read? grab ref.
1302 if (lock->can_rdlock(client))
1303 return true;
1304
1305 _rdlock_kick(lock, false);
1306
1307 if (lock->can_rdlock(client))
1308 return true;
1309
1310 // wait!
1311 if (con) {
1312 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1313 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1314 }
1315 return false;
1316 }
1317
1318 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1319 {
1320 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1321
1322 // client may be allowed to rdlock the same item it has xlocked.
1323 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1324 if (mut->snapid != CEPH_NOSNAP)
1325 as_anon = true;
1326 client_t client = as_anon ? -1 : mut->get_client();
1327
1328 CInode *in = 0;
1329 if (lock->get_type() != CEPH_LOCK_DN)
1330 in = static_cast<CInode *>(lock->get_parent());
1331
1332 /*
1333 if (!lock->get_parent()->is_auth() &&
1334 lock->fw_rdlock_to_auth()) {
1335 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1336 return false;
1337 }
1338 */
1339
1340 while (1) {
1341 // can read? grab ref.
1342 if (lock->can_rdlock(client)) {
1343 lock->get_rdlock();
1344 mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
1345 return true;
1346 }
1347
1348 // hmm, wait a second.
1349 if (in && !in->is_head() && in->is_auth() &&
1350 lock->get_state() == LOCK_SNAP_SYNC) {
1351 // okay, we actually need to kick the head's lock to get ourselves synced up.
1352 CInode *head = mdcache->get_inode(in->ino());
1353 ceph_assert(head);
1354 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1355 if (hlock->get_state() == LOCK_SYNC)
1356 hlock = head->get_lock(lock->get_type());
1357
1358 if (hlock->get_state() != LOCK_SYNC) {
1359 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1360 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1361 return false;
1362 // oh, check our lock again then
1363 }
1364 }
1365
1366 if (!_rdlock_kick(lock, as_anon))
1367 break;
1368 }
1369
1370 // wait!
1371 int wait_on;
1372 if (lock->get_parent()->is_auth() && lock->is_stable())
1373 wait_on = SimpleLock::WAIT_RD;
1374 else
1375 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1376 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1377 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1378 nudge_log(lock);
1379 return false;
1380 }
1381
1382 void Locker::nudge_log(SimpleLock *lock)
1383 {
1384 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1385 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1386 mds->mdlog->flush();
1387 }
1388
1389 void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1390 {
1391 ceph_assert(it->is_rdlock());
1392 SimpleLock *lock = it->lock;
1393 // drop ref
1394 lock->put_rdlock();
1395 if (mut)
1396 mut->locks.erase(it);
1397
1398 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1399
1400 // last one?
1401 if (!lock->is_rdlocked()) {
1402 if (!lock->is_stable())
1403 eval_gather(lock, false, pneed_issue);
1404 else if (lock->get_parent()->is_auth())
1405 try_eval(lock, pneed_issue);
1406 }
1407 }
1408
1409
1410 bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
1411 {
1412 dout(10) << "can_rdlock_set " << dendl;
1413 for (const auto& p : lov) {
1414 ceph_assert(p.is_rdlock());
1415 if (!p.lock->can_rdlock(-1)) {
1416 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
1417 return false;
1418 }
1419 }
1420 return true;
1421 }
1422
1423
1424 void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
1425 {
1426 dout(10) << "rdlock_take_set " << dendl;
1427 for (const auto& p : lov) {
1428 ceph_assert(p.is_rdlock());
1429 p.lock->get_rdlock();
1430 mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
1431 }
1432 }
1433
1434 // ------------------
1435 // wrlock
1436
1437 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1438 {
1439 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1440 lock->get_type() == CEPH_LOCK_DVERSION)
1441 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1442
1443 dout(7) << "wrlock_force on " << *lock
1444 << " on " << *lock->get_parent() << dendl;
1445 lock->get_wrlock(true);
1446 mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
1447 }
1448
1449 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1450 {
1451 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1452 lock->get_type() == CEPH_LOCK_DVERSION)
1453 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1454
1455 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1456
1457 CInode *in = static_cast<CInode *>(lock->get_parent());
1458 client_t client = mut->get_client();
1459 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1460 (in->has_subtree_or_exporting_dirfrag() ||
1461 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1462
1463 while (1) {
1464 // wrlock?
1465 if (lock->can_wrlock(client) &&
1466 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1467 lock->get_wrlock();
1468 auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
1469 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1470 return true;
1471 }
1472
1473 if (lock->get_type() == CEPH_LOCK_IFILE &&
1474 in->state_test(CInode::STATE_RECOVERING)) {
1475 mds->mdcache->recovery_queue.prioritize(in);
1476 }
1477
1478 if (!lock->is_stable())
1479 break;
1480
1481 if (in->is_auth()) {
1482 // don't do nested lock state change if we have dirty scatterdata and
1483 // may scatter_writebehind or start_scatter, because nowait==true implies
1484 // that the caller already has a log entry open!
1485 if (nowait && lock->is_dirty())
1486 return false;
1487
1488 if (want_scatter)
1489 scatter_mix(static_cast<ScatterLock*>(lock));
1490 else
1491 simple_lock(lock);
1492
1493 if (nowait && !lock->can_wrlock(client))
1494 return false;
1495
1496 } else {
1497 // replica.
1498 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1499 mds_rank_t auth = lock->get_parent()->authority().first;
1500 if (!mds->is_cluster_degraded() ||
1501 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1502 dout(10) << "requesting scatter from auth on "
1503 << *lock << " on " << *lock->get_parent() << dendl;
1504 mds->send_message_mds(MLock::create(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1505 }
1506 break;
1507 }
1508 }
1509
1510 if (!nowait) {
1511 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1512 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1513 nudge_log(lock);
1514 }
1515
1516 return false;
1517 }
1518
1519 void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1520 {
1521 ceph_assert(it->is_wrlock());
1522 SimpleLock* lock = it->lock;
1523
1524 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1525 lock->get_type() == CEPH_LOCK_DVERSION)
1526 return local_wrlock_finish(it, mut);
1527
1528 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1529 lock->put_wrlock();
1530
1531 if (it->is_remote_wrlock())
1532 it->clear_wrlock();
1533 else
1534 mut->locks.erase(it);
1535
1536 if (!lock->is_wrlocked()) {
1537 if (!lock->is_stable())
1538 eval_gather(lock, false, pneed_issue);
1539 else if (lock->get_parent()->is_auth())
1540 try_eval(lock, pneed_issue);
1541 }
1542 }
1543
1544
1545 // remote wrlock
1546
1547 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1548 {
1549 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1550
1551 // wait for active target
1552 if (mds->is_cluster_degraded() &&
1553 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1554 dout(7) << " mds." << target << " is not active" << dendl;
1555 if (mut->more()->waiting_on_slave.empty())
1556 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1557 return;
1558 }
1559
1560 // send lock request
1561 mut->start_locking(lock, target);
1562 mut->more()->slaves.insert(target);
1563 auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
1564 r->set_lock_type(lock->get_type());
1565 lock->get_parent()->set_object_info(r->get_object_info());
1566 mds->send_message_mds(r, target);
1567
1568 ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
1569 mut->more()->waiting_on_slave.insert(target);
1570 }
1571
1572 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1573 {
1574 ceph_assert(it->is_remote_wrlock());
1575 SimpleLock *lock = it->lock;
1576 mds_rank_t target = it->wrlock_target;
1577
1578 if (it->is_wrlock())
1579 it->clear_remote_wrlock();
1580 else
1581 mut->locks.erase(it);
1582
1583 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1584 << " " << *lock->get_parent() << dendl;
1585 if (!mds->is_cluster_degraded() ||
1586 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1587 auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
1588 slavereq->set_lock_type(lock->get_type());
1589 lock->get_parent()->set_object_info(slavereq->get_object_info());
1590 mds->send_message_mds(slavereq, target);
1591 }
1592 }
1593
1594
1595 // ------------------
1596 // xlock
1597
1598 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1599 {
1600 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1601 lock->get_type() == CEPH_LOCK_DVERSION)
1602 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1603
1604 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1605 client_t client = mut->get_client();
1606
1607 CInode *in = nullptr;
1608 if (lock->get_cap_shift())
1609 in = static_cast<CInode *>(lock->get_parent());
1610
1611 // auth?
1612 if (lock->get_parent()->is_auth()) {
1613 // auth
1614 while (1) {
1615 if (lock->can_xlock(client) &&
1616 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1617 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
1618 lock->set_state(LOCK_XLOCK);
1619 lock->get_xlock(mut, client);
1620 mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
1621 mut->finish_locking(lock);
1622 return true;
1623 }
1624
1625 if (lock->get_type() == CEPH_LOCK_IFILE &&
1626 in->state_test(CInode::STATE_RECOVERING)) {
1627 mds->mdcache->recovery_queue.prioritize(in);
1628 }
1629
1630 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1631 lock->get_xlock_by_client() != client ||
1632 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1633 break;
1634
1635 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1636 mut->start_locking(lock);
1637 simple_xlock(lock);
1638 } else {
1639 simple_lock(lock);
1640 }
1641 }
1642
1643 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1644 nudge_log(lock);
1645 return false;
1646 } else {
1647 // replica
1648 ceph_assert(lock->get_sm()->can_remote_xlock);
1649 ceph_assert(!mut->slave_request);
1650
1651 // wait for single auth
1652 if (lock->get_parent()->is_ambiguous_auth()) {
1653 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1654 new C_MDS_RetryRequest(mdcache, mut));
1655 return false;
1656 }
1657
1658 // wait for active auth
1659 mds_rank_t auth = lock->get_parent()->authority().first;
1660 if (mds->is_cluster_degraded() &&
1661 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1662 dout(7) << " mds." << auth << " is not active" << dendl;
1663 if (mut->more()->waiting_on_slave.empty())
1664 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1665 return false;
1666 }
1667
1668 // send lock request
1669 mut->more()->slaves.insert(auth);
1670 mut->start_locking(lock, auth);
1671 auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
1672 r->set_lock_type(lock->get_type());
1673 lock->get_parent()->set_object_info(r->get_object_info());
1674 mds->send_message_mds(r, auth);
1675
1676 ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
1677 mut->more()->waiting_on_slave.insert(auth);
1678
1679 return false;
1680 }
1681 }
1682
1683 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1684 {
1685 ceph_assert(!lock->is_stable());
1686 if (lock->get_type() != CEPH_LOCK_DN &&
1687 lock->get_type() != CEPH_LOCK_ISNAP &&
1688 lock->get_num_rdlocks() == 0 &&
1689 lock->get_num_wrlocks() == 0 &&
1690 !lock->is_leased() &&
1691 lock->get_state() != LOCK_XLOCKSNAP) {
1692 CInode *in = static_cast<CInode*>(lock->get_parent());
1693 client_t loner = in->get_target_loner();
1694 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1695 lock->set_state(LOCK_EXCL);
1696 lock->get_parent()->auth_unpin(lock);
1697 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1698 if (lock->get_cap_shift())
1699 *pneed_issue = true;
1700 if (lock->get_parent()->is_auth() &&
1701 lock->is_stable())
1702 try_eval(lock, pneed_issue);
1703 return;
1704 }
1705 }
1706 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1707 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1708 }
1709
1710 void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1711 {
1712 ceph_assert(it->is_xlock());
1713 SimpleLock *lock = it->lock;
1714
1715 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1716 lock->get_type() == CEPH_LOCK_DVERSION)
1717 return local_xlock_finish(it, mut);
1718
1719 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1720
1721 client_t xlocker = lock->get_xlock_by_client();
1722
1723 // drop ref
1724 lock->put_xlock();
1725 ceph_assert(mut);
1726 mut->locks.erase(it);
1727
1728 bool do_issue = false;
1729
1730 // remote xlock?
1731 if (!lock->get_parent()->is_auth()) {
1732 ceph_assert(lock->get_sm()->can_remote_xlock);
1733
1734 // tell auth
1735 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1736 mds_rank_t auth = lock->get_parent()->authority().first;
1737 if (!mds->is_cluster_degraded() ||
1738 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1739 auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
1740 slavereq->set_lock_type(lock->get_type());
1741 lock->get_parent()->set_object_info(slavereq->get_object_info());
1742 mds->send_message_mds(slavereq, auth);
1743 }
1744 // others waiting?
1745 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1746 SimpleLock::WAIT_WR |
1747 SimpleLock::WAIT_RD, 0);
1748 } else {
1749 if (lock->get_num_xlocks() == 0 &&
1750 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
1751 _finish_xlock(lock, xlocker, &do_issue);
1752 }
1753 }
1754
1755 if (do_issue) {
1756 CInode *in = static_cast<CInode*>(lock->get_parent());
1757 if (in->is_head()) {
1758 if (pneed_issue)
1759 *pneed_issue = true;
1760 else
1761 issue_caps(in);
1762 }
1763 }
1764 }
1765
1766 void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1767 {
1768 ceph_assert(it->is_xlock());
1769 SimpleLock *lock = it->lock;
1770 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1771
1772 lock->put_xlock();
1773 mut->locks.erase(it);
1774
1775 MDSCacheObject *p = lock->get_parent();
1776 ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1777
1778 if (!lock->is_stable())
1779 lock->get_parent()->auth_unpin(lock);
1780
1781 lock->set_state(LOCK_LOCK);
1782 }
1783
1784 void Locker::xlock_import(SimpleLock *lock)
1785 {
1786 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1787 lock->get_parent()->auth_pin(lock);
1788 }
1789
1790
1791
1792 // file i/o -----------------------------------------
1793
1794 version_t Locker::issue_file_data_version(CInode *in)
1795 {
1796 dout(7) << "issue_file_data_version on " << *in << dendl;
1797 return in->inode.file_data_version;
1798 }
1799
1800 class C_Locker_FileUpdate_finish : public LockerLogContext {
1801 CInode *in;
1802 MutationRef mut;
1803 unsigned flags;
1804 client_t client;
1805 MClientCaps::ref ack;
1806 public:
1807 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
1808 const MClientCaps::ref &ack, client_t c=-1)
1809 : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
1810 in->get(CInode::PIN_PTRWAITER);
1811 }
1812 void finish(int r) override {
1813 locker->file_update_finish(in, mut, flags, client, ack);
1814 in->put(CInode::PIN_PTRWAITER);
1815 }
1816 };
1817
1818 enum {
1819 UPDATE_SHAREMAX = 1,
1820 UPDATE_NEEDSISSUE = 2,
1821 UPDATE_SNAPFLUSH = 4,
1822 };
1823
1824 void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
1825 client_t client, const MClientCaps::ref &ack)
1826 {
1827 dout(10) << "file_update_finish on " << *in << dendl;
1828 in->pop_and_dirty_projected_inode(mut->ls);
1829
1830 mut->apply();
1831
1832 if (ack) {
1833 Session *session = mds->get_session(client);
1834 if (session) {
1835 // "oldest flush tid" > 0 means client uses unique TID for each flush
1836 if (ack->get_oldest_flush_tid() > 0)
1837 session->add_completed_flush(ack->get_client_tid());
1838 mds->send_message_client_counted(ack, session);
1839 } else {
1840 dout(10) << " no session for client." << client << " " << *ack << dendl;
1841 }
1842 }
1843
1844 set<CInode*> need_issue;
1845 drop_locks(mut.get(), &need_issue);
1846
1847 if (in->is_head()) {
1848 if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
1849 Capability *cap = in->get_client_cap(client);
1850 if (cap && (cap->wanted() & ~cap->pending()))
1851 issue_caps(in, cap);
1852 }
1853
1854 if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
1855 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1856 share_inode_max_size(in);
1857
1858 } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
1859 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1860 // check for snap writeback completion
1861 bool gather = false;
1862 auto p = in->client_snap_caps.begin();
1863 while (p != in->client_snap_caps.end()) {
1864 auto q = p->second.find(client);
1865 if (q != p->second.end()) {
1866 SimpleLock *lock = in->get_lock(p->first);
1867 ceph_assert(lock);
1868 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1869 << " lock " << *lock << " on " << *in << dendl;
1870 lock->put_wrlock();
1871
1872 p->second.erase(q);
1873 if (p->second.empty()) {
1874 gather = true;
1875 in->client_snap_caps.erase(p++);
1876 } else
1877 ++p;
1878 }
1879 }
1880 if (gather) {
1881 if (in->client_snap_caps.empty()) {
1882 in->item_open_file.remove_myself();
1883 in->item_caps.remove_myself();
1884 }
1885 eval_cap_gather(in, &need_issue);
1886 }
1887 }
1888 issue_caps_set(need_issue);
1889
1890 mds->balancer->hit_inode(in, META_POP_IWR);
1891
1892 // auth unpin after issuing caps
1893 mut->cleanup();
1894 }
1895
1896 Capability* Locker::issue_new_caps(CInode *in,
1897 int mode,
1898 Session *session,
1899 SnapRealm *realm,
1900 bool is_replay)
1901 {
1902 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1903 bool is_new;
1904
1905 // if replay, try to reconnect cap, and otherwise do nothing.
1906 if (is_replay)
1907 return mds->mdcache->try_reconnect_cap(in, session);
1908
1909
1910 // my needs
1911 ceph_assert(session->info.inst.name.is_client());
1912 client_t my_client = session->get_client();
1913 int my_want = ceph_caps_for_mode(mode);
1914
1915 // register a capability
1916 Capability *cap = in->get_client_cap(my_client);
1917 if (!cap) {
1918 // new cap
1919 cap = in->add_client_cap(my_client, session, realm);
1920 cap->set_wanted(my_want);
1921 cap->mark_new();
1922 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1923 is_new = true;
1924 } else {
1925 is_new = false;
1926 // make sure it wants sufficient caps
1927 if (my_want & ~cap->wanted()) {
1928 // augment wanted caps for this client
1929 cap->set_wanted(cap->wanted() | my_want);
1930 }
1931 }
1932
1933 if (in->is_auth()) {
1934 // [auth] twiddle mode?
1935 eval(in, CEPH_CAP_LOCKS);
1936
1937 if (_need_flush_mdlog(in, my_want))
1938 mds->mdlog->flush();
1939
1940 } else {
1941 // [replica] tell auth about any new caps wanted
1942 request_inode_file_caps(in);
1943 }
1944
1945 // issue caps (pot. incl new one)
1946 //issue_caps(in); // note: _eval above may have done this already...
1947
1948 // re-issue whatever we can
1949 //cap->issue(cap->pending());
1950
1951 if (is_new)
1952 cap->dec_suppress();
1953
1954 return cap;
1955 }
1956
1957
1958 void Locker::issue_caps_set(set<CInode*>& inset)
1959 {
1960 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1961 issue_caps(*p);
1962 }
1963
1964 int Locker::issue_caps(CInode *in, Capability *only_cap)
1965 {
1966 // allowed caps are determined by the lock mode.
1967 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1968 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1969 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1970
1971 client_t loner = in->get_loner();
1972 if (loner >= 0) {
1973 dout(7) << "issue_caps loner client." << loner
1974 << " allowed=" << ccap_string(loner_allowed)
1975 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1976 << ", others allowed=" << ccap_string(all_allowed)
1977 << " on " << *in << dendl;
1978 } else {
1979 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1980 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1981 << " on " << *in << dendl;
1982 }
1983
1984 ceph_assert(in->is_head());
1985
1986 // count conflicts with
1987 int nissued = 0;
1988
1989 // client caps
1990 map<client_t, Capability>::iterator it;
1991 if (only_cap)
1992 it = in->client_caps.find(only_cap->get_client());
1993 else
1994 it = in->client_caps.begin();
1995 for (; it != in->client_caps.end(); ++it) {
1996 Capability *cap = &it->second;
1997 if (cap->is_stale())
1998 continue;
1999
2000 // do not issue _new_ bits when size|mtime is projected
2001 int allowed;
2002 if (loner == it->first)
2003 allowed = loner_allowed;
2004 else
2005 allowed = all_allowed;
2006
2007 // add in any xlocker-only caps (for locks this client is the xlocker for)
2008 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2009
2010 if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
2011 cap->is_noinline()) ||
2012 (!in->inode.layout.pool_ns.empty() &&
2013 cap->is_nopoolns()))
2014 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2015
2016 int pending = cap->pending();
2017 int wanted = cap->wanted();
2018
2019 dout(20) << " client." << it->first
2020 << " pending " << ccap_string(pending)
2021 << " allowed " << ccap_string(allowed)
2022 << " wanted " << ccap_string(wanted)
2023 << dendl;
2024
2025 if (!(pending & ~allowed)) {
2026 // skip if suppress or new, and not revocation
2027 if (cap->is_new() || cap->is_suppress()) {
2028 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2029 continue;
2030 }
2031 }
2032
2033 // notify clients about deleted inode, to make sure they release caps ASAP.
2034 if (in->inode.nlink == 0)
2035 wanted |= CEPH_CAP_LINK_SHARED;
2036
2037 // are there caps that the client _wants_ and can have, but aren't pending?
2038 // or do we need to revoke?
2039 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2040 (pending & ~allowed)) { // need to revoke ~allowed caps.
2041 // issue
2042 nissued++;
2043
2044 // include caps that clients generally like, while we're at it.
2045 int likes = in->get_caps_liked();
2046 int before = pending;
2047 long seq;
2048 if (pending & ~allowed)
2049 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2050 else
2051 seq = cap->issue((wanted|likes) & allowed);
2052 int after = cap->pending();
2053
2054 if (cap->is_new()) {
2055 // haven't send caps to client yet
2056 if (before & ~after)
2057 cap->confirm_receipt(seq, after);
2058 } else {
2059 dout(7) << " sending MClientCaps to client." << it->first
2060 << " seq " << cap->get_last_seq()
2061 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2062 << dendl;
2063
2064 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2065 if (op == CEPH_CAP_OP_REVOKE) {
2066 revoking_caps.push_back(&cap->item_revoking_caps);
2067 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2068 cap->set_last_revoke_stamp(ceph_clock_now());
2069 cap->reset_num_revoke_warnings();
2070 }
2071
2072 auto m = MClientCaps::create(op, in->ino(),
2073 in->find_snaprealm()->inode->ino(),
2074 cap->get_cap_id(),
2075 cap->get_last_seq(),
2076 after, wanted, 0,
2077 cap->get_mseq(),
2078 mds->get_osd_epoch_barrier());
2079 in->encode_cap_message(m, cap);
2080
2081 mds->send_message_client_counted(m, cap->get_session());
2082 }
2083 }
2084
2085 if (only_cap)
2086 break;
2087 }
2088
2089 return nissued;
2090 }
2091
2092 void Locker::issue_truncate(CInode *in)
2093 {
2094 dout(7) << "issue_truncate on " << *in << dendl;
2095
2096 for (auto &p : in->client_caps) {
2097 Capability *cap = &p.second;
2098 auto m = MClientCaps::create(CEPH_CAP_OP_TRUNC,
2099 in->ino(),
2100 in->find_snaprealm()->inode->ino(),
2101 cap->get_cap_id(), cap->get_last_seq(),
2102 cap->pending(), cap->wanted(), 0,
2103 cap->get_mseq(),
2104 mds->get_osd_epoch_barrier());
2105 in->encode_cap_message(m, cap);
2106 mds->send_message_client_counted(m, p.first);
2107 }
2108
2109 // should we increase max_size?
2110 if (in->is_auth() && in->is_file())
2111 check_inode_max_size(in);
2112 }
2113
2114 void Locker::revoke_stale_caps(Session *session)
2115 {
2116 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2117
2118 std::vector<CInode*> to_eval;
2119
2120 for (auto p = session->caps.begin(); !p.end(); ) {
2121 Capability *cap = *p;
2122 ++p;
2123 if (!cap->is_notable()) {
2124 // the rest ones are not being revoked and don't have writeable range
2125 // and don't want exclusive caps or want file read/write. They don't
2126 // need recover, they don't affect eval_gather()/try_eval()
2127 break;
2128 }
2129
2130 int issued = cap->issued();
2131 if (!(issued & ~CEPH_CAP_PIN))
2132 continue;
2133
2134 CInode *in = cap->get_inode();
2135 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2136 cap->revoke();
2137
2138 if (in->is_auth() &&
2139 in->inode.client_ranges.count(cap->get_client()))
2140 in->state_set(CInode::STATE_NEEDSRECOVER);
2141
2142 // eval lock/inode may finish contexts, which may modify other cap's position
2143 // in the session->caps.
2144 to_eval.push_back(in);
2145 }
2146
2147 // invalidate the rest
2148 session->inc_cap_gen();
2149
2150 for (auto in : to_eval) {
2151 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2152 continue;
2153
2154 if (!in->filelock.is_stable())
2155 eval_gather(&in->filelock);
2156 if (!in->linklock.is_stable())
2157 eval_gather(&in->linklock);
2158 if (!in->authlock.is_stable())
2159 eval_gather(&in->authlock);
2160 if (!in->xattrlock.is_stable())
2161 eval_gather(&in->xattrlock);
2162
2163 if (in->is_auth())
2164 try_eval(in, CEPH_CAP_LOCKS);
2165 else
2166 request_inode_file_caps(in);
2167 }
2168 }
2169
2170 void Locker::resume_stale_caps(Session *session)
2171 {
2172 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2173
2174 bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
2175 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
2176 Capability *cap = *p;
2177 ++p;
2178 if (lazy && !cap->is_notable())
2179 break; // see revoke_stale_caps()
2180
2181 CInode *in = cap->get_inode();
2182 ceph_assert(in->is_head());
2183 dout(10) << " clearing stale flag on " << *in << dendl;
2184
2185 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2186 // if export succeeds, the cap will be removed. if export fails,
2187 // we need to re-issue the cap if it's not stale.
2188 in->state_set(CInode::STATE_EVALSTALECAPS);
2189 continue;
2190 }
2191
2192 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2193 issue_caps(in, cap);
2194 }
2195 }
2196
2197 void Locker::remove_stale_leases(Session *session)
2198 {
2199 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2200 xlist<ClientLease*>::iterator p = session->leases.begin();
2201 while (!p.end()) {
2202 ClientLease *l = *p;
2203 ++p;
2204 CDentry *parent = static_cast<CDentry*>(l->parent);
2205 dout(15) << " removing lease on " << *parent << dendl;
2206 parent->remove_client_lease(l, this);
2207 }
2208 }
2209
2210
2211 class C_MDL_RequestInodeFileCaps : public LockerContext {
2212 CInode *in;
2213 public:
2214 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2215 in->get(CInode::PIN_PTRWAITER);
2216 }
2217 void finish(int r) override {
2218 if (!in->is_auth())
2219 locker->request_inode_file_caps(in);
2220 in->put(CInode::PIN_PTRWAITER);
2221 }
2222 };
2223
2224 void Locker::request_inode_file_caps(CInode *in)
2225 {
2226 ceph_assert(!in->is_auth());
2227
2228 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2229 if (wanted != in->replica_caps_wanted) {
2230 // wait for single auth
2231 if (in->is_ambiguous_auth()) {
2232 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2233 new C_MDL_RequestInodeFileCaps(this, in));
2234 return;
2235 }
2236
2237 mds_rank_t auth = in->authority().first;
2238 if (mds->is_cluster_degraded() &&
2239 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2240 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2241 return;
2242 }
2243
2244 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2245 << " was " << ccap_string(in->replica_caps_wanted)
2246 << " on " << *in << " to mds." << auth << dendl;
2247
2248 in->replica_caps_wanted = wanted;
2249
2250 if (!mds->is_cluster_degraded() ||
2251 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2252 mds->send_message_mds(MInodeFileCaps::create(in->ino(), in->replica_caps_wanted), auth);
2253 }
2254 }
2255
2256 void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref &m)
2257 {
2258 // nobody should be talking to us during recovery.
2259 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2260 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2261 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2262 return;
2263 }
2264 ceph_abort_msg("got unexpected message during recovery");
2265 }
2266
2267 // ok
2268 CInode *in = mdcache->get_inode(m->get_ino());
2269 mds_rank_t from = mds_rank_t(m->get_source().num());
2270
2271 ceph_assert(in);
2272 ceph_assert(in->is_auth());
2273
2274 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2275
2276 in->set_mds_caps_wanted(from, m->get_caps());
2277
2278 try_eval(in, CEPH_CAP_LOCKS);
2279 }
2280
2281
2282 class C_MDL_CheckMaxSize : public LockerContext {
2283 CInode *in;
2284 uint64_t new_max_size;
2285 uint64_t newsize;
2286 utime_t mtime;
2287
2288 public:
2289 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2290 uint64_t _newsize, utime_t _mtime) :
2291 LockerContext(l), in(i),
2292 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2293 {
2294 in->get(CInode::PIN_PTRWAITER);
2295 }
2296 void finish(int r) override {
2297 if (in->is_auth())
2298 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2299 in->put(CInode::PIN_PTRWAITER);
2300 }
2301 };
2302
2303 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
2304 {
2305 uint64_t new_max = (size + 1) << 1;
2306 uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
2307 if (max_inc > 0) {
2308 max_inc *= pi->layout.object_size;
2309 new_max = std::min(new_max, size + max_inc);
2310 }
2311 return round_up_to(new_max, pi->get_layout_size_increment());
2312 }
2313
2314 void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
2315 CInode::mempool_inode::client_range_map *new_ranges,
2316 bool *max_increased)
2317 {
2318 auto latest = in->get_projected_inode();
2319 uint64_t ms;
2320 if (latest->has_layout()) {
2321 ms = calc_new_max_size(latest, size);
2322 } else {
2323 // Layout-less directories like ~mds0/, have zero size
2324 ms = 0;
2325 }
2326
2327 // increase ranges as appropriate.
2328 // shrink to 0 if no WR|BUFFER caps issued.
2329 for (auto &p : in->client_caps) {
2330 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2331 client_writeable_range_t& nr = (*new_ranges)[p.first];
2332 nr.range.first = 0;
2333 if (latest->client_ranges.count(p.first)) {
2334 client_writeable_range_t& oldr = latest->client_ranges[p.first];
2335 if (ms > oldr.range.last)
2336 *max_increased = true;
2337 nr.range.last = std::max(ms, oldr.range.last);
2338 nr.follows = oldr.follows;
2339 } else {
2340 *max_increased = true;
2341 nr.range.last = ms;
2342 nr.follows = in->first - 1;
2343 }
2344 if (update)
2345 p.second.mark_clientwriteable();
2346 } else {
2347 if (update)
2348 p.second.clear_clientwriteable();
2349 }
2350 }
2351 }
2352
2353 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2354 uint64_t new_max_size, uint64_t new_size,
2355 utime_t new_mtime)
2356 {
2357 ceph_assert(in->is_auth());
2358 ceph_assert(in->is_file());
2359
2360 CInode::mempool_inode *latest = in->get_projected_inode();
2361 CInode::mempool_inode::client_range_map new_ranges;
2362 uint64_t size = latest->size;
2363 bool update_size = new_size > 0;
2364 bool update_max = false;
2365 bool max_increased = false;
2366
2367 if (update_size) {
2368 new_size = size = std::max(size, new_size);
2369 new_mtime = std::max(new_mtime, latest->mtime);
2370 if (latest->size == new_size && latest->mtime == new_mtime)
2371 update_size = false;
2372 }
2373
2374 int can_update = 1;
2375 if (in->is_frozen()) {
2376 can_update = -1;
2377 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2378 // lock?
2379 if (in->filelock.is_stable()) {
2380 if (in->get_target_loner() >= 0)
2381 file_excl(&in->filelock);
2382 else
2383 simple_lock(&in->filelock);
2384 }
2385 if (!in->filelock.can_wrlock(in->get_loner()))
2386 can_update = -2;
2387 }
2388
2389 calc_new_client_ranges(in, std::max(new_max_size, size), can_update > 0,
2390 &new_ranges, &max_increased);
2391
2392 if (max_increased || latest->client_ranges != new_ranges)
2393 update_max = true;
2394
2395 if (!update_size && !update_max) {
2396 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2397 return false;
2398 }
2399
2400 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2401 << " update_size " << update_size
2402 << " on " << *in << dendl;
2403
2404 if (can_update < 0) {
2405 auto cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime);
2406 if (can_update == -1) {
2407 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2408 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2409 } else {
2410 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2411 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2412 }
2413 return false;
2414 }
2415
2416 MutationRef mut(new MutationImpl());
2417 mut->ls = mds->mdlog->get_current_segment();
2418
2419 auto &pi = in->project_inode();
2420 pi.inode.version = in->pre_dirty();
2421
2422 if (update_max) {
2423 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2424 pi.inode.client_ranges = new_ranges;
2425 }
2426
2427 if (update_size) {
2428 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2429 pi.inode.size = new_size;
2430 pi.inode.rstat.rbytes = new_size;
2431 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2432 pi.inode.mtime = new_mtime;
2433 if (new_mtime > pi.inode.ctime) {
2434 pi.inode.ctime = new_mtime;
2435 if (new_mtime > pi.inode.rstat.rctime)
2436 pi.inode.rstat.rctime = new_mtime;
2437 }
2438 }
2439
2440 // use EOpen if the file is still open; otherwise, use EUpdate.
2441 // this is just an optimization to push open files forward into
2442 // newer log segments.
2443 LogEvent *le;
2444 EMetaBlob *metablob;
2445 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2446 EOpen *eo = new EOpen(mds->mdlog);
2447 eo->add_ino(in->ino());
2448 metablob = &eo->metablob;
2449 le = eo;
2450 } else {
2451 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2452 metablob = &eu->metablob;
2453 le = eu;
2454 }
2455 mds->mdlog->start_entry(le);
2456 if (update_size) { // FIXME if/when we do max_size nested accounting
2457 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2458 // no cow, here!
2459 CDentry *parent = in->get_projected_parent_dn();
2460 metablob->add_primary_dentry(parent, in, true);
2461 } else {
2462 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2463 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2464 }
2465 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
2466 UPDATE_SHAREMAX, MClientCaps::ref()));
2467 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2468 mut->auth_pin(in);
2469
2470 // make max_size _increase_ timely
2471 if (max_increased)
2472 mds->mdlog->flush();
2473
2474 return true;
2475 }
2476
2477
2478 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2479 {
2480 /*
2481 * only share if currently issued a WR cap. if client doesn't have it,
2482 * file_max doesn't matter, and the client will get it if/when they get
2483 * the cap later.
2484 */
2485 dout(10) << "share_inode_max_size on " << *in << dendl;
2486 map<client_t, Capability>::iterator it;
2487 if (only_cap)
2488 it = in->client_caps.find(only_cap->get_client());
2489 else
2490 it = in->client_caps.begin();
2491 for (; it != in->client_caps.end(); ++it) {
2492 const client_t client = it->first;
2493 Capability *cap = &it->second;
2494 if (cap->is_suppress())
2495 continue;
2496 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2497 dout(10) << "share_inode_max_size with client." << client << dendl;
2498 cap->inc_last_seq();
2499 auto m = MClientCaps::create(CEPH_CAP_OP_GRANT,
2500 in->ino(),
2501 in->find_snaprealm()->inode->ino(),
2502 cap->get_cap_id(),
2503 cap->get_last_seq(),
2504 cap->pending(),
2505 cap->wanted(), 0,
2506 cap->get_mseq(),
2507 mds->get_osd_epoch_barrier());
2508 in->encode_cap_message(m, cap);
2509 mds->send_message_client_counted(m, client);
2510 }
2511 if (only_cap)
2512 break;
2513 }
2514 }
2515
2516 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2517 {
2518 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2519 * pending mutations. */
2520 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2521 in->filelock.is_unstable_and_locked()) ||
2522 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2523 in->authlock.is_unstable_and_locked()) ||
2524 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2525 in->linklock.is_unstable_and_locked()) ||
2526 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2527 in->xattrlock.is_unstable_and_locked()))
2528 return true;
2529 return false;
2530 }
2531
2532 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2533 {
2534 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2535 dout(10) << " wanted " << ccap_string(cap->wanted())
2536 << " -> " << ccap_string(wanted) << dendl;
2537 cap->set_wanted(wanted);
2538 } else if (wanted & ~cap->wanted()) {
2539 dout(10) << " wanted " << ccap_string(cap->wanted())
2540 << " -> " << ccap_string(wanted)
2541 << " (added caps even though we had seq mismatch!)" << dendl;
2542 cap->set_wanted(wanted | cap->wanted());
2543 } else {
2544 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2545 << " -> " << ccap_string(wanted)
2546 << " (issue_seq " << issue_seq << " != last_issue "
2547 << cap->get_last_issue() << ")" << dendl;
2548 return;
2549 }
2550
2551 CInode *cur = cap->get_inode();
2552 if (!cur->is_auth()) {
2553 request_inode_file_caps(cur);
2554 return;
2555 }
2556
2557 if (cap->wanted()) {
2558 if (cur->state_test(CInode::STATE_RECOVERING) &&
2559 (cap->wanted() & (CEPH_CAP_FILE_RD |
2560 CEPH_CAP_FILE_WR))) {
2561 mds->mdcache->recovery_queue.prioritize(cur);
2562 }
2563
2564 if (mdcache->open_file_table.should_log_open(cur)) {
2565 ceph_assert(cur->last == CEPH_NOSNAP);
2566 EOpen *le = new EOpen(mds->mdlog);
2567 mds->mdlog->start_entry(le);
2568 le->add_clean_inode(cur);
2569 mds->mdlog->submit_entry(le);
2570 }
2571 }
2572 }
2573
2574 void Locker::snapflush_nudge(CInode *in)
2575 {
2576 ceph_assert(in->last != CEPH_NOSNAP);
2577 if (in->client_snap_caps.empty())
2578 return;
2579
2580 CInode *head = mdcache->get_inode(in->ino());
2581 // head inode gets unpinned when snapflush starts. It might get trimmed
2582 // before snapflush finishes.
2583 if (!head)
2584 return;
2585
2586 ceph_assert(head->is_auth());
2587 if (head->client_need_snapflush.empty())
2588 return;
2589
2590 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
2591 if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
2592 hlock = NULL;
2593 for (int i = 0; i < num_cinode_locks; i++) {
2594 SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
2595 if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
2596 hlock = lock;
2597 break;
2598 }
2599 }
2600 }
2601 if (hlock) {
2602 _rdlock_kick(hlock, true);
2603 } else {
2604 // also, requeue, in case of unstable lock
2605 need_snapflush_inodes.push_back(&in->item_caps);
2606 }
2607 }
2608
2609 void Locker::mark_need_snapflush_inode(CInode *in)
2610 {
2611 ceph_assert(in->last != CEPH_NOSNAP);
2612 if (!in->item_caps.is_on_list()) {
2613 need_snapflush_inodes.push_back(&in->item_caps);
2614 utime_t now = ceph_clock_now();
2615 in->last_dirstat_prop = now;
2616 dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
2617 }
2618 }
2619
2620 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2621 {
2622 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2623 for (auto p = head_in->client_need_snapflush.begin();
2624 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2625 snapid_t snapid = p->first;
2626 auto &clients = p->second;
2627 ++p; // be careful, q loop below depends on this
2628
2629 if (clients.count(client)) {
2630 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2631 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2632 ceph_assert(sin);
2633 ceph_assert(sin->first <= snapid);
2634 _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref());
2635 head_in->remove_need_snapflush(sin, snapid, client);
2636 }
2637 }
2638 }
2639
2640
2641 bool Locker::should_defer_client_cap_frozen(CInode *in)
2642 {
2643 /*
2644 * This policy needs to be AT LEAST as permissive as allowing a client request
2645 * to go forward, or else a client request can release something, the release
2646 * gets deferred, but the request gets processed and deadlocks because when the
2647 * caps can't get revoked.
2648 *
2649 * Currently, a request wait if anything locked is freezing (can't
2650 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2651 * _MUST_ be in the lock/auth_pin set.
2652 *
2653 * auth_pins==0 implies no unstable lock and not auth pinnned by
2654 * client request, otherwise continue even it's freezing.
2655 */
2656 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2657 }
2658
2659 void Locker::handle_client_caps(const MClientCaps::const_ref &m)
2660 {
2661 client_t client = m->get_source().num();
2662 snapid_t follows = m->get_snap_follows();
2663 auto op = m->get_op();
2664 auto dirty = m->get_dirty();
2665 dout(7) << "handle_client_caps "
2666 << " on " << m->get_ino()
2667 << " tid " << m->get_client_tid() << " follows " << follows
2668 << " op " << ceph_cap_op_name(op)
2669 << " flags 0x" << std::hex << m->flags << std::dec << dendl;
2670
2671 Session *session = mds->get_session(m);
2672 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2673 if (!session) {
2674 dout(5) << " no session, dropping " << *m << dendl;
2675 return;
2676 }
2677 if (session->is_closed() ||
2678 session->is_closing() ||
2679 session->is_killing()) {
2680 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2681 return;
2682 }
2683 if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
2684 dirty && m->get_client_tid() > 0 &&
2685 !session->have_completed_flush(m->get_client_tid())) {
2686 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
2687 op == CEPH_CAP_OP_FLUSHSNAP);
2688 }
2689 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2690 return;
2691 }
2692
2693 if (m->get_client_tid() > 0 && session &&
2694 session->have_completed_flush(m->get_client_tid())) {
2695 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2696 << " for client." << client << dendl;
2697 MClientCaps::ref ack;
2698 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2699 ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
2700 } else {
2701 ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
2702 }
2703 ack->set_snap_follows(follows);
2704 ack->set_client_tid(m->get_client_tid());
2705 mds->send_message_client_counted(ack, m->get_connection());
2706 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2707 return;
2708 } else {
2709 // fall-thru because the message may release some caps
2710 dirty = false;
2711 op = CEPH_CAP_OP_UPDATE;
2712 }
2713 }
2714
2715 // "oldest flush tid" > 0 means client uses unique TID for each flush
2716 if (m->get_oldest_flush_tid() > 0 && session) {
2717 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2718 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2719
2720 if (session->get_num_trim_flushes_warnings() > 0 &&
2721 session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
2722 session->reset_num_trim_flushes_warnings();
2723 } else {
2724 if (session->get_num_completed_flushes() >=
2725 (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2726 session->inc_num_trim_flushes_warnings();
2727 stringstream ss;
2728 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2729 << m->get_oldest_flush_tid() << "), "
2730 << session->get_num_completed_flushes()
2731 << " completed flushes recorded in session";
2732 mds->clog->warn() << ss.str();
2733 dout(20) << __func__ << " " << ss.str() << dendl;
2734 }
2735 }
2736 }
2737
2738 CInode *head_in = mdcache->get_inode(m->get_ino());
2739 if (!head_in) {
2740 if (mds->is_clientreplay()) {
2741 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2742 << ", will try again after replayed client requests" << dendl;
2743 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2744 return;
2745 }
2746
2747 /*
2748 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2749 * Sequence of events that cause this are:
2750 * - client sends caps message to mds.a
2751 * - mds finishes subtree migration, send cap export to client
2752 * - mds trim its cache
2753 * - mds receives cap messages from client
2754 */
2755 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2756 return;
2757 }
2758
2759 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2760 // Pause RADOS operations until we see the required epoch
2761 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2762 }
2763
2764 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2765 // Record the barrier so that we will retransmit it to clients
2766 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2767 }
2768
2769 dout(10) << " head inode " << *head_in << dendl;
2770
2771 Capability *cap = 0;
2772 cap = head_in->get_client_cap(client);
2773 if (!cap) {
2774 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
2775 return;
2776 }
2777 ceph_assert(cap);
2778
2779 // freezing|frozen?
2780 if (should_defer_client_cap_frozen(head_in)) {
2781 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2782 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2783 return;
2784 }
2785 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2786 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2787 << ", dropping" << dendl;
2788 return;
2789 }
2790
2791 bool need_unpin = false;
2792
2793 // flushsnap?
2794 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2795 if (!head_in->is_auth()) {
2796 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
2797 goto out;
2798 }
2799
2800 SnapRealm *realm = head_in->find_snaprealm();
2801 snapid_t snap = realm->get_snap_following(follows);
2802 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2803
2804 auto p = head_in->client_need_snapflush.begin();
2805 if (p != head_in->client_need_snapflush.end() && p->first < snap) {
2806 head_in->auth_pin(this); // prevent subtree frozen
2807 need_unpin = true;
2808 _do_null_snapflush(head_in, client, snap);
2809 }
2810
2811 CInode *in = head_in;
2812 if (snap != CEPH_NOSNAP) {
2813 in = mdcache->pick_inode_snap(head_in, snap - 1);
2814 if (in != head_in)
2815 dout(10) << " snapped inode " << *in << dendl;
2816 }
2817
2818 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2819 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2820 // case we get a dup response, so whatever.)
2821 MClientCaps::ref ack;
2822 if (dirty) {
2823 ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
2824 ack->set_snap_follows(follows);
2825 ack->set_client_tid(m->get_client_tid());
2826 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2827 }
2828
2829 if (in == head_in ||
2830 (head_in->client_need_snapflush.count(snap) &&
2831 head_in->client_need_snapflush[snap].count(client))) {
2832 dout(7) << " flushsnap snap " << snap
2833 << " client." << client << " on " << *in << dendl;
2834
2835 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2836 if (in == head_in)
2837 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2838
2839 _do_snap_update(in, snap, dirty, follows, client, m, ack);
2840
2841 if (in != head_in)
2842 head_in->remove_need_snapflush(in, snap, client);
2843 } else {
2844 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2845 if (ack)
2846 mds->send_message_client_counted(ack, m->get_connection());
2847 }
2848 goto out;
2849 }
2850
2851 if (cap->get_cap_id() != m->get_cap_id()) {
2852 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2853 } else {
2854 CInode *in = head_in;
2855 if (follows > 0) {
2856 in = mdcache->pick_inode_snap(head_in, follows);
2857 // intermediate snap inodes
2858 while (in != head_in) {
2859 ceph_assert(in->last != CEPH_NOSNAP);
2860 if (in->is_auth() && dirty) {
2861 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2862 _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref());
2863 }
2864 in = mdcache->pick_inode_snap(head_in, in->last);
2865 }
2866 }
2867
2868 // head inode, and cap
2869 MClientCaps::ref ack;
2870
2871 int caps = m->get_caps();
2872 if (caps & ~cap->issued()) {
2873 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2874 caps &= cap->issued();
2875 }
2876
2877 cap->confirm_receipt(m->get_seq(), caps);
2878 dout(10) << " follows " << follows
2879 << " retains " << ccap_string(m->get_caps())
2880 << " dirty " << ccap_string(dirty)
2881 << " on " << *in << dendl;
2882
2883
2884 // missing/skipped snapflush?
2885 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2886 // presently only does so when it has actual dirty metadata. But, we
2887 // set up the need_snapflush stuff based on the issued caps.
2888 // We can infer that the client WONT send a FLUSHSNAP once they have
2889 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2890 // update/release).
2891 if (!head_in->client_need_snapflush.empty()) {
2892 if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
2893 !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
2894 head_in->auth_pin(this); // prevent subtree frozen
2895 need_unpin = true;
2896 _do_null_snapflush(head_in, client);
2897 } else {
2898 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2899 }
2900 }
2901
2902 bool need_snapflush = cap->need_snapflush();
2903 if (dirty && in->is_auth()) {
2904 dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
2905 << " seq " << m->get_seq() << " on " << *in << dendl;
2906 ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2907 m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
2908 ack->set_client_tid(m->get_client_tid());
2909 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2910
2911 // client flushes and releases caps at the same time. make sure MDCache::cow_inode()
2912 // properly setup CInode::client_need_snapflush
2913 if ((dirty & ~cap->issued()) && !need_snapflush)
2914 cap->mark_needsnapflush();
2915 }
2916
2917 // filter wanted based on what we could ever give out (given auth/replica status)
2918 bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
2919 int new_wanted = m->get_wanted();
2920 if (new_wanted != cap->wanted()) {
2921 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
2922 // exapnding caps. make sure we aren't waiting for a log flush
2923 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2924 }
2925
2926 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2927 }
2928
2929 bool updated = in->is_auth() &&
2930 _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush);
2931
2932 if (cap->need_snapflush() &&
2933 (!need_snapflush || !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)))
2934 cap->clear_needsnapflush();
2935
2936 if (updated) {
2937 eval(in, CEPH_CAP_LOCKS);
2938
2939 if (!need_flush && (cap->wanted() & ~cap->pending()))
2940 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2941 } else {
2942 // no update, ack now.
2943 if (ack)
2944 mds->send_message_client_counted(ack, m->get_connection());
2945
2946 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2947 if (!did_issue && (cap->wanted() & ~cap->pending()))
2948 issue_caps(in, cap);
2949
2950 if (cap->get_last_seq() == 0 &&
2951 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2952 share_inode_max_size(in, cap);
2953 }
2954 }
2955
2956 if (need_flush)
2957 mds->mdlog->flush();
2958 }
2959
2960 out:
2961 if (need_unpin)
2962 head_in->auth_unpin(this);
2963 }
2964
2965
2966 class C_Locker_RetryRequestCapRelease : public LockerContext {
2967 client_t client;
2968 ceph_mds_request_release item;
2969 public:
2970 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2971 LockerContext(l), client(c), item(it) { }
2972 void finish(int r) override {
2973 string dname;
2974 MDRequestRef null_ref;
2975 locker->process_request_cap_release(null_ref, client, item, dname);
2976 }
2977 };
2978
2979 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2980 std::string_view dname)
2981 {
2982 inodeno_t ino = (uint64_t)item.ino;
2983 uint64_t cap_id = item.cap_id;
2984 int caps = item.caps;
2985 int wanted = item.wanted;
2986 int seq = item.seq;
2987 int issue_seq = item.issue_seq;
2988 int mseq = item.mseq;
2989
2990 CInode *in = mdcache->get_inode(ino);
2991 if (!in)
2992 return;
2993
2994 if (dname.length()) {
2995 frag_t fg = in->pick_dirfrag(dname);
2996 CDir *dir = in->get_dirfrag(fg);
2997 if (dir) {
2998 CDentry *dn = dir->lookup(dname);
2999 if (dn) {
3000 ClientLease *l = dn->get_client_lease(client);
3001 if (l) {
3002 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
3003 dn->remove_client_lease(l, this);
3004 } else {
3005 dout(7) << "process_cap_release client." << client
3006 << " doesn't have lease on " << *dn << dendl;
3007 }
3008 } else {
3009 dout(7) << "process_cap_release client." << client << " released lease on dn "
3010 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3011 }
3012 }
3013 }
3014
3015 Capability *cap = in->get_client_cap(client);
3016 if (!cap)
3017 return;
3018
3019 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
3020 << (mdr ? "" : " (DEFERRED, no mdr)")
3021 << dendl;
3022
3023 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3024 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3025 return;
3026 }
3027
3028 if (cap->get_cap_id() != cap_id) {
3029 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3030 return;
3031 }
3032
3033 if (should_defer_client_cap_frozen(in)) {
3034 dout(7) << " frozen, deferring" << dendl;
3035 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3036 return;
3037 }
3038
3039 if (caps & ~cap->issued()) {
3040 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3041 caps &= cap->issued();
3042 }
3043 cap->confirm_receipt(seq, caps);
3044
3045 if (!in->client_need_snapflush.empty() &&
3046 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3047 _do_null_snapflush(in, client);
3048 }
3049
3050 adjust_cap_wanted(cap, wanted, issue_seq);
3051
3052 if (mdr)
3053 cap->inc_suppress();
3054 eval(in, CEPH_CAP_LOCKS);
3055 if (mdr)
3056 cap->dec_suppress();
3057
3058 // take note; we may need to reissue on this cap later
3059 if (mdr)
3060 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3061 }
3062
3063 class C_Locker_RetryKickIssueCaps : public LockerContext {
3064 CInode *in;
3065 client_t client;
3066 ceph_seq_t seq;
3067 public:
3068 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3069 LockerContext(l), in(i), client(c), seq(s) {
3070 in->get(CInode::PIN_PTRWAITER);
3071 }
3072 void finish(int r) override {
3073 locker->kick_issue_caps(in, client, seq);
3074 in->put(CInode::PIN_PTRWAITER);
3075 }
3076 };
3077
3078 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3079 {
3080 Capability *cap = in->get_client_cap(client);
3081 if (!cap || cap->get_last_seq() != seq)
3082 return;
3083 if (in->is_frozen()) {
3084 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3085 in->add_waiter(CInode::WAIT_UNFREEZE,
3086 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3087 return;
3088 }
3089 dout(10) << "kick_issue_caps released at current seq " << seq
3090 << ", reissuing" << dendl;
3091 issue_caps(in, cap);
3092 }
3093
3094 void Locker::kick_cap_releases(MDRequestRef& mdr)
3095 {
3096 client_t client = mdr->get_client();
3097 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3098 p != mdr->cap_releases.end();
3099 ++p) {
3100 CInode *in = mdcache->get_inode(p->first);
3101 if (!in)
3102 continue;
3103 kick_issue_caps(in, client, p->second);
3104 }
3105 }
3106
3107 /**
3108 * m and ack might be NULL, so don't dereference them unless dirty != 0
3109 */
3110 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const MClientCaps::const_ref &m, const MClientCaps::ref &ack)
3111 {
3112 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3113 << " follows " << follows << " snap " << snap
3114 << " on " << *in << dendl;
3115
3116 if (snap == CEPH_NOSNAP) {
3117 // hmm, i guess snap was already deleted? just ack!
3118 dout(10) << " wow, the snap following " << follows
3119 << " was already deleted. nothing to record, just ack." << dendl;
3120 if (ack)
3121 mds->send_message_client_counted(ack, m->get_connection());
3122 return;
3123 }
3124
3125 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3126 mds->mdlog->start_entry(le);
3127 MutationRef mut = new MutationImpl();
3128 mut->ls = mds->mdlog->get_current_segment();
3129
3130 // normal metadata updates that we can apply to the head as well.
3131
3132 // update xattrs?
3133 CInode::mempool_xattr_map *px = nullptr;
3134 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3135 m->xattrbl.length() &&
3136 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3137
3138 CInode::mempool_old_inode *oi = 0;
3139 if (in->is_multiversion()) {
3140 oi = in->pick_old_inode(snap);
3141 }
3142
3143 CInode::mempool_inode *i;
3144 if (oi) {
3145 dout(10) << " writing into old inode" << dendl;
3146 auto &pi = in->project_inode();
3147 pi.inode.version = in->pre_dirty();
3148 if (snap > oi->first)
3149 in->split_old_inode(snap);
3150 i = &oi->inode;
3151 if (xattrs)
3152 px = &oi->xattrs;
3153 } else {
3154 auto &pi = in->project_inode(xattrs);
3155 pi.inode.version = in->pre_dirty();
3156 i = &pi.inode;
3157 if (xattrs)
3158 px = pi.xattrs.get();
3159 }
3160
3161 _update_cap_fields(in, dirty, m, i);
3162
3163 // xattr
3164 if (xattrs) {
3165 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3166 << " len " << m->xattrbl.length() << dendl;
3167 i->xattr_version = m->head.xattr_version;
3168 auto p = m->xattrbl.cbegin();
3169 decode(*px, p);
3170 }
3171
3172 {
3173 auto it = i->client_ranges.find(client);
3174 if (it != i->client_ranges.end()) {
3175 if (in->last == snap) {
3176 dout(10) << " removing client_range entirely" << dendl;
3177 i->client_ranges.erase(it);
3178 } else {
3179 dout(10) << " client_range now follows " << snap << dendl;
3180 it->second.follows = snap;
3181 }
3182 }
3183 }
3184
3185 mut->auth_pin(in);
3186 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3187 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3188
3189 // "oldest flush tid" > 0 means client uses unique TID for each flush
3190 if (ack && ack->get_oldest_flush_tid() > 0)
3191 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3192 ack->get_oldest_flush_tid());
3193
3194 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
3195 ack, client));
3196 }
3197
3198 void Locker::_update_cap_fields(CInode *in, int dirty, const MClientCaps::const_ref &m, CInode::mempool_inode *pi)
3199 {
3200 if (dirty == 0)
3201 return;
3202
3203 /* m must be valid if there are dirty caps */
3204 ceph_assert(m);
3205 uint64_t features = m->get_connection()->get_features();
3206
3207 if (m->get_ctime() > pi->ctime) {
3208 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3209 << " for " << *in << dendl;
3210 pi->ctime = m->get_ctime();
3211 if (m->get_ctime() > pi->rstat.rctime)
3212 pi->rstat.rctime = m->get_ctime();
3213 }
3214
3215 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3216 m->get_change_attr() > pi->change_attr) {
3217 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3218 << " for " << *in << dendl;
3219 pi->change_attr = m->get_change_attr();
3220 }
3221
3222 // file
3223 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3224 utime_t atime = m->get_atime();
3225 utime_t mtime = m->get_mtime();
3226 uint64_t size = m->get_size();
3227 version_t inline_version = m->inline_version;
3228
3229 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3230 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3231 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3232 << " for " << *in << dendl;
3233 pi->mtime = mtime;
3234 if (mtime > pi->rstat.rctime)
3235 pi->rstat.rctime = mtime;
3236 }
3237 if (in->inode.is_file() && // ONLY if regular file
3238 size > pi->size) {
3239 dout(7) << " size " << pi->size << " -> " << size
3240 << " for " << *in << dendl;
3241 pi->size = size;
3242 pi->rstat.rbytes = size;
3243 }
3244 if (in->inode.is_file() &&
3245 (dirty & CEPH_CAP_FILE_WR) &&
3246 inline_version > pi->inline_data.version) {
3247 pi->inline_data.version = inline_version;
3248 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3249 pi->inline_data.get_data() = m->inline_data;
3250 else
3251 pi->inline_data.free_data();
3252 }
3253 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3254 dout(7) << " atime " << pi->atime << " -> " << atime
3255 << " for " << *in << dendl;
3256 pi->atime = atime;
3257 }
3258 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3259 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3260 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3261 << " for " << *in << dendl;
3262 pi->time_warp_seq = m->get_time_warp_seq();
3263 }
3264 }
3265 // auth
3266 if (dirty & CEPH_CAP_AUTH_EXCL) {
3267 if (m->head.uid != pi->uid) {
3268 dout(7) << " uid " << pi->uid
3269 << " -> " << m->head.uid
3270 << " for " << *in << dendl;
3271 pi->uid = m->head.uid;
3272 }
3273 if (m->head.gid != pi->gid) {
3274 dout(7) << " gid " << pi->gid
3275 << " -> " << m->head.gid
3276 << " for " << *in << dendl;
3277 pi->gid = m->head.gid;
3278 }
3279 if (m->head.mode != pi->mode) {
3280 dout(7) << " mode " << oct << pi->mode
3281 << " -> " << m->head.mode << dec
3282 << " for " << *in << dendl;
3283 pi->mode = m->head.mode;
3284 }
3285 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3286 dout(7) << " btime " << oct << pi->btime
3287 << " -> " << m->get_btime() << dec
3288 << " for " << *in << dendl;
3289 pi->btime = m->get_btime();
3290 }
3291 }
3292 }
3293
3294 /*
3295 * update inode based on cap flush|flushsnap|wanted.
3296 * adjust max_size, if needed.
3297 * if we update, return true; otherwise, false (no updated needed).
3298 */
3299 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3300 int dirty, snapid_t follows,
3301 const MClientCaps::const_ref &m, const MClientCaps::ref &ack,
3302 bool *need_flush)
3303 {
3304 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3305 << " issued " << ccap_string(cap ? cap->issued() : 0)
3306 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3307 << " on " << *in << dendl;
3308 ceph_assert(in->is_auth());
3309 client_t client = m->get_source().num();
3310 CInode::mempool_inode *latest = in->get_projected_inode();
3311
3312 // increase or zero max_size?
3313 uint64_t size = m->get_size();
3314 bool change_max = false;
3315 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3316 uint64_t new_max = old_max;
3317
3318 if (in->is_file()) {
3319 bool forced_change_max = false;
3320 dout(20) << "inode is file" << dendl;
3321 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3322 dout(20) << "client has write caps; m->get_max_size="
3323 << m->get_max_size() << "; old_max=" << old_max << dendl;
3324 if (m->get_max_size() > new_max) {
3325 dout(10) << "client requests file_max " << m->get_max_size()
3326 << " > max " << old_max << dendl;
3327 change_max = true;
3328 forced_change_max = true;
3329 new_max = calc_new_max_size(latest, m->get_max_size());
3330 } else {
3331 new_max = calc_new_max_size(latest, size);
3332
3333 if (new_max > old_max)
3334 change_max = true;
3335 else
3336 new_max = old_max;
3337 }
3338 } else {
3339 if (old_max) {
3340 change_max = true;
3341 new_max = 0;
3342 }
3343 }
3344
3345 if (in->last == CEPH_NOSNAP &&
3346 change_max &&
3347 !in->filelock.can_wrlock(client) &&
3348 !in->filelock.can_force_wrlock(client)) {
3349 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3350 if (in->filelock.is_stable()) {
3351 bool need_issue = false;
3352 if (cap)
3353 cap->inc_suppress();
3354 if (in->get_mds_caps_wanted().empty() &&
3355 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3356 if (in->filelock.get_state() != LOCK_EXCL)
3357 file_excl(&in->filelock, &need_issue);
3358 } else
3359 simple_lock(&in->filelock, &need_issue);
3360 if (need_issue)
3361 issue_caps(in);
3362 if (cap)
3363 cap->dec_suppress();
3364 }
3365 if (!in->filelock.can_wrlock(client) &&
3366 !in->filelock.can_force_wrlock(client)) {
3367 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3368 forced_change_max ? new_max : 0,
3369 0, utime_t());
3370
3371 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3372 change_max = false;
3373 }
3374 }
3375 }
3376
3377 if (m->flockbl.length()) {
3378 int32_t num_locks;
3379 auto bli = m->flockbl.cbegin();
3380 decode(num_locks, bli);
3381 for ( int i=0; i < num_locks; ++i) {
3382 ceph_filelock decoded_lock;
3383 decode(decoded_lock, bli);
3384 in->get_fcntl_lock_state()->held_locks.
3385 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3386 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3387 }
3388 decode(num_locks, bli);
3389 for ( int i=0; i < num_locks; ++i) {
3390 ceph_filelock decoded_lock;
3391 decode(decoded_lock, bli);
3392 in->get_flock_lock_state()->held_locks.
3393 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3394 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3395 }
3396 }
3397
3398 if (!dirty && !change_max)
3399 return false;
3400
3401 Session *session = mds->get_session(m);
3402 if (session->check_access(in, MAY_WRITE,
3403 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3404 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3405 return false;
3406 }
3407
3408 // do the update.
3409 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3410 mds->mdlog->start_entry(le);
3411
3412 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3413 m->xattrbl.length() &&
3414 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3415
3416 auto &pi = in->project_inode(xattr);
3417 pi.inode.version = in->pre_dirty();
3418
3419 MutationRef mut(new MutationImpl());
3420 mut->ls = mds->mdlog->get_current_segment();
3421
3422 _update_cap_fields(in, dirty, m, &pi.inode);
3423
3424 if (change_max) {
3425 dout(7) << " max_size " << old_max << " -> " << new_max
3426 << " for " << *in << dendl;
3427 if (new_max) {
3428 auto &cr = pi.inode.client_ranges[client];
3429 cr.range.first = 0;
3430 cr.range.last = new_max;
3431 cr.follows = in->first - 1;
3432 if (cap)
3433 cap->mark_clientwriteable();
3434 } else {
3435 pi.inode.client_ranges.erase(client);
3436 if (cap)
3437 cap->clear_clientwriteable();
3438 }
3439 }
3440
3441 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3442 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3443
3444 // auth
3445 if (dirty & CEPH_CAP_AUTH_EXCL)
3446 wrlock_force(&in->authlock, mut);
3447
3448 // xattrs update?
3449 if (xattr) {
3450 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3451 pi.inode.xattr_version = m->head.xattr_version;
3452 auto p = m->xattrbl.cbegin();
3453 decode(*pi.xattrs, p);
3454 wrlock_force(&in->xattrlock, mut);
3455 }
3456
3457 mut->auth_pin(in);
3458 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3459 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3460
3461 // "oldest flush tid" > 0 means client uses unique TID for each flush
3462 if (ack && ack->get_oldest_flush_tid() > 0)
3463 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3464 ack->get_oldest_flush_tid());
3465
3466 unsigned update_flags = 0;
3467 if (change_max)
3468 update_flags |= UPDATE_SHAREMAX;
3469 if (cap)
3470 update_flags |= UPDATE_NEEDSISSUE;
3471 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
3472 ack, client));
3473 if (need_flush && !*need_flush &&
3474 ((change_max && new_max) || // max INCREASE
3475 _need_flush_mdlog(in, dirty)))
3476 *need_flush = true;
3477
3478 return true;
3479 }
3480
3481 void Locker::handle_client_cap_release(const MClientCapRelease::const_ref &m)
3482 {
3483 client_t client = m->get_source().num();
3484 dout(10) << "handle_client_cap_release " << *m << dendl;
3485
3486 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3487 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3488 return;
3489 }
3490
3491 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3492 // Pause RADOS operations until we see the required epoch
3493 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3494 }
3495
3496 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3497 // Record the barrier so that we will retransmit it to clients
3498 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3499 }
3500
3501 Session *session = mds->get_session(m);
3502
3503 for (const auto &cap : m->caps) {
3504 _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
3505 }
3506
3507 if (session) {
3508 session->notify_cap_release(m->caps.size());
3509 }
3510 }
3511
3512 class C_Locker_RetryCapRelease : public LockerContext {
3513 client_t client;
3514 inodeno_t ino;
3515 uint64_t cap_id;
3516 ceph_seq_t migrate_seq;
3517 ceph_seq_t issue_seq;
3518 public:
3519 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3520 ceph_seq_t mseq, ceph_seq_t seq) :
3521 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3522 void finish(int r) override {
3523 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3524 }
3525 };
3526
3527 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3528 ceph_seq_t mseq, ceph_seq_t seq)
3529 {
3530 CInode *in = mdcache->get_inode(ino);
3531 if (!in) {
3532 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3533 return;
3534 }
3535 Capability *cap = in->get_client_cap(client);
3536 if (!cap) {
3537 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3538 return;
3539 }
3540
3541 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3542 if (cap->get_cap_id() != cap_id) {
3543 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3544 return;
3545 }
3546 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3547 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3548 return;
3549 }
3550 if (should_defer_client_cap_frozen(in)) {
3551 dout(7) << " freezing|frozen, deferring" << dendl;
3552 in->add_waiter(CInode::WAIT_UNFREEZE,
3553 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3554 return;
3555 }
3556 if (seq != cap->get_last_issue()) {
3557 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3558 // clean out any old revoke history
3559 cap->clean_revoke_from(seq);
3560 eval_cap_gather(in);
3561 return;
3562 }
3563 remove_client_cap(in, cap);
3564 }
3565
3566 void Locker::remove_client_cap(CInode *in, Capability *cap)
3567 {
3568 client_t client = cap->get_client();
3569 // clean out any pending snapflush state
3570 if (!in->client_need_snapflush.empty())
3571 _do_null_snapflush(in, client);
3572
3573 bool notable = cap->is_notable();
3574 in->remove_client_cap(client);
3575 if (!notable)
3576 return;
3577
3578 if (in->is_auth()) {
3579 // make sure we clear out the client byte range
3580 if (in->get_projected_inode()->client_ranges.count(client) &&
3581 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3582 check_inode_max_size(in);
3583 } else {
3584 request_inode_file_caps(in);
3585 }
3586
3587 try_eval(in, CEPH_CAP_LOCKS);
3588 }
3589
3590
3591 /**
3592 * Return true if any currently revoking caps exceed the
3593 * session_timeout threshold.
3594 */
3595 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
3596 double timeout) const
3597 {
3598 xlist<Capability*>::const_iterator p = revoking.begin();
3599 if (p.end()) {
3600 // No revoking caps at the moment
3601 return false;
3602 } else {
3603 utime_t now = ceph_clock_now();
3604 utime_t age = now - (*p)->get_last_revoke_stamp();
3605 if (age <= timeout) {
3606 return false;
3607 } else {
3608 return true;
3609 }
3610 }
3611 }
3612
3613 void Locker::get_late_revoking_clients(std::list<client_t> *result,
3614 double timeout) const
3615 {
3616 if (!any_late_revoking_caps(revoking_caps, timeout)) {
3617 // Fast path: no misbehaving clients, execute in O(1)
3618 return;
3619 }
3620
3621 // Slow path: execute in O(N_clients)
3622 for (auto &p : revoking_caps_by_client) {
3623 if (any_late_revoking_caps(p.second, timeout)) {
3624 result->push_back(p.first);
3625 }
3626 }
3627 }
3628
3629 // Hard-code instead of surfacing a config settings because this is
3630 // really a hack that should go away at some point when we have better
3631 // inspection tools for getting at detailed cap state (#7316)
3632 #define MAX_WARN_CAPS 100
3633
3634 void Locker::caps_tick()
3635 {
3636 utime_t now = ceph_clock_now();
3637
3638 if (!need_snapflush_inodes.empty()) {
3639 // snap inodes that needs flush are auth pinned, they affect
3640 // subtree/difrarg freeze.
3641 utime_t cutoff = now;
3642 cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
3643
3644 CInode *last = need_snapflush_inodes.back();
3645 while (!need_snapflush_inodes.empty()) {
3646 CInode *in = need_snapflush_inodes.front();
3647 if (in->last_dirstat_prop >= cutoff)
3648 break;
3649 in->item_caps.remove_myself();
3650 snapflush_nudge(in);
3651 if (in == last)
3652 break;
3653 }
3654 }
3655
3656 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3657
3658 now = ceph_clock_now();
3659 int n = 0;
3660 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3661 Capability *cap = *p;
3662
3663 utime_t age = now - cap->get_last_revoke_stamp();
3664 dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3665 if (age <= mds->mdsmap->get_session_timeout()) {
3666 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
3667 break;
3668 } else {
3669 ++n;
3670 if (n > MAX_WARN_CAPS) {
3671 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3672 << "revoking, ignoring subsequent caps" << dendl;
3673 break;
3674 }
3675 }
3676 // exponential backoff of warning intervals
3677 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
3678 cap->inc_num_revoke_warnings();
3679 stringstream ss;
3680 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3681 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3682 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3683 mds->clog->warn() << ss.str();
3684 dout(20) << __func__ << " " << ss.str() << dendl;
3685 } else {
3686 dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3687 }
3688 }
3689 }
3690
3691
3692 void Locker::handle_client_lease(const MClientLease::const_ref &m)
3693 {
3694 dout(10) << "handle_client_lease " << *m << dendl;
3695
3696 ceph_assert(m->get_source().is_client());
3697 client_t client = m->get_source().num();
3698
3699 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3700 if (!in) {
3701 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3702 return;
3703 }
3704 CDentry *dn = 0;
3705
3706 frag_t fg = in->pick_dirfrag(m->dname);
3707 CDir *dir = in->get_dirfrag(fg);
3708 if (dir)
3709 dn = dir->lookup(m->dname);
3710 if (!dn) {
3711 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3712 return;
3713 }
3714 dout(10) << " on " << *dn << dendl;
3715
3716 // replica and lock
3717 ClientLease *l = dn->get_client_lease(client);
3718 if (!l) {
3719 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3720 return;
3721 }
3722
3723 switch (m->get_action()) {
3724 case CEPH_MDS_LEASE_REVOKE_ACK:
3725 case CEPH_MDS_LEASE_RELEASE:
3726 if (l->seq != m->get_seq()) {
3727 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3728 } else {
3729 dout(7) << "handle_client_lease client." << client
3730 << " on " << *dn << dendl;
3731 dn->remove_client_lease(l, this);
3732 }
3733 break;
3734
3735 case CEPH_MDS_LEASE_RENEW:
3736 {
3737 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3738 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3739 if (dn->lock.can_lease(client)) {
3740 auto reply = MClientLease::create(*m);
3741 int pool = 1; // fixme.. do something smart!
3742 reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3743 reply->h.seq = ++l->seq;
3744 reply->clear_payload();
3745
3746 utime_t now = ceph_clock_now();
3747 now += mdcache->client_lease_durations[pool];
3748 mdcache->touch_client_lease(l, pool, now);
3749
3750 mds->send_message_client_counted(reply, m->get_connection());
3751 }
3752 }
3753 break;
3754
3755 default:
3756 ceph_abort(); // implement me
3757 break;
3758 }
3759 }
3760
3761
3762 void Locker::issue_client_lease(CDentry *dn, client_t client,
3763 bufferlist &bl, utime_t now, Session *session)
3764 {
3765 CInode *diri = dn->get_dir()->get_inode();
3766 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3767 ((!diri->filelock.can_lease(client) &&
3768 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3769 dn->lock.can_lease(client)) {
3770 int pool = 1; // fixme.. do something smart!
3771 // issue a dentry lease
3772 ClientLease *l = dn->add_client_lease(client, session);
3773 session->touch_lease(l);
3774
3775 now += mdcache->client_lease_durations[pool];
3776 mdcache->touch_client_lease(l, pool, now);
3777
3778 LeaseStat lstat;
3779 lstat.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3780 lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
3781 lstat.seq = ++l->seq;
3782 encode_lease(bl, session->info, lstat);
3783 dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
3784 << " on " << *dn << dendl;
3785 } else {
3786 // null lease
3787 LeaseStat lstat;
3788 encode_lease(bl, session->info, lstat);
3789 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3790 }
3791 }
3792
3793
3794 void Locker::revoke_client_leases(SimpleLock *lock)
3795 {
3796 int n = 0;
3797 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3798 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3799 p != dn->client_lease_map.end();
3800 ++p) {
3801 ClientLease *l = p->second;
3802
3803 n++;
3804 ceph_assert(lock->get_type() == CEPH_LOCK_DN);
3805
3806 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3807 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3808
3809 // i should also revoke the dir ICONTENT lease, if they have it!
3810 CInode *diri = dn->get_dir()->get_inode();
3811 auto lease = MClientLease::create(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
3812 mds->send_message_client_counted(lease, l->client);
3813 }
3814 }
3815
3816 void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
3817 const LeaseStat& ls)
3818 {
3819 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
3820 ENCODE_START(1, 1, bl);
3821 encode(ls.mask, bl);
3822 encode(ls.duration_ms, bl);
3823 encode(ls.seq, bl);
3824 ENCODE_FINISH(bl);
3825 }
3826 else {
3827 encode(ls.mask, bl);
3828 encode(ls.duration_ms, bl);
3829 encode(ls.seq, bl);
3830 }
3831 }
3832
3833 // locks ----------------------------------------------------------------
3834
3835 SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
3836 {
3837 switch (lock_type) {
3838 case CEPH_LOCK_DN:
3839 {
3840 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3841 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3842 frag_t fg;
3843 CDir *dir = 0;
3844 CDentry *dn = 0;
3845 if (diri) {
3846 fg = diri->pick_dirfrag(info.dname);
3847 dir = diri->get_dirfrag(fg);
3848 if (dir)
3849 dn = dir->lookup(info.dname, info.snapid);
3850 }
3851 if (!dn) {
3852 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3853 return 0;
3854 }
3855 return &dn->lock;
3856 }
3857
3858 case CEPH_LOCK_IAUTH:
3859 case CEPH_LOCK_ILINK:
3860 case CEPH_LOCK_IDFT:
3861 case CEPH_LOCK_IFILE:
3862 case CEPH_LOCK_INEST:
3863 case CEPH_LOCK_IXATTR:
3864 case CEPH_LOCK_ISNAP:
3865 case CEPH_LOCK_IFLOCK:
3866 case CEPH_LOCK_IPOLICY:
3867 {
3868 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3869 if (!in) {
3870 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3871 return 0;
3872 }
3873 switch (lock_type) {
3874 case CEPH_LOCK_IAUTH: return &in->authlock;
3875 case CEPH_LOCK_ILINK: return &in->linklock;
3876 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3877 case CEPH_LOCK_IFILE: return &in->filelock;
3878 case CEPH_LOCK_INEST: return &in->nestlock;
3879 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3880 case CEPH_LOCK_ISNAP: return &in->snaplock;
3881 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3882 case CEPH_LOCK_IPOLICY: return &in->policylock;
3883 }
3884 }
3885
3886 default:
3887 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3888 ceph_abort();
3889 break;
3890 }
3891
3892 return 0;
3893 }
3894
3895 void Locker::handle_lock(const MLock::const_ref &m)
3896 {
3897 // nobody should be talking to us during recovery.
3898 ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3899
3900 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3901 if (!lock) {
3902 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3903 return;
3904 }
3905
3906 switch (lock->get_type()) {
3907 case CEPH_LOCK_DN:
3908 case CEPH_LOCK_IAUTH:
3909 case CEPH_LOCK_ILINK:
3910 case CEPH_LOCK_ISNAP:
3911 case CEPH_LOCK_IXATTR:
3912 case CEPH_LOCK_IFLOCK:
3913 case CEPH_LOCK_IPOLICY:
3914 handle_simple_lock(lock, m);
3915 break;
3916
3917 case CEPH_LOCK_IDFT:
3918 case CEPH_LOCK_INEST:
3919 //handle_scatter_lock((ScatterLock*)lock, m);
3920 //break;
3921
3922 case CEPH_LOCK_IFILE:
3923 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3924 break;
3925
3926 default:
3927 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3928 ceph_abort();
3929 break;
3930 }
3931 }
3932
3933
3934
3935
3936
3937 // ==========================================================================
3938 // simple lock
3939
3940 /** This function may take a reference to m if it needs one, but does
3941 * not put references. */
3942 void Locker::handle_reqrdlock(SimpleLock *lock, const MLock::const_ref &m)
3943 {
3944 MDSCacheObject *parent = lock->get_parent();
3945 if (parent->is_auth() &&
3946 lock->get_state() != LOCK_SYNC &&
3947 !parent->is_frozen()) {
3948 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3949 << " on " << *parent << dendl;
3950 ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
3951 if (lock->is_stable()) {
3952 simple_sync(lock);
3953 } else {
3954 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3955 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3956 new C_MDS_RetryMessage(mds, m));
3957 }
3958 } else {
3959 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3960 << " on " << *parent << dendl;
3961 // replica should retry
3962 }
3963 }
3964
3965 void Locker::handle_simple_lock(SimpleLock *lock, const MLock::const_ref &m)
3966 {
3967 int from = m->get_asker();
3968
3969 dout(10) << "handle_simple_lock " << *m
3970 << " on " << *lock << " " << *lock->get_parent() << dendl;
3971
3972 if (mds->is_rejoin()) {
3973 if (lock->get_parent()->is_rejoining()) {
3974 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3975 << ", dropping " << *m << dendl;
3976 return;
3977 }
3978 }
3979
3980 switch (m->get_action()) {
3981 // -- replica --
3982 case LOCK_AC_SYNC:
3983 ceph_assert(lock->get_state() == LOCK_LOCK);
3984 lock->decode_locked_state(m->get_data());
3985 lock->set_state(LOCK_SYNC);
3986 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3987 break;
3988
3989 case LOCK_AC_LOCK:
3990 ceph_assert(lock->get_state() == LOCK_SYNC);
3991 lock->set_state(LOCK_SYNC_LOCK);
3992 if (lock->is_leased())
3993 revoke_client_leases(lock);
3994 eval_gather(lock, true);
3995 if (lock->is_unstable_and_locked())
3996 mds->mdlog->flush();
3997 break;
3998
3999
4000 // -- auth --
4001 case LOCK_AC_LOCKACK:
4002 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
4003 lock->get_state() == LOCK_SYNC_EXCL);
4004 ceph_assert(lock->is_gathering(from));
4005 lock->remove_gather(from);
4006
4007 if (lock->is_gathering()) {
4008 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4009 << ", still gathering " << lock->get_gather_set() << dendl;
4010 } else {
4011 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4012 << ", last one" << dendl;
4013 eval_gather(lock);
4014 }
4015 break;
4016
4017 case LOCK_AC_REQRDLOCK:
4018 handle_reqrdlock(lock, m);
4019 break;
4020
4021 }
4022 }
4023
4024 /* unused, currently.
4025
4026 class C_Locker_SimpleEval : public Context {
4027 Locker *locker;
4028 SimpleLock *lock;
4029 public:
4030 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4031 void finish(int r) {
4032 locker->try_simple_eval(lock);
4033 }
4034 };
4035
4036 void Locker::try_simple_eval(SimpleLock *lock)
4037 {
4038 // unstable and ambiguous auth?
4039 if (!lock->is_stable() &&
4040 lock->get_parent()->is_ambiguous_auth()) {
4041 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4042 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4043 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4044 return;
4045 }
4046
4047 if (!lock->get_parent()->is_auth()) {
4048 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4049 return;
4050 }
4051
4052 if (!lock->get_parent()->can_auth_pin()) {
4053 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4054 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4055 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4056 return;
4057 }
4058
4059 if (lock->is_stable())
4060 simple_eval(lock);
4061 }
4062 */
4063
4064
4065 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4066 {
4067 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4068
4069 ceph_assert(lock->get_parent()->is_auth());
4070 ceph_assert(lock->is_stable());
4071
4072 if (lock->get_parent()->is_freezing_or_frozen()) {
4073 // dentry/snap lock in unreadable state can block path traverse
4074 if ((lock->get_type() != CEPH_LOCK_DN &&
4075 lock->get_type() != CEPH_LOCK_ISNAP) ||
4076 lock->get_state() == LOCK_SYNC ||
4077 lock->get_parent()->is_frozen())
4078 return;
4079 }
4080
4081 if (mdcache->is_readonly()) {
4082 if (lock->get_state() != LOCK_SYNC) {
4083 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4084 simple_sync(lock, need_issue);
4085 }
4086 return;
4087 }
4088
4089 CInode *in = 0;
4090 int wanted = 0;
4091 if (lock->get_cap_shift()) {
4092 in = static_cast<CInode*>(lock->get_parent());
4093 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4094 }
4095
4096 // -> excl?
4097 if (lock->get_state() != LOCK_EXCL &&
4098 in && in->get_target_loner() >= 0 &&
4099 (wanted & CEPH_CAP_GEXCL)) {
4100 dout(7) << "simple_eval stable, going to excl " << *lock
4101 << " on " << *lock->get_parent() << dendl;
4102 simple_excl(lock, need_issue);
4103 }
4104
4105 // stable -> sync?
4106 else if (lock->get_state() != LOCK_SYNC &&
4107 !lock->is_wrlocked() &&
4108 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4109 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4110 dout(7) << "simple_eval stable, syncing " << *lock
4111 << " on " << *lock->get_parent() << dendl;
4112 simple_sync(lock, need_issue);
4113 }
4114 }
4115
4116
4117 // mid
4118
4119 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4120 {
4121 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4122 ceph_assert(lock->get_parent()->is_auth());
4123 ceph_assert(lock->is_stable());
4124
4125 CInode *in = 0;
4126 if (lock->get_cap_shift())
4127 in = static_cast<CInode *>(lock->get_parent());
4128
4129 int old_state = lock->get_state();
4130
4131 if (old_state != LOCK_TSYN) {
4132
4133 switch (lock->get_state()) {
4134 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4135 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4136 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4137 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4138 default: ceph_abort();
4139 }
4140
4141 int gather = 0;
4142 if (lock->is_wrlocked())
4143 gather++;
4144
4145 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4146 send_lock_message(lock, LOCK_AC_SYNC);
4147 lock->init_gather();
4148 gather++;
4149 }
4150
4151 if (in && in->is_head()) {
4152 if (in->issued_caps_need_gather(lock)) {
4153 if (need_issue)
4154 *need_issue = true;
4155 else
4156 issue_caps(in);
4157 gather++;
4158 }
4159 }
4160
4161 bool need_recover = false;
4162 if (lock->get_type() == CEPH_LOCK_IFILE) {
4163 ceph_assert(in);
4164 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4165 mds->mdcache->queue_file_recover(in);
4166 need_recover = true;
4167 gather++;
4168 }
4169 }
4170
4171 if (!gather && lock->is_dirty()) {
4172 lock->get_parent()->auth_pin(lock);
4173 scatter_writebehind(static_cast<ScatterLock*>(lock));
4174 mds->mdlog->flush();
4175 return false;
4176 }
4177
4178 if (gather) {
4179 lock->get_parent()->auth_pin(lock);
4180 if (need_recover)
4181 mds->mdcache->do_file_recover();
4182 return false;
4183 }
4184 }
4185
4186 if (lock->get_parent()->is_replicated()) { // FIXME
4187 bufferlist data;
4188 lock->encode_locked_state(data);
4189 send_lock_message(lock, LOCK_AC_SYNC, data);
4190 }
4191 lock->set_state(LOCK_SYNC);
4192 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4193 if (in && in->is_head()) {
4194 if (need_issue)
4195 *need_issue = true;
4196 else
4197 issue_caps(in);
4198 }
4199 return true;
4200 }
4201
4202 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4203 {
4204 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4205 ceph_assert(lock->get_parent()->is_auth());
4206 ceph_assert(lock->is_stable());
4207
4208 CInode *in = 0;
4209 if (lock->get_cap_shift())
4210 in = static_cast<CInode *>(lock->get_parent());
4211
4212 switch (lock->get_state()) {
4213 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4214 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4215 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4216 default: ceph_abort();
4217 }
4218
4219 int gather = 0;
4220 if (lock->is_rdlocked())
4221 gather++;
4222 if (lock->is_wrlocked())
4223 gather++;
4224
4225 if (lock->get_parent()->is_replicated() &&
4226 lock->get_state() != LOCK_LOCK_EXCL &&
4227 lock->get_state() != LOCK_XSYN_EXCL) {
4228 send_lock_message(lock, LOCK_AC_LOCK);
4229 lock->init_gather();
4230 gather++;
4231 }
4232
4233 if (in && in->is_head()) {
4234 if (in->issued_caps_need_gather(lock)) {
4235 if (need_issue)
4236 *need_issue = true;
4237 else
4238 issue_caps(in);
4239 gather++;
4240 }
4241 }
4242
4243 if (gather) {
4244 lock->get_parent()->auth_pin(lock);
4245 } else {
4246 lock->set_state(LOCK_EXCL);
4247 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4248 if (in) {
4249 if (need_issue)
4250 *need_issue = true;
4251 else
4252 issue_caps(in);
4253 }
4254 }
4255 }
4256
4257 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4258 {
4259 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4260 ceph_assert(lock->get_parent()->is_auth());
4261 ceph_assert(lock->is_stable());
4262 ceph_assert(lock->get_state() != LOCK_LOCK);
4263
4264 CInode *in = 0;
4265 if (lock->get_cap_shift())
4266 in = static_cast<CInode *>(lock->get_parent());
4267
4268 int old_state = lock->get_state();
4269
4270 switch (lock->get_state()) {
4271 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4272 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4273 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4274 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4275 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4276 break;
4277 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4278 default: ceph_abort();
4279 }
4280
4281 int gather = 0;
4282 if (lock->is_leased()) {
4283 gather++;
4284 revoke_client_leases(lock);
4285 }
4286 if (lock->is_rdlocked())
4287 gather++;
4288 if (in && in->is_head()) {
4289 if (in->issued_caps_need_gather(lock)) {
4290 if (need_issue)
4291 *need_issue = true;
4292 else
4293 issue_caps(in);
4294 gather++;
4295 }
4296 }
4297
4298 bool need_recover = false;
4299 if (lock->get_type() == CEPH_LOCK_IFILE) {
4300 ceph_assert(in);
4301 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4302 mds->mdcache->queue_file_recover(in);
4303 need_recover = true;
4304 gather++;
4305 }
4306 }
4307
4308 if (lock->get_parent()->is_replicated() &&
4309 lock->get_state() == LOCK_MIX_LOCK &&
4310 gather) {
4311 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4312 } else {
4313 // move to second stage of gather now, so we don't send the lock action later.
4314 if (lock->get_state() == LOCK_MIX_LOCK)
4315 lock->set_state(LOCK_MIX_LOCK2);
4316
4317 if (lock->get_parent()->is_replicated() &&
4318 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4319 gather++;
4320 send_lock_message(lock, LOCK_AC_LOCK);
4321 lock->init_gather();
4322 }
4323 }
4324
4325 if (!gather && lock->is_dirty()) {
4326 lock->get_parent()->auth_pin(lock);
4327 scatter_writebehind(static_cast<ScatterLock*>(lock));
4328 mds->mdlog->flush();
4329 return;
4330 }
4331
4332 if (gather) {
4333 lock->get_parent()->auth_pin(lock);
4334 if (need_recover)
4335 mds->mdcache->do_file_recover();
4336 } else {
4337 lock->set_state(LOCK_LOCK);
4338 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4339 }
4340 }
4341
4342
4343 void Locker::simple_xlock(SimpleLock *lock)
4344 {
4345 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4346 ceph_assert(lock->get_parent()->is_auth());
4347 //assert(lock->is_stable());
4348 ceph_assert(lock->get_state() != LOCK_XLOCK);
4349
4350 CInode *in = 0;
4351 if (lock->get_cap_shift())
4352 in = static_cast<CInode *>(lock->get_parent());
4353
4354 if (lock->is_stable())
4355 lock->get_parent()->auth_pin(lock);
4356
4357 switch (lock->get_state()) {
4358 case LOCK_LOCK:
4359 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4360 default: ceph_abort();
4361 }
4362
4363 int gather = 0;
4364 if (lock->is_rdlocked())
4365 gather++;
4366 if (lock->is_wrlocked())
4367 gather++;
4368
4369 if (in && in->is_head()) {
4370 if (in->issued_caps_need_gather(lock)) {
4371 issue_caps(in);
4372 gather++;
4373 }
4374 }
4375
4376 if (!gather) {
4377 lock->set_state(LOCK_PREXLOCK);
4378 //assert("shouldn't be called if we are already xlockable" == 0);
4379 }
4380 }
4381
4382
4383
4384
4385
4386 // ==========================================================================
4387 // scatter lock
4388
4389 /*
4390
4391 Some notes on scatterlocks.
4392
4393 - The scatter/gather is driven by the inode lock. The scatter always
4394 brings in the latest metadata from the fragments.
4395
4396 - When in a scattered/MIX state, fragments are only allowed to
4397 update/be written to if the accounted stat matches the inode's
4398 current version.
4399
4400 - That means, on gather, we _only_ assimilate diffs for frag metadata
4401 that match the current version, because those are the only ones
4402 written during this scatter/gather cycle. (Others didn't permit
4403 it.) We increment the version and journal this to disk.
4404
4405 - When possible, we also simultaneously update our local frag
4406 accounted stats to match.
4407
4408 - On scatter, the new inode info is broadcast to frags, both local
4409 and remote. If possible (auth and !frozen), the dirfrag auth
4410 should update the accounted state (if it isn't already up to date).
4411 Note that this may occur on both the local inode auth node and
4412 inode replicas, so there are two potential paths. If it is NOT
4413 possible, they need to mark_stale to prevent any possible writes.
4414
4415 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4416 only). Both are opportunities to update the frag accounted stats,
4417 even though only the MIX case is affected by a stale dirfrag.
4418
4419 - Because many scatter/gather cycles can potentially go by without a
4420 frag being able to update its accounted stats (due to being frozen
4421 by exports/refragments in progress), the frag may have (even very)
4422 old stat versions. That's fine. If when we do want to update it,
4423 we can update accounted_* and the version first.
4424
4425 */
4426
4427 class C_Locker_ScatterWB : public LockerLogContext {
4428 ScatterLock *lock;
4429 MutationRef mut;
4430 public:
4431 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4432 LockerLogContext(l), lock(sl), mut(m) {}
4433 void finish(int r) override {
4434 locker->scatter_writebehind_finish(lock, mut);
4435 }
4436 };
4437
4438 void Locker::scatter_writebehind(ScatterLock *lock)
4439 {
4440 CInode *in = static_cast<CInode*>(lock->get_parent());
4441 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4442
4443 // journal
4444 MutationRef mut(new MutationImpl());
4445 mut->ls = mds->mdlog->get_current_segment();
4446
4447 // forcefully take a wrlock
4448 lock->get_wrlock(true);
4449 mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
4450
4451 in->pre_cow_old_inode(); // avoid cow mayhem
4452
4453 auto &pi = in->project_inode();
4454 pi.inode.version = in->pre_dirty();
4455
4456 in->finish_scatter_gather_update(lock->get_type());
4457 lock->start_flush();
4458
4459 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4460 mds->mdlog->start_entry(le);
4461
4462 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4463 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4464
4465 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4466
4467 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4468 }
4469
4470 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4471 {
4472 CInode *in = static_cast<CInode*>(lock->get_parent());
4473 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4474 in->pop_and_dirty_projected_inode(mut->ls);
4475
4476 lock->finish_flush();
4477
4478 // if replicas may have flushed in a mix->lock state, send another
4479 // message so they can finish_flush().
4480 if (in->is_replicated()) {
4481 switch (lock->get_state()) {
4482 case LOCK_MIX_LOCK:
4483 case LOCK_MIX_LOCK2:
4484 case LOCK_MIX_EXCL:
4485 case LOCK_MIX_TSYN:
4486 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4487 }
4488 }
4489
4490 mut->apply();
4491 drop_locks(mut.get());
4492 mut->cleanup();
4493
4494 if (lock->is_stable())
4495 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4496
4497 //scatter_eval_gather(lock);
4498 }
4499
4500 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4501 {
4502 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4503
4504 ceph_assert(lock->get_parent()->is_auth());
4505 ceph_assert(lock->is_stable());
4506
4507 if (lock->get_parent()->is_freezing_or_frozen()) {
4508 dout(20) << " freezing|frozen" << dendl;
4509 return;
4510 }
4511
4512 if (mdcache->is_readonly()) {
4513 if (lock->get_state() != LOCK_SYNC) {
4514 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4515 simple_sync(lock, need_issue);
4516 }
4517 return;
4518 }
4519
4520 if (!lock->is_rdlocked() &&
4521 lock->get_state() != LOCK_MIX &&
4522 lock->get_scatter_wanted()) {
4523 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4524 << " on " << *lock->get_parent() << dendl;
4525 scatter_mix(lock, need_issue);
4526 return;
4527 }
4528
4529 if (lock->get_type() == CEPH_LOCK_INEST) {
4530 // in general, we want to keep INEST writable at all times.
4531 if (!lock->is_rdlocked()) {
4532 if (lock->get_parent()->is_replicated()) {
4533 if (lock->get_state() != LOCK_MIX)
4534 scatter_mix(lock, need_issue);
4535 } else {
4536 if (lock->get_state() != LOCK_LOCK)
4537 simple_lock(lock, need_issue);
4538 }
4539 }
4540 return;
4541 }
4542
4543 CInode *in = static_cast<CInode*>(lock->get_parent());
4544 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4545 // i _should_ be sync.
4546 if (!lock->is_wrlocked() &&
4547 lock->get_state() != LOCK_SYNC) {
4548 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4549 simple_sync(lock, need_issue);
4550 }
4551 }
4552 }
4553
4554
4555 /*
4556 * mark a scatterlock to indicate that the dir fnode has some dirty data
4557 */
4558 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4559 {
4560 lock->mark_dirty();
4561 if (lock->get_updated_item()->is_on_list()) {
4562 dout(10) << "mark_updated_scatterlock " << *lock
4563 << " - already on list since " << lock->get_update_stamp() << dendl;
4564 } else {
4565 updated_scatterlocks.push_back(lock->get_updated_item());
4566 utime_t now = ceph_clock_now();
4567 lock->set_update_stamp(now);
4568 dout(10) << "mark_updated_scatterlock " << *lock
4569 << " - added at " << now << dendl;
4570 }
4571 }
4572
4573 /*
4574 * this is called by scatter_tick and LogSegment::try_to_trim() when
4575 * trying to flush dirty scattered data (i.e. updated fnode) back to
4576 * the inode.
4577 *
4578 * we need to lock|scatter in order to push fnode changes into the
4579 * inode.dirstat.
4580 */
4581 void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
4582 {
4583 CInode *p = static_cast<CInode *>(lock->get_parent());
4584
4585 if (p->is_frozen() || p->is_freezing()) {
4586 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4587 if (c)
4588 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4589 else if (lock->is_dirty())
4590 // just requeue. not ideal.. starvation prone..
4591 updated_scatterlocks.push_back(lock->get_updated_item());
4592 return;
4593 }
4594
4595 if (p->is_ambiguous_auth()) {
4596 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4597 if (c)
4598 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4599 else if (lock->is_dirty())
4600 // just requeue. not ideal.. starvation prone..
4601 updated_scatterlocks.push_back(lock->get_updated_item());
4602 return;
4603 }
4604
4605 if (p->is_auth()) {
4606 int count = 0;
4607 while (true) {
4608 if (lock->is_stable()) {
4609 // can we do it now?
4610 // (only if we're not replicated.. if we are, we really do need
4611 // to nudge the lock state!)
4612 /*
4613 actually, even if we're not replicated, we can't stay in MIX, because another mds
4614 could discover and replicate us at any time. if that happens while we're flushing,
4615 they end up in MIX but their inode has the old scatterstat version.
4616
4617 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4618 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4619 scatter_writebehind(lock);
4620 if (c)
4621 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4622 return;
4623 }
4624 */
4625
4626 if (mdcache->is_readonly()) {
4627 if (lock->get_state() != LOCK_SYNC) {
4628 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4629 simple_sync(static_cast<ScatterLock*>(lock));
4630 }
4631 break;
4632 }
4633
4634 // adjust lock state
4635 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4636 switch (lock->get_type()) {
4637 case CEPH_LOCK_IFILE:
4638 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4639 scatter_mix(static_cast<ScatterLock*>(lock));
4640 else if (lock->get_state() != LOCK_LOCK)
4641 simple_lock(static_cast<ScatterLock*>(lock));
4642 else
4643 simple_sync(static_cast<ScatterLock*>(lock));
4644 break;
4645
4646 case CEPH_LOCK_IDFT:
4647 case CEPH_LOCK_INEST:
4648 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4649 scatter_mix(lock);
4650 else if (lock->get_state() != LOCK_LOCK)
4651 simple_lock(lock);
4652 else
4653 simple_sync(lock);
4654 break;
4655 default:
4656 ceph_abort();
4657 }
4658 ++count;
4659 if (lock->is_stable() && count == 2) {
4660 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4661 // this should only realy happen when called via
4662 // handle_file_lock due to AC_NUDGE, because the rest of the
4663 // time we are replicated or have dirty data and won't get
4664 // called. bailing here avoids an infinite loop.
4665 ceph_assert(!c);
4666 break;
4667 }
4668 } else {
4669 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4670 if (c)
4671 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4672 return;
4673 }
4674 }
4675 } else {
4676 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4677 << *lock << " on " << *p << dendl;
4678 // request unscatter?
4679 mds_rank_t auth = lock->get_parent()->authority().first;
4680 if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
4681 mds->send_message_mds(MLock::create(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4682 }
4683
4684 // wait...
4685 if (c)
4686 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4687
4688 // also, requeue, in case we had wrong auth or something
4689 if (lock->is_dirty())
4690 updated_scatterlocks.push_back(lock->get_updated_item());
4691 }
4692 }
4693
4694 void Locker::scatter_tick()
4695 {
4696 dout(10) << "scatter_tick" << dendl;
4697
4698 // updated
4699 utime_t now = ceph_clock_now();
4700 int n = updated_scatterlocks.size();
4701 while (!updated_scatterlocks.empty()) {
4702 ScatterLock *lock = updated_scatterlocks.front();
4703
4704 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4705
4706 if (!lock->is_dirty()) {
4707 updated_scatterlocks.pop_front();
4708 dout(10) << " removing from updated_scatterlocks "
4709 << *lock << " " << *lock->get_parent() << dendl;
4710 continue;
4711 }
4712 if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
4713 break;
4714 updated_scatterlocks.pop_front();
4715 scatter_nudge(lock, 0);
4716 }
4717 mds->mdlog->flush();
4718 }
4719
4720
4721 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4722 {
4723 dout(10) << "scatter_tempsync " << *lock
4724 << " on " << *lock->get_parent() << dendl;
4725 ceph_assert(lock->get_parent()->is_auth());
4726 ceph_assert(lock->is_stable());
4727
4728 ceph_abort_msg("not fully implemented, at least not for filelock");
4729
4730 CInode *in = static_cast<CInode *>(lock->get_parent());
4731
4732 switch (lock->get_state()) {
4733 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4734 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4735 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4736 default: ceph_abort();
4737 }
4738
4739 int gather = 0;
4740 if (lock->is_wrlocked())
4741 gather++;
4742
4743 if (lock->get_cap_shift() &&
4744 in->is_head() &&
4745 in->issued_caps_need_gather(lock)) {
4746 if (need_issue)
4747 *need_issue = true;
4748 else
4749 issue_caps(in);
4750 gather++;
4751 }
4752
4753 if (lock->get_state() == LOCK_MIX_TSYN &&
4754 in->is_replicated()) {
4755 lock->init_gather();
4756 send_lock_message(lock, LOCK_AC_LOCK);
4757 gather++;
4758 }
4759
4760 if (gather) {
4761 in->auth_pin(lock);
4762 } else {
4763 // do tempsync
4764 lock->set_state(LOCK_TSYN);
4765 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4766 if (lock->get_cap_shift()) {
4767 if (need_issue)
4768 *need_issue = true;
4769 else
4770 issue_caps(in);
4771 }
4772 }
4773 }
4774
4775
4776
4777 // ==========================================================================
4778 // local lock
4779
4780 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4781 {
4782 dout(7) << "local_wrlock_grab on " << *lock
4783 << " on " << *lock->get_parent() << dendl;
4784
4785 ceph_assert(lock->get_parent()->is_auth());
4786 ceph_assert(lock->can_wrlock());
4787 lock->get_wrlock(mut->get_client());
4788
4789 auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
4790 ceph_assert(ret.second);
4791 }
4792
4793 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4794 {
4795 dout(7) << "local_wrlock_start on " << *lock
4796 << " on " << *lock->get_parent() << dendl;
4797
4798 ceph_assert(lock->get_parent()->is_auth());
4799 if (lock->can_wrlock()) {
4800 lock->get_wrlock(mut->get_client());
4801 auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
4802 ceph_assert(it->is_wrlock());
4803 return true;
4804 } else {
4805 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4806 return false;
4807 }
4808 }
4809
4810 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
4811 {
4812 ceph_assert(it->is_wrlock());
4813 LocalLock *lock = static_cast<LocalLock*>(it->lock);
4814 dout(7) << "local_wrlock_finish on " << *lock
4815 << " on " << *lock->get_parent() << dendl;
4816 lock->put_wrlock();
4817 mut->locks.erase(it);
4818 if (lock->get_num_wrlocks() == 0) {
4819 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4820 SimpleLock::WAIT_WR |
4821 SimpleLock::WAIT_RD);
4822 }
4823 }
4824
4825 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4826 {
4827 dout(7) << "local_xlock_start on " << *lock
4828 << " on " << *lock->get_parent() << dendl;
4829
4830 ceph_assert(lock->get_parent()->is_auth());
4831 if (!lock->can_xlock_local()) {
4832 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4833 return false;
4834 }
4835
4836 lock->get_xlock(mut, mut->get_client());
4837 mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
4838 return true;
4839 }
4840
4841 void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
4842 {
4843 ceph_assert(it->is_xlock());
4844 LocalLock *lock = static_cast<LocalLock*>(it->lock);
4845 dout(7) << "local_xlock_finish on " << *lock
4846 << " on " << *lock->get_parent() << dendl;
4847 lock->put_xlock();
4848 mut->locks.erase(it);
4849
4850 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4851 SimpleLock::WAIT_WR |
4852 SimpleLock::WAIT_RD);
4853 }
4854
4855
4856
4857 // ==========================================================================
4858 // file lock
4859
4860
4861 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4862 {
4863 CInode *in = static_cast<CInode*>(lock->get_parent());
4864 int loner_wanted, other_wanted;
4865 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4866 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4867 << " loner_wanted=" << gcap_string(loner_wanted)
4868 << " other_wanted=" << gcap_string(other_wanted)
4869 << " filelock=" << *lock << " on " << *lock->get_parent()
4870 << dendl;
4871
4872 ceph_assert(lock->get_parent()->is_auth());
4873 ceph_assert(lock->is_stable());
4874
4875 if (lock->get_parent()->is_freezing_or_frozen())
4876 return;
4877
4878 if (mdcache->is_readonly()) {
4879 if (lock->get_state() != LOCK_SYNC) {
4880 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4881 simple_sync(lock, need_issue);
4882 }
4883 return;
4884 }
4885
4886 // excl -> *?
4887 if (lock->get_state() == LOCK_EXCL) {
4888 dout(20) << " is excl" << dendl;
4889 int loner_issued, other_issued, xlocker_issued;
4890 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4891 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4892 << " other_issued=" << gcap_string(other_issued)
4893 << " xlocker_issued=" << gcap_string(xlocker_issued)
4894 << dendl;
4895 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4896 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4897 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4898 dout(20) << " should lose it" << dendl;
4899 // we should lose it.
4900 // loner other want
4901 // R R SYNC
4902 // R R|W MIX
4903 // R W MIX
4904 // R|W R MIX
4905 // R|W R|W MIX
4906 // R|W W MIX
4907 // W R MIX
4908 // W R|W MIX
4909 // W W MIX
4910 // -> any writer means MIX; RD doesn't matter.
4911 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4912 lock->is_waiter_for(SimpleLock::WAIT_WR))
4913 scatter_mix(lock, need_issue);
4914 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4915 simple_sync(lock, need_issue);
4916 else
4917 dout(10) << " waiting for wrlock to drain" << dendl;
4918 }
4919 }
4920
4921 // * -> excl?
4922 else if (lock->get_state() != LOCK_EXCL &&
4923 !lock->is_rdlocked() &&
4924 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4925 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4926 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4927 in->get_target_loner() >= 0) {
4928 dout(7) << "file_eval stable, bump to loner " << *lock
4929 << " on " << *lock->get_parent() << dendl;
4930 file_excl(lock, need_issue);
4931 }
4932
4933 // * -> mixed?
4934 else if (lock->get_state() != LOCK_MIX &&
4935 !lock->is_rdlocked() &&
4936 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4937 (lock->get_scatter_wanted() ||
4938 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4939 dout(7) << "file_eval stable, bump to mixed " << *lock
4940 << " on " << *lock->get_parent() << dendl;
4941 scatter_mix(lock, need_issue);
4942 }
4943
4944 // * -> sync?
4945 else if (lock->get_state() != LOCK_SYNC &&
4946 !lock->is_wrlocked() && // drain wrlocks first!
4947 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4948 !(wanted & CEPH_CAP_GWR) &&
4949 !((lock->get_state() == LOCK_MIX) &&
4950 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4951 //((wanted & CEPH_CAP_RD) ||
4952 //in->is_replicated() ||
4953 //lock->is_leased() ||
4954 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4955 ) {
4956 dout(7) << "file_eval stable, bump to sync " << *lock
4957 << " on " << *lock->get_parent() << dendl;
4958 simple_sync(lock, need_issue);
4959 }
4960 }
4961
4962
4963
4964 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4965 {
4966 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4967
4968 CInode *in = static_cast<CInode*>(lock->get_parent());
4969 ceph_assert(in->is_auth());
4970 ceph_assert(lock->is_stable());
4971
4972 if (lock->get_state() == LOCK_LOCK) {
4973 in->start_scatter(lock);
4974 if (in->is_replicated()) {
4975 // data
4976 bufferlist softdata;
4977 lock->encode_locked_state(softdata);
4978
4979 // bcast to replicas
4980 send_lock_message(lock, LOCK_AC_MIX, softdata);
4981 }
4982
4983 // change lock
4984 lock->set_state(LOCK_MIX);
4985 lock->clear_scatter_wanted();
4986 if (lock->get_cap_shift()) {
4987 if (need_issue)
4988 *need_issue = true;
4989 else
4990 issue_caps(in);
4991 }
4992 } else {
4993 // gather?
4994 switch (lock->get_state()) {
4995 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4996 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4997 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
4998 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4999 default: ceph_abort();
5000 }
5001
5002 int gather = 0;
5003 if (lock->is_rdlocked())
5004 gather++;
5005 if (in->is_replicated()) {
5006 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
5007 send_lock_message(lock, LOCK_AC_MIX);
5008 lock->init_gather();
5009 gather++;
5010 }
5011 }
5012 if (lock->is_leased()) {
5013 revoke_client_leases(lock);
5014 gather++;
5015 }
5016 if (lock->get_cap_shift() &&
5017 in->is_head() &&
5018 in->issued_caps_need_gather(lock)) {
5019 if (need_issue)
5020 *need_issue = true;
5021 else
5022 issue_caps(in);
5023 gather++;
5024 }
5025 bool need_recover = false;
5026 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5027 mds->mdcache->queue_file_recover(in);
5028 need_recover = true;
5029 gather++;
5030 }
5031
5032 if (gather) {
5033 lock->get_parent()->auth_pin(lock);
5034 if (need_recover)
5035 mds->mdcache->do_file_recover();
5036 } else {
5037 in->start_scatter(lock);
5038 lock->set_state(LOCK_MIX);
5039 lock->clear_scatter_wanted();
5040 if (in->is_replicated()) {
5041 bufferlist softdata;
5042 lock->encode_locked_state(softdata);
5043 send_lock_message(lock, LOCK_AC_MIX, softdata);
5044 }
5045 if (lock->get_cap_shift()) {
5046 if (need_issue)
5047 *need_issue = true;
5048 else
5049 issue_caps(in);
5050 }
5051 }
5052 }
5053 }
5054
5055
5056 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5057 {
5058 CInode *in = static_cast<CInode*>(lock->get_parent());
5059 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5060
5061 ceph_assert(in->is_auth());
5062 ceph_assert(lock->is_stable());
5063
5064 ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
5065 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5066
5067 switch (lock->get_state()) {
5068 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5069 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5070 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5071 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5072 default: ceph_abort();
5073 }
5074 int gather = 0;
5075
5076 if (lock->is_rdlocked())
5077 gather++;
5078 if (lock->is_wrlocked())
5079 gather++;
5080
5081 if (in->is_replicated() &&
5082 lock->get_state() != LOCK_LOCK_EXCL &&
5083 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5084 send_lock_message(lock, LOCK_AC_LOCK);
5085 lock->init_gather();
5086 gather++;
5087 }
5088 if (lock->is_leased()) {
5089 revoke_client_leases(lock);
5090 gather++;
5091 }
5092 if (in->is_head() &&
5093 in->issued_caps_need_gather(lock)) {
5094 if (need_issue)
5095 *need_issue = true;
5096 else
5097 issue_caps(in);
5098 gather++;
5099 }
5100 bool need_recover = false;
5101 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5102 mds->mdcache->queue_file_recover(in);
5103 need_recover = true;
5104 gather++;
5105 }
5106
5107 if (gather) {
5108 lock->get_parent()->auth_pin(lock);
5109 if (need_recover)
5110 mds->mdcache->do_file_recover();
5111 } else {
5112 lock->set_state(LOCK_EXCL);
5113 if (need_issue)
5114 *need_issue = true;
5115 else
5116 issue_caps(in);
5117 }
5118 }
5119
5120 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5121 {
5122 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5123 CInode *in = static_cast<CInode *>(lock->get_parent());
5124 ceph_assert(in->is_auth());
5125 ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
5126
5127 switch (lock->get_state()) {
5128 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5129 default: ceph_abort();
5130 }
5131
5132 int gather = 0;
5133 if (lock->is_wrlocked())
5134 gather++;
5135
5136 if (in->is_head() &&
5137 in->issued_caps_need_gather(lock)) {
5138 if (need_issue)
5139 *need_issue = true;
5140 else
5141 issue_caps(in);
5142 gather++;
5143 }
5144
5145 if (gather) {
5146 lock->get_parent()->auth_pin(lock);
5147 } else {
5148 lock->set_state(LOCK_XSYN);
5149 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5150 if (need_issue)
5151 *need_issue = true;
5152 else
5153 issue_caps(in);
5154 }
5155 }
5156
5157 void Locker::file_recover(ScatterLock *lock)
5158 {
5159 CInode *in = static_cast<CInode *>(lock->get_parent());
5160 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5161
5162 ceph_assert(in->is_auth());
5163 //assert(lock->is_stable());
5164 ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5165
5166 int gather = 0;
5167
5168 /*
5169 if (in->is_replicated()
5170 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5171 send_lock_message(lock, LOCK_AC_LOCK);
5172 lock->init_gather();
5173 gather++;
5174 }
5175 */
5176 if (in->is_head() &&
5177 in->issued_caps_need_gather(lock)) {
5178 issue_caps(in);
5179 gather++;
5180 }
5181
5182 lock->set_state(LOCK_SCAN);
5183 if (gather)
5184 in->state_set(CInode::STATE_NEEDSRECOVER);
5185 else
5186 mds->mdcache->queue_file_recover(in);
5187 }
5188
5189
5190 // messenger
5191 void Locker::handle_file_lock(ScatterLock *lock, const MLock::const_ref &m)
5192 {
5193 CInode *in = static_cast<CInode*>(lock->get_parent());
5194 int from = m->get_asker();
5195
5196 if (mds->is_rejoin()) {
5197 if (in->is_rejoining()) {
5198 dout(7) << "handle_file_lock still rejoining " << *in
5199 << ", dropping " << *m << dendl;
5200 return;
5201 }
5202 }
5203
5204 dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
5205 << " on " << *lock
5206 << " from mds." << from << " "
5207 << *in << dendl;
5208
5209 bool caps = lock->get_cap_shift();
5210
5211 switch (m->get_action()) {
5212 // -- replica --
5213 case LOCK_AC_SYNC:
5214 ceph_assert(lock->get_state() == LOCK_LOCK ||
5215 lock->get_state() == LOCK_MIX ||
5216 lock->get_state() == LOCK_MIX_SYNC2);
5217
5218 if (lock->get_state() == LOCK_MIX) {
5219 lock->set_state(LOCK_MIX_SYNC);
5220 eval_gather(lock, true);
5221 if (lock->is_unstable_and_locked())
5222 mds->mdlog->flush();
5223 break;
5224 }
5225
5226 (static_cast<ScatterLock *>(lock))->finish_flush();
5227 (static_cast<ScatterLock *>(lock))->clear_flushed();
5228
5229 // ok
5230 lock->decode_locked_state(m->get_data());
5231 lock->set_state(LOCK_SYNC);
5232
5233 lock->get_rdlock();
5234 if (caps)
5235 issue_caps(in);
5236 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5237 lock->put_rdlock();
5238 break;
5239
5240 case LOCK_AC_LOCK:
5241 switch (lock->get_state()) {
5242 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5243 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5244 default: ceph_abort();
5245 }
5246
5247 eval_gather(lock, true);
5248 if (lock->is_unstable_and_locked())
5249 mds->mdlog->flush();
5250
5251 break;
5252
5253 case LOCK_AC_LOCKFLUSHED:
5254 (static_cast<ScatterLock *>(lock))->finish_flush();
5255 (static_cast<ScatterLock *>(lock))->clear_flushed();
5256 // wake up scatter_nudge waiters
5257 if (lock->is_stable())
5258 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5259 break;
5260
5261 case LOCK_AC_MIX:
5262 ceph_assert(lock->get_state() == LOCK_SYNC ||
5263 lock->get_state() == LOCK_LOCK ||
5264 lock->get_state() == LOCK_SYNC_MIX2);
5265
5266 if (lock->get_state() == LOCK_SYNC) {
5267 // MIXED
5268 lock->set_state(LOCK_SYNC_MIX);
5269 eval_gather(lock, true);
5270 if (lock->is_unstable_and_locked())
5271 mds->mdlog->flush();
5272 break;
5273 }
5274
5275 // ok
5276 lock->set_state(LOCK_MIX);
5277 lock->decode_locked_state(m->get_data());
5278
5279 if (caps)
5280 issue_caps(in);
5281
5282 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5283 break;
5284
5285
5286 // -- auth --
5287 case LOCK_AC_LOCKACK:
5288 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
5289 lock->get_state() == LOCK_MIX_LOCK ||
5290 lock->get_state() == LOCK_MIX_LOCK2 ||
5291 lock->get_state() == LOCK_MIX_EXCL ||
5292 lock->get_state() == LOCK_SYNC_EXCL ||
5293 lock->get_state() == LOCK_SYNC_MIX ||
5294 lock->get_state() == LOCK_MIX_TSYN);
5295 ceph_assert(lock->is_gathering(from));
5296 lock->remove_gather(from);
5297
5298 if (lock->get_state() == LOCK_MIX_LOCK ||
5299 lock->get_state() == LOCK_MIX_LOCK2 ||
5300 lock->get_state() == LOCK_MIX_EXCL ||
5301 lock->get_state() == LOCK_MIX_TSYN) {
5302 lock->decode_locked_state(m->get_data());
5303 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5304 // delay calling scatter_writebehind().
5305 lock->clear_flushed();
5306 }
5307
5308 if (lock->is_gathering()) {
5309 dout(7) << "handle_file_lock " << *in << " from " << from
5310 << ", still gathering " << lock->get_gather_set() << dendl;
5311 } else {
5312 dout(7) << "handle_file_lock " << *in << " from " << from
5313 << ", last one" << dendl;
5314 eval_gather(lock);
5315 }
5316 break;
5317
5318 case LOCK_AC_SYNCACK:
5319 ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
5320 ceph_assert(lock->is_gathering(from));
5321 lock->remove_gather(from);
5322
5323 lock->decode_locked_state(m->get_data());
5324
5325 if (lock->is_gathering()) {
5326 dout(7) << "handle_file_lock " << *in << " from " << from
5327 << ", still gathering " << lock->get_gather_set() << dendl;
5328 } else {
5329 dout(7) << "handle_file_lock " << *in << " from " << from
5330 << ", last one" << dendl;
5331 eval_gather(lock);
5332 }
5333 break;
5334
5335 case LOCK_AC_MIXACK:
5336 ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
5337 ceph_assert(lock->is_gathering(from));
5338 lock->remove_gather(from);
5339
5340 if (lock->is_gathering()) {
5341 dout(7) << "handle_file_lock " << *in << " from " << from
5342 << ", still gathering " << lock->get_gather_set() << dendl;
5343 } else {
5344 dout(7) << "handle_file_lock " << *in << " from " << from
5345 << ", last one" << dendl;
5346 eval_gather(lock);
5347 }
5348 break;
5349
5350
5351 // requests....
5352 case LOCK_AC_REQSCATTER:
5353 if (lock->is_stable()) {
5354 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5355 * because the replica should be holding an auth_pin if they're
5356 * doing this (and thus, we are freezing, not frozen, and indefinite
5357 * starvation isn't an issue).
5358 */
5359 dout(7) << "handle_file_lock got scatter request on " << *lock
5360 << " on " << *lock->get_parent() << dendl;
5361 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5362 scatter_mix(lock);
5363 } else {
5364 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5365 << " on " << *lock->get_parent() << dendl;
5366 lock->set_scatter_wanted();
5367 }
5368 break;
5369
5370 case LOCK_AC_REQUNSCATTER:
5371 if (lock->is_stable()) {
5372 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5373 * because the replica should be holding an auth_pin if they're
5374 * doing this (and thus, we are freezing, not frozen, and indefinite
5375 * starvation isn't an issue).
5376 */
5377 dout(7) << "handle_file_lock got unscatter request on " << *lock
5378 << " on " << *lock->get_parent() << dendl;
5379 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5380 simple_lock(lock); // FIXME tempsync?
5381 } else {
5382 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5383 << " on " << *lock->get_parent() << dendl;
5384 lock->set_unscatter_wanted();
5385 }
5386 break;
5387
5388 case LOCK_AC_REQRDLOCK:
5389 handle_reqrdlock(lock, m);
5390 break;
5391
5392 case LOCK_AC_NUDGE:
5393 if (!lock->get_parent()->is_auth()) {
5394 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5395 << " on " << *lock->get_parent() << dendl;
5396 } else if (!lock->get_parent()->is_replicated()) {
5397 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5398 << " on " << *lock->get_parent() << dendl;
5399 } else {
5400 dout(7) << "handle_file_lock trying nudge on " << *lock
5401 << " on " << *lock->get_parent() << dendl;
5402 scatter_nudge(lock, 0, true);
5403 mds->mdlog->flush();
5404 }
5405 break;
5406
5407 default:
5408 ceph_abort();
5409 }
5410 }