]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <string_view>
16
17 #include "MDSRank.h"
18 #include "MDCache.h"
19 #include "Locker.h"
20 #include "MDBalancer.h"
21 #include "Migrator.h"
22 #include "CInode.h"
23 #include "CDir.h"
24 #include "CDentry.h"
25 #include "Mutation.h"
26 #include "MDSContext.h"
27
28 #include "MDLog.h"
29 #include "MDSMap.h"
30
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
33
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
36
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
43
44 #include "messages/MMDSSlaveRequest.h"
45
46 #include <errno.h>
47
48 #include "common/config.h"
49
50
51 #define dout_subsys ceph_subsys_mds
52 #undef dout_prefix
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
56 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
57 }
58
59
60 class LockerContext : public MDSContext {
61 protected:
62 Locker *locker;
63 MDSRank *get_mds() override
64 {
65 return locker->mds;
66 }
67
68 public:
69 explicit LockerContext(Locker *locker_) : locker(locker_) {
70 ceph_assert(locker != NULL);
71 }
72 };
73
74 class LockerLogContext : public MDSLogContextBase {
75 protected:
76 Locker *locker;
77 MDSRank *get_mds() override
78 {
79 return locker->mds;
80 }
81
82 public:
83 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
84 ceph_assert(locker != NULL);
85 }
86 };
87
88 Locker::Locker(MDSRank *m, MDCache *c) :
89 mds(m), mdcache(c), need_snapflush_inodes(member_offset(CInode, item_caps)) {}
90
91
92 void Locker::dispatch(const Message::const_ref &m)
93 {
94
95 switch (m->get_type()) {
96 // inter-mds locking
97 case MSG_MDS_LOCK:
98 handle_lock(MLock::msgref_cast(m));
99 break;
100 // inter-mds caps
101 case MSG_MDS_INODEFILECAPS:
102 handle_inode_file_caps(MInodeFileCaps::msgref_cast(m));
103 break;
104 // client sync
105 case CEPH_MSG_CLIENT_CAPS:
106 handle_client_caps(MClientCaps::msgref_cast(m));
107 break;
108 case CEPH_MSG_CLIENT_CAPRELEASE:
109 handle_client_cap_release(MClientCapRelease::msgref_cast(m));
110 break;
111 case CEPH_MSG_CLIENT_LEASE:
112 handle_client_lease(MClientLease::msgref_cast(m));
113 break;
114 default:
115 derr << "locker unknown message " << m->get_type() << dendl;
116 ceph_abort_msg("locker unknown message");
117 }
118 }
119
120 void Locker::tick()
121 {
122 scatter_tick();
123 caps_tick();
124 }
125
126 /*
127 * locks vs rejoin
128 *
129 *
130 *
131 */
132
133 void Locker::send_lock_message(SimpleLock *lock, int msg)
134 {
135 for (const auto &it : lock->get_parent()->get_replicas()) {
136 if (mds->is_cluster_degraded() &&
137 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
138 continue;
139 auto m = MLock::create(lock, msg, mds->get_nodeid());
140 mds->send_message_mds(m, it.first);
141 }
142 }
143
144 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145 {
146 for (const auto &it : lock->get_parent()->get_replicas()) {
147 if (mds->is_cluster_degraded() &&
148 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
149 continue;
150 auto m = MLock::create(lock, msg, mds->get_nodeid());
151 m->set_data(data);
152 mds->send_message_mds(m, it.first);
153 }
154 }
155
156
157
158
159 void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
160 {
161 // rdlock ancestor snaps
162 CInode *t = in;
163 while (t->get_projected_parent_dn()) {
164 t = t->get_projected_parent_dn()->get_dir()->get_inode();
165 lov.add_rdlock(&t->snaplock);
166 }
167 lov.add_rdlock(&in->snaplock);
168 }
169
170 void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
171 file_layout_t **layout)
172 {
173 //rdlock ancestor snaps
174 CInode *t = in;
175 lov.add_rdlock(&in->snaplock);
176 lov.add_rdlock(&in->policylock);
177 bool found_layout = false;
178 while (t) {
179 lov.add_rdlock(&t->snaplock);
180 if (!found_layout) {
181 lov.add_rdlock(&t->policylock);
182 if (t->get_projected_inode()->has_layout()) {
183 *layout = &t->get_projected_inode()->layout;
184 found_layout = true;
185 }
186 }
187 if (t->get_projected_parent_dn() &&
188 t->get_projected_parent_dn()->get_dir())
189 t = t->get_projected_parent_dn()->get_dir()->get_inode();
190 else t = NULL;
191 }
192 }
193
194 struct MarkEventOnDestruct {
195 MDRequestRef& mdr;
196 std::string_view message;
197 bool mark_event;
198 MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
199 mdr(_mdr),
200 message(_message),
201 mark_event(true) {}
202 ~MarkEventOnDestruct() {
203 if (mark_event)
204 mdr->mark_event(message);
205 }
206 };
207
208 /* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210 bool Locker::acquire_locks(MDRequestRef& mdr,
211 MutationImpl::LockOpVec& lov,
212 CInode *auth_pin_freeze,
213 bool auth_pin_nonblock)
214 {
215 if (mdr->done_locking &&
216 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
217 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
218 return true; // at least we had better be!
219 }
220 dout(10) << "acquire_locks " << *mdr << dendl;
221
222 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
223
224 client_t client = mdr->get_client();
225
226 set<MDSCacheObject*> mustpin; // items to authpin
227
228 // xlocks
229 for (int i = 0, size = lov.size(); i < size; ++i) {
230 auto& p = lov[i];
231 SimpleLock *lock = p.lock;
232 MDSCacheObject *object = lock->get_parent();
233
234 if (p.is_xlock()) {
235 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
236 lock->get_type() == CEPH_LOCK_IPOLICY) &&
237 mds->is_cluster_degraded() &&
238 mdr->is_master() &&
239 !mdr->is_queued_for_replay()) {
240 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
241 // get processed in proper order.
242 bool wait = false;
243 if (object->is_auth()) {
244 if (!mdr->locks.count(lock)) {
245 set<mds_rank_t> ls;
246 object->list_replicas(ls);
247 for (auto m : ls) {
248 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
249 wait = true;
250 break;
251 }
252 }
253 }
254 } else {
255 // if the lock is the latest locked one, it's possible that slave mds got the lock
256 // while there are recovering mds.
257 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
258 wait = true;
259 }
260 if (wait) {
261 dout(10) << " must xlock " << *lock << " " << *object
262 << ", waiting for cluster recovered" << dendl;
263 mds->locker->drop_locks(mdr.get(), NULL);
264 mdr->drop_local_auth_pins();
265 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
266 return false;
267 }
268 }
269
270 dout(20) << " must xlock " << *lock << " " << *object << dendl;
271
272 mustpin.insert(object);
273
274 // augment xlock with a versionlock?
275 if (lock->get_type() == CEPH_LOCK_DN) {
276 CDentry *dn = static_cast<CDentry*>(object);
277 if (!dn->is_auth())
278 continue;
279 if (mdr->is_master()) {
280 // master. wrlock versionlock so we can pipeline dentry updates to journal.
281 lov.add_wrlock(&dn->versionlock);
282 } else {
283 // slave. exclusively lock the dentry version (i.e. block other journal updates).
284 // this makes rollback safe.
285 lov.add_xlock(&dn->versionlock);
286 }
287 }
288 if (lock->get_type() > CEPH_LOCK_IVERSION) {
289 // inode version lock?
290 CInode *in = static_cast<CInode*>(object);
291 if (!in->is_auth())
292 continue;
293 if (mdr->is_master()) {
294 // master. wrlock versionlock so we can pipeline inode updates to journal.
295 lov.add_wrlock(&in->versionlock);
296 } else {
297 // slave. exclusively lock the inode version (i.e. block other journal updates).
298 // this makes rollback safe.
299 lov.add_xlock(&in->versionlock);
300 }
301 }
302 } else if (p.is_wrlock()) {
303 dout(20) << " must wrlock " << *lock << " " << *object << dendl;
304 if (object->is_auth()) {
305 mustpin.insert(object);
306 } else if (!object->is_auth() &&
307 !lock->can_wrlock(client) && // we might have to request a scatter
308 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
309 dout(15) << " will also auth_pin " << *object
310 << " in case we need to request a scatter" << dendl;
311 mustpin.insert(object);
312 }
313 } else if (p.is_remote_wrlock()) {
314 dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
315 << *lock << " " << *object << dendl;
316 mustpin.insert(object);
317 } else if (p.is_rdlock()) {
318
319 dout(20) << " must rdlock " << *lock << " " << *object << dendl;
320 if (object->is_auth()) {
321 mustpin.insert(object);
322 } else if (!object->is_auth() &&
323 !lock->can_rdlock(client)) { // we might have to request an rdlock
324 dout(15) << " will also auth_pin " << *object
325 << " in case we need to request a rdlock" << dendl;
326 mustpin.insert(object);
327 }
328 } else {
329 ceph_assert(0 == "locker unknown lock operation");
330 }
331 }
332
333 lov.sort_and_merge();
334
335 // AUTH PINS
336 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
337
338 // can i auth pin them all now?
339 marker.message = "failed to authpin local pins";
340 for (const auto &p : mustpin) {
341 MDSCacheObject *object = p;
342
343 dout(10) << " must authpin " << *object << dendl;
344
345 if (mdr->is_auth_pinned(object)) {
346 if (object != (MDSCacheObject*)auth_pin_freeze)
347 continue;
348 if (mdr->more()->is_remote_frozen_authpin) {
349 if (mdr->more()->rename_inode == auth_pin_freeze)
350 continue;
351 // unfreeze auth pin for the wrong inode
352 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
353 }
354 }
355
356 if (!object->is_auth()) {
357 if (!mdr->locks.empty())
358 drop_locks(mdr.get());
359 if (object->is_ambiguous_auth()) {
360 // wait
361 marker.message = "waiting for single auth, object is being migrated";
362 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
363 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
364 mdr->drop_local_auth_pins();
365 return false;
366 }
367 mustpin_remote[object->authority().first].insert(object);
368 continue;
369 }
370 int err = 0;
371 if (!object->can_auth_pin(&err)) {
372 // wait
373 drop_locks(mdr.get());
374 mdr->drop_local_auth_pins();
375 if (auth_pin_nonblock) {
376 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
377 mdr->aborted = true;
378 return false;
379 }
380 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
381 marker.message = "failed to authpin, subtree is being exported";
382 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
383 marker.message = "failed to authpin, dir is being fragmented";
384 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
385 marker.message = "failed to authpin, inode is being exported";
386 }
387 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
388 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
389
390 if (!mdr->remote_auth_pins.empty())
391 notify_freeze_waiter(object);
392
393 return false;
394 }
395 }
396
397 // ok, grab local auth pins
398 for (const auto& p : mustpin) {
399 MDSCacheObject *object = p;
400 if (mdr->is_auth_pinned(object)) {
401 dout(10) << " already auth_pinned " << *object << dendl;
402 } else if (object->is_auth()) {
403 dout(10) << " auth_pinning " << *object << dendl;
404 mdr->auth_pin(object);
405 }
406 }
407
408 // request remote auth_pins
409 if (!mustpin_remote.empty()) {
410 marker.message = "requesting remote authpins";
411 for (const auto& p : mdr->remote_auth_pins) {
412 if (mustpin.count(p.first)) {
413 ceph_assert(p.second == p.first->authority().first);
414 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
415 if (q != mustpin_remote.end())
416 q->second.insert(p.first);
417 }
418 }
419 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
420 p != mustpin_remote.end();
421 ++p) {
422 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
423
424 // wait for active auth
425 if (mds->is_cluster_degraded() &&
426 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
427 dout(10) << " mds." << p->first << " is not active" << dendl;
428 if (mdr->more()->waiting_on_slave.empty())
429 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
430 return false;
431 }
432
433 auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPIN);
434 for (set<MDSCacheObject*>::iterator q = p->second.begin();
435 q != p->second.end();
436 ++q) {
437 dout(10) << " req remote auth_pin of " << **q << dendl;
438 MDSCacheObjectInfo info;
439 (*q)->set_object_info(info);
440 req->get_authpins().push_back(info);
441 if (*q == auth_pin_freeze)
442 (*q)->set_object_info(req->get_authpin_freeze());
443 mdr->pin(*q);
444 }
445 if (auth_pin_nonblock)
446 req->mark_nonblock();
447 mds->send_message_mds(req, p->first);
448
449 // put in waiting list
450 ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
451 mdr->more()->waiting_on_slave.insert(p->first);
452 }
453 return false;
454 }
455
456 // caps i'll need to issue
457 set<CInode*> issue_set;
458 bool result = false;
459
460 // acquire locks.
461 // make sure they match currently acquired locks.
462 auto existing = mdr->locks.begin();
463 for (const auto& p : lov) {
464 bool need_wrlock = p.is_wrlock();
465 bool need_remote_wrlock = p.is_remote_wrlock();
466
467 // already locked?
468 if (existing != mdr->locks.end() && existing->lock == p.lock) {
469 // right kind?
470 auto it = existing++;
471 auto have = *it; // don't reference
472
473 if (have.is_xlock() && p.is_xlock()) {
474 dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
475 continue;
476 }
477
478 if (have.is_remote_wrlock() &&
479 (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
480 dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
481 << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
482 remote_wrlock_finish(it, mdr.get());
483 have.clear_remote_wrlock();
484 }
485
486 if (need_wrlock || need_remote_wrlock) {
487 if (need_wrlock == have.is_wrlock() &&
488 need_remote_wrlock == have.is_remote_wrlock()) {
489 if (need_wrlock)
490 dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
491 if (need_remote_wrlock)
492 dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
493 continue;
494 }
495
496 if (have.is_wrlock()) {
497 if (!need_wrlock)
498 dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
499 else if (need_remote_wrlock) // acquire remote_wrlock first
500 dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
501 bool need_issue = false;
502 wrlock_finish(it, mdr.get(), &need_issue);
503 if (need_issue)
504 issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
505 }
506 } else if (have.is_rdlock() && p.is_rdlock()) {
507 dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
508 continue;
509 }
510 }
511
512 // hose any stray locks
513 while (existing != mdr->locks.end()) {
514 auto it = existing++;
515 auto stray = *it; // don't reference
516 dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
517 bool need_issue = false;
518 if (stray.is_xlock()) {
519 xlock_finish(it, mdr.get(), &need_issue);
520 } else if (stray.is_rdlock()) {
521 rdlock_finish(it, mdr.get(), &need_issue);
522 } else {
523 // may have acquired both wrlock and remore wrlock
524 if (stray.is_wrlock())
525 wrlock_finish(it, mdr.get(), &need_issue);
526 if (stray.is_remote_wrlock())
527 remote_wrlock_finish(it, mdr.get());
528 }
529 if (need_issue)
530 issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
531 }
532
533 // lock
534 if (mdr->locking && p.lock != mdr->locking) {
535 cancel_locking(mdr.get(), &issue_set);
536 }
537 if (p.is_xlock()) {
538 marker.message = "failed to xlock, waiting";
539 if (!xlock_start(p.lock, mdr))
540 goto out;
541 dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
542 } else if (need_wrlock || need_remote_wrlock) {
543 if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
544 marker.message = "waiting for remote wrlocks";
545 remote_wrlock_start(p, p.wrlock_target, mdr);
546 goto out;
547 }
548 if (need_wrlock) {
549 marker.message = "failed to wrlock, waiting";
550 if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
551 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
552 // can't take the wrlock because the scatter lock is gathering. need to
553 // release the remote wrlock, so that the gathering process can finish.
554 auto it = mdr->locks.end();
555 ++it;
556 remote_wrlock_finish(it, mdr.get());
557 remote_wrlock_start(p, p.wrlock_target, mdr);
558 goto out;
559 }
560 // nowait if we have already gotten remote wrlock
561 if (!wrlock_start(p, mdr, need_remote_wrlock))
562 goto out;
563 dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
564 }
565 } else {
566 ceph_assert(mdr->is_master());
567 if (p.lock->needs_recover()) {
568 if (mds->is_cluster_degraded()) {
569 if (!mdr->is_queued_for_replay()) {
570 // see comments in SimpleLock::set_state_rejoin() and
571 // ScatterLock::encode_state_for_rejoin()
572 drop_locks(mdr.get());
573 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
574 dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
575 << ", waiting for cluster recovered" << dendl;
576 marker.message = "rejoin recovering lock, waiting for cluster recovered";
577 return false;
578 }
579 } else {
580 p.lock->clear_need_recover();
581 }
582 }
583
584 marker.message = "failed to rdlock, waiting";
585 if (!rdlock_start(p, mdr))
586 goto out;
587 dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
588 }
589 }
590
591 // any extra unneeded locks?
592 while (existing != mdr->locks.end()) {
593 auto it = existing++;
594 auto stray = *it;
595 dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
596 bool need_issue = false;
597 if (stray.is_xlock()) {
598 xlock_finish(it, mdr.get(), &need_issue);
599 } else if (stray.is_rdlock()) {
600 rdlock_finish(it, mdr.get(), &need_issue);
601 } else {
602 // may have acquired both wrlock and remore wrlock
603 if (stray.is_wrlock())
604 wrlock_finish(it, mdr.get(), &need_issue);
605 if (stray.is_remote_wrlock())
606 remote_wrlock_finish(it, mdr.get());
607 }
608 if (need_issue)
609 issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
610 }
611
612 mdr->done_locking = true;
613 mdr->set_mds_stamp(ceph_clock_now());
614 result = true;
615 marker.message = "acquired locks";
616
617 out:
618 issue_caps_set(issue_set);
619 return result;
620 }
621
622 void Locker::notify_freeze_waiter(MDSCacheObject *o)
623 {
624 CDir *dir = NULL;
625 if (CInode *in = dynamic_cast<CInode*>(o)) {
626 if (!in->is_root())
627 dir = in->get_parent_dir();
628 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
629 dir = dn->get_dir();
630 } else {
631 dir = dynamic_cast<CDir*>(o);
632 ceph_assert(dir);
633 }
634 if (dir) {
635 if (dir->is_freezing_dir())
636 mdcache->fragment_freeze_inc_num_waiters(dir);
637 if (dir->is_freezing_tree()) {
638 while (!dir->is_freezing_tree_root())
639 dir = dir->get_parent_dir();
640 mdcache->migrator->export_freeze_inc_num_waiters(dir);
641 }
642 }
643 }
644
645 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
646 {
647 for (const auto &p : mut->locks) {
648 if (!p.is_xlock())
649 continue;
650 MDSCacheObject *obj = p.lock->get_parent();
651 ceph_assert(obj->is_auth());
652 if (skip_dentry &&
653 (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
654 continue;
655 dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
656 p.lock->set_xlock_done();
657 }
658 }
659
660 void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
661 bool drop_rdlocks)
662 {
663 set<mds_rank_t> slaves;
664
665 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
666 SimpleLock *lock = it->lock;
667 MDSCacheObject *obj = lock->get_parent();
668
669 if (it->is_xlock()) {
670 if (obj->is_auth()) {
671 bool ni = false;
672 xlock_finish(it++, mut, &ni);
673 if (ni)
674 pneed_issue->insert(static_cast<CInode*>(obj));
675 } else {
676 ceph_assert(lock->get_sm()->can_remote_xlock);
677 slaves.insert(obj->authority().first);
678 lock->put_xlock();
679 mut->locks.erase(it++);
680 }
681 } else if (it->is_wrlock() || it->is_remote_wrlock()) {
682 if (it->is_remote_wrlock()) {
683 slaves.insert(it->wrlock_target);
684 it->clear_remote_wrlock();
685 }
686 if (it->is_wrlock()) {
687 bool ni = false;
688 wrlock_finish(it++, mut, &ni);
689 if (ni)
690 pneed_issue->insert(static_cast<CInode*>(obj));
691 } else {
692 mut->locks.erase(it++);
693 }
694 } else if (drop_rdlocks && it->is_rdlock()) {
695 bool ni = false;
696 rdlock_finish(it++, mut, &ni);
697 if (ni)
698 pneed_issue->insert(static_cast<CInode*>(obj));
699 } else {
700 ++it;
701 }
702 }
703
704 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
705 if (!mds->is_cluster_degraded() ||
706 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
707 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
708 auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_DROPLOCKS);
709 mds->send_message_mds(slavereq, *p);
710 }
711 }
712 }
713
714 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
715 {
716 SimpleLock *lock = mut->locking;
717 ceph_assert(lock);
718 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
719
720 if (lock->get_parent()->is_auth()) {
721 bool need_issue = false;
722 if (lock->get_state() == LOCK_PREXLOCK) {
723 _finish_xlock(lock, -1, &need_issue);
724 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
725 lock->set_state(LOCK_XLOCKDONE);
726 eval_gather(lock, true, &need_issue);
727 }
728 if (need_issue)
729 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
730 }
731 mut->finish_locking(lock);
732 }
733
734 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
735 {
736 // leftover locks
737 set<CInode*> my_need_issue;
738 if (!pneed_issue)
739 pneed_issue = &my_need_issue;
740
741 if (mut->locking)
742 cancel_locking(mut, pneed_issue);
743 _drop_locks(mut, pneed_issue, true);
744
745 if (pneed_issue == &my_need_issue)
746 issue_caps_set(*pneed_issue);
747 mut->done_locking = false;
748 }
749
750 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
751 {
752 set<CInode*> my_need_issue;
753 if (!pneed_issue)
754 pneed_issue = &my_need_issue;
755
756 _drop_locks(mut, pneed_issue, false);
757
758 if (pneed_issue == &my_need_issue)
759 issue_caps_set(*pneed_issue);
760 }
761
762 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
763 {
764 set<CInode*> need_issue;
765
766 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
767 if (!it->is_rdlock()) {
768 ++it;
769 continue;
770 }
771 SimpleLock *lock = it->lock;
772 // make later mksnap/setlayout (at other mds) wait for this unsafe request
773 if (lock->get_type() == CEPH_LOCK_ISNAP ||
774 lock->get_type() == CEPH_LOCK_IPOLICY) {
775 ++it;
776 continue;
777 }
778 bool ni = false;
779 rdlock_finish(it++, mut, &ni);
780 if (ni)
781 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
782 }
783
784 issue_caps_set(need_issue);
785 }
786
787 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
788 {
789 set<CInode*> need_issue;
790
791 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
792 SimpleLock *lock = it->lock;
793 if (lock->get_type() == CEPH_LOCK_IDFT) {
794 ++it;
795 continue;
796 }
797 bool ni = false;
798 wrlock_finish(it++, mut, &ni);
799 if (ni)
800 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
801 }
802 issue_caps_set(need_issue);
803 }
804
805 // generics
806
807 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
808 {
809 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
810 ceph_assert(!lock->is_stable());
811
812 int next = lock->get_next_state();
813
814 CInode *in = 0;
815 bool caps = lock->get_cap_shift();
816 if (lock->get_type() != CEPH_LOCK_DN)
817 in = static_cast<CInode *>(lock->get_parent());
818
819 bool need_issue = false;
820
821 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
822 ceph_assert(!caps || in != NULL);
823 if (caps && in->is_head()) {
824 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
825 lock->get_cap_shift(), lock->get_cap_mask());
826 dout(10) << " next state is " << lock->get_state_name(next)
827 << " issued/allows loner " << gcap_string(loner_issued)
828 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
829 << " xlocker " << gcap_string(xlocker_issued)
830 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
831 << " other " << gcap_string(other_issued)
832 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
833 << dendl;
834
835 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
836 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
837 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
838 need_issue = true;
839 }
840
841 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
842 bool auth = lock->get_parent()->is_auth();
843 if (!lock->is_gathering() &&
844 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
845 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
846 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
847 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
848 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
849 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
850 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
851 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
852 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
853 lock->get_state() != LOCK_MIX_SYNC2
854 ) {
855 dout(7) << "eval_gather finished gather on " << *lock
856 << " on " << *lock->get_parent() << dendl;
857
858 if (lock->get_sm() == &sm_filelock) {
859 ceph_assert(in);
860 if (in->state_test(CInode::STATE_RECOVERING)) {
861 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
862 return;
863 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
864 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
865 mds->mdcache->queue_file_recover(in);
866 mds->mdcache->do_file_recover();
867 return;
868 }
869 }
870
871 if (!lock->get_parent()->is_auth()) {
872 // replica: tell auth
873 mds_rank_t auth = lock->get_parent()->authority().first;
874
875 if (lock->get_parent()->is_rejoining() &&
876 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
877 dout(7) << "eval_gather finished gather, but still rejoining "
878 << *lock->get_parent() << dendl;
879 return;
880 }
881
882 if (!mds->is_cluster_degraded() ||
883 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
884 switch (lock->get_state()) {
885 case LOCK_SYNC_LOCK:
886 mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
887 break;
888
889 case LOCK_MIX_SYNC:
890 {
891 auto reply = MLock::create(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
892 lock->encode_locked_state(reply->get_data());
893 mds->send_message_mds(reply, auth);
894 next = LOCK_MIX_SYNC2;
895 (static_cast<ScatterLock *>(lock))->start_flush();
896 }
897 break;
898
899 case LOCK_MIX_SYNC2:
900 (static_cast<ScatterLock *>(lock))->finish_flush();
901 (static_cast<ScatterLock *>(lock))->clear_flushed();
902
903 case LOCK_SYNC_MIX2:
904 // do nothing, we already acked
905 break;
906
907 case LOCK_SYNC_MIX:
908 {
909 auto reply = MLock::create(lock, LOCK_AC_MIXACK, mds->get_nodeid());
910 mds->send_message_mds(reply, auth);
911 next = LOCK_SYNC_MIX2;
912 }
913 break;
914
915 case LOCK_MIX_LOCK:
916 {
917 bufferlist data;
918 lock->encode_locked_state(data);
919 mds->send_message_mds(MLock::create(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
920 (static_cast<ScatterLock *>(lock))->start_flush();
921 // we'll get an AC_LOCKFLUSHED to complete
922 }
923 break;
924
925 default:
926 ceph_abort();
927 }
928 }
929 } else {
930 // auth
931
932 // once the first (local) stage of mix->lock gather complete we can
933 // gather from replicas
934 if (lock->get_state() == LOCK_MIX_LOCK &&
935 lock->get_parent()->is_replicated()) {
936 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
937 send_lock_message(lock, LOCK_AC_LOCK);
938 lock->init_gather();
939 lock->set_state(LOCK_MIX_LOCK2);
940 return;
941 }
942
943 if (lock->is_dirty() && !lock->is_flushed()) {
944 scatter_writebehind(static_cast<ScatterLock *>(lock));
945 mds->mdlog->flush();
946 return;
947 }
948 lock->clear_flushed();
949
950 switch (lock->get_state()) {
951 // to mixed
952 case LOCK_TSYN_MIX:
953 case LOCK_SYNC_MIX:
954 case LOCK_EXCL_MIX:
955 case LOCK_XSYN_MIX:
956 in->start_scatter(static_cast<ScatterLock *>(lock));
957 if (lock->get_parent()->is_replicated()) {
958 bufferlist softdata;
959 lock->encode_locked_state(softdata);
960 send_lock_message(lock, LOCK_AC_MIX, softdata);
961 }
962 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
963 break;
964
965 case LOCK_XLOCK:
966 case LOCK_XLOCKDONE:
967 if (next != LOCK_SYNC)
968 break;
969 // fall-thru
970
971 // to sync
972 case LOCK_EXCL_SYNC:
973 case LOCK_LOCK_SYNC:
974 case LOCK_MIX_SYNC:
975 case LOCK_XSYN_SYNC:
976 if (lock->get_parent()->is_replicated()) {
977 bufferlist softdata;
978 lock->encode_locked_state(softdata);
979 send_lock_message(lock, LOCK_AC_SYNC, softdata);
980 }
981 break;
982 }
983
984 }
985
986 lock->set_state(next);
987
988 if (lock->get_parent()->is_auth() &&
989 lock->is_stable())
990 lock->get_parent()->auth_unpin(lock);
991
992 // drop loner before doing waiters
993 if (caps &&
994 in->is_head() &&
995 in->is_auth() &&
996 in->get_wanted_loner() != in->get_loner()) {
997 dout(10) << " trying to drop loner" << dendl;
998 if (in->try_drop_loner()) {
999 dout(10) << " dropped loner" << dendl;
1000 need_issue = true;
1001 }
1002 }
1003
1004 if (pfinishers)
1005 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1006 *pfinishers);
1007 else
1008 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1009
1010 if (caps && in->is_head())
1011 need_issue = true;
1012
1013 if (lock->get_parent()->is_auth() &&
1014 lock->is_stable())
1015 try_eval(lock, &need_issue);
1016 }
1017
1018 if (need_issue) {
1019 if (pneed_issue)
1020 *pneed_issue = true;
1021 else if (in->is_head())
1022 issue_caps(in);
1023 }
1024
1025 }
1026
1027 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1028 {
1029 bool need_issue = caps_imported;
1030 MDSContext::vec finishers;
1031
1032 dout(10) << "eval " << mask << " " << *in << dendl;
1033
1034 // choose loner?
1035 if (in->is_auth() && in->is_head()) {
1036 client_t orig_loner = in->get_loner();
1037 if (in->choose_ideal_loner()) {
1038 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1039 need_issue = true;
1040 mask = -1;
1041 } else if (in->get_wanted_loner() != in->get_loner()) {
1042 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1043 mask = -1;
1044 }
1045 }
1046
1047 retry:
1048 if (mask & CEPH_LOCK_IFILE)
1049 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1050 if (mask & CEPH_LOCK_IAUTH)
1051 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1052 if (mask & CEPH_LOCK_ILINK)
1053 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1054 if (mask & CEPH_LOCK_IXATTR)
1055 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1056 if (mask & CEPH_LOCK_INEST)
1057 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1058 if (mask & CEPH_LOCK_IFLOCK)
1059 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1060 if (mask & CEPH_LOCK_IPOLICY)
1061 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1062
1063 // drop loner?
1064 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1065 if (in->try_drop_loner()) {
1066 need_issue = true;
1067 if (in->get_wanted_loner() >= 0) {
1068 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1069 bool ok = in->try_set_loner();
1070 ceph_assert(ok);
1071 mask = -1;
1072 goto retry;
1073 }
1074 }
1075 }
1076
1077 finish_contexts(g_ceph_context, finishers);
1078
1079 if (need_issue && in->is_head())
1080 issue_caps(in);
1081
1082 dout(10) << "eval done" << dendl;
1083 return need_issue;
1084 }
1085
1086 class C_Locker_Eval : public LockerContext {
1087 MDSCacheObject *p;
1088 int mask;
1089 public:
1090 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1091 // We are used as an MDSCacheObject waiter, so should
1092 // only be invoked by someone already holding the big lock.
1093 ceph_assert(locker->mds->mds_lock.is_locked_by_me());
1094 p->get(MDSCacheObject::PIN_PTRWAITER);
1095 }
1096 void finish(int r) override {
1097 locker->try_eval(p, mask);
1098 p->put(MDSCacheObject::PIN_PTRWAITER);
1099 }
1100 };
1101
1102 void Locker::try_eval(MDSCacheObject *p, int mask)
1103 {
1104 // unstable and ambiguous auth?
1105 if (p->is_ambiguous_auth()) {
1106 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1107 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1108 return;
1109 }
1110
1111 if (p->is_auth() && p->is_frozen()) {
1112 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1113 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1114 return;
1115 }
1116
1117 if (mask & CEPH_LOCK_DN) {
1118 ceph_assert(mask == CEPH_LOCK_DN);
1119 bool need_issue = false; // ignore this, no caps on dentries
1120 CDentry *dn = static_cast<CDentry *>(p);
1121 eval_any(&dn->lock, &need_issue);
1122 } else {
1123 CInode *in = static_cast<CInode *>(p);
1124 eval(in, mask);
1125 }
1126 }
1127
1128 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1129 {
1130 MDSCacheObject *p = lock->get_parent();
1131
1132 // unstable and ambiguous auth?
1133 if (p->is_ambiguous_auth()) {
1134 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1135 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1136 return;
1137 }
1138
1139 if (!p->is_auth()) {
1140 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1141 return;
1142 }
1143
1144 if (p->is_frozen()) {
1145 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1146 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1147 return;
1148 }
1149
1150 /*
1151 * We could have a situation like:
1152 *
1153 * - mds A authpins item on mds B
1154 * - mds B starts to freeze tree containing item
1155 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1156 * - mds B lock is unstable, sets scatter_wanted
1157 * - mds B lock stabilizes, calls try_eval.
1158 *
1159 * We can defer while freezing without causing a deadlock. Honor
1160 * scatter_wanted flag here. This will never get deferred by the
1161 * checks above due to the auth_pin held by the master.
1162 */
1163 if (lock->is_scatterlock()) {
1164 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1165 if (slock->get_scatter_wanted() &&
1166 slock->get_state() != LOCK_MIX) {
1167 scatter_mix(slock, pneed_issue);
1168 if (!lock->is_stable())
1169 return;
1170 } else if (slock->get_unscatter_wanted() &&
1171 slock->get_state() != LOCK_LOCK) {
1172 simple_lock(slock, pneed_issue);
1173 if (!lock->is_stable()) {
1174 return;
1175 }
1176 }
1177 }
1178
1179 if (lock->get_type() != CEPH_LOCK_DN &&
1180 lock->get_type() != CEPH_LOCK_ISNAP &&
1181 p->is_freezing()) {
1182 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1183 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1184 return;
1185 }
1186
1187 eval(lock, pneed_issue);
1188 }
1189
1190 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1191 {
1192 bool need_issue = false;
1193 MDSContext::vec finishers;
1194
1195 // kick locks now
1196 if (!in->filelock.is_stable())
1197 eval_gather(&in->filelock, false, &need_issue, &finishers);
1198 if (!in->authlock.is_stable())
1199 eval_gather(&in->authlock, false, &need_issue, &finishers);
1200 if (!in->linklock.is_stable())
1201 eval_gather(&in->linklock, false, &need_issue, &finishers);
1202 if (!in->xattrlock.is_stable())
1203 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1204
1205 if (need_issue && in->is_head()) {
1206 if (issue_set)
1207 issue_set->insert(in);
1208 else
1209 issue_caps(in);
1210 }
1211
1212 finish_contexts(g_ceph_context, finishers);
1213 }
1214
1215 void Locker::eval_scatter_gathers(CInode *in)
1216 {
1217 bool need_issue = false;
1218 MDSContext::vec finishers;
1219
1220 dout(10) << "eval_scatter_gathers " << *in << dendl;
1221
1222 // kick locks now
1223 if (!in->filelock.is_stable())
1224 eval_gather(&in->filelock, false, &need_issue, &finishers);
1225 if (!in->nestlock.is_stable())
1226 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1227 if (!in->dirfragtreelock.is_stable())
1228 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1229
1230 if (need_issue && in->is_head())
1231 issue_caps(in);
1232
1233 finish_contexts(g_ceph_context, finishers);
1234 }
1235
1236 void Locker::eval(SimpleLock *lock, bool *need_issue)
1237 {
1238 switch (lock->get_type()) {
1239 case CEPH_LOCK_IFILE:
1240 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1241 case CEPH_LOCK_IDFT:
1242 case CEPH_LOCK_INEST:
1243 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1244 default:
1245 return simple_eval(lock, need_issue);
1246 }
1247 }
1248
1249
1250 // ------------------
1251 // rdlock
1252
1253 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1254 {
1255 // kick the lock
1256 if (lock->is_stable()) {
1257 if (lock->get_parent()->is_auth()) {
1258 if (lock->get_sm() == &sm_scatterlock) {
1259 // not until tempsync is fully implemented
1260 //if (lock->get_parent()->is_replicated())
1261 //scatter_tempsync((ScatterLock*)lock);
1262 //else
1263 simple_sync(lock);
1264 } else if (lock->get_sm() == &sm_filelock) {
1265 CInode *in = static_cast<CInode*>(lock->get_parent());
1266 if (lock->get_state() == LOCK_EXCL &&
1267 in->get_target_loner() >= 0 &&
1268 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1269 file_xsyn(lock);
1270 else
1271 simple_sync(lock);
1272 } else
1273 simple_sync(lock);
1274 return true;
1275 } else {
1276 // request rdlock state change from auth
1277 mds_rank_t auth = lock->get_parent()->authority().first;
1278 if (!mds->is_cluster_degraded() ||
1279 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1280 dout(10) << "requesting rdlock from auth on "
1281 << *lock << " on " << *lock->get_parent() << dendl;
1282 mds->send_message_mds(MLock::create(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1283 }
1284 return false;
1285 }
1286 }
1287 if (lock->get_type() == CEPH_LOCK_IFILE) {
1288 CInode *in = static_cast<CInode *>(lock->get_parent());
1289 if (in->state_test(CInode::STATE_RECOVERING)) {
1290 mds->mdcache->recovery_queue.prioritize(in);
1291 }
1292 }
1293
1294 return false;
1295 }
1296
1297 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSContext *con)
1298 {
1299 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1300
1301 // can read? grab ref.
1302 if (lock->can_rdlock(client))
1303 return true;
1304
1305 _rdlock_kick(lock, false);
1306
1307 if (lock->can_rdlock(client))
1308 return true;
1309
1310 // wait!
1311 if (con) {
1312 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1313 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1314 }
1315 return false;
1316 }
1317
1318 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1319 {
1320 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1321
1322 // client may be allowed to rdlock the same item it has xlocked.
1323 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1324 if (mut->snapid != CEPH_NOSNAP)
1325 as_anon = true;
1326 client_t client = as_anon ? -1 : mut->get_client();
1327
1328 CInode *in = 0;
1329 if (lock->get_type() != CEPH_LOCK_DN)
1330 in = static_cast<CInode *>(lock->get_parent());
1331
1332 /*
1333 if (!lock->get_parent()->is_auth() &&
1334 lock->fw_rdlock_to_auth()) {
1335 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1336 return false;
1337 }
1338 */
1339
1340 while (1) {
1341 // can read? grab ref.
1342 if (lock->can_rdlock(client)) {
1343 lock->get_rdlock();
1344 mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
1345 return true;
1346 }
1347
1348 // hmm, wait a second.
1349 if (in && !in->is_head() && in->is_auth() &&
1350 lock->get_state() == LOCK_SNAP_SYNC) {
1351 // okay, we actually need to kick the head's lock to get ourselves synced up.
1352 CInode *head = mdcache->get_inode(in->ino());
1353 ceph_assert(head);
1354 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1355 if (hlock->get_state() == LOCK_SYNC)
1356 hlock = head->get_lock(lock->get_type());
1357
1358 if (hlock->get_state() != LOCK_SYNC) {
1359 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1360 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1361 return false;
1362 // oh, check our lock again then
1363 }
1364 }
1365
1366 if (!_rdlock_kick(lock, as_anon))
1367 break;
1368 }
1369
1370 // wait!
1371 int wait_on;
1372 if (lock->get_parent()->is_auth() && lock->is_stable())
1373 wait_on = SimpleLock::WAIT_RD;
1374 else
1375 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1376 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1377 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1378 nudge_log(lock);
1379 return false;
1380 }
1381
1382 void Locker::nudge_log(SimpleLock *lock)
1383 {
1384 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1385 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1386 mds->mdlog->flush();
1387 }
1388
1389 void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1390 {
1391 ceph_assert(it->is_rdlock());
1392 SimpleLock *lock = it->lock;
1393 // drop ref
1394 lock->put_rdlock();
1395 if (mut)
1396 mut->locks.erase(it);
1397
1398 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1399
1400 // last one?
1401 if (!lock->is_rdlocked()) {
1402 if (!lock->is_stable())
1403 eval_gather(lock, false, pneed_issue);
1404 else if (lock->get_parent()->is_auth())
1405 try_eval(lock, pneed_issue);
1406 }
1407 }
1408
1409
1410 bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
1411 {
1412 dout(10) << "can_rdlock_set " << dendl;
1413 for (const auto& p : lov) {
1414 ceph_assert(p.is_rdlock());
1415 if (!p.lock->can_rdlock(-1)) {
1416 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
1417 return false;
1418 }
1419 }
1420 return true;
1421 }
1422
1423
1424 void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
1425 {
1426 dout(10) << "rdlock_take_set " << dendl;
1427 for (const auto& p : lov) {
1428 ceph_assert(p.is_rdlock());
1429 p.lock->get_rdlock();
1430 mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
1431 }
1432 }
1433
1434 // ------------------
1435 // wrlock
1436
1437 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1438 {
1439 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1440 lock->get_type() == CEPH_LOCK_DVERSION)
1441 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1442
1443 dout(7) << "wrlock_force on " << *lock
1444 << " on " << *lock->get_parent() << dendl;
1445 lock->get_wrlock(true);
1446 mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
1447 }
1448
1449 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1450 {
1451 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1452 lock->get_type() == CEPH_LOCK_DVERSION)
1453 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1454
1455 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1456
1457 CInode *in = static_cast<CInode *>(lock->get_parent());
1458 client_t client = mut->get_client();
1459 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1460 (in->has_subtree_or_exporting_dirfrag() ||
1461 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1462
1463 while (1) {
1464 // wrlock?
1465 if (lock->can_wrlock(client) &&
1466 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1467 lock->get_wrlock();
1468 auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
1469 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1470 return true;
1471 }
1472
1473 if (lock->get_type() == CEPH_LOCK_IFILE &&
1474 in->state_test(CInode::STATE_RECOVERING)) {
1475 mds->mdcache->recovery_queue.prioritize(in);
1476 }
1477
1478 if (!lock->is_stable())
1479 break;
1480
1481 if (in->is_auth()) {
1482 // don't do nested lock state change if we have dirty scatterdata and
1483 // may scatter_writebehind or start_scatter, because nowait==true implies
1484 // that the caller already has a log entry open!
1485 if (nowait && lock->is_dirty())
1486 return false;
1487
1488 if (want_scatter)
1489 scatter_mix(static_cast<ScatterLock*>(lock));
1490 else
1491 simple_lock(lock);
1492
1493 if (nowait && !lock->can_wrlock(client))
1494 return false;
1495
1496 } else {
1497 // replica.
1498 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1499 mds_rank_t auth = lock->get_parent()->authority().first;
1500 if (!mds->is_cluster_degraded() ||
1501 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1502 dout(10) << "requesting scatter from auth on "
1503 << *lock << " on " << *lock->get_parent() << dendl;
1504 mds->send_message_mds(MLock::create(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1505 }
1506 break;
1507 }
1508 }
1509
1510 if (!nowait) {
1511 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1512 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1513 nudge_log(lock);
1514 }
1515
1516 return false;
1517 }
1518
1519 void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1520 {
1521 ceph_assert(it->is_wrlock());
1522 SimpleLock* lock = it->lock;
1523
1524 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1525 lock->get_type() == CEPH_LOCK_DVERSION)
1526 return local_wrlock_finish(it, mut);
1527
1528 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1529 lock->put_wrlock();
1530
1531 if (it->is_remote_wrlock())
1532 it->clear_wrlock();
1533 else
1534 mut->locks.erase(it);
1535
1536 if (!lock->is_wrlocked()) {
1537 if (!lock->is_stable())
1538 eval_gather(lock, false, pneed_issue);
1539 else if (lock->get_parent()->is_auth())
1540 try_eval(lock, pneed_issue);
1541 }
1542 }
1543
1544
1545 // remote wrlock
1546
1547 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1548 {
1549 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1550
1551 // wait for active target
1552 if (mds->is_cluster_degraded() &&
1553 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1554 dout(7) << " mds." << target << " is not active" << dendl;
1555 if (mut->more()->waiting_on_slave.empty())
1556 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1557 return;
1558 }
1559
1560 // send lock request
1561 mut->start_locking(lock, target);
1562 mut->more()->slaves.insert(target);
1563 auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
1564 r->set_lock_type(lock->get_type());
1565 lock->get_parent()->set_object_info(r->get_object_info());
1566 mds->send_message_mds(r, target);
1567
1568 ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
1569 mut->more()->waiting_on_slave.insert(target);
1570 }
1571
1572 void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1573 {
1574 ceph_assert(it->is_remote_wrlock());
1575 SimpleLock *lock = it->lock;
1576 mds_rank_t target = it->wrlock_target;
1577
1578 if (it->is_wrlock())
1579 it->clear_remote_wrlock();
1580 else
1581 mut->locks.erase(it);
1582
1583 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1584 << " " << *lock->get_parent() << dendl;
1585 if (!mds->is_cluster_degraded() ||
1586 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1587 auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
1588 slavereq->set_lock_type(lock->get_type());
1589 lock->get_parent()->set_object_info(slavereq->get_object_info());
1590 mds->send_message_mds(slavereq, target);
1591 }
1592 }
1593
1594
1595 // ------------------
1596 // xlock
1597
1598 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1599 {
1600 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1601 lock->get_type() == CEPH_LOCK_DVERSION)
1602 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1603
1604 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1605 client_t client = mut->get_client();
1606
1607 CInode *in = nullptr;
1608 if (lock->get_cap_shift())
1609 in = static_cast<CInode *>(lock->get_parent());
1610
1611 // auth?
1612 if (lock->get_parent()->is_auth()) {
1613 // auth
1614 while (1) {
1615 if (lock->can_xlock(client) &&
1616 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1617 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
1618 lock->set_state(LOCK_XLOCK);
1619 lock->get_xlock(mut, client);
1620 mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
1621 mut->finish_locking(lock);
1622 return true;
1623 }
1624
1625 if (lock->get_type() == CEPH_LOCK_IFILE &&
1626 in->state_test(CInode::STATE_RECOVERING)) {
1627 mds->mdcache->recovery_queue.prioritize(in);
1628 }
1629
1630 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1631 lock->get_xlock_by_client() != client ||
1632 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1633 break;
1634
1635 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1636 mut->start_locking(lock);
1637 simple_xlock(lock);
1638 } else {
1639 simple_lock(lock);
1640 }
1641 }
1642
1643 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1644 nudge_log(lock);
1645 return false;
1646 } else {
1647 // replica
1648 ceph_assert(lock->get_sm()->can_remote_xlock);
1649 ceph_assert(!mut->slave_request);
1650
1651 // wait for single auth
1652 if (lock->get_parent()->is_ambiguous_auth()) {
1653 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1654 new C_MDS_RetryRequest(mdcache, mut));
1655 return false;
1656 }
1657
1658 // wait for active auth
1659 mds_rank_t auth = lock->get_parent()->authority().first;
1660 if (mds->is_cluster_degraded() &&
1661 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1662 dout(7) << " mds." << auth << " is not active" << dendl;
1663 if (mut->more()->waiting_on_slave.empty())
1664 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1665 return false;
1666 }
1667
1668 // send lock request
1669 mut->more()->slaves.insert(auth);
1670 mut->start_locking(lock, auth);
1671 auto r = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
1672 r->set_lock_type(lock->get_type());
1673 lock->get_parent()->set_object_info(r->get_object_info());
1674 mds->send_message_mds(r, auth);
1675
1676 ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
1677 mut->more()->waiting_on_slave.insert(auth);
1678
1679 return false;
1680 }
1681 }
1682
1683 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1684 {
1685 ceph_assert(!lock->is_stable());
1686 if (lock->get_type() != CEPH_LOCK_DN &&
1687 lock->get_type() != CEPH_LOCK_ISNAP &&
1688 lock->get_num_rdlocks() == 0 &&
1689 lock->get_num_wrlocks() == 0 &&
1690 !lock->is_leased() &&
1691 lock->get_state() != LOCK_XLOCKSNAP) {
1692 CInode *in = static_cast<CInode*>(lock->get_parent());
1693 client_t loner = in->get_target_loner();
1694 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1695 lock->set_state(LOCK_EXCL);
1696 lock->get_parent()->auth_unpin(lock);
1697 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1698 if (lock->get_cap_shift())
1699 *pneed_issue = true;
1700 if (lock->get_parent()->is_auth() &&
1701 lock->is_stable())
1702 try_eval(lock, pneed_issue);
1703 return;
1704 }
1705 }
1706 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1707 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1708 }
1709
1710 void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
1711 {
1712 ceph_assert(it->is_xlock());
1713 SimpleLock *lock = it->lock;
1714
1715 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1716 lock->get_type() == CEPH_LOCK_DVERSION)
1717 return local_xlock_finish(it, mut);
1718
1719 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1720
1721 client_t xlocker = lock->get_xlock_by_client();
1722
1723 // drop ref
1724 lock->put_xlock();
1725 ceph_assert(mut);
1726 mut->locks.erase(it);
1727
1728 bool do_issue = false;
1729
1730 // remote xlock?
1731 if (!lock->get_parent()->is_auth()) {
1732 ceph_assert(lock->get_sm()->can_remote_xlock);
1733
1734 // tell auth
1735 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1736 mds_rank_t auth = lock->get_parent()->authority().first;
1737 if (!mds->is_cluster_degraded() ||
1738 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1739 auto slavereq = MMDSSlaveRequest::create(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
1740 slavereq->set_lock_type(lock->get_type());
1741 lock->get_parent()->set_object_info(slavereq->get_object_info());
1742 mds->send_message_mds(slavereq, auth);
1743 }
1744 // others waiting?
1745 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1746 SimpleLock::WAIT_WR |
1747 SimpleLock::WAIT_RD, 0);
1748 } else {
1749 if (lock->get_num_xlocks() == 0 &&
1750 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
1751 _finish_xlock(lock, xlocker, &do_issue);
1752 }
1753 }
1754
1755 if (do_issue) {
1756 CInode *in = static_cast<CInode*>(lock->get_parent());
1757 if (in->is_head()) {
1758 if (pneed_issue)
1759 *pneed_issue = true;
1760 else
1761 issue_caps(in);
1762 }
1763 }
1764 }
1765
1766 void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
1767 {
1768 ceph_assert(it->is_xlock());
1769 SimpleLock *lock = it->lock;
1770 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1771
1772 lock->put_xlock();
1773 mut->locks.erase(it);
1774
1775 MDSCacheObject *p = lock->get_parent();
1776 ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1777
1778 if (!lock->is_stable())
1779 lock->get_parent()->auth_unpin(lock);
1780
1781 lock->set_state(LOCK_LOCK);
1782 }
1783
1784 void Locker::xlock_import(SimpleLock *lock)
1785 {
1786 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1787 lock->get_parent()->auth_pin(lock);
1788 }
1789
1790
1791
1792 // file i/o -----------------------------------------
1793
1794 version_t Locker::issue_file_data_version(CInode *in)
1795 {
1796 dout(7) << "issue_file_data_version on " << *in << dendl;
1797 return in->inode.file_data_version;
1798 }
1799
1800 class C_Locker_FileUpdate_finish : public LockerLogContext {
1801 CInode *in;
1802 MutationRef mut;
1803 unsigned flags;
1804 client_t client;
1805 MClientCaps::ref ack;
1806 public:
1807 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
1808 const MClientCaps::ref &ack, client_t c=-1)
1809 : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
1810 in->get(CInode::PIN_PTRWAITER);
1811 }
1812 void finish(int r) override {
1813 locker->file_update_finish(in, mut, flags, client, ack);
1814 in->put(CInode::PIN_PTRWAITER);
1815 }
1816 };
1817
1818 enum {
1819 UPDATE_SHAREMAX = 1,
1820 UPDATE_NEEDSISSUE = 2,
1821 UPDATE_SNAPFLUSH = 4,
1822 };
1823
1824 void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
1825 client_t client, const MClientCaps::ref &ack)
1826 {
1827 dout(10) << "file_update_finish on " << *in << dendl;
1828 in->pop_and_dirty_projected_inode(mut->ls);
1829
1830 mut->apply();
1831
1832 if (ack) {
1833 Session *session = mds->get_session(client);
1834 if (session) {
1835 // "oldest flush tid" > 0 means client uses unique TID for each flush
1836 if (ack->get_oldest_flush_tid() > 0)
1837 session->add_completed_flush(ack->get_client_tid());
1838 mds->send_message_client_counted(ack, session);
1839 } else {
1840 dout(10) << " no session for client." << client << " " << *ack << dendl;
1841 }
1842 }
1843
1844 set<CInode*> need_issue;
1845 drop_locks(mut.get(), &need_issue);
1846
1847 if (in->is_head()) {
1848 if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
1849 Capability *cap = in->get_client_cap(client);
1850 if (cap && (cap->wanted() & ~cap->pending()))
1851 issue_caps(in, cap);
1852 }
1853
1854 if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
1855 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1856 share_inode_max_size(in);
1857
1858 } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
1859 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1860 // check for snap writeback completion
1861 bool gather = false;
1862 auto p = in->client_snap_caps.begin();
1863 while (p != in->client_snap_caps.end()) {
1864 auto q = p->second.find(client);
1865 if (q != p->second.end()) {
1866 SimpleLock *lock = in->get_lock(p->first);
1867 ceph_assert(lock);
1868 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1869 << " lock " << *lock << " on " << *in << dendl;
1870 lock->put_wrlock();
1871
1872 p->second.erase(q);
1873 if (p->second.empty()) {
1874 gather = true;
1875 in->client_snap_caps.erase(p++);
1876 } else
1877 ++p;
1878 }
1879 }
1880 if (gather) {
1881 if (in->client_snap_caps.empty()) {
1882 in->item_open_file.remove_myself();
1883 in->item_caps.remove_myself();
1884 }
1885 eval_cap_gather(in, &need_issue);
1886 }
1887 }
1888 issue_caps_set(need_issue);
1889
1890 mds->balancer->hit_inode(in, META_POP_IWR);
1891
1892 // auth unpin after issuing caps
1893 mut->cleanup();
1894 }
1895
1896 Capability* Locker::issue_new_caps(CInode *in,
1897 int mode,
1898 Session *session,
1899 SnapRealm *realm,
1900 bool is_replay)
1901 {
1902 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1903 bool is_new;
1904
1905 // if replay, try to reconnect cap, and otherwise do nothing.
1906 if (is_replay)
1907 return mds->mdcache->try_reconnect_cap(in, session);
1908
1909
1910 // my needs
1911 ceph_assert(session->info.inst.name.is_client());
1912 client_t my_client = session->get_client();
1913 int my_want = ceph_caps_for_mode(mode);
1914
1915 // register a capability
1916 Capability *cap = in->get_client_cap(my_client);
1917 if (!cap) {
1918 // new cap
1919 cap = in->add_client_cap(my_client, session, realm);
1920 cap->set_wanted(my_want);
1921 cap->mark_new();
1922 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1923 is_new = true;
1924 } else {
1925 is_new = false;
1926 // make sure it wants sufficient caps
1927 if (my_want & ~cap->wanted()) {
1928 // augment wanted caps for this client
1929 cap->set_wanted(cap->wanted() | my_want);
1930 }
1931 }
1932
1933 if (in->is_auth()) {
1934 // [auth] twiddle mode?
1935 eval(in, CEPH_CAP_LOCKS);
1936
1937 if (_need_flush_mdlog(in, my_want))
1938 mds->mdlog->flush();
1939
1940 } else {
1941 // [replica] tell auth about any new caps wanted
1942 request_inode_file_caps(in);
1943 }
1944
1945 // issue caps (pot. incl new one)
1946 //issue_caps(in); // note: _eval above may have done this already...
1947
1948 // re-issue whatever we can
1949 //cap->issue(cap->pending());
1950
1951 if (is_new)
1952 cap->dec_suppress();
1953
1954 return cap;
1955 }
1956
1957
1958 void Locker::issue_caps_set(set<CInode*>& inset)
1959 {
1960 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1961 issue_caps(*p);
1962 }
1963
1964 class C_Locker_RevokeStaleCap : public LockerContext {
1965 CInode *in;
1966 client_t client;
1967 public:
1968 C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
1969 LockerContext(l), in(i), client(c) {
1970 in->get(CInode::PIN_PTRWAITER);
1971 }
1972 void finish(int r) override {
1973 locker->revoke_stale_cap(in, client);
1974 in->put(CInode::PIN_PTRWAITER);
1975 }
1976 };
1977
1978 int Locker::issue_caps(CInode *in, Capability *only_cap)
1979 {
1980 // allowed caps are determined by the lock mode.
1981 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1982 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1983 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1984
1985 client_t loner = in->get_loner();
1986 if (loner >= 0) {
1987 dout(7) << "issue_caps loner client." << loner
1988 << " allowed=" << ccap_string(loner_allowed)
1989 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1990 << ", others allowed=" << ccap_string(all_allowed)
1991 << " on " << *in << dendl;
1992 } else {
1993 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1994 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1995 << " on " << *in << dendl;
1996 }
1997
1998 ceph_assert(in->is_head());
1999
2000 // count conflicts with
2001 int nissued = 0;
2002
2003 // client caps
2004 map<client_t, Capability>::iterator it;
2005 if (only_cap)
2006 it = in->client_caps.find(only_cap->get_client());
2007 else
2008 it = in->client_caps.begin();
2009 for (; it != in->client_caps.end(); ++it) {
2010 Capability *cap = &it->second;
2011
2012 // do not issue _new_ bits when size|mtime is projected
2013 int allowed;
2014 if (loner == it->first)
2015 allowed = loner_allowed;
2016 else
2017 allowed = all_allowed;
2018
2019 // add in any xlocker-only caps (for locks this client is the xlocker for)
2020 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2021
2022 if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
2023 cap->is_noinline()) ||
2024 (!in->inode.layout.pool_ns.empty() &&
2025 cap->is_nopoolns()))
2026 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2027
2028 int pending = cap->pending();
2029 int wanted = cap->wanted();
2030
2031 dout(20) << " client." << it->first
2032 << " pending " << ccap_string(pending)
2033 << " allowed " << ccap_string(allowed)
2034 << " wanted " << ccap_string(wanted)
2035 << dendl;
2036
2037 if (!(pending & ~allowed)) {
2038 // skip if suppress or new, and not revocation
2039 if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
2040 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
2041 continue;
2042 }
2043 } else {
2044 ceph_assert(!cap->is_new());
2045 if (cap->is_stale()) {
2046 dout(20) << " revoke stale cap from client." << it->first << dendl;
2047 ceph_assert(!cap->is_valid());
2048 cap->issue(allowed & pending, false);
2049 mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
2050 continue;
2051 }
2052 }
2053
2054 // notify clients about deleted inode, to make sure they release caps ASAP.
2055 if (in->inode.nlink == 0)
2056 wanted |= CEPH_CAP_LINK_SHARED;
2057
2058 // are there caps that the client _wants_ and can have, but aren't pending?
2059 // or do we need to revoke?
2060 if ((pending & ~allowed) || // need to revoke ~allowed caps.
2061 ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2062 !cap->is_valid()) { // after stale->resume circle
2063 // issue
2064 nissued++;
2065
2066 // include caps that clients generally like, while we're at it.
2067 int likes = in->get_caps_liked();
2068 int before = pending;
2069 long seq;
2070 if (pending & ~allowed)
2071 seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
2072 else
2073 seq = cap->issue((wanted|likes) & allowed, true);
2074 int after = cap->pending();
2075
2076 dout(7) << " sending MClientCaps to client." << it->first
2077 << " seq " << seq << " new pending " << ccap_string(after)
2078 << " was " << ccap_string(before) << dendl;
2079
2080 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2081 if (op == CEPH_CAP_OP_REVOKE) {
2082 revoking_caps.push_back(&cap->item_revoking_caps);
2083 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2084 cap->set_last_revoke_stamp(ceph_clock_now());
2085 cap->reset_num_revoke_warnings();
2086 }
2087
2088 auto m = MClientCaps::create(op, in->ino(),
2089 in->find_snaprealm()->inode->ino(),
2090 cap->get_cap_id(),
2091 cap->get_last_seq(),
2092 after, wanted, 0,
2093 cap->get_mseq(),
2094 mds->get_osd_epoch_barrier());
2095 in->encode_cap_message(m, cap);
2096
2097 mds->send_message_client_counted(m, cap->get_session());
2098 }
2099
2100 if (only_cap)
2101 break;
2102 }
2103
2104 return nissued;
2105 }
2106
2107 void Locker::issue_truncate(CInode *in)
2108 {
2109 dout(7) << "issue_truncate on " << *in << dendl;
2110
2111 for (auto &p : in->client_caps) {
2112 Capability *cap = &p.second;
2113 auto m = MClientCaps::create(CEPH_CAP_OP_TRUNC,
2114 in->ino(),
2115 in->find_snaprealm()->inode->ino(),
2116 cap->get_cap_id(), cap->get_last_seq(),
2117 cap->pending(), cap->wanted(), 0,
2118 cap->get_mseq(),
2119 mds->get_osd_epoch_barrier());
2120 in->encode_cap_message(m, cap);
2121 mds->send_message_client_counted(m, p.first);
2122 }
2123
2124 // should we increase max_size?
2125 if (in->is_auth() && in->is_file())
2126 check_inode_max_size(in);
2127 }
2128
2129
2130 void Locker::revoke_stale_cap(CInode *in, client_t client)
2131 {
2132 dout(7) << __func__ << " client." << client << " on " << *in << dendl;
2133 Capability *cap = in->get_client_cap(client);
2134 if (!cap)
2135 return;
2136
2137 if (cap->revoking() & CEPH_CAP_ANY_WR) {
2138 std::stringstream ss;
2139 mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr);
2140 return;
2141 }
2142
2143 cap->revoke();
2144
2145 if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
2146 in->state_set(CInode::STATE_NEEDSRECOVER);
2147
2148 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2149 return;
2150
2151 if (!in->filelock.is_stable())
2152 eval_gather(&in->filelock);
2153 if (!in->linklock.is_stable())
2154 eval_gather(&in->linklock);
2155 if (!in->authlock.is_stable())
2156 eval_gather(&in->authlock);
2157 if (!in->xattrlock.is_stable())
2158 eval_gather(&in->xattrlock);
2159
2160 if (in->is_auth())
2161 try_eval(in, CEPH_CAP_LOCKS);
2162 else
2163 request_inode_file_caps(in);
2164 }
2165
2166 bool Locker::revoke_stale_caps(Session *session)
2167 {
2168 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2169
2170 // invalidate all caps
2171 session->inc_cap_gen();
2172
2173 std::vector<CInode*> to_eval;
2174
2175 for (auto p = session->caps.begin(); !p.end(); ) {
2176 Capability *cap = *p;
2177 ++p;
2178 if (!cap->is_notable()) {
2179 // the rest ones are not being revoked and don't have writeable range
2180 // and don't want exclusive caps or want file read/write. They don't
2181 // need recover, they don't affect eval_gather()/try_eval()
2182 break;
2183 }
2184
2185 int revoking = cap->revoking();
2186 if (!revoking)
2187 continue;
2188
2189 if (revoking & CEPH_CAP_ANY_WR)
2190 return false;
2191
2192 int issued = cap->issued();
2193 CInode *in = cap->get_inode();
2194 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2195 cap->revoke();
2196
2197 if (in->is_auth() &&
2198 in->inode.client_ranges.count(cap->get_client()))
2199 in->state_set(CInode::STATE_NEEDSRECOVER);
2200
2201 // eval lock/inode may finish contexts, which may modify other cap's position
2202 // in the session->caps.
2203 to_eval.push_back(in);
2204 }
2205
2206 for (auto in : to_eval) {
2207 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2208 continue;
2209
2210 if (!in->filelock.is_stable())
2211 eval_gather(&in->filelock);
2212 if (!in->linklock.is_stable())
2213 eval_gather(&in->linklock);
2214 if (!in->authlock.is_stable())
2215 eval_gather(&in->authlock);
2216 if (!in->xattrlock.is_stable())
2217 eval_gather(&in->xattrlock);
2218
2219 if (in->is_auth())
2220 try_eval(in, CEPH_CAP_LOCKS);
2221 else
2222 request_inode_file_caps(in);
2223 }
2224
2225 return true;
2226 }
2227
2228 void Locker::resume_stale_caps(Session *session)
2229 {
2230 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2231
2232 bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
2233 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
2234 Capability *cap = *p;
2235 ++p;
2236 if (lazy && !cap->is_notable())
2237 break; // see revoke_stale_caps()
2238
2239 CInode *in = cap->get_inode();
2240 ceph_assert(in->is_head());
2241 dout(10) << " clearing stale flag on " << *in << dendl;
2242
2243 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2244 // if export succeeds, the cap will be removed. if export fails,
2245 // we need to re-issue the cap if it's not stale.
2246 in->state_set(CInode::STATE_EVALSTALECAPS);
2247 continue;
2248 }
2249
2250 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2251 issue_caps(in, cap);
2252 }
2253 }
2254
2255 void Locker::remove_stale_leases(Session *session)
2256 {
2257 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2258 xlist<ClientLease*>::iterator p = session->leases.begin();
2259 while (!p.end()) {
2260 ClientLease *l = *p;
2261 ++p;
2262 CDentry *parent = static_cast<CDentry*>(l->parent);
2263 dout(15) << " removing lease on " << *parent << dendl;
2264 parent->remove_client_lease(l, this);
2265 }
2266 }
2267
2268
2269 class C_MDL_RequestInodeFileCaps : public LockerContext {
2270 CInode *in;
2271 public:
2272 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2273 in->get(CInode::PIN_PTRWAITER);
2274 }
2275 void finish(int r) override {
2276 if (!in->is_auth())
2277 locker->request_inode_file_caps(in);
2278 in->put(CInode::PIN_PTRWAITER);
2279 }
2280 };
2281
2282 void Locker::request_inode_file_caps(CInode *in)
2283 {
2284 ceph_assert(!in->is_auth());
2285
2286 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2287 if (wanted != in->replica_caps_wanted) {
2288 // wait for single auth
2289 if (in->is_ambiguous_auth()) {
2290 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2291 new C_MDL_RequestInodeFileCaps(this, in));
2292 return;
2293 }
2294
2295 mds_rank_t auth = in->authority().first;
2296 if (mds->is_cluster_degraded() &&
2297 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2298 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2299 return;
2300 }
2301
2302 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2303 << " was " << ccap_string(in->replica_caps_wanted)
2304 << " on " << *in << " to mds." << auth << dendl;
2305
2306 in->replica_caps_wanted = wanted;
2307
2308 if (!mds->is_cluster_degraded() ||
2309 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2310 mds->send_message_mds(MInodeFileCaps::create(in->ino(), in->replica_caps_wanted), auth);
2311 }
2312 }
2313
2314 void Locker::handle_inode_file_caps(const MInodeFileCaps::const_ref &m)
2315 {
2316 // nobody should be talking to us during recovery.
2317 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2318 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2319 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2320 return;
2321 }
2322 ceph_abort_msg("got unexpected message during recovery");
2323 }
2324
2325 // ok
2326 CInode *in = mdcache->get_inode(m->get_ino());
2327 mds_rank_t from = mds_rank_t(m->get_source().num());
2328
2329 ceph_assert(in);
2330 ceph_assert(in->is_auth());
2331
2332 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2333
2334 in->set_mds_caps_wanted(from, m->get_caps());
2335
2336 try_eval(in, CEPH_CAP_LOCKS);
2337 }
2338
2339
2340 class C_MDL_CheckMaxSize : public LockerContext {
2341 CInode *in;
2342 uint64_t new_max_size;
2343 uint64_t newsize;
2344 utime_t mtime;
2345
2346 public:
2347 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2348 uint64_t _newsize, utime_t _mtime) :
2349 LockerContext(l), in(i),
2350 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2351 {
2352 in->get(CInode::PIN_PTRWAITER);
2353 }
2354 void finish(int r) override {
2355 if (in->is_auth())
2356 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2357 in->put(CInode::PIN_PTRWAITER);
2358 }
2359 };
2360
2361 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
2362 {
2363 uint64_t new_max = (size + 1) << 1;
2364 uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
2365 if (max_inc > 0) {
2366 max_inc *= pi->layout.object_size;
2367 new_max = std::min(new_max, size + max_inc);
2368 }
2369 return round_up_to(new_max, pi->get_layout_size_increment());
2370 }
2371
2372 void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
2373 CInode::mempool_inode::client_range_map *new_ranges,
2374 bool *max_increased)
2375 {
2376 auto latest = in->get_projected_inode();
2377 uint64_t ms;
2378 if (latest->has_layout()) {
2379 ms = calc_new_max_size(latest, size);
2380 } else {
2381 // Layout-less directories like ~mds0/, have zero size
2382 ms = 0;
2383 }
2384
2385 // increase ranges as appropriate.
2386 // shrink to 0 if no WR|BUFFER caps issued.
2387 for (auto &p : in->client_caps) {
2388 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2389 client_writeable_range_t& nr = (*new_ranges)[p.first];
2390 nr.range.first = 0;
2391 if (latest->client_ranges.count(p.first)) {
2392 client_writeable_range_t& oldr = latest->client_ranges[p.first];
2393 if (ms > oldr.range.last)
2394 *max_increased = true;
2395 nr.range.last = std::max(ms, oldr.range.last);
2396 nr.follows = oldr.follows;
2397 } else {
2398 *max_increased = true;
2399 nr.range.last = ms;
2400 nr.follows = in->first - 1;
2401 }
2402 if (update)
2403 p.second.mark_clientwriteable();
2404 } else {
2405 if (update)
2406 p.second.clear_clientwriteable();
2407 }
2408 }
2409 }
2410
2411 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2412 uint64_t new_max_size, uint64_t new_size,
2413 utime_t new_mtime)
2414 {
2415 ceph_assert(in->is_auth());
2416 ceph_assert(in->is_file());
2417
2418 CInode::mempool_inode *latest = in->get_projected_inode();
2419 CInode::mempool_inode::client_range_map new_ranges;
2420 uint64_t size = latest->size;
2421 bool update_size = new_size > 0;
2422 bool update_max = false;
2423 bool max_increased = false;
2424
2425 if (update_size) {
2426 new_size = size = std::max(size, new_size);
2427 new_mtime = std::max(new_mtime, latest->mtime);
2428 if (latest->size == new_size && latest->mtime == new_mtime)
2429 update_size = false;
2430 }
2431
2432 int can_update = 1;
2433 if (in->is_frozen()) {
2434 can_update = -1;
2435 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2436 // lock?
2437 if (in->filelock.is_stable()) {
2438 if (in->get_target_loner() >= 0)
2439 file_excl(&in->filelock);
2440 else
2441 simple_lock(&in->filelock);
2442 }
2443 if (!in->filelock.can_wrlock(in->get_loner()))
2444 can_update = -2;
2445 }
2446
2447 calc_new_client_ranges(in, std::max(new_max_size, size), can_update > 0,
2448 &new_ranges, &max_increased);
2449
2450 if (max_increased || latest->client_ranges != new_ranges)
2451 update_max = true;
2452
2453 if (!update_size && !update_max) {
2454 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2455 return false;
2456 }
2457
2458 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2459 << " update_size " << update_size
2460 << " on " << *in << dendl;
2461
2462 if (can_update < 0) {
2463 auto cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime);
2464 if (can_update == -1) {
2465 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2466 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2467 } else {
2468 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2469 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2470 }
2471 return false;
2472 }
2473
2474 MutationRef mut(new MutationImpl());
2475 mut->ls = mds->mdlog->get_current_segment();
2476
2477 auto &pi = in->project_inode();
2478 pi.inode.version = in->pre_dirty();
2479
2480 if (update_max) {
2481 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2482 pi.inode.client_ranges = new_ranges;
2483 }
2484
2485 if (update_size) {
2486 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2487 pi.inode.size = new_size;
2488 pi.inode.rstat.rbytes = new_size;
2489 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2490 pi.inode.mtime = new_mtime;
2491 if (new_mtime > pi.inode.ctime) {
2492 pi.inode.ctime = new_mtime;
2493 if (new_mtime > pi.inode.rstat.rctime)
2494 pi.inode.rstat.rctime = new_mtime;
2495 }
2496 }
2497
2498 // use EOpen if the file is still open; otherwise, use EUpdate.
2499 // this is just an optimization to push open files forward into
2500 // newer log segments.
2501 LogEvent *le;
2502 EMetaBlob *metablob;
2503 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2504 EOpen *eo = new EOpen(mds->mdlog);
2505 eo->add_ino(in->ino());
2506 metablob = &eo->metablob;
2507 le = eo;
2508 } else {
2509 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2510 metablob = &eu->metablob;
2511 le = eu;
2512 }
2513 mds->mdlog->start_entry(le);
2514 if (update_size) { // FIXME if/when we do max_size nested accounting
2515 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2516 // no cow, here!
2517 CDentry *parent = in->get_projected_parent_dn();
2518 metablob->add_primary_dentry(parent, in, true);
2519 } else {
2520 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2521 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2522 }
2523 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
2524 UPDATE_SHAREMAX, MClientCaps::ref()));
2525 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2526 mut->auth_pin(in);
2527
2528 // make max_size _increase_ timely
2529 if (max_increased)
2530 mds->mdlog->flush();
2531
2532 return true;
2533 }
2534
2535
2536 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2537 {
2538 /*
2539 * only share if currently issued a WR cap. if client doesn't have it,
2540 * file_max doesn't matter, and the client will get it if/when they get
2541 * the cap later.
2542 */
2543 dout(10) << "share_inode_max_size on " << *in << dendl;
2544 map<client_t, Capability>::iterator it;
2545 if (only_cap)
2546 it = in->client_caps.find(only_cap->get_client());
2547 else
2548 it = in->client_caps.begin();
2549 for (; it != in->client_caps.end(); ++it) {
2550 const client_t client = it->first;
2551 Capability *cap = &it->second;
2552 if (cap->is_suppress())
2553 continue;
2554 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2555 dout(10) << "share_inode_max_size with client." << client << dendl;
2556 cap->inc_last_seq();
2557 auto m = MClientCaps::create(CEPH_CAP_OP_GRANT,
2558 in->ino(),
2559 in->find_snaprealm()->inode->ino(),
2560 cap->get_cap_id(),
2561 cap->get_last_seq(),
2562 cap->pending(),
2563 cap->wanted(), 0,
2564 cap->get_mseq(),
2565 mds->get_osd_epoch_barrier());
2566 in->encode_cap_message(m, cap);
2567 mds->send_message_client_counted(m, client);
2568 }
2569 if (only_cap)
2570 break;
2571 }
2572 }
2573
2574 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2575 {
2576 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2577 * pending mutations. */
2578 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2579 in->filelock.is_unstable_and_locked()) ||
2580 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2581 in->authlock.is_unstable_and_locked()) ||
2582 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2583 in->linklock.is_unstable_and_locked()) ||
2584 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2585 in->xattrlock.is_unstable_and_locked()))
2586 return true;
2587 return false;
2588 }
2589
2590 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2591 {
2592 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2593 dout(10) << " wanted " << ccap_string(cap->wanted())
2594 << " -> " << ccap_string(wanted) << dendl;
2595 cap->set_wanted(wanted);
2596 } else if (wanted & ~cap->wanted()) {
2597 dout(10) << " wanted " << ccap_string(cap->wanted())
2598 << " -> " << ccap_string(wanted)
2599 << " (added caps even though we had seq mismatch!)" << dendl;
2600 cap->set_wanted(wanted | cap->wanted());
2601 } else {
2602 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2603 << " -> " << ccap_string(wanted)
2604 << " (issue_seq " << issue_seq << " != last_issue "
2605 << cap->get_last_issue() << ")" << dendl;
2606 return;
2607 }
2608
2609 CInode *cur = cap->get_inode();
2610 if (!cur->is_auth()) {
2611 request_inode_file_caps(cur);
2612 return;
2613 }
2614
2615 if (cap->wanted()) {
2616 if (cur->state_test(CInode::STATE_RECOVERING) &&
2617 (cap->wanted() & (CEPH_CAP_FILE_RD |
2618 CEPH_CAP_FILE_WR))) {
2619 mds->mdcache->recovery_queue.prioritize(cur);
2620 }
2621
2622 if (mdcache->open_file_table.should_log_open(cur)) {
2623 ceph_assert(cur->last == CEPH_NOSNAP);
2624 EOpen *le = new EOpen(mds->mdlog);
2625 mds->mdlog->start_entry(le);
2626 le->add_clean_inode(cur);
2627 mds->mdlog->submit_entry(le);
2628 }
2629 }
2630 }
2631
2632 void Locker::snapflush_nudge(CInode *in)
2633 {
2634 ceph_assert(in->last != CEPH_NOSNAP);
2635 if (in->client_snap_caps.empty())
2636 return;
2637
2638 CInode *head = mdcache->get_inode(in->ino());
2639 // head inode gets unpinned when snapflush starts. It might get trimmed
2640 // before snapflush finishes.
2641 if (!head)
2642 return;
2643
2644 ceph_assert(head->is_auth());
2645 if (head->client_need_snapflush.empty())
2646 return;
2647
2648 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
2649 if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
2650 hlock = NULL;
2651 for (int i = 0; i < num_cinode_locks; i++) {
2652 SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
2653 if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
2654 hlock = lock;
2655 break;
2656 }
2657 }
2658 }
2659 if (hlock) {
2660 _rdlock_kick(hlock, true);
2661 } else {
2662 // also, requeue, in case of unstable lock
2663 need_snapflush_inodes.push_back(&in->item_caps);
2664 }
2665 }
2666
2667 void Locker::mark_need_snapflush_inode(CInode *in)
2668 {
2669 ceph_assert(in->last != CEPH_NOSNAP);
2670 if (!in->item_caps.is_on_list()) {
2671 need_snapflush_inodes.push_back(&in->item_caps);
2672 utime_t now = ceph_clock_now();
2673 in->last_dirstat_prop = now;
2674 dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
2675 }
2676 }
2677
2678 bool Locker::is_revoking_any_caps_from(client_t client)
2679 {
2680 auto it = revoking_caps_by_client.find(client);
2681 if (it == revoking_caps_by_client.end())
2682 return false;
2683 return !it->second.empty();
2684 }
2685
2686 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2687 {
2688 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2689 for (auto p = head_in->client_need_snapflush.begin();
2690 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2691 snapid_t snapid = p->first;
2692 auto &clients = p->second;
2693 ++p; // be careful, q loop below depends on this
2694
2695 if (clients.count(client)) {
2696 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2697 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2698 ceph_assert(sin);
2699 ceph_assert(sin->first <= snapid);
2700 _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref());
2701 head_in->remove_need_snapflush(sin, snapid, client);
2702 }
2703 }
2704 }
2705
2706
2707 bool Locker::should_defer_client_cap_frozen(CInode *in)
2708 {
2709 /*
2710 * This policy needs to be AT LEAST as permissive as allowing a client request
2711 * to go forward, or else a client request can release something, the release
2712 * gets deferred, but the request gets processed and deadlocks because when the
2713 * caps can't get revoked.
2714 *
2715 * Currently, a request wait if anything locked is freezing (can't
2716 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2717 * _MUST_ be in the lock/auth_pin set.
2718 *
2719 * auth_pins==0 implies no unstable lock and not auth pinnned by
2720 * client request, otherwise continue even it's freezing.
2721 */
2722 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2723 }
2724
2725 void Locker::handle_client_caps(const MClientCaps::const_ref &m)
2726 {
2727 client_t client = m->get_source().num();
2728 snapid_t follows = m->get_snap_follows();
2729 auto op = m->get_op();
2730 auto dirty = m->get_dirty();
2731 dout(7) << "handle_client_caps "
2732 << " on " << m->get_ino()
2733 << " tid " << m->get_client_tid() << " follows " << follows
2734 << " op " << ceph_cap_op_name(op)
2735 << " flags 0x" << std::hex << m->flags << std::dec << dendl;
2736
2737 Session *session = mds->get_session(m);
2738 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2739 if (!session) {
2740 dout(5) << " no session, dropping " << *m << dendl;
2741 return;
2742 }
2743 if (session->is_closed() ||
2744 session->is_closing() ||
2745 session->is_killing()) {
2746 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2747 return;
2748 }
2749 if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
2750 dirty && m->get_client_tid() > 0 &&
2751 !session->have_completed_flush(m->get_client_tid())) {
2752 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
2753 op == CEPH_CAP_OP_FLUSHSNAP);
2754 }
2755 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2756 return;
2757 }
2758
2759 if (m->get_client_tid() > 0 && session &&
2760 session->have_completed_flush(m->get_client_tid())) {
2761 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2762 << " for client." << client << dendl;
2763 MClientCaps::ref ack;
2764 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2765 ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
2766 } else {
2767 ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
2768 }
2769 ack->set_snap_follows(follows);
2770 ack->set_client_tid(m->get_client_tid());
2771 mds->send_message_client_counted(ack, m->get_connection());
2772 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2773 return;
2774 } else {
2775 // fall-thru because the message may release some caps
2776 dirty = false;
2777 op = CEPH_CAP_OP_UPDATE;
2778 }
2779 }
2780
2781 // "oldest flush tid" > 0 means client uses unique TID for each flush
2782 if (m->get_oldest_flush_tid() > 0 && session) {
2783 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2784 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2785
2786 if (session->get_num_trim_flushes_warnings() > 0 &&
2787 session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
2788 session->reset_num_trim_flushes_warnings();
2789 } else {
2790 if (session->get_num_completed_flushes() >=
2791 (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2792 session->inc_num_trim_flushes_warnings();
2793 stringstream ss;
2794 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2795 << m->get_oldest_flush_tid() << "), "
2796 << session->get_num_completed_flushes()
2797 << " completed flushes recorded in session";
2798 mds->clog->warn() << ss.str();
2799 dout(20) << __func__ << " " << ss.str() << dendl;
2800 }
2801 }
2802 }
2803
2804 CInode *head_in = mdcache->get_inode(m->get_ino());
2805 if (!head_in) {
2806 if (mds->is_clientreplay()) {
2807 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2808 << ", will try again after replayed client requests" << dendl;
2809 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2810 return;
2811 }
2812
2813 /*
2814 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2815 * Sequence of events that cause this are:
2816 * - client sends caps message to mds.a
2817 * - mds finishes subtree migration, send cap export to client
2818 * - mds trim its cache
2819 * - mds receives cap messages from client
2820 */
2821 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2822 return;
2823 }
2824
2825 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2826 // Pause RADOS operations until we see the required epoch
2827 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2828 }
2829
2830 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2831 // Record the barrier so that we will retransmit it to clients
2832 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2833 }
2834
2835 dout(10) << " head inode " << *head_in << dendl;
2836
2837 Capability *cap = 0;
2838 cap = head_in->get_client_cap(client);
2839 if (!cap) {
2840 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
2841 return;
2842 }
2843 ceph_assert(cap);
2844
2845 // freezing|frozen?
2846 if (should_defer_client_cap_frozen(head_in)) {
2847 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2848 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2849 return;
2850 }
2851 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2852 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2853 << ", dropping" << dendl;
2854 return;
2855 }
2856
2857 bool need_unpin = false;
2858
2859 // flushsnap?
2860 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2861 if (!head_in->is_auth()) {
2862 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
2863 goto out;
2864 }
2865
2866 SnapRealm *realm = head_in->find_snaprealm();
2867 snapid_t snap = realm->get_snap_following(follows);
2868 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2869
2870 auto p = head_in->client_need_snapflush.begin();
2871 if (p != head_in->client_need_snapflush.end() && p->first < snap) {
2872 head_in->auth_pin(this); // prevent subtree frozen
2873 need_unpin = true;
2874 _do_null_snapflush(head_in, client, snap);
2875 }
2876
2877 CInode *in = head_in;
2878 if (snap != CEPH_NOSNAP) {
2879 in = mdcache->pick_inode_snap(head_in, snap - 1);
2880 if (in != head_in)
2881 dout(10) << " snapped inode " << *in << dendl;
2882 }
2883
2884 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2885 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2886 // case we get a dup response, so whatever.)
2887 MClientCaps::ref ack;
2888 if (dirty) {
2889 ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
2890 ack->set_snap_follows(follows);
2891 ack->set_client_tid(m->get_client_tid());
2892 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2893 }
2894
2895 if (in == head_in ||
2896 (head_in->client_need_snapflush.count(snap) &&
2897 head_in->client_need_snapflush[snap].count(client))) {
2898 dout(7) << " flushsnap snap " << snap
2899 << " client." << client << " on " << *in << dendl;
2900
2901 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2902 if (in == head_in)
2903 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2904
2905 _do_snap_update(in, snap, dirty, follows, client, m, ack);
2906
2907 if (in != head_in)
2908 head_in->remove_need_snapflush(in, snap, client);
2909 } else {
2910 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2911 if (ack)
2912 mds->send_message_client_counted(ack, m->get_connection());
2913 }
2914 goto out;
2915 }
2916
2917 if (cap->get_cap_id() != m->get_cap_id()) {
2918 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2919 } else {
2920 CInode *in = head_in;
2921 if (follows > 0) {
2922 in = mdcache->pick_inode_snap(head_in, follows);
2923 // intermediate snap inodes
2924 while (in != head_in) {
2925 ceph_assert(in->last != CEPH_NOSNAP);
2926 if (in->is_auth() && dirty) {
2927 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2928 _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref());
2929 }
2930 in = mdcache->pick_inode_snap(head_in, in->last);
2931 }
2932 }
2933
2934 // head inode, and cap
2935 MClientCaps::ref ack;
2936
2937 int caps = m->get_caps();
2938 if (caps & ~cap->issued()) {
2939 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2940 caps &= cap->issued();
2941 }
2942
2943 cap->confirm_receipt(m->get_seq(), caps);
2944 dout(10) << " follows " << follows
2945 << " retains " << ccap_string(m->get_caps())
2946 << " dirty " << ccap_string(dirty)
2947 << " on " << *in << dendl;
2948
2949
2950 // missing/skipped snapflush?
2951 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2952 // presently only does so when it has actual dirty metadata. But, we
2953 // set up the need_snapflush stuff based on the issued caps.
2954 // We can infer that the client WONT send a FLUSHSNAP once they have
2955 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2956 // update/release).
2957 if (!head_in->client_need_snapflush.empty()) {
2958 if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
2959 !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
2960 head_in->auth_pin(this); // prevent subtree frozen
2961 need_unpin = true;
2962 _do_null_snapflush(head_in, client);
2963 } else {
2964 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2965 }
2966 }
2967
2968 bool need_snapflush = cap->need_snapflush();
2969 if (dirty && in->is_auth()) {
2970 dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
2971 << " seq " << m->get_seq() << " on " << *in << dendl;
2972 ack = MClientCaps::create(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2973 m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
2974 ack->set_client_tid(m->get_client_tid());
2975 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2976
2977 // client flushes and releases caps at the same time. make sure MDCache::cow_inode()
2978 // properly setup CInode::client_need_snapflush
2979 if (!need_snapflush && (dirty & ~cap->issued()) &&
2980 (m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
2981 cap->mark_needsnapflush();
2982 }
2983
2984 // filter wanted based on what we could ever give out (given auth/replica status)
2985 bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
2986 int new_wanted = m->get_wanted();
2987 if (new_wanted != cap->wanted()) {
2988 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
2989 // exapnding caps. make sure we aren't waiting for a log flush
2990 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2991 }
2992
2993 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2994 }
2995
2996 bool updated = in->is_auth() &&
2997 _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush);
2998
2999 if (cap->need_snapflush() &&
3000 (!need_snapflush || !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)))
3001 cap->clear_needsnapflush();
3002
3003 if (updated) {
3004 eval(in, CEPH_CAP_LOCKS);
3005
3006 if (!need_flush && (cap->wanted() & ~cap->pending()))
3007 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
3008 } else {
3009 // no update, ack now.
3010 if (ack)
3011 mds->send_message_client_counted(ack, m->get_connection());
3012
3013 bool did_issue = eval(in, CEPH_CAP_LOCKS);
3014 if (!did_issue && (cap->wanted() & ~cap->pending()))
3015 issue_caps(in, cap);
3016
3017 if (cap->get_last_seq() == 0 &&
3018 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
3019 share_inode_max_size(in, cap);
3020 }
3021 }
3022
3023 if (need_flush)
3024 mds->mdlog->flush();
3025 }
3026
3027 out:
3028 if (need_unpin)
3029 head_in->auth_unpin(this);
3030 }
3031
3032
3033 class C_Locker_RetryRequestCapRelease : public LockerContext {
3034 client_t client;
3035 ceph_mds_request_release item;
3036 public:
3037 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
3038 LockerContext(l), client(c), item(it) { }
3039 void finish(int r) override {
3040 string dname;
3041 MDRequestRef null_ref;
3042 locker->process_request_cap_release(null_ref, client, item, dname);
3043 }
3044 };
3045
3046 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
3047 std::string_view dname)
3048 {
3049 inodeno_t ino = (uint64_t)item.ino;
3050 uint64_t cap_id = item.cap_id;
3051 int caps = item.caps;
3052 int wanted = item.wanted;
3053 int seq = item.seq;
3054 int issue_seq = item.issue_seq;
3055 int mseq = item.mseq;
3056
3057 CInode *in = mdcache->get_inode(ino);
3058 if (!in)
3059 return;
3060
3061 if (dname.length()) {
3062 frag_t fg = in->pick_dirfrag(dname);
3063 CDir *dir = in->get_dirfrag(fg);
3064 if (dir) {
3065 CDentry *dn = dir->lookup(dname);
3066 if (dn) {
3067 ClientLease *l = dn->get_client_lease(client);
3068 if (l) {
3069 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
3070 dn->remove_client_lease(l, this);
3071 } else {
3072 dout(7) << "process_cap_release client." << client
3073 << " doesn't have lease on " << *dn << dendl;
3074 }
3075 } else {
3076 dout(7) << "process_cap_release client." << client << " released lease on dn "
3077 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3078 }
3079 }
3080 }
3081
3082 Capability *cap = in->get_client_cap(client);
3083 if (!cap)
3084 return;
3085
3086 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
3087 << (mdr ? "" : " (DEFERRED, no mdr)")
3088 << dendl;
3089
3090 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3091 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3092 return;
3093 }
3094
3095 if (cap->get_cap_id() != cap_id) {
3096 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3097 return;
3098 }
3099
3100 if (should_defer_client_cap_frozen(in)) {
3101 dout(7) << " frozen, deferring" << dendl;
3102 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3103 return;
3104 }
3105
3106 if (caps & ~cap->issued()) {
3107 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3108 caps &= cap->issued();
3109 }
3110 cap->confirm_receipt(seq, caps);
3111
3112 if (!in->client_need_snapflush.empty() &&
3113 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3114 _do_null_snapflush(in, client);
3115 }
3116
3117 adjust_cap_wanted(cap, wanted, issue_seq);
3118
3119 if (mdr)
3120 cap->inc_suppress();
3121 eval(in, CEPH_CAP_LOCKS);
3122 if (mdr)
3123 cap->dec_suppress();
3124
3125 // take note; we may need to reissue on this cap later
3126 if (mdr)
3127 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3128 }
3129
3130 class C_Locker_RetryKickIssueCaps : public LockerContext {
3131 CInode *in;
3132 client_t client;
3133 ceph_seq_t seq;
3134 public:
3135 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3136 LockerContext(l), in(i), client(c), seq(s) {
3137 in->get(CInode::PIN_PTRWAITER);
3138 }
3139 void finish(int r) override {
3140 locker->kick_issue_caps(in, client, seq);
3141 in->put(CInode::PIN_PTRWAITER);
3142 }
3143 };
3144
3145 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3146 {
3147 Capability *cap = in->get_client_cap(client);
3148 if (!cap || cap->get_last_seq() != seq)
3149 return;
3150 if (in->is_frozen()) {
3151 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3152 in->add_waiter(CInode::WAIT_UNFREEZE,
3153 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3154 return;
3155 }
3156 dout(10) << "kick_issue_caps released at current seq " << seq
3157 << ", reissuing" << dendl;
3158 issue_caps(in, cap);
3159 }
3160
3161 void Locker::kick_cap_releases(MDRequestRef& mdr)
3162 {
3163 client_t client = mdr->get_client();
3164 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3165 p != mdr->cap_releases.end();
3166 ++p) {
3167 CInode *in = mdcache->get_inode(p->first);
3168 if (!in)
3169 continue;
3170 kick_issue_caps(in, client, p->second);
3171 }
3172 }
3173
3174 /**
3175 * m and ack might be NULL, so don't dereference them unless dirty != 0
3176 */
3177 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const MClientCaps::const_ref &m, const MClientCaps::ref &ack)
3178 {
3179 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3180 << " follows " << follows << " snap " << snap
3181 << " on " << *in << dendl;
3182
3183 if (snap == CEPH_NOSNAP) {
3184 // hmm, i guess snap was already deleted? just ack!
3185 dout(10) << " wow, the snap following " << follows
3186 << " was already deleted. nothing to record, just ack." << dendl;
3187 if (ack)
3188 mds->send_message_client_counted(ack, m->get_connection());
3189 return;
3190 }
3191
3192 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3193 mds->mdlog->start_entry(le);
3194 MutationRef mut = new MutationImpl();
3195 mut->ls = mds->mdlog->get_current_segment();
3196
3197 // normal metadata updates that we can apply to the head as well.
3198
3199 // update xattrs?
3200 CInode::mempool_xattr_map *px = nullptr;
3201 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3202 m->xattrbl.length() &&
3203 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3204
3205 CInode::mempool_old_inode *oi = 0;
3206 if (in->is_multiversion()) {
3207 oi = in->pick_old_inode(snap);
3208 }
3209
3210 CInode::mempool_inode *i;
3211 if (oi) {
3212 dout(10) << " writing into old inode" << dendl;
3213 auto &pi = in->project_inode();
3214 pi.inode.version = in->pre_dirty();
3215 if (snap > oi->first)
3216 in->split_old_inode(snap);
3217 i = &oi->inode;
3218 if (xattrs)
3219 px = &oi->xattrs;
3220 } else {
3221 auto &pi = in->project_inode(xattrs);
3222 pi.inode.version = in->pre_dirty();
3223 i = &pi.inode;
3224 if (xattrs)
3225 px = pi.xattrs.get();
3226 }
3227
3228 _update_cap_fields(in, dirty, m, i);
3229
3230 // xattr
3231 if (xattrs) {
3232 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3233 << " len " << m->xattrbl.length() << dendl;
3234 i->xattr_version = m->head.xattr_version;
3235 auto p = m->xattrbl.cbegin();
3236 decode(*px, p);
3237 }
3238
3239 {
3240 auto it = i->client_ranges.find(client);
3241 if (it != i->client_ranges.end()) {
3242 if (in->last == snap) {
3243 dout(10) << " removing client_range entirely" << dendl;
3244 i->client_ranges.erase(it);
3245 } else {
3246 dout(10) << " client_range now follows " << snap << dendl;
3247 it->second.follows = snap;
3248 }
3249 }
3250 }
3251
3252 mut->auth_pin(in);
3253 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3254 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3255
3256 // "oldest flush tid" > 0 means client uses unique TID for each flush
3257 if (ack && ack->get_oldest_flush_tid() > 0)
3258 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3259 ack->get_oldest_flush_tid());
3260
3261 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
3262 ack, client));
3263 }
3264
3265 void Locker::_update_cap_fields(CInode *in, int dirty, const MClientCaps::const_ref &m, CInode::mempool_inode *pi)
3266 {
3267 if (dirty == 0)
3268 return;
3269
3270 /* m must be valid if there are dirty caps */
3271 ceph_assert(m);
3272 uint64_t features = m->get_connection()->get_features();
3273
3274 if (m->get_ctime() > pi->ctime) {
3275 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3276 << " for " << *in << dendl;
3277 pi->ctime = m->get_ctime();
3278 if (m->get_ctime() > pi->rstat.rctime)
3279 pi->rstat.rctime = m->get_ctime();
3280 }
3281
3282 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3283 m->get_change_attr() > pi->change_attr) {
3284 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3285 << " for " << *in << dendl;
3286 pi->change_attr = m->get_change_attr();
3287 }
3288
3289 // file
3290 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3291 utime_t atime = m->get_atime();
3292 utime_t mtime = m->get_mtime();
3293 uint64_t size = m->get_size();
3294 version_t inline_version = m->inline_version;
3295
3296 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3297 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3298 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3299 << " for " << *in << dendl;
3300 pi->mtime = mtime;
3301 if (mtime > pi->rstat.rctime)
3302 pi->rstat.rctime = mtime;
3303 }
3304 if (in->inode.is_file() && // ONLY if regular file
3305 size > pi->size) {
3306 dout(7) << " size " << pi->size << " -> " << size
3307 << " for " << *in << dendl;
3308 pi->size = size;
3309 pi->rstat.rbytes = size;
3310 }
3311 if (in->inode.is_file() &&
3312 (dirty & CEPH_CAP_FILE_WR) &&
3313 inline_version > pi->inline_data.version) {
3314 pi->inline_data.version = inline_version;
3315 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3316 pi->inline_data.get_data() = m->inline_data;
3317 else
3318 pi->inline_data.free_data();
3319 }
3320 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3321 dout(7) << " atime " << pi->atime << " -> " << atime
3322 << " for " << *in << dendl;
3323 pi->atime = atime;
3324 }
3325 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3326 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3327 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3328 << " for " << *in << dendl;
3329 pi->time_warp_seq = m->get_time_warp_seq();
3330 }
3331 }
3332 // auth
3333 if (dirty & CEPH_CAP_AUTH_EXCL) {
3334 if (m->head.uid != pi->uid) {
3335 dout(7) << " uid " << pi->uid
3336 << " -> " << m->head.uid
3337 << " for " << *in << dendl;
3338 pi->uid = m->head.uid;
3339 }
3340 if (m->head.gid != pi->gid) {
3341 dout(7) << " gid " << pi->gid
3342 << " -> " << m->head.gid
3343 << " for " << *in << dendl;
3344 pi->gid = m->head.gid;
3345 }
3346 if (m->head.mode != pi->mode) {
3347 dout(7) << " mode " << oct << pi->mode
3348 << " -> " << m->head.mode << dec
3349 << " for " << *in << dendl;
3350 pi->mode = m->head.mode;
3351 }
3352 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3353 dout(7) << " btime " << oct << pi->btime
3354 << " -> " << m->get_btime() << dec
3355 << " for " << *in << dendl;
3356 pi->btime = m->get_btime();
3357 }
3358 }
3359 }
3360
3361 /*
3362 * update inode based on cap flush|flushsnap|wanted.
3363 * adjust max_size, if needed.
3364 * if we update, return true; otherwise, false (no updated needed).
3365 */
3366 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3367 int dirty, snapid_t follows,
3368 const MClientCaps::const_ref &m, const MClientCaps::ref &ack,
3369 bool *need_flush)
3370 {
3371 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3372 << " issued " << ccap_string(cap ? cap->issued() : 0)
3373 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3374 << " on " << *in << dendl;
3375 ceph_assert(in->is_auth());
3376 client_t client = m->get_source().num();
3377 CInode::mempool_inode *latest = in->get_projected_inode();
3378
3379 // increase or zero max_size?
3380 uint64_t size = m->get_size();
3381 bool change_max = false;
3382 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3383 uint64_t new_max = old_max;
3384
3385 if (in->is_file()) {
3386 bool forced_change_max = false;
3387 dout(20) << "inode is file" << dendl;
3388 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3389 dout(20) << "client has write caps; m->get_max_size="
3390 << m->get_max_size() << "; old_max=" << old_max << dendl;
3391 if (m->get_max_size() > new_max) {
3392 dout(10) << "client requests file_max " << m->get_max_size()
3393 << " > max " << old_max << dendl;
3394 change_max = true;
3395 forced_change_max = true;
3396 new_max = calc_new_max_size(latest, m->get_max_size());
3397 } else {
3398 new_max = calc_new_max_size(latest, size);
3399
3400 if (new_max > old_max)
3401 change_max = true;
3402 else
3403 new_max = old_max;
3404 }
3405 } else {
3406 if (old_max) {
3407 change_max = true;
3408 new_max = 0;
3409 }
3410 }
3411
3412 if (in->last == CEPH_NOSNAP &&
3413 change_max &&
3414 !in->filelock.can_wrlock(client) &&
3415 !in->filelock.can_force_wrlock(client)) {
3416 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3417 if (in->filelock.is_stable()) {
3418 bool need_issue = false;
3419 if (cap)
3420 cap->inc_suppress();
3421 if (in->get_mds_caps_wanted().empty() &&
3422 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3423 if (in->filelock.get_state() != LOCK_EXCL)
3424 file_excl(&in->filelock, &need_issue);
3425 } else
3426 simple_lock(&in->filelock, &need_issue);
3427 if (need_issue)
3428 issue_caps(in);
3429 if (cap)
3430 cap->dec_suppress();
3431 }
3432 if (!in->filelock.can_wrlock(client) &&
3433 !in->filelock.can_force_wrlock(client)) {
3434 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3435 forced_change_max ? new_max : 0,
3436 0, utime_t());
3437
3438 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3439 change_max = false;
3440 }
3441 }
3442 }
3443
3444 if (m->flockbl.length()) {
3445 int32_t num_locks;
3446 auto bli = m->flockbl.cbegin();
3447 decode(num_locks, bli);
3448 for ( int i=0; i < num_locks; ++i) {
3449 ceph_filelock decoded_lock;
3450 decode(decoded_lock, bli);
3451 in->get_fcntl_lock_state()->held_locks.
3452 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3453 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3454 }
3455 decode(num_locks, bli);
3456 for ( int i=0; i < num_locks; ++i) {
3457 ceph_filelock decoded_lock;
3458 decode(decoded_lock, bli);
3459 in->get_flock_lock_state()->held_locks.
3460 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3461 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3462 }
3463 }
3464
3465 if (!dirty && !change_max)
3466 return false;
3467
3468 Session *session = mds->get_session(m);
3469 if (session->check_access(in, MAY_WRITE,
3470 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3471 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3472 return false;
3473 }
3474
3475 // do the update.
3476 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3477 mds->mdlog->start_entry(le);
3478
3479 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3480 m->xattrbl.length() &&
3481 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3482
3483 auto &pi = in->project_inode(xattr);
3484 pi.inode.version = in->pre_dirty();
3485
3486 MutationRef mut(new MutationImpl());
3487 mut->ls = mds->mdlog->get_current_segment();
3488
3489 _update_cap_fields(in, dirty, m, &pi.inode);
3490
3491 if (change_max) {
3492 dout(7) << " max_size " << old_max << " -> " << new_max
3493 << " for " << *in << dendl;
3494 if (new_max) {
3495 auto &cr = pi.inode.client_ranges[client];
3496 cr.range.first = 0;
3497 cr.range.last = new_max;
3498 cr.follows = in->first - 1;
3499 if (cap)
3500 cap->mark_clientwriteable();
3501 } else {
3502 pi.inode.client_ranges.erase(client);
3503 if (cap)
3504 cap->clear_clientwriteable();
3505 }
3506 }
3507
3508 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3509 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3510
3511 // auth
3512 if (dirty & CEPH_CAP_AUTH_EXCL)
3513 wrlock_force(&in->authlock, mut);
3514
3515 // xattrs update?
3516 if (xattr) {
3517 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3518 pi.inode.xattr_version = m->head.xattr_version;
3519 auto p = m->xattrbl.cbegin();
3520 decode(*pi.xattrs, p);
3521 wrlock_force(&in->xattrlock, mut);
3522 }
3523
3524 mut->auth_pin(in);
3525 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3526 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3527
3528 // "oldest flush tid" > 0 means client uses unique TID for each flush
3529 if (ack && ack->get_oldest_flush_tid() > 0)
3530 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3531 ack->get_oldest_flush_tid());
3532
3533 unsigned update_flags = 0;
3534 if (change_max)
3535 update_flags |= UPDATE_SHAREMAX;
3536 if (cap)
3537 update_flags |= UPDATE_NEEDSISSUE;
3538 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
3539 ack, client));
3540 if (need_flush && !*need_flush &&
3541 ((change_max && new_max) || // max INCREASE
3542 _need_flush_mdlog(in, dirty)))
3543 *need_flush = true;
3544
3545 return true;
3546 }
3547
3548 void Locker::handle_client_cap_release(const MClientCapRelease::const_ref &m)
3549 {
3550 client_t client = m->get_source().num();
3551 dout(10) << "handle_client_cap_release " << *m << dendl;
3552
3553 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3554 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3555 return;
3556 }
3557
3558 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3559 // Pause RADOS operations until we see the required epoch
3560 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3561 }
3562
3563 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3564 // Record the barrier so that we will retransmit it to clients
3565 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3566 }
3567
3568 Session *session = mds->get_session(m);
3569
3570 for (const auto &cap : m->caps) {
3571 _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
3572 }
3573
3574 if (session) {
3575 session->notify_cap_release(m->caps.size());
3576 }
3577 }
3578
3579 class C_Locker_RetryCapRelease : public LockerContext {
3580 client_t client;
3581 inodeno_t ino;
3582 uint64_t cap_id;
3583 ceph_seq_t migrate_seq;
3584 ceph_seq_t issue_seq;
3585 public:
3586 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3587 ceph_seq_t mseq, ceph_seq_t seq) :
3588 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3589 void finish(int r) override {
3590 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3591 }
3592 };
3593
3594 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3595 ceph_seq_t mseq, ceph_seq_t seq)
3596 {
3597 CInode *in = mdcache->get_inode(ino);
3598 if (!in) {
3599 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3600 return;
3601 }
3602 Capability *cap = in->get_client_cap(client);
3603 if (!cap) {
3604 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3605 return;
3606 }
3607
3608 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3609 if (cap->get_cap_id() != cap_id) {
3610 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3611 return;
3612 }
3613 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3614 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3615 return;
3616 }
3617 if (should_defer_client_cap_frozen(in)) {
3618 dout(7) << " freezing|frozen, deferring" << dendl;
3619 in->add_waiter(CInode::WAIT_UNFREEZE,
3620 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3621 return;
3622 }
3623 if (seq != cap->get_last_issue()) {
3624 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3625 // clean out any old revoke history
3626 cap->clean_revoke_from(seq);
3627 eval_cap_gather(in);
3628 return;
3629 }
3630 remove_client_cap(in, cap);
3631 }
3632
3633 void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
3634 {
3635 client_t client = cap->get_client();
3636 // clean out any pending snapflush state
3637 if (!in->client_need_snapflush.empty())
3638 _do_null_snapflush(in, client);
3639
3640 bool notable = cap->is_notable();
3641 in->remove_client_cap(client);
3642 if (!notable)
3643 return;
3644
3645 if (in->is_auth()) {
3646 // make sure we clear out the client byte range
3647 if (in->get_projected_inode()->client_ranges.count(client) &&
3648 !(in->inode.nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
3649 if (kill)
3650 in->state_set(CInode::STATE_NEEDSRECOVER);
3651 else
3652 check_inode_max_size(in);
3653 }
3654 } else {
3655 request_inode_file_caps(in);
3656 }
3657
3658 try_eval(in, CEPH_CAP_LOCKS);
3659 }
3660
3661
3662 /**
3663 * Return true if any currently revoking caps exceed the
3664 * session_timeout threshold.
3665 */
3666 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
3667 double timeout) const
3668 {
3669 xlist<Capability*>::const_iterator p = revoking.begin();
3670 if (p.end()) {
3671 // No revoking caps at the moment
3672 return false;
3673 } else {
3674 utime_t now = ceph_clock_now();
3675 utime_t age = now - (*p)->get_last_revoke_stamp();
3676 if (age <= timeout) {
3677 return false;
3678 } else {
3679 return true;
3680 }
3681 }
3682 }
3683
3684 void Locker::get_late_revoking_clients(std::list<client_t> *result,
3685 double timeout) const
3686 {
3687 if (!any_late_revoking_caps(revoking_caps, timeout)) {
3688 // Fast path: no misbehaving clients, execute in O(1)
3689 return;
3690 }
3691
3692 // Slow path: execute in O(N_clients)
3693 for (auto &p : revoking_caps_by_client) {
3694 if (any_late_revoking_caps(p.second, timeout)) {
3695 result->push_back(p.first);
3696 }
3697 }
3698 }
3699
3700 // Hard-code instead of surfacing a config settings because this is
3701 // really a hack that should go away at some point when we have better
3702 // inspection tools for getting at detailed cap state (#7316)
3703 #define MAX_WARN_CAPS 100
3704
3705 void Locker::caps_tick()
3706 {
3707 utime_t now = ceph_clock_now();
3708
3709 if (!need_snapflush_inodes.empty()) {
3710 // snap inodes that needs flush are auth pinned, they affect
3711 // subtree/difrarg freeze.
3712 utime_t cutoff = now;
3713 cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
3714
3715 CInode *last = need_snapflush_inodes.back();
3716 while (!need_snapflush_inodes.empty()) {
3717 CInode *in = need_snapflush_inodes.front();
3718 if (in->last_dirstat_prop >= cutoff)
3719 break;
3720 in->item_caps.remove_myself();
3721 snapflush_nudge(in);
3722 if (in == last)
3723 break;
3724 }
3725 }
3726
3727 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3728
3729 now = ceph_clock_now();
3730 int n = 0;
3731 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3732 Capability *cap = *p;
3733
3734 utime_t age = now - cap->get_last_revoke_stamp();
3735 dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3736 if (age <= mds->mdsmap->get_session_timeout()) {
3737 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
3738 break;
3739 } else {
3740 ++n;
3741 if (n > MAX_WARN_CAPS) {
3742 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3743 << "revoking, ignoring subsequent caps" << dendl;
3744 break;
3745 }
3746 }
3747 // exponential backoff of warning intervals
3748 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
3749 cap->inc_num_revoke_warnings();
3750 stringstream ss;
3751 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3752 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3753 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3754 mds->clog->warn() << ss.str();
3755 dout(20) << __func__ << " " << ss.str() << dendl;
3756 } else {
3757 dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3758 }
3759 }
3760 }
3761
3762
3763 void Locker::handle_client_lease(const MClientLease::const_ref &m)
3764 {
3765 dout(10) << "handle_client_lease " << *m << dendl;
3766
3767 ceph_assert(m->get_source().is_client());
3768 client_t client = m->get_source().num();
3769
3770 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3771 if (!in) {
3772 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3773 return;
3774 }
3775 CDentry *dn = 0;
3776
3777 frag_t fg = in->pick_dirfrag(m->dname);
3778 CDir *dir = in->get_dirfrag(fg);
3779 if (dir)
3780 dn = dir->lookup(m->dname);
3781 if (!dn) {
3782 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3783 return;
3784 }
3785 dout(10) << " on " << *dn << dendl;
3786
3787 // replica and lock
3788 ClientLease *l = dn->get_client_lease(client);
3789 if (!l) {
3790 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3791 return;
3792 }
3793
3794 switch (m->get_action()) {
3795 case CEPH_MDS_LEASE_REVOKE_ACK:
3796 case CEPH_MDS_LEASE_RELEASE:
3797 if (l->seq != m->get_seq()) {
3798 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3799 } else {
3800 dout(7) << "handle_client_lease client." << client
3801 << " on " << *dn << dendl;
3802 dn->remove_client_lease(l, this);
3803 }
3804 break;
3805
3806 case CEPH_MDS_LEASE_RENEW:
3807 {
3808 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3809 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3810 if (dn->lock.can_lease(client)) {
3811 auto reply = MClientLease::create(*m);
3812 int pool = 1; // fixme.. do something smart!
3813 reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3814 reply->h.seq = ++l->seq;
3815 reply->clear_payload();
3816
3817 utime_t now = ceph_clock_now();
3818 now += mdcache->client_lease_durations[pool];
3819 mdcache->touch_client_lease(l, pool, now);
3820
3821 mds->send_message_client_counted(reply, m->get_connection());
3822 }
3823 }
3824 break;
3825
3826 default:
3827 ceph_abort(); // implement me
3828 break;
3829 }
3830 }
3831
3832
3833 void Locker::issue_client_lease(CDentry *dn, client_t client,
3834 bufferlist &bl, utime_t now, Session *session)
3835 {
3836 CInode *diri = dn->get_dir()->get_inode();
3837 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3838 ((!diri->filelock.can_lease(client) &&
3839 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3840 dn->lock.can_lease(client)) {
3841 int pool = 1; // fixme.. do something smart!
3842 // issue a dentry lease
3843 ClientLease *l = dn->add_client_lease(client, session);
3844 session->touch_lease(l);
3845
3846 now += mdcache->client_lease_durations[pool];
3847 mdcache->touch_client_lease(l, pool, now);
3848
3849 LeaseStat lstat;
3850 lstat.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3851 lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
3852 lstat.seq = ++l->seq;
3853 encode_lease(bl, session->info, lstat);
3854 dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
3855 << " on " << *dn << dendl;
3856 } else {
3857 // null lease
3858 LeaseStat lstat;
3859 encode_lease(bl, session->info, lstat);
3860 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3861 }
3862 }
3863
3864
3865 void Locker::revoke_client_leases(SimpleLock *lock)
3866 {
3867 int n = 0;
3868 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3869 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3870 p != dn->client_lease_map.end();
3871 ++p) {
3872 ClientLease *l = p->second;
3873
3874 n++;
3875 ceph_assert(lock->get_type() == CEPH_LOCK_DN);
3876
3877 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3878 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3879
3880 // i should also revoke the dir ICONTENT lease, if they have it!
3881 CInode *diri = dn->get_dir()->get_inode();
3882 auto lease = MClientLease::create(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
3883 mds->send_message_client_counted(lease, l->client);
3884 }
3885 }
3886
3887 void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
3888 const LeaseStat& ls)
3889 {
3890 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
3891 ENCODE_START(1, 1, bl);
3892 encode(ls.mask, bl);
3893 encode(ls.duration_ms, bl);
3894 encode(ls.seq, bl);
3895 ENCODE_FINISH(bl);
3896 }
3897 else {
3898 encode(ls.mask, bl);
3899 encode(ls.duration_ms, bl);
3900 encode(ls.seq, bl);
3901 }
3902 }
3903
3904 // locks ----------------------------------------------------------------
3905
3906 SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
3907 {
3908 switch (lock_type) {
3909 case CEPH_LOCK_DN:
3910 {
3911 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3912 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3913 frag_t fg;
3914 CDir *dir = 0;
3915 CDentry *dn = 0;
3916 if (diri) {
3917 fg = diri->pick_dirfrag(info.dname);
3918 dir = diri->get_dirfrag(fg);
3919 if (dir)
3920 dn = dir->lookup(info.dname, info.snapid);
3921 }
3922 if (!dn) {
3923 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3924 return 0;
3925 }
3926 return &dn->lock;
3927 }
3928
3929 case CEPH_LOCK_IAUTH:
3930 case CEPH_LOCK_ILINK:
3931 case CEPH_LOCK_IDFT:
3932 case CEPH_LOCK_IFILE:
3933 case CEPH_LOCK_INEST:
3934 case CEPH_LOCK_IXATTR:
3935 case CEPH_LOCK_ISNAP:
3936 case CEPH_LOCK_IFLOCK:
3937 case CEPH_LOCK_IPOLICY:
3938 {
3939 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3940 if (!in) {
3941 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3942 return 0;
3943 }
3944 switch (lock_type) {
3945 case CEPH_LOCK_IAUTH: return &in->authlock;
3946 case CEPH_LOCK_ILINK: return &in->linklock;
3947 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3948 case CEPH_LOCK_IFILE: return &in->filelock;
3949 case CEPH_LOCK_INEST: return &in->nestlock;
3950 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3951 case CEPH_LOCK_ISNAP: return &in->snaplock;
3952 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3953 case CEPH_LOCK_IPOLICY: return &in->policylock;
3954 }
3955 }
3956
3957 default:
3958 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3959 ceph_abort();
3960 break;
3961 }
3962
3963 return 0;
3964 }
3965
3966 void Locker::handle_lock(const MLock::const_ref &m)
3967 {
3968 // nobody should be talking to us during recovery.
3969 ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3970
3971 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3972 if (!lock) {
3973 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3974 return;
3975 }
3976
3977 switch (lock->get_type()) {
3978 case CEPH_LOCK_DN:
3979 case CEPH_LOCK_IAUTH:
3980 case CEPH_LOCK_ILINK:
3981 case CEPH_LOCK_ISNAP:
3982 case CEPH_LOCK_IXATTR:
3983 case CEPH_LOCK_IFLOCK:
3984 case CEPH_LOCK_IPOLICY:
3985 handle_simple_lock(lock, m);
3986 break;
3987
3988 case CEPH_LOCK_IDFT:
3989 case CEPH_LOCK_INEST:
3990 //handle_scatter_lock((ScatterLock*)lock, m);
3991 //break;
3992
3993 case CEPH_LOCK_IFILE:
3994 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3995 break;
3996
3997 default:
3998 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3999 ceph_abort();
4000 break;
4001 }
4002 }
4003
4004
4005
4006
4007
4008 // ==========================================================================
4009 // simple lock
4010
4011 /** This function may take a reference to m if it needs one, but does
4012 * not put references. */
4013 void Locker::handle_reqrdlock(SimpleLock *lock, const MLock::const_ref &m)
4014 {
4015 MDSCacheObject *parent = lock->get_parent();
4016 if (parent->is_auth() &&
4017 lock->get_state() != LOCK_SYNC &&
4018 !parent->is_frozen()) {
4019 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4020 << " on " << *parent << dendl;
4021 ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
4022 if (lock->is_stable()) {
4023 simple_sync(lock);
4024 } else {
4025 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
4026 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
4027 new C_MDS_RetryMessage(mds, m));
4028 }
4029 } else {
4030 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4031 << " on " << *parent << dendl;
4032 // replica should retry
4033 }
4034 }
4035
4036 void Locker::handle_simple_lock(SimpleLock *lock, const MLock::const_ref &m)
4037 {
4038 int from = m->get_asker();
4039
4040 dout(10) << "handle_simple_lock " << *m
4041 << " on " << *lock << " " << *lock->get_parent() << dendl;
4042
4043 if (mds->is_rejoin()) {
4044 if (lock->get_parent()->is_rejoining()) {
4045 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
4046 << ", dropping " << *m << dendl;
4047 return;
4048 }
4049 }
4050
4051 switch (m->get_action()) {
4052 // -- replica --
4053 case LOCK_AC_SYNC:
4054 ceph_assert(lock->get_state() == LOCK_LOCK);
4055 lock->decode_locked_state(m->get_data());
4056 lock->set_state(LOCK_SYNC);
4057 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4058 break;
4059
4060 case LOCK_AC_LOCK:
4061 ceph_assert(lock->get_state() == LOCK_SYNC);
4062 lock->set_state(LOCK_SYNC_LOCK);
4063 if (lock->is_leased())
4064 revoke_client_leases(lock);
4065 eval_gather(lock, true);
4066 if (lock->is_unstable_and_locked())
4067 mds->mdlog->flush();
4068 break;
4069
4070
4071 // -- auth --
4072 case LOCK_AC_LOCKACK:
4073 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
4074 lock->get_state() == LOCK_SYNC_EXCL);
4075 ceph_assert(lock->is_gathering(from));
4076 lock->remove_gather(from);
4077
4078 if (lock->is_gathering()) {
4079 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4080 << ", still gathering " << lock->get_gather_set() << dendl;
4081 } else {
4082 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4083 << ", last one" << dendl;
4084 eval_gather(lock);
4085 }
4086 break;
4087
4088 case LOCK_AC_REQRDLOCK:
4089 handle_reqrdlock(lock, m);
4090 break;
4091
4092 }
4093 }
4094
4095 /* unused, currently.
4096
4097 class C_Locker_SimpleEval : public Context {
4098 Locker *locker;
4099 SimpleLock *lock;
4100 public:
4101 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4102 void finish(int r) {
4103 locker->try_simple_eval(lock);
4104 }
4105 };
4106
4107 void Locker::try_simple_eval(SimpleLock *lock)
4108 {
4109 // unstable and ambiguous auth?
4110 if (!lock->is_stable() &&
4111 lock->get_parent()->is_ambiguous_auth()) {
4112 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4113 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4114 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4115 return;
4116 }
4117
4118 if (!lock->get_parent()->is_auth()) {
4119 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4120 return;
4121 }
4122
4123 if (!lock->get_parent()->can_auth_pin()) {
4124 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4125 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4126 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4127 return;
4128 }
4129
4130 if (lock->is_stable())
4131 simple_eval(lock);
4132 }
4133 */
4134
4135
4136 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4137 {
4138 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4139
4140 ceph_assert(lock->get_parent()->is_auth());
4141 ceph_assert(lock->is_stable());
4142
4143 if (lock->get_parent()->is_freezing_or_frozen()) {
4144 // dentry/snap lock in unreadable state can block path traverse
4145 if ((lock->get_type() != CEPH_LOCK_DN &&
4146 lock->get_type() != CEPH_LOCK_ISNAP) ||
4147 lock->get_state() == LOCK_SYNC ||
4148 lock->get_parent()->is_frozen())
4149 return;
4150 }
4151
4152 if (mdcache->is_readonly()) {
4153 if (lock->get_state() != LOCK_SYNC) {
4154 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4155 simple_sync(lock, need_issue);
4156 }
4157 return;
4158 }
4159
4160 CInode *in = 0;
4161 int wanted = 0;
4162 if (lock->get_cap_shift()) {
4163 in = static_cast<CInode*>(lock->get_parent());
4164 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4165 }
4166
4167 // -> excl?
4168 if (lock->get_state() != LOCK_EXCL &&
4169 in && in->get_target_loner() >= 0 &&
4170 (wanted & CEPH_CAP_GEXCL)) {
4171 dout(7) << "simple_eval stable, going to excl " << *lock
4172 << " on " << *lock->get_parent() << dendl;
4173 simple_excl(lock, need_issue);
4174 }
4175
4176 // stable -> sync?
4177 else if (lock->get_state() != LOCK_SYNC &&
4178 !lock->is_wrlocked() &&
4179 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4180 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4181 dout(7) << "simple_eval stable, syncing " << *lock
4182 << " on " << *lock->get_parent() << dendl;
4183 simple_sync(lock, need_issue);
4184 }
4185 }
4186
4187
4188 // mid
4189
4190 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4191 {
4192 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4193 ceph_assert(lock->get_parent()->is_auth());
4194 ceph_assert(lock->is_stable());
4195
4196 CInode *in = 0;
4197 if (lock->get_cap_shift())
4198 in = static_cast<CInode *>(lock->get_parent());
4199
4200 int old_state = lock->get_state();
4201
4202 if (old_state != LOCK_TSYN) {
4203
4204 switch (lock->get_state()) {
4205 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4206 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4207 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4208 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4209 default: ceph_abort();
4210 }
4211
4212 int gather = 0;
4213 if (lock->is_wrlocked())
4214 gather++;
4215
4216 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4217 send_lock_message(lock, LOCK_AC_SYNC);
4218 lock->init_gather();
4219 gather++;
4220 }
4221
4222 if (in && in->is_head()) {
4223 if (in->issued_caps_need_gather(lock)) {
4224 if (need_issue)
4225 *need_issue = true;
4226 else
4227 issue_caps(in);
4228 gather++;
4229 }
4230 }
4231
4232 bool need_recover = false;
4233 if (lock->get_type() == CEPH_LOCK_IFILE) {
4234 ceph_assert(in);
4235 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4236 mds->mdcache->queue_file_recover(in);
4237 need_recover = true;
4238 gather++;
4239 }
4240 }
4241
4242 if (!gather && lock->is_dirty()) {
4243 lock->get_parent()->auth_pin(lock);
4244 scatter_writebehind(static_cast<ScatterLock*>(lock));
4245 mds->mdlog->flush();
4246 return false;
4247 }
4248
4249 if (gather) {
4250 lock->get_parent()->auth_pin(lock);
4251 if (need_recover)
4252 mds->mdcache->do_file_recover();
4253 return false;
4254 }
4255 }
4256
4257 if (lock->get_parent()->is_replicated()) { // FIXME
4258 bufferlist data;
4259 lock->encode_locked_state(data);
4260 send_lock_message(lock, LOCK_AC_SYNC, data);
4261 }
4262 lock->set_state(LOCK_SYNC);
4263 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4264 if (in && in->is_head()) {
4265 if (need_issue)
4266 *need_issue = true;
4267 else
4268 issue_caps(in);
4269 }
4270 return true;
4271 }
4272
4273 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4274 {
4275 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4276 ceph_assert(lock->get_parent()->is_auth());
4277 ceph_assert(lock->is_stable());
4278
4279 CInode *in = 0;
4280 if (lock->get_cap_shift())
4281 in = static_cast<CInode *>(lock->get_parent());
4282
4283 switch (lock->get_state()) {
4284 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4285 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4286 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4287 default: ceph_abort();
4288 }
4289
4290 int gather = 0;
4291 if (lock->is_rdlocked())
4292 gather++;
4293 if (lock->is_wrlocked())
4294 gather++;
4295
4296 if (lock->get_parent()->is_replicated() &&
4297 lock->get_state() != LOCK_LOCK_EXCL &&
4298 lock->get_state() != LOCK_XSYN_EXCL) {
4299 send_lock_message(lock, LOCK_AC_LOCK);
4300 lock->init_gather();
4301 gather++;
4302 }
4303
4304 if (in && in->is_head()) {
4305 if (in->issued_caps_need_gather(lock)) {
4306 if (need_issue)
4307 *need_issue = true;
4308 else
4309 issue_caps(in);
4310 gather++;
4311 }
4312 }
4313
4314 if (gather) {
4315 lock->get_parent()->auth_pin(lock);
4316 } else {
4317 lock->set_state(LOCK_EXCL);
4318 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4319 if (in) {
4320 if (need_issue)
4321 *need_issue = true;
4322 else
4323 issue_caps(in);
4324 }
4325 }
4326 }
4327
4328 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4329 {
4330 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4331 ceph_assert(lock->get_parent()->is_auth());
4332 ceph_assert(lock->is_stable());
4333 ceph_assert(lock->get_state() != LOCK_LOCK);
4334
4335 CInode *in = 0;
4336 if (lock->get_cap_shift())
4337 in = static_cast<CInode *>(lock->get_parent());
4338
4339 int old_state = lock->get_state();
4340
4341 switch (lock->get_state()) {
4342 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4343 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4344 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4345 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4346 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4347 break;
4348 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4349 default: ceph_abort();
4350 }
4351
4352 int gather = 0;
4353 if (lock->is_leased()) {
4354 gather++;
4355 revoke_client_leases(lock);
4356 }
4357 if (lock->is_rdlocked())
4358 gather++;
4359 if (in && in->is_head()) {
4360 if (in->issued_caps_need_gather(lock)) {
4361 if (need_issue)
4362 *need_issue = true;
4363 else
4364 issue_caps(in);
4365 gather++;
4366 }
4367 }
4368
4369 bool need_recover = false;
4370 if (lock->get_type() == CEPH_LOCK_IFILE) {
4371 ceph_assert(in);
4372 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4373 mds->mdcache->queue_file_recover(in);
4374 need_recover = true;
4375 gather++;
4376 }
4377 }
4378
4379 if (lock->get_parent()->is_replicated() &&
4380 lock->get_state() == LOCK_MIX_LOCK &&
4381 gather) {
4382 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4383 } else {
4384 // move to second stage of gather now, so we don't send the lock action later.
4385 if (lock->get_state() == LOCK_MIX_LOCK)
4386 lock->set_state(LOCK_MIX_LOCK2);
4387
4388 if (lock->get_parent()->is_replicated() &&
4389 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4390 gather++;
4391 send_lock_message(lock, LOCK_AC_LOCK);
4392 lock->init_gather();
4393 }
4394 }
4395
4396 if (!gather && lock->is_dirty()) {
4397 lock->get_parent()->auth_pin(lock);
4398 scatter_writebehind(static_cast<ScatterLock*>(lock));
4399 mds->mdlog->flush();
4400 return;
4401 }
4402
4403 if (gather) {
4404 lock->get_parent()->auth_pin(lock);
4405 if (need_recover)
4406 mds->mdcache->do_file_recover();
4407 } else {
4408 lock->set_state(LOCK_LOCK);
4409 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4410 }
4411 }
4412
4413
4414 void Locker::simple_xlock(SimpleLock *lock)
4415 {
4416 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4417 ceph_assert(lock->get_parent()->is_auth());
4418 //assert(lock->is_stable());
4419 ceph_assert(lock->get_state() != LOCK_XLOCK);
4420
4421 CInode *in = 0;
4422 if (lock->get_cap_shift())
4423 in = static_cast<CInode *>(lock->get_parent());
4424
4425 if (lock->is_stable())
4426 lock->get_parent()->auth_pin(lock);
4427
4428 switch (lock->get_state()) {
4429 case LOCK_LOCK:
4430 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4431 default: ceph_abort();
4432 }
4433
4434 int gather = 0;
4435 if (lock->is_rdlocked())
4436 gather++;
4437 if (lock->is_wrlocked())
4438 gather++;
4439
4440 if (in && in->is_head()) {
4441 if (in->issued_caps_need_gather(lock)) {
4442 issue_caps(in);
4443 gather++;
4444 }
4445 }
4446
4447 if (!gather) {
4448 lock->set_state(LOCK_PREXLOCK);
4449 //assert("shouldn't be called if we are already xlockable" == 0);
4450 }
4451 }
4452
4453
4454
4455
4456
4457 // ==========================================================================
4458 // scatter lock
4459
4460 /*
4461
4462 Some notes on scatterlocks.
4463
4464 - The scatter/gather is driven by the inode lock. The scatter always
4465 brings in the latest metadata from the fragments.
4466
4467 - When in a scattered/MIX state, fragments are only allowed to
4468 update/be written to if the accounted stat matches the inode's
4469 current version.
4470
4471 - That means, on gather, we _only_ assimilate diffs for frag metadata
4472 that match the current version, because those are the only ones
4473 written during this scatter/gather cycle. (Others didn't permit
4474 it.) We increment the version and journal this to disk.
4475
4476 - When possible, we also simultaneously update our local frag
4477 accounted stats to match.
4478
4479 - On scatter, the new inode info is broadcast to frags, both local
4480 and remote. If possible (auth and !frozen), the dirfrag auth
4481 should update the accounted state (if it isn't already up to date).
4482 Note that this may occur on both the local inode auth node and
4483 inode replicas, so there are two potential paths. If it is NOT
4484 possible, they need to mark_stale to prevent any possible writes.
4485
4486 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4487 only). Both are opportunities to update the frag accounted stats,
4488 even though only the MIX case is affected by a stale dirfrag.
4489
4490 - Because many scatter/gather cycles can potentially go by without a
4491 frag being able to update its accounted stats (due to being frozen
4492 by exports/refragments in progress), the frag may have (even very)
4493 old stat versions. That's fine. If when we do want to update it,
4494 we can update accounted_* and the version first.
4495
4496 */
4497
4498 class C_Locker_ScatterWB : public LockerLogContext {
4499 ScatterLock *lock;
4500 MutationRef mut;
4501 public:
4502 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4503 LockerLogContext(l), lock(sl), mut(m) {}
4504 void finish(int r) override {
4505 locker->scatter_writebehind_finish(lock, mut);
4506 }
4507 };
4508
4509 void Locker::scatter_writebehind(ScatterLock *lock)
4510 {
4511 CInode *in = static_cast<CInode*>(lock->get_parent());
4512 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4513
4514 // journal
4515 MutationRef mut(new MutationImpl());
4516 mut->ls = mds->mdlog->get_current_segment();
4517
4518 // forcefully take a wrlock
4519 lock->get_wrlock(true);
4520 mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
4521
4522 in->pre_cow_old_inode(); // avoid cow mayhem
4523
4524 auto &pi = in->project_inode();
4525 pi.inode.version = in->pre_dirty();
4526
4527 in->finish_scatter_gather_update(lock->get_type());
4528 lock->start_flush();
4529
4530 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4531 mds->mdlog->start_entry(le);
4532
4533 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4534 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4535
4536 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4537
4538 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4539 }
4540
4541 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4542 {
4543 CInode *in = static_cast<CInode*>(lock->get_parent());
4544 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4545 in->pop_and_dirty_projected_inode(mut->ls);
4546
4547 lock->finish_flush();
4548
4549 // if replicas may have flushed in a mix->lock state, send another
4550 // message so they can finish_flush().
4551 if (in->is_replicated()) {
4552 switch (lock->get_state()) {
4553 case LOCK_MIX_LOCK:
4554 case LOCK_MIX_LOCK2:
4555 case LOCK_MIX_EXCL:
4556 case LOCK_MIX_TSYN:
4557 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4558 }
4559 }
4560
4561 mut->apply();
4562 drop_locks(mut.get());
4563 mut->cleanup();
4564
4565 if (lock->is_stable())
4566 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4567
4568 //scatter_eval_gather(lock);
4569 }
4570
4571 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4572 {
4573 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4574
4575 ceph_assert(lock->get_parent()->is_auth());
4576 ceph_assert(lock->is_stable());
4577
4578 if (lock->get_parent()->is_freezing_or_frozen()) {
4579 dout(20) << " freezing|frozen" << dendl;
4580 return;
4581 }
4582
4583 if (mdcache->is_readonly()) {
4584 if (lock->get_state() != LOCK_SYNC) {
4585 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4586 simple_sync(lock, need_issue);
4587 }
4588 return;
4589 }
4590
4591 if (!lock->is_rdlocked() &&
4592 lock->get_state() != LOCK_MIX &&
4593 lock->get_scatter_wanted()) {
4594 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4595 << " on " << *lock->get_parent() << dendl;
4596 scatter_mix(lock, need_issue);
4597 return;
4598 }
4599
4600 if (lock->get_type() == CEPH_LOCK_INEST) {
4601 // in general, we want to keep INEST writable at all times.
4602 if (!lock->is_rdlocked()) {
4603 if (lock->get_parent()->is_replicated()) {
4604 if (lock->get_state() != LOCK_MIX)
4605 scatter_mix(lock, need_issue);
4606 } else {
4607 if (lock->get_state() != LOCK_LOCK)
4608 simple_lock(lock, need_issue);
4609 }
4610 }
4611 return;
4612 }
4613
4614 CInode *in = static_cast<CInode*>(lock->get_parent());
4615 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4616 // i _should_ be sync.
4617 if (!lock->is_wrlocked() &&
4618 lock->get_state() != LOCK_SYNC) {
4619 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4620 simple_sync(lock, need_issue);
4621 }
4622 }
4623 }
4624
4625
4626 /*
4627 * mark a scatterlock to indicate that the dir fnode has some dirty data
4628 */
4629 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4630 {
4631 lock->mark_dirty();
4632 if (lock->get_updated_item()->is_on_list()) {
4633 dout(10) << "mark_updated_scatterlock " << *lock
4634 << " - already on list since " << lock->get_update_stamp() << dendl;
4635 } else {
4636 updated_scatterlocks.push_back(lock->get_updated_item());
4637 utime_t now = ceph_clock_now();
4638 lock->set_update_stamp(now);
4639 dout(10) << "mark_updated_scatterlock " << *lock
4640 << " - added at " << now << dendl;
4641 }
4642 }
4643
4644 /*
4645 * this is called by scatter_tick and LogSegment::try_to_trim() when
4646 * trying to flush dirty scattered data (i.e. updated fnode) back to
4647 * the inode.
4648 *
4649 * we need to lock|scatter in order to push fnode changes into the
4650 * inode.dirstat.
4651 */
4652 void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
4653 {
4654 CInode *p = static_cast<CInode *>(lock->get_parent());
4655
4656 if (p->is_frozen() || p->is_freezing()) {
4657 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4658 if (c)
4659 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4660 else if (lock->is_dirty())
4661 // just requeue. not ideal.. starvation prone..
4662 updated_scatterlocks.push_back(lock->get_updated_item());
4663 return;
4664 }
4665
4666 if (p->is_ambiguous_auth()) {
4667 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4668 if (c)
4669 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4670 else if (lock->is_dirty())
4671 // just requeue. not ideal.. starvation prone..
4672 updated_scatterlocks.push_back(lock->get_updated_item());
4673 return;
4674 }
4675
4676 if (p->is_auth()) {
4677 int count = 0;
4678 while (true) {
4679 if (lock->is_stable()) {
4680 // can we do it now?
4681 // (only if we're not replicated.. if we are, we really do need
4682 // to nudge the lock state!)
4683 /*
4684 actually, even if we're not replicated, we can't stay in MIX, because another mds
4685 could discover and replicate us at any time. if that happens while we're flushing,
4686 they end up in MIX but their inode has the old scatterstat version.
4687
4688 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4689 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4690 scatter_writebehind(lock);
4691 if (c)
4692 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4693 return;
4694 }
4695 */
4696
4697 if (mdcache->is_readonly()) {
4698 if (lock->get_state() != LOCK_SYNC) {
4699 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4700 simple_sync(static_cast<ScatterLock*>(lock));
4701 }
4702 break;
4703 }
4704
4705 // adjust lock state
4706 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4707 switch (lock->get_type()) {
4708 case CEPH_LOCK_IFILE:
4709 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4710 scatter_mix(static_cast<ScatterLock*>(lock));
4711 else if (lock->get_state() != LOCK_LOCK)
4712 simple_lock(static_cast<ScatterLock*>(lock));
4713 else
4714 simple_sync(static_cast<ScatterLock*>(lock));
4715 break;
4716
4717 case CEPH_LOCK_IDFT:
4718 case CEPH_LOCK_INEST:
4719 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4720 scatter_mix(lock);
4721 else if (lock->get_state() != LOCK_LOCK)
4722 simple_lock(lock);
4723 else
4724 simple_sync(lock);
4725 break;
4726 default:
4727 ceph_abort();
4728 }
4729 ++count;
4730 if (lock->is_stable() && count == 2) {
4731 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4732 // this should only realy happen when called via
4733 // handle_file_lock due to AC_NUDGE, because the rest of the
4734 // time we are replicated or have dirty data and won't get
4735 // called. bailing here avoids an infinite loop.
4736 ceph_assert(!c);
4737 break;
4738 }
4739 } else {
4740 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4741 if (c)
4742 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4743 return;
4744 }
4745 }
4746 } else {
4747 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4748 << *lock << " on " << *p << dendl;
4749 // request unscatter?
4750 mds_rank_t auth = lock->get_parent()->authority().first;
4751 if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
4752 mds->send_message_mds(MLock::create(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4753 }
4754
4755 // wait...
4756 if (c)
4757 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4758
4759 // also, requeue, in case we had wrong auth or something
4760 if (lock->is_dirty())
4761 updated_scatterlocks.push_back(lock->get_updated_item());
4762 }
4763 }
4764
4765 void Locker::scatter_tick()
4766 {
4767 dout(10) << "scatter_tick" << dendl;
4768
4769 // updated
4770 utime_t now = ceph_clock_now();
4771 int n = updated_scatterlocks.size();
4772 while (!updated_scatterlocks.empty()) {
4773 ScatterLock *lock = updated_scatterlocks.front();
4774
4775 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4776
4777 if (!lock->is_dirty()) {
4778 updated_scatterlocks.pop_front();
4779 dout(10) << " removing from updated_scatterlocks "
4780 << *lock << " " << *lock->get_parent() << dendl;
4781 continue;
4782 }
4783 if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
4784 break;
4785 updated_scatterlocks.pop_front();
4786 scatter_nudge(lock, 0);
4787 }
4788 mds->mdlog->flush();
4789 }
4790
4791
4792 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4793 {
4794 dout(10) << "scatter_tempsync " << *lock
4795 << " on " << *lock->get_parent() << dendl;
4796 ceph_assert(lock->get_parent()->is_auth());
4797 ceph_assert(lock->is_stable());
4798
4799 ceph_abort_msg("not fully implemented, at least not for filelock");
4800
4801 CInode *in = static_cast<CInode *>(lock->get_parent());
4802
4803 switch (lock->get_state()) {
4804 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4805 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4806 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4807 default: ceph_abort();
4808 }
4809
4810 int gather = 0;
4811 if (lock->is_wrlocked())
4812 gather++;
4813
4814 if (lock->get_cap_shift() &&
4815 in->is_head() &&
4816 in->issued_caps_need_gather(lock)) {
4817 if (need_issue)
4818 *need_issue = true;
4819 else
4820 issue_caps(in);
4821 gather++;
4822 }
4823
4824 if (lock->get_state() == LOCK_MIX_TSYN &&
4825 in->is_replicated()) {
4826 lock->init_gather();
4827 send_lock_message(lock, LOCK_AC_LOCK);
4828 gather++;
4829 }
4830
4831 if (gather) {
4832 in->auth_pin(lock);
4833 } else {
4834 // do tempsync
4835 lock->set_state(LOCK_TSYN);
4836 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4837 if (lock->get_cap_shift()) {
4838 if (need_issue)
4839 *need_issue = true;
4840 else
4841 issue_caps(in);
4842 }
4843 }
4844 }
4845
4846
4847
4848 // ==========================================================================
4849 // local lock
4850
4851 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4852 {
4853 dout(7) << "local_wrlock_grab on " << *lock
4854 << " on " << *lock->get_parent() << dendl;
4855
4856 ceph_assert(lock->get_parent()->is_auth());
4857 ceph_assert(lock->can_wrlock());
4858 lock->get_wrlock(mut->get_client());
4859
4860 auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
4861 ceph_assert(ret.second);
4862 }
4863
4864 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4865 {
4866 dout(7) << "local_wrlock_start on " << *lock
4867 << " on " << *lock->get_parent() << dendl;
4868
4869 ceph_assert(lock->get_parent()->is_auth());
4870 if (lock->can_wrlock()) {
4871 lock->get_wrlock(mut->get_client());
4872 auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
4873 ceph_assert(it->is_wrlock());
4874 return true;
4875 } else {
4876 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4877 return false;
4878 }
4879 }
4880
4881 void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
4882 {
4883 ceph_assert(it->is_wrlock());
4884 LocalLock *lock = static_cast<LocalLock*>(it->lock);
4885 dout(7) << "local_wrlock_finish on " << *lock
4886 << " on " << *lock->get_parent() << dendl;
4887 lock->put_wrlock();
4888 mut->locks.erase(it);
4889 if (lock->get_num_wrlocks() == 0) {
4890 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4891 SimpleLock::WAIT_WR |
4892 SimpleLock::WAIT_RD);
4893 }
4894 }
4895
4896 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4897 {
4898 dout(7) << "local_xlock_start on " << *lock
4899 << " on " << *lock->get_parent() << dendl;
4900
4901 ceph_assert(lock->get_parent()->is_auth());
4902 if (!lock->can_xlock_local()) {
4903 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4904 return false;
4905 }
4906
4907 lock->get_xlock(mut, mut->get_client());
4908 mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
4909 return true;
4910 }
4911
4912 void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
4913 {
4914 ceph_assert(it->is_xlock());
4915 LocalLock *lock = static_cast<LocalLock*>(it->lock);
4916 dout(7) << "local_xlock_finish on " << *lock
4917 << " on " << *lock->get_parent() << dendl;
4918 lock->put_xlock();
4919 mut->locks.erase(it);
4920
4921 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4922 SimpleLock::WAIT_WR |
4923 SimpleLock::WAIT_RD);
4924 }
4925
4926
4927
4928 // ==========================================================================
4929 // file lock
4930
4931
4932 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4933 {
4934 CInode *in = static_cast<CInode*>(lock->get_parent());
4935 int loner_wanted, other_wanted;
4936 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4937 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4938 << " loner_wanted=" << gcap_string(loner_wanted)
4939 << " other_wanted=" << gcap_string(other_wanted)
4940 << " filelock=" << *lock << " on " << *lock->get_parent()
4941 << dendl;
4942
4943 ceph_assert(lock->get_parent()->is_auth());
4944 ceph_assert(lock->is_stable());
4945
4946 if (lock->get_parent()->is_freezing_or_frozen())
4947 return;
4948
4949 if (mdcache->is_readonly()) {
4950 if (lock->get_state() != LOCK_SYNC) {
4951 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4952 simple_sync(lock, need_issue);
4953 }
4954 return;
4955 }
4956
4957 // excl -> *?
4958 if (lock->get_state() == LOCK_EXCL) {
4959 dout(20) << " is excl" << dendl;
4960 int loner_issued, other_issued, xlocker_issued;
4961 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4962 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4963 << " other_issued=" << gcap_string(other_issued)
4964 << " xlocker_issued=" << gcap_string(xlocker_issued)
4965 << dendl;
4966 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4967 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4968 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4969 dout(20) << " should lose it" << dendl;
4970 // we should lose it.
4971 // loner other want
4972 // R R SYNC
4973 // R R|W MIX
4974 // R W MIX
4975 // R|W R MIX
4976 // R|W R|W MIX
4977 // R|W W MIX
4978 // W R MIX
4979 // W R|W MIX
4980 // W W MIX
4981 // -> any writer means MIX; RD doesn't matter.
4982 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4983 lock->is_waiter_for(SimpleLock::WAIT_WR))
4984 scatter_mix(lock, need_issue);
4985 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4986 simple_sync(lock, need_issue);
4987 else
4988 dout(10) << " waiting for wrlock to drain" << dendl;
4989 }
4990 }
4991
4992 // * -> excl?
4993 else if (lock->get_state() != LOCK_EXCL &&
4994 !lock->is_rdlocked() &&
4995 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4996 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4997 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4998 in->get_target_loner() >= 0) {
4999 dout(7) << "file_eval stable, bump to loner " << *lock
5000 << " on " << *lock->get_parent() << dendl;
5001 file_excl(lock, need_issue);
5002 }
5003
5004 // * -> mixed?
5005 else if (lock->get_state() != LOCK_MIX &&
5006 !lock->is_rdlocked() &&
5007 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5008 (lock->get_scatter_wanted() ||
5009 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
5010 dout(7) << "file_eval stable, bump to mixed " << *lock
5011 << " on " << *lock->get_parent() << dendl;
5012 scatter_mix(lock, need_issue);
5013 }
5014
5015 // * -> sync?
5016 else if (lock->get_state() != LOCK_SYNC &&
5017 !lock->is_wrlocked() && // drain wrlocks first!
5018 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5019 !(wanted & CEPH_CAP_GWR) &&
5020 !((lock->get_state() == LOCK_MIX) &&
5021 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5022 //((wanted & CEPH_CAP_RD) ||
5023 //in->is_replicated() ||
5024 //lock->is_leased() ||
5025 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5026 ) {
5027 dout(7) << "file_eval stable, bump to sync " << *lock
5028 << " on " << *lock->get_parent() << dendl;
5029 simple_sync(lock, need_issue);
5030 }
5031 }
5032
5033
5034
5035 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
5036 {
5037 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
5038
5039 CInode *in = static_cast<CInode*>(lock->get_parent());
5040 ceph_assert(in->is_auth());
5041 ceph_assert(lock->is_stable());
5042
5043 if (lock->get_state() == LOCK_LOCK) {
5044 in->start_scatter(lock);
5045 if (in->is_replicated()) {
5046 // data
5047 bufferlist softdata;
5048 lock->encode_locked_state(softdata);
5049
5050 // bcast to replicas
5051 send_lock_message(lock, LOCK_AC_MIX, softdata);
5052 }
5053
5054 // change lock
5055 lock->set_state(LOCK_MIX);
5056 lock->clear_scatter_wanted();
5057 if (lock->get_cap_shift()) {
5058 if (need_issue)
5059 *need_issue = true;
5060 else
5061 issue_caps(in);
5062 }
5063 } else {
5064 // gather?
5065 switch (lock->get_state()) {
5066 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
5067 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
5068 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
5069 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
5070 default: ceph_abort();
5071 }
5072
5073 int gather = 0;
5074 if (lock->is_rdlocked())
5075 gather++;
5076 if (in->is_replicated()) {
5077 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
5078 send_lock_message(lock, LOCK_AC_MIX);
5079 lock->init_gather();
5080 gather++;
5081 }
5082 }
5083 if (lock->is_leased()) {
5084 revoke_client_leases(lock);
5085 gather++;
5086 }
5087 if (lock->get_cap_shift() &&
5088 in->is_head() &&
5089 in->issued_caps_need_gather(lock)) {
5090 if (need_issue)
5091 *need_issue = true;
5092 else
5093 issue_caps(in);
5094 gather++;
5095 }
5096 bool need_recover = false;
5097 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5098 mds->mdcache->queue_file_recover(in);
5099 need_recover = true;
5100 gather++;
5101 }
5102
5103 if (gather) {
5104 lock->get_parent()->auth_pin(lock);
5105 if (need_recover)
5106 mds->mdcache->do_file_recover();
5107 } else {
5108 in->start_scatter(lock);
5109 lock->set_state(LOCK_MIX);
5110 lock->clear_scatter_wanted();
5111 if (in->is_replicated()) {
5112 bufferlist softdata;
5113 lock->encode_locked_state(softdata);
5114 send_lock_message(lock, LOCK_AC_MIX, softdata);
5115 }
5116 if (lock->get_cap_shift()) {
5117 if (need_issue)
5118 *need_issue = true;
5119 else
5120 issue_caps(in);
5121 }
5122 }
5123 }
5124 }
5125
5126
5127 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5128 {
5129 CInode *in = static_cast<CInode*>(lock->get_parent());
5130 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5131
5132 ceph_assert(in->is_auth());
5133 ceph_assert(lock->is_stable());
5134
5135 ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
5136 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5137
5138 switch (lock->get_state()) {
5139 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5140 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5141 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5142 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5143 default: ceph_abort();
5144 }
5145 int gather = 0;
5146
5147 if (lock->is_rdlocked())
5148 gather++;
5149 if (lock->is_wrlocked())
5150 gather++;
5151
5152 if (in->is_replicated() &&
5153 lock->get_state() != LOCK_LOCK_EXCL &&
5154 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5155 send_lock_message(lock, LOCK_AC_LOCK);
5156 lock->init_gather();
5157 gather++;
5158 }
5159 if (lock->is_leased()) {
5160 revoke_client_leases(lock);
5161 gather++;
5162 }
5163 if (in->is_head() &&
5164 in->issued_caps_need_gather(lock)) {
5165 if (need_issue)
5166 *need_issue = true;
5167 else
5168 issue_caps(in);
5169 gather++;
5170 }
5171 bool need_recover = false;
5172 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5173 mds->mdcache->queue_file_recover(in);
5174 need_recover = true;
5175 gather++;
5176 }
5177
5178 if (gather) {
5179 lock->get_parent()->auth_pin(lock);
5180 if (need_recover)
5181 mds->mdcache->do_file_recover();
5182 } else {
5183 lock->set_state(LOCK_EXCL);
5184 if (need_issue)
5185 *need_issue = true;
5186 else
5187 issue_caps(in);
5188 }
5189 }
5190
5191 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5192 {
5193 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5194 CInode *in = static_cast<CInode *>(lock->get_parent());
5195 ceph_assert(in->is_auth());
5196 ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
5197
5198 switch (lock->get_state()) {
5199 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5200 default: ceph_abort();
5201 }
5202
5203 int gather = 0;
5204 if (lock->is_wrlocked())
5205 gather++;
5206
5207 if (in->is_head() &&
5208 in->issued_caps_need_gather(lock)) {
5209 if (need_issue)
5210 *need_issue = true;
5211 else
5212 issue_caps(in);
5213 gather++;
5214 }
5215
5216 if (gather) {
5217 lock->get_parent()->auth_pin(lock);
5218 } else {
5219 lock->set_state(LOCK_XSYN);
5220 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5221 if (need_issue)
5222 *need_issue = true;
5223 else
5224 issue_caps(in);
5225 }
5226 }
5227
5228 void Locker::file_recover(ScatterLock *lock)
5229 {
5230 CInode *in = static_cast<CInode *>(lock->get_parent());
5231 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5232
5233 ceph_assert(in->is_auth());
5234 //assert(lock->is_stable());
5235 ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5236
5237 int gather = 0;
5238
5239 /*
5240 if (in->is_replicated()
5241 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5242 send_lock_message(lock, LOCK_AC_LOCK);
5243 lock->init_gather();
5244 gather++;
5245 }
5246 */
5247 if (in->is_head() &&
5248 in->issued_caps_need_gather(lock)) {
5249 issue_caps(in);
5250 gather++;
5251 }
5252
5253 lock->set_state(LOCK_SCAN);
5254 if (gather)
5255 in->state_set(CInode::STATE_NEEDSRECOVER);
5256 else
5257 mds->mdcache->queue_file_recover(in);
5258 }
5259
5260
5261 // messenger
5262 void Locker::handle_file_lock(ScatterLock *lock, const MLock::const_ref &m)
5263 {
5264 CInode *in = static_cast<CInode*>(lock->get_parent());
5265 int from = m->get_asker();
5266
5267 if (mds->is_rejoin()) {
5268 if (in->is_rejoining()) {
5269 dout(7) << "handle_file_lock still rejoining " << *in
5270 << ", dropping " << *m << dendl;
5271 return;
5272 }
5273 }
5274
5275 dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
5276 << " on " << *lock
5277 << " from mds." << from << " "
5278 << *in << dendl;
5279
5280 bool caps = lock->get_cap_shift();
5281
5282 switch (m->get_action()) {
5283 // -- replica --
5284 case LOCK_AC_SYNC:
5285 ceph_assert(lock->get_state() == LOCK_LOCK ||
5286 lock->get_state() == LOCK_MIX ||
5287 lock->get_state() == LOCK_MIX_SYNC2);
5288
5289 if (lock->get_state() == LOCK_MIX) {
5290 lock->set_state(LOCK_MIX_SYNC);
5291 eval_gather(lock, true);
5292 if (lock->is_unstable_and_locked())
5293 mds->mdlog->flush();
5294 break;
5295 }
5296
5297 (static_cast<ScatterLock *>(lock))->finish_flush();
5298 (static_cast<ScatterLock *>(lock))->clear_flushed();
5299
5300 // ok
5301 lock->decode_locked_state(m->get_data());
5302 lock->set_state(LOCK_SYNC);
5303
5304 lock->get_rdlock();
5305 if (caps)
5306 issue_caps(in);
5307 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5308 lock->put_rdlock();
5309 break;
5310
5311 case LOCK_AC_LOCK:
5312 switch (lock->get_state()) {
5313 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5314 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5315 default: ceph_abort();
5316 }
5317
5318 eval_gather(lock, true);
5319 if (lock->is_unstable_and_locked())
5320 mds->mdlog->flush();
5321
5322 break;
5323
5324 case LOCK_AC_LOCKFLUSHED:
5325 (static_cast<ScatterLock *>(lock))->finish_flush();
5326 (static_cast<ScatterLock *>(lock))->clear_flushed();
5327 // wake up scatter_nudge waiters
5328 if (lock->is_stable())
5329 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5330 break;
5331
5332 case LOCK_AC_MIX:
5333 ceph_assert(lock->get_state() == LOCK_SYNC ||
5334 lock->get_state() == LOCK_LOCK ||
5335 lock->get_state() == LOCK_SYNC_MIX2);
5336
5337 if (lock->get_state() == LOCK_SYNC) {
5338 // MIXED
5339 lock->set_state(LOCK_SYNC_MIX);
5340 eval_gather(lock, true);
5341 if (lock->is_unstable_and_locked())
5342 mds->mdlog->flush();
5343 break;
5344 }
5345
5346 // ok
5347 lock->set_state(LOCK_MIX);
5348 lock->decode_locked_state(m->get_data());
5349
5350 if (caps)
5351 issue_caps(in);
5352
5353 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5354 break;
5355
5356
5357 // -- auth --
5358 case LOCK_AC_LOCKACK:
5359 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
5360 lock->get_state() == LOCK_MIX_LOCK ||
5361 lock->get_state() == LOCK_MIX_LOCK2 ||
5362 lock->get_state() == LOCK_MIX_EXCL ||
5363 lock->get_state() == LOCK_SYNC_EXCL ||
5364 lock->get_state() == LOCK_SYNC_MIX ||
5365 lock->get_state() == LOCK_MIX_TSYN);
5366 ceph_assert(lock->is_gathering(from));
5367 lock->remove_gather(from);
5368
5369 if (lock->get_state() == LOCK_MIX_LOCK ||
5370 lock->get_state() == LOCK_MIX_LOCK2 ||
5371 lock->get_state() == LOCK_MIX_EXCL ||
5372 lock->get_state() == LOCK_MIX_TSYN) {
5373 lock->decode_locked_state(m->get_data());
5374 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5375 // delay calling scatter_writebehind().
5376 lock->clear_flushed();
5377 }
5378
5379 if (lock->is_gathering()) {
5380 dout(7) << "handle_file_lock " << *in << " from " << from
5381 << ", still gathering " << lock->get_gather_set() << dendl;
5382 } else {
5383 dout(7) << "handle_file_lock " << *in << " from " << from
5384 << ", last one" << dendl;
5385 eval_gather(lock);
5386 }
5387 break;
5388
5389 case LOCK_AC_SYNCACK:
5390 ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
5391 ceph_assert(lock->is_gathering(from));
5392 lock->remove_gather(from);
5393
5394 lock->decode_locked_state(m->get_data());
5395
5396 if (lock->is_gathering()) {
5397 dout(7) << "handle_file_lock " << *in << " from " << from
5398 << ", still gathering " << lock->get_gather_set() << dendl;
5399 } else {
5400 dout(7) << "handle_file_lock " << *in << " from " << from
5401 << ", last one" << dendl;
5402 eval_gather(lock);
5403 }
5404 break;
5405
5406 case LOCK_AC_MIXACK:
5407 ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
5408 ceph_assert(lock->is_gathering(from));
5409 lock->remove_gather(from);
5410
5411 if (lock->is_gathering()) {
5412 dout(7) << "handle_file_lock " << *in << " from " << from
5413 << ", still gathering " << lock->get_gather_set() << dendl;
5414 } else {
5415 dout(7) << "handle_file_lock " << *in << " from " << from
5416 << ", last one" << dendl;
5417 eval_gather(lock);
5418 }
5419 break;
5420
5421
5422 // requests....
5423 case LOCK_AC_REQSCATTER:
5424 if (lock->is_stable()) {
5425 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5426 * because the replica should be holding an auth_pin if they're
5427 * doing this (and thus, we are freezing, not frozen, and indefinite
5428 * starvation isn't an issue).
5429 */
5430 dout(7) << "handle_file_lock got scatter request on " << *lock
5431 << " on " << *lock->get_parent() << dendl;
5432 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5433 scatter_mix(lock);
5434 } else {
5435 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5436 << " on " << *lock->get_parent() << dendl;
5437 lock->set_scatter_wanted();
5438 }
5439 break;
5440
5441 case LOCK_AC_REQUNSCATTER:
5442 if (lock->is_stable()) {
5443 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5444 * because the replica should be holding an auth_pin if they're
5445 * doing this (and thus, we are freezing, not frozen, and indefinite
5446 * starvation isn't an issue).
5447 */
5448 dout(7) << "handle_file_lock got unscatter request on " << *lock
5449 << " on " << *lock->get_parent() << dendl;
5450 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5451 simple_lock(lock); // FIXME tempsync?
5452 } else {
5453 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5454 << " on " << *lock->get_parent() << dendl;
5455 lock->set_unscatter_wanted();
5456 }
5457 break;
5458
5459 case LOCK_AC_REQRDLOCK:
5460 handle_reqrdlock(lock, m);
5461 break;
5462
5463 case LOCK_AC_NUDGE:
5464 if (!lock->get_parent()->is_auth()) {
5465 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5466 << " on " << *lock->get_parent() << dendl;
5467 } else if (!lock->get_parent()->is_replicated()) {
5468 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5469 << " on " << *lock->get_parent() << dendl;
5470 } else {
5471 dout(7) << "handle_file_lock trying nudge on " << *lock
5472 << " on " << *lock->get_parent() << dendl;
5473 scatter_nudge(lock, 0, true);
5474 mds->mdlog->flush();
5475 }
5476 break;
5477
5478 default:
5479 ceph_abort();
5480 }
5481 }