]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Locker.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / mds / Locker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
7c673cae
FG
16#include "CDir.h"
17#include "CDentry.h"
9f95a23c
TL
18#include "CInode.h"
19#include "common/config.h"
20#include "events/EOpen.h"
21#include "events/EUpdate.h"
22#include "Locker.h"
23#include "MDBalancer.h"
24#include "MDCache.h"
7c673cae 25#include "MDLog.h"
9f95a23c 26#include "MDSRank.h"
7c673cae 27#include "MDSMap.h"
7c673cae 28#include "messages/MInodeFileCaps.h"
7c673cae 29#include "messages/MMDSSlaveRequest.h"
9f95a23c
TL
30#include "Migrator.h"
31#include "msg/Messenger.h"
32#include "osdc/Objecter.h"
7c673cae
FG
33
34#define dout_subsys ceph_subsys_mds
35#undef dout_prefix
36#define dout_context g_ceph_context
37#define dout_prefix _prefix(_dout, mds)
38static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
39 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
40}
41
42
11fdf7f2 43class LockerContext : public MDSContext {
7c673cae
FG
44protected:
45 Locker *locker;
46 MDSRank *get_mds() override
47 {
48 return locker->mds;
49 }
50
51public:
52 explicit LockerContext(Locker *locker_) : locker(locker_) {
11fdf7f2 53 ceph_assert(locker != NULL);
7c673cae
FG
54 }
55};
56
57class LockerLogContext : public MDSLogContextBase {
58protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65public:
66 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
11fdf7f2 67 ceph_assert(locker != NULL);
7c673cae
FG
68 }
69};
70
11fdf7f2 71Locker::Locker(MDSRank *m, MDCache *c) :
9f95a23c 72 need_snapflush_inodes(member_offset(CInode, item_caps)), mds(m), mdcache(c) {}
11fdf7f2
TL
73
74
9f95a23c 75void Locker::dispatch(const cref_t<Message> &m)
7c673cae
FG
76{
77
78 switch (m->get_type()) {
7c673cae
FG
79 // inter-mds locking
80 case MSG_MDS_LOCK:
9f95a23c 81 handle_lock(ref_cast<MLock>(m));
7c673cae
FG
82 break;
83 // inter-mds caps
84 case MSG_MDS_INODEFILECAPS:
9f95a23c 85 handle_inode_file_caps(ref_cast<MInodeFileCaps>(m));
7c673cae 86 break;
7c673cae
FG
87 // client sync
88 case CEPH_MSG_CLIENT_CAPS:
9f95a23c 89 handle_client_caps(ref_cast<MClientCaps>(m));
7c673cae
FG
90 break;
91 case CEPH_MSG_CLIENT_CAPRELEASE:
9f95a23c 92 handle_client_cap_release(ref_cast<MClientCapRelease>(m));
7c673cae
FG
93 break;
94 case CEPH_MSG_CLIENT_LEASE:
9f95a23c 95 handle_client_lease(ref_cast<MClientLease>(m));
7c673cae 96 break;
7c673cae
FG
97 default:
98 derr << "locker unknown message " << m->get_type() << dendl;
11fdf7f2 99 ceph_abort_msg("locker unknown message");
7c673cae
FG
100 }
101}
102
103void Locker::tick()
104{
105 scatter_tick();
106 caps_tick();
107}
108
109/*
110 * locks vs rejoin
111 *
112 *
113 *
114 */
115
116void Locker::send_lock_message(SimpleLock *lock, int msg)
117{
181888fb 118 for (const auto &it : lock->get_parent()->get_replicas()) {
7c673cae 119 if (mds->is_cluster_degraded() &&
181888fb 120 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
7c673cae 121 continue;
9f95a23c 122 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
181888fb 123 mds->send_message_mds(m, it.first);
7c673cae
FG
124 }
125}
126
127void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
128{
181888fb 129 for (const auto &it : lock->get_parent()->get_replicas()) {
7c673cae 130 if (mds->is_cluster_degraded() &&
181888fb 131 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
7c673cae 132 continue;
9f95a23c 133 auto m = make_message<MLock>(lock, msg, mds->get_nodeid());
7c673cae 134 m->set_data(data);
181888fb 135 mds->send_message_mds(m, it.first);
7c673cae
FG
136 }
137}
138
9f95a23c
TL
139bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
140 int n, bool want_layout)
7c673cae 141{
9f95a23c 142 dout(10) << __func__ << " " << *mdr << " " << *in << dendl;
7c673cae 143 // rdlock ancestor snaps
9f95a23c
TL
144 inodeno_t root;
145 int depth = -1;
146 bool found_locked = false;
147 bool found_layout = false;
148
149 if (want_layout)
150 ceph_assert(n == 0);
151
152 client_t client = mdr->get_client();
7c673cae 153
7c673cae 154 CInode *t = in;
9f95a23c
TL
155 while (true) {
156 ++depth;
157 if (!found_locked && mdr->is_rdlocked(&t->snaplock))
158 found_locked = true;
159
160 if (!found_locked) {
161 if (!t->snaplock.can_rdlock(client)) {
162 t->snaplock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
163 goto failed;
164 }
165 t->snaplock.get_rdlock();
166 mdr->locks.emplace(&t->snaplock, MutationImpl::LockOp::RDLOCK);
167 dout(20) << " got rdlock on " << t->snaplock << " " << *t << dendl;
168 }
169 if (want_layout && !found_layout) {
170 if (!mdr->is_rdlocked(&t->policylock)) {
171 if (!t->policylock.can_rdlock(client)) {
172 t->policylock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
173 goto failed;
174 }
175 t->policylock.get_rdlock();
176 mdr->locks.emplace(&t->policylock, MutationImpl::LockOp::RDLOCK);
177 dout(20) << " got rdlock on " << t->policylock << " " << *t << dendl;
178 }
7c673cae 179 if (t->get_projected_inode()->has_layout()) {
9f95a23c
TL
180 mdr->dir_layout = t->get_projected_inode()->layout;
181 found_layout = true;
7c673cae
FG
182 }
183 }
9f95a23c
TL
184 CDentry* pdn = t->get_projected_parent_dn();
185 if (!pdn) {
186 root = t->ino();
187 break;
188 }
189 t = pdn->get_dir()->get_inode();
7c673cae 190 }
9f95a23c
TL
191
192 mdr->dir_root[n] = root;
193 mdr->dir_depth[n] = depth;
194 return true;
195
196failed:
197 dout(10) << __func__ << " failed" << dendl;
198
199 drop_locks(mdr.get(), nullptr);
200 mdr->drop_local_auth_pins();
201 return false;
7c673cae
FG
202}
203
204struct MarkEventOnDestruct {
205 MDRequestRef& mdr;
11fdf7f2 206 std::string_view message;
7c673cae 207 bool mark_event;
11fdf7f2
TL
208 MarkEventOnDestruct(MDRequestRef& _mdr, std::string_view _message) :
209 mdr(_mdr),
210 message(_message),
211 mark_event(true) {}
7c673cae
FG
212 ~MarkEventOnDestruct() {
213 if (mark_event)
214 mdr->mark_event(message);
215 }
216};
217
218/* If this function returns false, the mdr has been placed
219 * on the appropriate wait list */
220bool Locker::acquire_locks(MDRequestRef& mdr,
11fdf7f2 221 MutationImpl::LockOpVec& lov,
7c673cae 222 CInode *auth_pin_freeze,
9f95a23c 223 bool auth_pin_nonblocking)
7c673cae 224{
7c673cae
FG
225 dout(10) << "acquire_locks " << *mdr << dendl;
226
227 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
228
229 client_t client = mdr->get_client();
230
11fdf7f2 231 set<MDSCacheObject*> mustpin; // items to authpin
9f95a23c
TL
232 if (auth_pin_freeze)
233 mustpin.insert(auth_pin_freeze);
7c673cae
FG
234
235 // xlocks
9f95a23c 236 for (size_t i = 0; i < lov.size(); ++i) {
11fdf7f2
TL
237 auto& p = lov[i];
238 SimpleLock *lock = p.lock;
239 MDSCacheObject *object = lock->get_parent();
240
241 if (p.is_xlock()) {
242 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
243 lock->get_type() == CEPH_LOCK_IPOLICY) &&
244 mds->is_cluster_degraded() &&
245 mdr->is_master() &&
246 !mdr->is_queued_for_replay()) {
247 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
248 // get processed in proper order.
249 bool wait = false;
250 if (object->is_auth()) {
9f95a23c 251 if (!mdr->is_xlocked(lock)) {
11fdf7f2
TL
252 set<mds_rank_t> ls;
253 object->list_replicas(ls);
254 for (auto m : ls) {
255 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
256 wait = true;
257 break;
258 }
b32b8144
FG
259 }
260 }
11fdf7f2
TL
261 } else {
262 // if the lock is the latest locked one, it's possible that slave mds got the lock
263 // while there are recovering mds.
9f95a23c 264 if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock))
11fdf7f2
TL
265 wait = true;
266 }
267 if (wait) {
268 dout(10) << " must xlock " << *lock << " " << *object
269 << ", waiting for cluster recovered" << dendl;
270 mds->locker->drop_locks(mdr.get(), NULL);
271 mdr->drop_local_auth_pins();
272 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
273 return false;
b32b8144 274 }
b32b8144 275 }
7c673cae 276
11fdf7f2 277 dout(20) << " must xlock " << *lock << " " << *object << dendl;
7c673cae 278
11fdf7f2 279 mustpin.insert(object);
7c673cae 280
11fdf7f2
TL
281 // augment xlock with a versionlock?
282 if (lock->get_type() == CEPH_LOCK_DN) {
283 CDentry *dn = static_cast<CDentry*>(object);
284 if (!dn->is_auth())
285 continue;
286 if (mdr->is_master()) {
287 // master. wrlock versionlock so we can pipeline dentry updates to journal.
9f95a23c 288 lov.add_wrlock(&dn->versionlock, i + 1);
11fdf7f2
TL
289 } else {
290 // slave. exclusively lock the dentry version (i.e. block other journal updates).
291 // this makes rollback safe.
9f95a23c 292 lov.add_xlock(&dn->versionlock, i + 1);
11fdf7f2 293 }
7c673cae 294 }
9f95a23c 295 if (lock->get_type() >= CEPH_LOCK_IFIRST && lock->get_type() != CEPH_LOCK_IVERSION) {
11fdf7f2
TL
296 // inode version lock?
297 CInode *in = static_cast<CInode*>(object);
298 if (!in->is_auth())
299 continue;
300 if (mdr->is_master()) {
301 // master. wrlock versionlock so we can pipeline inode updates to journal.
9f95a23c 302 lov.add_wrlock(&in->versionlock, i + 1);
11fdf7f2
TL
303 } else {
304 // slave. exclusively lock the inode version (i.e. block other journal updates).
305 // this makes rollback safe.
9f95a23c 306 lov.add_xlock(&in->versionlock, i + 1);
11fdf7f2 307 }
7c673cae 308 }
11fdf7f2
TL
309 } else if (p.is_wrlock()) {
310 dout(20) << " must wrlock " << *lock << " " << *object << dendl;
9f95a23c 311 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
11fdf7f2
TL
312 if (object->is_auth()) {
313 mustpin.insert(object);
314 } else if (!object->is_auth() &&
9f95a23c 315 !lock->can_wrlock(_client) && // we might have to request a scatter
11fdf7f2
TL
316 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
317 dout(15) << " will also auth_pin " << *object
318 << " in case we need to request a scatter" << dendl;
319 mustpin.insert(object);
320 }
321 } else if (p.is_remote_wrlock()) {
322 dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
323 << *lock << " " << *object << dendl;
7c673cae 324 mustpin.insert(object);
11fdf7f2
TL
325 } else if (p.is_rdlock()) {
326
327 dout(20) << " must rdlock " << *lock << " " << *object << dendl;
328 if (object->is_auth()) {
329 mustpin.insert(object);
330 } else if (!object->is_auth() &&
331 !lock->can_rdlock(client)) { // we might have to request an rdlock
332 dout(15) << " will also auth_pin " << *object
333 << " in case we need to request a rdlock" << dendl;
334 mustpin.insert(object);
335 }
336 } else {
337 ceph_assert(0 == "locker unknown lock operation");
7c673cae
FG
338 }
339 }
340
11fdf7f2 341 lov.sort_and_merge();
7c673cae
FG
342
343 // AUTH PINS
344 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
345
346 // can i auth pin them all now?
347 marker.message = "failed to authpin local pins";
11fdf7f2
TL
348 for (const auto &p : mustpin) {
349 MDSCacheObject *object = p;
7c673cae
FG
350
351 dout(10) << " must authpin " << *object << dendl;
352
353 if (mdr->is_auth_pinned(object)) {
354 if (object != (MDSCacheObject*)auth_pin_freeze)
355 continue;
356 if (mdr->more()->is_remote_frozen_authpin) {
357 if (mdr->more()->rename_inode == auth_pin_freeze)
358 continue;
359 // unfreeze auth pin for the wrong inode
360 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
361 }
362 }
363
364 if (!object->is_auth()) {
9f95a23c
TL
365 if (mdr->lock_cache) { // debug
366 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
367 CDentry *dn = mdr->dn[0].back();
368 ceph_assert(dn->get_projected_linkage()->is_remote());
369 }
370
7c673cae
FG
371 if (object->is_ambiguous_auth()) {
372 // wait
373 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
9f95a23c
TL
374 mdr->disable_lock_cache();
375 drop_locks(mdr.get());
7c673cae 376 mdr->drop_local_auth_pins();
9f95a23c
TL
377 marker.message = "waiting for single auth, object is being migrated";
378 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
7c673cae
FG
379 return false;
380 }
381 mustpin_remote[object->authority().first].insert(object);
382 continue;
383 }
91327a77
AA
384 int err = 0;
385 if (!object->can_auth_pin(&err)) {
9f95a23c
TL
386 if (mdr->lock_cache) {
387 CDir *dir;
388 if (CInode *in = dynamic_cast<CInode*>(object)) {
389 ceph_assert(!in->is_frozen_inode() && !in->is_frozen_auth_pin());
390 dir = in->get_projected_parent_dir();
391 } else if (CDentry *dn = dynamic_cast<CDentry*>(object)) {
392 dir = dn->get_dir();
393 } else {
394 ceph_assert(0 == "unknown type of lock parent");
395 }
396 if (dir->get_inode() == mdr->lock_cache->get_dir_inode()) {
397 // forcibly auth pin if there is lock cache on parent dir
398 continue;
399 }
400
401 { // debug
402 ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
403 CDentry *dn = mdr->dn[0].back();
404 ceph_assert(dn->get_projected_linkage()->is_remote());
405 }
406 }
407
7c673cae 408 // wait
9f95a23c 409 mdr->disable_lock_cache();
7c673cae
FG
410 drop_locks(mdr.get());
411 mdr->drop_local_auth_pins();
9f95a23c 412 if (auth_pin_nonblocking) {
7c673cae
FG
413 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
414 mdr->aborted = true;
415 return false;
416 }
91327a77
AA
417 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
418 marker.message = "failed to authpin, subtree is being exported";
419 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
420 marker.message = "failed to authpin, dir is being fragmented";
421 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
422 marker.message = "failed to authpin, inode is being exported";
423 }
7c673cae
FG
424 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
425 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
224ce89b 426
9f95a23c 427 if (mdr->is_any_remote_auth_pin())
224ce89b
WB
428 notify_freeze_waiter(object);
429
7c673cae
FG
430 return false;
431 }
432 }
433
434 // ok, grab local auth pins
11fdf7f2
TL
435 for (const auto& p : mustpin) {
436 MDSCacheObject *object = p;
7c673cae
FG
437 if (mdr->is_auth_pinned(object)) {
438 dout(10) << " already auth_pinned " << *object << dendl;
439 } else if (object->is_auth()) {
440 dout(10) << " auth_pinning " << *object << dendl;
441 mdr->auth_pin(object);
442 }
443 }
444
445 // request remote auth_pins
446 if (!mustpin_remote.empty()) {
447 marker.message = "requesting remote authpins";
9f95a23c
TL
448 for (const auto& p : mdr->object_states) {
449 if (p.second.remote_auth_pinned == MDS_RANK_NONE)
450 continue;
451 ceph_assert(p.second.remote_auth_pinned == p.first->authority().first);
452 auto q = mustpin_remote.find(p.second.remote_auth_pinned);
453 if (q != mustpin_remote.end())
454 q->second.insert(p.first);
7c673cae 455 }
9f95a23c
TL
456
457 for (auto& p : mustpin_remote) {
458 dout(10) << "requesting remote auth_pins from mds." << p.first << dendl;
7c673cae
FG
459
460 // wait for active auth
461 if (mds->is_cluster_degraded() &&
9f95a23c
TL
462 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) {
463 dout(10) << " mds." << p.first << " is not active" << dendl;
7c673cae 464 if (mdr->more()->waiting_on_slave.empty())
9f95a23c 465 mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr));
7c673cae
FG
466 return false;
467 }
468
9f95a23c
TL
469 auto req = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt,
470 MMDSSlaveRequest::OP_AUTHPIN);
471 for (auto& o : p.second) {
472 dout(10) << " req remote auth_pin of " << *o << dendl;
7c673cae 473 MDSCacheObjectInfo info;
9f95a23c 474 o->set_object_info(info);
7c673cae 475 req->get_authpins().push_back(info);
9f95a23c
TL
476 if (o == auth_pin_freeze)
477 o->set_object_info(req->get_authpin_freeze());
478 mdr->pin(o);
7c673cae 479 }
9f95a23c
TL
480 if (auth_pin_nonblocking)
481 req->mark_nonblocking();
482 else if (!mdr->locks.empty())
483 req->mark_notify_blocking();
484
485 mds->send_message_mds(req, p.first);
7c673cae
FG
486
487 // put in waiting list
9f95a23c
TL
488 auto ret = mdr->more()->waiting_on_slave.insert(p.first);
489 ceph_assert(ret.second);
7c673cae
FG
490 }
491 return false;
492 }
493
494 // caps i'll need to issue
495 set<CInode*> issue_set;
496 bool result = false;
497
498 // acquire locks.
499 // make sure they match currently acquired locks.
11fdf7f2 500 for (const auto& p : lov) {
9f95a23c
TL
501 auto lock = p.lock;
502 if (p.is_xlock()) {
503 if (mdr->is_xlocked(lock)) {
504 dout(10) << " already xlocked " << *lock << " " << *lock->get_parent() << dendl;
7c673cae
FG
505 continue;
506 }
9f95a23c
TL
507 if (mdr->locking && lock != mdr->locking)
508 cancel_locking(mdr.get(), &issue_set);
509 if (!xlock_start(lock, mdr)) {
510 marker.message = "failed to xlock, waiting";
511 goto out;
7c673cae 512 }
9f95a23c
TL
513 dout(10) << " got xlock on " << *lock << " " << *lock->get_parent() << dendl;
514 } else if (p.is_wrlock() || p.is_remote_wrlock()) {
515 auto it = mdr->locks.find(lock);
516 if (p.is_remote_wrlock()) {
517 if (it != mdr->locks.end() && it->is_remote_wrlock()) {
518 dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl;
519 } else {
520 if (mdr->locking && lock != mdr->locking)
521 cancel_locking(mdr.get(), &issue_set);
522 marker.message = "waiting for remote wrlocks";
523 remote_wrlock_start(lock, p.wrlock_target, mdr);
524 goto out;
525 }
526 }
527 if (p.is_wrlock()) {
528 if (it != mdr->locks.end() && it->is_wrlock()) {
529 dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl;
7c673cae
FG
530 continue;
531 }
9f95a23c
TL
532 client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
533 if (p.is_remote_wrlock()) {
534 // nowait if we have already gotten remote wrlock
535 if (!wrlock_try(lock, mdr, _client)) {
536 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
537 // can't take the wrlock because the scatter lock is gathering. need to
538 // release the remote wrlock, so that the gathering process can finish.
539 ceph_assert(it != mdr->locks.end());
540 remote_wrlock_finish(it, mdr.get());
541 remote_wrlock_start(lock, p.wrlock_target, mdr);
542 goto out;
543 }
544 } else {
545 if (!wrlock_start(p, mdr)) {
546 ceph_assert(!p.is_remote_wrlock());
547 marker.message = "failed to wrlock, waiting";
548 goto out;
549 }
11fdf7f2 550 }
9f95a23c 551 dout(10) << " got wrlock on " << *lock << " " << *lock->get_parent() << dendl;
7c673cae 552 }
9f95a23c
TL
553 } else {
554 if (mdr->is_rdlocked(lock)) {
555 dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl;
556 continue;
7c673cae 557 }
7c673cae 558
11fdf7f2 559 ceph_assert(mdr->is_master());
9f95a23c 560 if (lock->needs_recover()) {
b32b8144
FG
561 if (mds->is_cluster_degraded()) {
562 if (!mdr->is_queued_for_replay()) {
563 // see comments in SimpleLock::set_state_rejoin() and
564 // ScatterLock::encode_state_for_rejoin()
565 drop_locks(mdr.get());
566 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
9f95a23c 567 dout(10) << " rejoin recovering " << *lock << " " << *lock->get_parent()
b32b8144
FG
568 << ", waiting for cluster recovered" << dendl;
569 marker.message = "rejoin recovering lock, waiting for cluster recovered";
570 return false;
7c673cae 571 }
b32b8144 572 } else {
9f95a23c 573 lock->clear_need_recover();
7c673cae
FG
574 }
575 }
576
9f95a23c
TL
577 if (!rdlock_start(lock, mdr)) {
578 marker.message = "failed to rdlock, waiting";
7c673cae 579 goto out;
9f95a23c
TL
580 }
581 dout(10) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
7c673cae 582 }
7c673cae
FG
583 }
584
7c673cae
FG
585 mdr->set_mds_stamp(ceph_clock_now());
586 result = true;
587 marker.message = "acquired locks";
588
589 out:
590 issue_caps_set(issue_set);
591 return result;
592}
593
224ce89b
WB
594void Locker::notify_freeze_waiter(MDSCacheObject *o)
595{
596 CDir *dir = NULL;
597 if (CInode *in = dynamic_cast<CInode*>(o)) {
598 if (!in->is_root())
599 dir = in->get_parent_dir();
600 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
601 dir = dn->get_dir();
602 } else {
603 dir = dynamic_cast<CDir*>(o);
11fdf7f2 604 ceph_assert(dir);
224ce89b
WB
605 }
606 if (dir) {
607 if (dir->is_freezing_dir())
608 mdcache->fragment_freeze_inc_num_waiters(dir);
609 if (dir->is_freezing_tree()) {
610 while (!dir->is_freezing_tree_root())
611 dir = dir->get_parent_dir();
612 mdcache->migrator->export_freeze_inc_num_waiters(dir);
613 }
614 }
615}
7c673cae
FG
616
617void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
618{
11fdf7f2
TL
619 for (const auto &p : mut->locks) {
620 if (!p.is_xlock())
621 continue;
622 MDSCacheObject *obj = p.lock->get_parent();
623 ceph_assert(obj->is_auth());
7c673cae 624 if (skip_dentry &&
11fdf7f2 625 (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
7c673cae 626 continue;
11fdf7f2
TL
627 dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
628 p.lock->set_xlock_done();
7c673cae
FG
629 }
630}
631
11fdf7f2
TL
632void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
633 bool drop_rdlocks)
7c673cae
FG
634{
635 set<mds_rank_t> slaves;
636
11fdf7f2
TL
637 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
638 SimpleLock *lock = it->lock;
639 MDSCacheObject *obj = lock->get_parent();
640
641 if (it->is_xlock()) {
642 if (obj->is_auth()) {
643 bool ni = false;
644 xlock_finish(it++, mut, &ni);
645 if (ni)
646 pneed_issue->insert(static_cast<CInode*>(obj));
647 } else {
648 ceph_assert(lock->get_sm()->can_remote_xlock);
649 slaves.insert(obj->authority().first);
650 lock->put_xlock();
651 mut->locks.erase(it++);
652 }
653 } else if (it->is_wrlock() || it->is_remote_wrlock()) {
654 if (it->is_remote_wrlock()) {
655 slaves.insert(it->wrlock_target);
656 it->clear_remote_wrlock();
657 }
658 if (it->is_wrlock()) {
659 bool ni = false;
660 wrlock_finish(it++, mut, &ni);
661 if (ni)
662 pneed_issue->insert(static_cast<CInode*>(obj));
663 } else {
664 mut->locks.erase(it++);
665 }
666 } else if (drop_rdlocks && it->is_rdlock()) {
667 bool ni = false;
668 rdlock_finish(it++, mut, &ni);
669 if (ni)
670 pneed_issue->insert(static_cast<CInode*>(obj));
671 } else {
672 ++it;
7c673cae 673 }
7c673cae
FG
674 }
675
9f95a23c
TL
676 if (drop_rdlocks) {
677 if (mut->lock_cache) {
678 put_lock_cache(mut->lock_cache);
679 mut->lock_cache = nullptr;
680 }
681 }
682
7c673cae
FG
683 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
684 if (!mds->is_cluster_degraded() ||
685 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
686 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
9f95a23c
TL
687 auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt,
688 MMDSSlaveRequest::OP_DROPLOCKS);
7c673cae
FG
689 mds->send_message_mds(slavereq, *p);
690 }
691 }
692}
693
694void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
695{
696 SimpleLock *lock = mut->locking;
11fdf7f2 697 ceph_assert(lock);
7c673cae
FG
698 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
699
700 if (lock->get_parent()->is_auth()) {
701 bool need_issue = false;
702 if (lock->get_state() == LOCK_PREXLOCK) {
703 _finish_xlock(lock, -1, &need_issue);
a8e16298 704 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
7c673cae
FG
705 lock->set_state(LOCK_XLOCKDONE);
706 eval_gather(lock, true, &need_issue);
707 }
708 if (need_issue)
709 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
710 }
711 mut->finish_locking(lock);
712}
713
714void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
715{
716 // leftover locks
717 set<CInode*> my_need_issue;
718 if (!pneed_issue)
719 pneed_issue = &my_need_issue;
720
721 if (mut->locking)
722 cancel_locking(mut, pneed_issue);
11fdf7f2 723 _drop_locks(mut, pneed_issue, true);
7c673cae
FG
724
725 if (pneed_issue == &my_need_issue)
726 issue_caps_set(*pneed_issue);
9f95a23c 727 mut->locking_state = 0;
7c673cae
FG
728}
729
730void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
731{
732 set<CInode*> my_need_issue;
733 if (!pneed_issue)
734 pneed_issue = &my_need_issue;
735
11fdf7f2 736 _drop_locks(mut, pneed_issue, false);
7c673cae
FG
737
738 if (pneed_issue == &my_need_issue)
739 issue_caps_set(*pneed_issue);
740}
741
b32b8144 742void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
7c673cae 743{
b32b8144 744 set<CInode*> need_issue;
7c673cae 745
11fdf7f2
TL
746 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
747 if (!it->is_rdlock()) {
748 ++it;
749 continue;
750 }
751 SimpleLock *lock = it->lock;
b32b8144
FG
752 // make later mksnap/setlayout (at other mds) wait for this unsafe request
753 if (lock->get_type() == CEPH_LOCK_ISNAP ||
11fdf7f2
TL
754 lock->get_type() == CEPH_LOCK_IPOLICY) {
755 ++it;
b32b8144 756 continue;
11fdf7f2 757 }
b32b8144 758 bool ni = false;
11fdf7f2 759 rdlock_finish(it++, mut, &ni);
b32b8144
FG
760 if (ni)
761 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
762 }
7c673cae 763
b32b8144 764 issue_caps_set(need_issue);
7c673cae
FG
765}
766
a8e16298
TL
767void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
768{
769 set<CInode*> need_issue;
770
771 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
11fdf7f2 772 SimpleLock *lock = it->lock;
a8e16298 773 if (lock->get_type() == CEPH_LOCK_IDFT) {
11fdf7f2 774 ++it;
a8e16298
TL
775 continue;
776 }
777 bool ni = false;
11fdf7f2 778 wrlock_finish(it++, mut, &ni);
a8e16298
TL
779 if (ni)
780 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
781 }
782 issue_caps_set(need_issue);
783}
784
9f95a23c
TL
785class C_MDL_DropCache : public LockerContext {
786 MDLockCache *lock_cache;
787public:
788 C_MDL_DropCache(Locker *l, MDLockCache *lc) :
789 LockerContext(l), lock_cache(lc) { }
790 void finish(int r) override {
791 locker->drop_locks(lock_cache);
792 lock_cache->cleanup();
793 delete lock_cache;
794 }
795};
796
797void Locker::put_lock_cache(MDLockCache* lock_cache)
798{
799 ceph_assert(lock_cache->ref > 0);
800 if (--lock_cache->ref > 0)
801 return;
802
803 ceph_assert(lock_cache->invalidating);
804
805 CInode *diri = lock_cache->get_dir_inode();
806 for (auto dir : lock_cache->auth_pinned_dirfrags) {
807 if (dir->get_inode() != diri)
808 continue;
809 dir->enable_frozen_inode();
810 }
811
812 mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
813}
814
815int Locker::get_cap_bit_for_lock_cache(int op)
816{
817 switch(op) {
818 case CEPH_MDS_OP_CREATE:
819 return CEPH_CAP_DIR_CREATE;
820 case CEPH_MDS_OP_UNLINK:
821 return CEPH_CAP_DIR_UNLINK;
822 default:
823 ceph_assert(0 == "unsupported operation");
824 return 0;
825 }
826}
827
828void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
829{
830 ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
831 if (lock_cache->invalidating) {
832 ceph_assert(!lock_cache->client_cap);
833 } else {
834 lock_cache->invalidating = true;
835 lock_cache->detach_all();
836 }
837
838 Capability *cap = lock_cache->client_cap;
839 if (cap) {
840 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
841 cap->clear_lock_cache_allowed(cap_bit);
842 if (cap->issued() & cap_bit)
843 issue_caps(lock_cache->get_dir_inode(), cap);
844 else
845 cap = nullptr;
846 }
847
848 if (!cap) {
849 lock_cache->item_cap_lock_cache.remove_myself();
850 put_lock_cache(lock_cache);
851 }
852}
853
854void Locker::eval_lock_caches(Capability *cap)
855{
856 for (auto p = cap->lock_caches.begin(); !p.end(); ) {
857 MDLockCache *lock_cache = *p;
858 ++p;
859 if (!lock_cache->invalidating)
860 continue;
861 int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
862 if (!(cap->issued() & cap_bit)) {
863 lock_cache->item_cap_lock_cache.remove_myself();
864 put_lock_cache(lock_cache);
865 }
866 }
867}
868
869// ask lock caches to release auth pins
870void Locker::invalidate_lock_caches(CDir *dir)
871{
872 dout(10) << "invalidate_lock_caches on " << *dir << dendl;
873 auto &lock_caches = dir->lock_caches_with_auth_pins;
874 while (!lock_caches.empty()) {
875 invalidate_lock_cache(lock_caches.front()->parent);
876 }
877}
878
879// ask lock caches to release locks
880void Locker::invalidate_lock_caches(SimpleLock *lock)
881{
882 dout(10) << "invalidate_lock_caches " << *lock << " on " << *lock->get_parent() << dendl;
883 while (lock->is_cached()) {
884 invalidate_lock_cache(lock->get_first_cache());
885 }
886}
887
888void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout)
889{
890 if (mdr->lock_cache)
891 return;
892
893 client_t client = mdr->get_client();
894 int opcode = mdr->client_request->get_op();
895 dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
896
897 if (!diri->is_auth()) {
898 dout(10) << " dir inode is not auth, noop" << dendl;
899 return;
900 }
901
902 if (mdr->has_more() && !mdr->more()->slaves.empty()) {
903 dout(10) << " there are slaves requests for " << *mdr << ", noop" << dendl;
904 return;
905 }
906
907 Capability *cap = diri->get_client_cap(client);
908 if (!cap) {
909 dout(10) << " there is no cap for client." << client << ", noop" << dendl;
910 return;
911 }
912
913 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
914 if ((*p)->opcode == opcode) {
915 dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode) << ", noop" << dendl;
916 return;
917 }
918 }
919
920 set<MDSCacheObject*> ancestors;
921 for (CInode *in = diri; ; ) {
922 CDentry *pdn = in->get_projected_parent_dn();
923 if (!pdn)
924 break;
925 // ancestors.insert(pdn);
926 in = pdn->get_dir()->get_inode();
927 ancestors.insert(in);
928 }
929
930 for (auto& p : mdr->object_states) {
931 if (p.first != diri && !ancestors.count(p.first))
932 continue;
933 auto& stat = p.second;
934 if (stat.auth_pinned) {
935 if (!p.first->can_auth_pin()) {
936 dout(10) << " can't auth_pin(freezing?) lock parent " << *p.first << ", noop" << dendl;
937 return;
938 }
939 if (CInode *in = dynamic_cast<CInode*>(p.first); in->is_parent_projected()) {
940 CDir *dir = in->get_projected_parent_dir();
941 if (!dir->can_auth_pin()) {
942 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
943 return;
944 }
945 }
946 }
947 }
948
949 std::vector<CDir*> dfv;
950 dfv.reserve(diri->get_num_dirfrags());
951
952 diri->get_dirfrags(dfv);
953 for (auto dir : dfv) {
954 if (!dir->is_auth() || !dir->can_auth_pin()) {
955 dout(10) << " can't auth_pin(!auth|freezing?) dirfrag " << *dir << ", noop" << dendl;
956 return;
957 }
958 if (dir->is_any_freezing_or_frozen_inode()) {
959 dout(10) << " there is freezing/frozen inode in " << *dir << ", noop" << dendl;
960 return;
961 }
962 }
963
964 for (auto& p : mdr->locks) {
965 MDSCacheObject *obj = p.lock->get_parent();
966 if (obj != diri && !ancestors.count(obj))
967 continue;
968 if (!p.lock->is_stable()) {
969 dout(10) << " unstable " << *p.lock << " on " << *obj << ", noop" << dendl;
970 return;
971 }
972 }
973
974 auto lock_cache = new MDLockCache(cap, opcode);
975 if (dir_layout)
976 lock_cache->set_dir_layout(*dir_layout);
977 cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
978
979 for (auto dir : dfv) {
980 // prevent subtree migration
981 lock_cache->auth_pin(dir);
982 // prevent frozen inode
983 dir->disable_frozen_inode();
984 }
985
986 for (auto& p : mdr->object_states) {
987 if (p.first != diri && !ancestors.count(p.first))
988 continue;
989 auto& stat = p.second;
990 if (stat.auth_pinned)
991 lock_cache->auth_pin(p.first);
992 else
993 lock_cache->pin(p.first);
994
995 if (CInode *in = dynamic_cast<CInode*>(p.first)) {
996 CDentry *pdn = in->get_projected_parent_dn();
997 if (pdn)
998 dfv.push_back(pdn->get_dir());
999 } else if (CDentry *dn = dynamic_cast<CDentry*>(p.first)) {
1000 dfv.push_back(dn->get_dir());
1001 } else {
1002 ceph_assert(0 == "unknown type of lock parent");
1003 }
1004 }
1005 lock_cache->attach_dirfrags(std::move(dfv));
1006
1007 for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) {
1008 MDSCacheObject *obj = it->lock->get_parent();
1009 if (obj != diri && !ancestors.count(obj)) {
1010 ++it;
1011 continue;
1012 }
1013 unsigned lock_flag = 0;
1014 if (it->is_wrlock()) {
1015 // skip wrlocks that were added by MDCache::predirty_journal_parent()
1016 if (obj == diri)
1017 lock_flag = MutationImpl::LockOp::WRLOCK;
1018 } else {
1019 ceph_assert(it->is_rdlock());
1020 lock_flag = MutationImpl::LockOp::RDLOCK;
1021 }
1022 if (lock_flag) {
1023 lock_cache->emplace_lock(it->lock, lock_flag);
1024 mdr->locks.erase(it++);
1025 } else {
1026 ++it;
1027 }
1028 }
1029 lock_cache->attach_locks();
1030
1031 lock_cache->ref++;
1032 mdr->lock_cache = lock_cache;
1033}
1034
1035bool Locker::find_and_attach_lock_cache(MDRequestRef& mdr, CInode *diri)
1036{
1037 if (mdr->lock_cache)
1038 return true;
1039
1040 Capability *cap = diri->get_client_cap(mdr->get_client());
1041 if (!cap)
1042 return false;
1043
1044 int opcode = mdr->client_request->get_op();
1045 for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
1046 MDLockCache *lock_cache = *p;
1047 if (lock_cache->opcode == opcode) {
1048 dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl;
1049 mdr->lock_cache = lock_cache;
1050 mdr->lock_cache->ref++;
1051 return true;
1052 }
1053 }
1054 return false;
1055}
1056
7c673cae
FG
1057// generics
1058
11fdf7f2 1059void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSContext::vec *pfinishers)
7c673cae
FG
1060{
1061 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
11fdf7f2 1062 ceph_assert(!lock->is_stable());
7c673cae
FG
1063
1064 int next = lock->get_next_state();
1065
1066 CInode *in = 0;
1067 bool caps = lock->get_cap_shift();
1068 if (lock->get_type() != CEPH_LOCK_DN)
1069 in = static_cast<CInode *>(lock->get_parent());
1070
1071 bool need_issue = false;
1072
1073 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
11fdf7f2 1074 ceph_assert(!caps || in != NULL);
7c673cae
FG
1075 if (caps && in->is_head()) {
1076 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
1077 lock->get_cap_shift(), lock->get_cap_mask());
1078 dout(10) << " next state is " << lock->get_state_name(next)
1079 << " issued/allows loner " << gcap_string(loner_issued)
1080 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
1081 << " xlocker " << gcap_string(xlocker_issued)
1082 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
1083 << " other " << gcap_string(other_issued)
1084 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
1085 << dendl;
1086
1087 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
1088 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
1089 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
1090 need_issue = true;
1091 }
1092
1093#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
1094 bool auth = lock->get_parent()->is_auth();
1095 if (!lock->is_gathering() &&
1096 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
1097 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
1098 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
1099 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
1100 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
1101 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
1102 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
1103 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
1104 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
1105 lock->get_state() != LOCK_MIX_SYNC2
1106 ) {
1107 dout(7) << "eval_gather finished gather on " << *lock
1108 << " on " << *lock->get_parent() << dendl;
1109
1110 if (lock->get_sm() == &sm_filelock) {
11fdf7f2 1111 ceph_assert(in);
7c673cae
FG
1112 if (in->state_test(CInode::STATE_RECOVERING)) {
1113 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
1114 return;
1115 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
1116 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
1117 mds->mdcache->queue_file_recover(in);
1118 mds->mdcache->do_file_recover();
1119 return;
1120 }
1121 }
1122
1123 if (!lock->get_parent()->is_auth()) {
1124 // replica: tell auth
1125 mds_rank_t auth = lock->get_parent()->authority().first;
1126
1127 if (lock->get_parent()->is_rejoining() &&
1128 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
1129 dout(7) << "eval_gather finished gather, but still rejoining "
1130 << *lock->get_parent() << dendl;
1131 return;
1132 }
1133
1134 if (!mds->is_cluster_degraded() ||
1135 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1136 switch (lock->get_state()) {
1137 case LOCK_SYNC_LOCK:
9f95a23c 1138 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), auth);
7c673cae
FG
1139 break;
1140
1141 case LOCK_MIX_SYNC:
1142 {
9f95a23c 1143 auto reply = make_message<MLock>(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
7c673cae
FG
1144 lock->encode_locked_state(reply->get_data());
1145 mds->send_message_mds(reply, auth);
1146 next = LOCK_MIX_SYNC2;
1147 (static_cast<ScatterLock *>(lock))->start_flush();
1148 }
1149 break;
1150
1151 case LOCK_MIX_SYNC2:
1152 (static_cast<ScatterLock *>(lock))->finish_flush();
1153 (static_cast<ScatterLock *>(lock))->clear_flushed();
1154
1155 case LOCK_SYNC_MIX2:
1156 // do nothing, we already acked
1157 break;
1158
1159 case LOCK_SYNC_MIX:
1160 {
9f95a23c 1161 auto reply = make_message<MLock>(lock, LOCK_AC_MIXACK, mds->get_nodeid());
7c673cae
FG
1162 mds->send_message_mds(reply, auth);
1163 next = LOCK_SYNC_MIX2;
1164 }
1165 break;
1166
1167 case LOCK_MIX_LOCK:
1168 {
1169 bufferlist data;
1170 lock->encode_locked_state(data);
9f95a23c 1171 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
7c673cae
FG
1172 (static_cast<ScatterLock *>(lock))->start_flush();
1173 // we'll get an AC_LOCKFLUSHED to complete
1174 }
1175 break;
1176
1177 default:
1178 ceph_abort();
1179 }
1180 }
1181 } else {
1182 // auth
1183
1184 // once the first (local) stage of mix->lock gather complete we can
1185 // gather from replicas
1186 if (lock->get_state() == LOCK_MIX_LOCK &&
1187 lock->get_parent()->is_replicated()) {
1188 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
1189 send_lock_message(lock, LOCK_AC_LOCK);
1190 lock->init_gather();
1191 lock->set_state(LOCK_MIX_LOCK2);
1192 return;
1193 }
1194
1195 if (lock->is_dirty() && !lock->is_flushed()) {
1196 scatter_writebehind(static_cast<ScatterLock *>(lock));
1197 mds->mdlog->flush();
1198 return;
1199 }
1200 lock->clear_flushed();
1201
1202 switch (lock->get_state()) {
1203 // to mixed
1204 case LOCK_TSYN_MIX:
1205 case LOCK_SYNC_MIX:
1206 case LOCK_EXCL_MIX:
b32b8144 1207 case LOCK_XSYN_MIX:
7c673cae
FG
1208 in->start_scatter(static_cast<ScatterLock *>(lock));
1209 if (lock->get_parent()->is_replicated()) {
1210 bufferlist softdata;
1211 lock->encode_locked_state(softdata);
1212 send_lock_message(lock, LOCK_AC_MIX, softdata);
1213 }
1214 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
1215 break;
1216
1217 case LOCK_XLOCK:
1218 case LOCK_XLOCKDONE:
1219 if (next != LOCK_SYNC)
1220 break;
1221 // fall-thru
1222
1223 // to sync
1224 case LOCK_EXCL_SYNC:
1225 case LOCK_LOCK_SYNC:
1226 case LOCK_MIX_SYNC:
1227 case LOCK_XSYN_SYNC:
1228 if (lock->get_parent()->is_replicated()) {
1229 bufferlist softdata;
1230 lock->encode_locked_state(softdata);
1231 send_lock_message(lock, LOCK_AC_SYNC, softdata);
1232 }
1233 break;
1234 }
1235
1236 }
1237
1238 lock->set_state(next);
1239
1240 if (lock->get_parent()->is_auth() &&
1241 lock->is_stable())
1242 lock->get_parent()->auth_unpin(lock);
1243
1244 // drop loner before doing waiters
1245 if (caps &&
1246 in->is_head() &&
1247 in->is_auth() &&
1248 in->get_wanted_loner() != in->get_loner()) {
1249 dout(10) << " trying to drop loner" << dendl;
1250 if (in->try_drop_loner()) {
1251 dout(10) << " dropped loner" << dendl;
1252 need_issue = true;
1253 }
1254 }
1255
1256 if (pfinishers)
1257 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1258 *pfinishers);
1259 else
1260 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1261
1262 if (caps && in->is_head())
1263 need_issue = true;
1264
1265 if (lock->get_parent()->is_auth() &&
1266 lock->is_stable())
1267 try_eval(lock, &need_issue);
1268 }
1269
1270 if (need_issue) {
1271 if (pneed_issue)
1272 *pneed_issue = true;
1273 else if (in->is_head())
1274 issue_caps(in);
1275 }
1276
1277}
1278
1279bool Locker::eval(CInode *in, int mask, bool caps_imported)
1280{
1281 bool need_issue = caps_imported;
11fdf7f2 1282 MDSContext::vec finishers;
7c673cae
FG
1283
1284 dout(10) << "eval " << mask << " " << *in << dendl;
1285
1286 // choose loner?
1287 if (in->is_auth() && in->is_head()) {
b32b8144
FG
1288 client_t orig_loner = in->get_loner();
1289 if (in->choose_ideal_loner()) {
1290 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1291 need_issue = true;
1292 mask = -1;
1293 } else if (in->get_wanted_loner() != in->get_loner()) {
1294 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1295 mask = -1;
1296 }
7c673cae
FG
1297 }
1298
1299 retry:
1300 if (mask & CEPH_LOCK_IFILE)
1301 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1302 if (mask & CEPH_LOCK_IAUTH)
1303 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1304 if (mask & CEPH_LOCK_ILINK)
1305 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1306 if (mask & CEPH_LOCK_IXATTR)
1307 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1308 if (mask & CEPH_LOCK_INEST)
1309 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1310 if (mask & CEPH_LOCK_IFLOCK)
1311 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1312 if (mask & CEPH_LOCK_IPOLICY)
1313 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1314
1315 // drop loner?
1316 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
7c673cae 1317 if (in->try_drop_loner()) {
7c673cae 1318 need_issue = true;
7c673cae 1319 if (in->get_wanted_loner() >= 0) {
b32b8144
FG
1320 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1321 bool ok = in->try_set_loner();
11fdf7f2 1322 ceph_assert(ok);
b32b8144
FG
1323 mask = -1;
1324 goto retry;
7c673cae
FG
1325 }
1326 }
1327 }
1328
1329 finish_contexts(g_ceph_context, finishers);
1330
1331 if (need_issue && in->is_head())
1332 issue_caps(in);
1333
1334 dout(10) << "eval done" << dendl;
1335 return need_issue;
1336}
1337
1338class C_Locker_Eval : public LockerContext {
1339 MDSCacheObject *p;
1340 int mask;
1341public:
1342 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1343 // We are used as an MDSCacheObject waiter, so should
1344 // only be invoked by someone already holding the big lock.
9f95a23c 1345 ceph_assert(ceph_mutex_is_locked_by_me(locker->mds->mds_lock));
7c673cae
FG
1346 p->get(MDSCacheObject::PIN_PTRWAITER);
1347 }
1348 void finish(int r) override {
1349 locker->try_eval(p, mask);
1350 p->put(MDSCacheObject::PIN_PTRWAITER);
1351 }
1352};
1353
1354void Locker::try_eval(MDSCacheObject *p, int mask)
1355{
1356 // unstable and ambiguous auth?
1357 if (p->is_ambiguous_auth()) {
1358 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1359 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1360 return;
1361 }
1362
1363 if (p->is_auth() && p->is_frozen()) {
1364 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1365 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1366 return;
1367 }
1368
1369 if (mask & CEPH_LOCK_DN) {
11fdf7f2 1370 ceph_assert(mask == CEPH_LOCK_DN);
7c673cae
FG
1371 bool need_issue = false; // ignore this, no caps on dentries
1372 CDentry *dn = static_cast<CDentry *>(p);
1373 eval_any(&dn->lock, &need_issue);
1374 } else {
1375 CInode *in = static_cast<CInode *>(p);
1376 eval(in, mask);
1377 }
1378}
1379
1380void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1381{
1382 MDSCacheObject *p = lock->get_parent();
1383
1384 // unstable and ambiguous auth?
1385 if (p->is_ambiguous_auth()) {
1386 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1387 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1388 return;
1389 }
1390
1391 if (!p->is_auth()) {
1392 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1393 return;
1394 }
1395
1396 if (p->is_frozen()) {
1397 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1398 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1399 return;
1400 }
1401
1402 /*
1403 * We could have a situation like:
1404 *
1405 * - mds A authpins item on mds B
1406 * - mds B starts to freeze tree containing item
1407 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1408 * - mds B lock is unstable, sets scatter_wanted
1409 * - mds B lock stabilizes, calls try_eval.
1410 *
1411 * We can defer while freezing without causing a deadlock. Honor
1412 * scatter_wanted flag here. This will never get deferred by the
1413 * checks above due to the auth_pin held by the master.
1414 */
1415 if (lock->is_scatterlock()) {
1416 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1417 if (slock->get_scatter_wanted() &&
1418 slock->get_state() != LOCK_MIX) {
1419 scatter_mix(slock, pneed_issue);
1420 if (!lock->is_stable())
1421 return;
1422 } else if (slock->get_unscatter_wanted() &&
1423 slock->get_state() != LOCK_LOCK) {
1424 simple_lock(slock, pneed_issue);
1425 if (!lock->is_stable()) {
1426 return;
1427 }
1428 }
1429 }
1430
11fdf7f2
TL
1431 if (lock->get_type() != CEPH_LOCK_DN &&
1432 lock->get_type() != CEPH_LOCK_ISNAP &&
9f95a23c 1433 lock->get_type() != CEPH_LOCK_IPOLICY &&
11fdf7f2 1434 p->is_freezing()) {
7c673cae
FG
1435 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1436 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1437 return;
1438 }
1439
1440 eval(lock, pneed_issue);
1441}
1442
1443void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1444{
1445 bool need_issue = false;
11fdf7f2 1446 MDSContext::vec finishers;
7c673cae
FG
1447
1448 // kick locks now
1449 if (!in->filelock.is_stable())
1450 eval_gather(&in->filelock, false, &need_issue, &finishers);
1451 if (!in->authlock.is_stable())
1452 eval_gather(&in->authlock, false, &need_issue, &finishers);
1453 if (!in->linklock.is_stable())
1454 eval_gather(&in->linklock, false, &need_issue, &finishers);
1455 if (!in->xattrlock.is_stable())
1456 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1457
1458 if (need_issue && in->is_head()) {
1459 if (issue_set)
1460 issue_set->insert(in);
1461 else
1462 issue_caps(in);
1463 }
1464
1465 finish_contexts(g_ceph_context, finishers);
1466}
1467
1468void Locker::eval_scatter_gathers(CInode *in)
1469{
1470 bool need_issue = false;
11fdf7f2 1471 MDSContext::vec finishers;
7c673cae
FG
1472
1473 dout(10) << "eval_scatter_gathers " << *in << dendl;
1474
1475 // kick locks now
1476 if (!in->filelock.is_stable())
1477 eval_gather(&in->filelock, false, &need_issue, &finishers);
1478 if (!in->nestlock.is_stable())
1479 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1480 if (!in->dirfragtreelock.is_stable())
1481 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1482
1483 if (need_issue && in->is_head())
1484 issue_caps(in);
1485
1486 finish_contexts(g_ceph_context, finishers);
1487}
1488
1489void Locker::eval(SimpleLock *lock, bool *need_issue)
1490{
1491 switch (lock->get_type()) {
1492 case CEPH_LOCK_IFILE:
1493 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1494 case CEPH_LOCK_IDFT:
1495 case CEPH_LOCK_INEST:
1496 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1497 default:
1498 return simple_eval(lock, need_issue);
1499 }
1500}
1501
1502
1503// ------------------
1504// rdlock
1505
1506bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1507{
1508 // kick the lock
1509 if (lock->is_stable()) {
1510 if (lock->get_parent()->is_auth()) {
1511 if (lock->get_sm() == &sm_scatterlock) {
1512 // not until tempsync is fully implemented
1513 //if (lock->get_parent()->is_replicated())
1514 //scatter_tempsync((ScatterLock*)lock);
1515 //else
1516 simple_sync(lock);
1517 } else if (lock->get_sm() == &sm_filelock) {
1518 CInode *in = static_cast<CInode*>(lock->get_parent());
1519 if (lock->get_state() == LOCK_EXCL &&
1520 in->get_target_loner() >= 0 &&
1521 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1522 file_xsyn(lock);
1523 else
1524 simple_sync(lock);
1525 } else
1526 simple_sync(lock);
1527 return true;
1528 } else {
1529 // request rdlock state change from auth
1530 mds_rank_t auth = lock->get_parent()->authority().first;
1531 if (!mds->is_cluster_degraded() ||
1532 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1533 dout(10) << "requesting rdlock from auth on "
1534 << *lock << " on " << *lock->get_parent() << dendl;
9f95a23c 1535 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
7c673cae
FG
1536 }
1537 return false;
1538 }
1539 }
1540 if (lock->get_type() == CEPH_LOCK_IFILE) {
1541 CInode *in = static_cast<CInode *>(lock->get_parent());
1542 if (in->state_test(CInode::STATE_RECOVERING)) {
1543 mds->mdcache->recovery_queue.prioritize(in);
1544 }
1545 }
1546
1547 return false;
1548}
1549
9f95a23c 1550bool Locker::rdlock_try(SimpleLock *lock, client_t client)
7c673cae
FG
1551{
1552 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1553
1554 // can read? grab ref.
1555 if (lock->can_rdlock(client))
1556 return true;
1557
1558 _rdlock_kick(lock, false);
1559
1560 if (lock->can_rdlock(client))
1561 return true;
1562
7c673cae
FG
1563 return false;
1564}
1565
1566bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1567{
1568 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1569
1570 // client may be allowed to rdlock the same item it has xlocked.
1571 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1572 if (mut->snapid != CEPH_NOSNAP)
1573 as_anon = true;
1574 client_t client = as_anon ? -1 : mut->get_client();
1575
1576 CInode *in = 0;
1577 if (lock->get_type() != CEPH_LOCK_DN)
1578 in = static_cast<CInode *>(lock->get_parent());
1579
1580 /*
1581 if (!lock->get_parent()->is_auth() &&
1582 lock->fw_rdlock_to_auth()) {
1583 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1584 return false;
1585 }
1586 */
1587
1588 while (1) {
1589 // can read? grab ref.
1590 if (lock->can_rdlock(client)) {
1591 lock->get_rdlock();
9f95a23c 1592 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
7c673cae
FG
1593 return true;
1594 }
1595
1596 // hmm, wait a second.
1597 if (in && !in->is_head() && in->is_auth() &&
1598 lock->get_state() == LOCK_SNAP_SYNC) {
1599 // okay, we actually need to kick the head's lock to get ourselves synced up.
1600 CInode *head = mdcache->get_inode(in->ino());
11fdf7f2 1601 ceph_assert(head);
c07f9fc5
FG
1602 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1603 if (hlock->get_state() == LOCK_SYNC)
1604 hlock = head->get_lock(lock->get_type());
1605
7c673cae
FG
1606 if (hlock->get_state() != LOCK_SYNC) {
1607 dout(10) << "rdlock_start trying head inode " << *head << dendl;
c07f9fc5 1608 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
7c673cae
FG
1609 return false;
1610 // oh, check our lock again then
1611 }
1612 }
1613
1614 if (!_rdlock_kick(lock, as_anon))
1615 break;
1616 }
1617
1618 // wait!
1619 int wait_on;
1620 if (lock->get_parent()->is_auth() && lock->is_stable())
1621 wait_on = SimpleLock::WAIT_RD;
1622 else
1623 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1624 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1625 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1626 nudge_log(lock);
1627 return false;
1628}
1629
1630void Locker::nudge_log(SimpleLock *lock)
1631{
1632 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1633 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1634 mds->mdlog->flush();
1635}
1636
11fdf7f2 1637void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
7c673cae 1638{
11fdf7f2
TL
1639 ceph_assert(it->is_rdlock());
1640 SimpleLock *lock = it->lock;
7c673cae
FG
1641 // drop ref
1642 lock->put_rdlock();
11fdf7f2
TL
1643 if (mut)
1644 mut->locks.erase(it);
7c673cae
FG
1645
1646 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1647
1648 // last one?
1649 if (!lock->is_rdlocked()) {
1650 if (!lock->is_stable())
1651 eval_gather(lock, false, pneed_issue);
1652 else if (lock->get_parent()->is_auth())
1653 try_eval(lock, pneed_issue);
1654 }
1655}
1656
9f95a23c 1657bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MDRequestRef& mdr)
7c673cae 1658{
9f95a23c 1659 dout(10) << __func__ << dendl;
11fdf7f2 1660 for (const auto& p : lov) {
9f95a23c 1661 auto lock = p.lock;
11fdf7f2 1662 ceph_assert(p.is_rdlock());
9f95a23c
TL
1663 if (!mdr->is_rdlocked(lock) && !rdlock_try(lock, mdr->get_client())) {
1664 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD,
1665 new C_MDS_RetryRequest(mdcache, mdr));
1666 goto failed;
7c673cae 1667 }
9f95a23c
TL
1668 lock->get_rdlock();
1669 mdr->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
1670 dout(20) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
11fdf7f2 1671 }
9f95a23c 1672
7c673cae 1673 return true;
9f95a23c
TL
1674failed:
1675 dout(10) << __func__ << " failed" << dendl;
1676 drop_locks(mdr.get(), nullptr);
1677 mdr->drop_local_auth_pins();
1678 return false;
7c673cae
FG
1679}
1680
9f95a23c 1681bool Locker::rdlock_try_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
7c673cae 1682{
9f95a23c 1683 dout(10) << __func__ << dendl;
11fdf7f2 1684 for (const auto& p : lov) {
9f95a23c 1685 auto lock = p.lock;
11fdf7f2 1686 ceph_assert(p.is_rdlock());
9f95a23c
TL
1687 if (!lock->can_rdlock(mut->get_client()))
1688 return false;
11fdf7f2 1689 p.lock->get_rdlock();
9f95a23c 1690 mut->emplace_lock(p.lock, MutationImpl::LockOp::RDLOCK);
7c673cae 1691 }
9f95a23c 1692 return true;
7c673cae
FG
1693}
1694
1695// ------------------
1696// wrlock
1697
1698void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1699{
1700 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1701 lock->get_type() == CEPH_LOCK_DVERSION)
1702 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1703
1704 dout(7) << "wrlock_force on " << *lock
1705 << " on " << *lock->get_parent() << dendl;
1706 lock->get_wrlock(true);
9f95a23c
TL
1707 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1708}
1709
1710bool Locker::wrlock_try(SimpleLock *lock, const MutationRef& mut, client_t client)
1711{
1712 dout(10) << "wrlock_try " << *lock << " on " << *lock->get_parent() << dendl;
1713 if (client == -1)
1714 client = mut->get_client();
1715
1716 while (1) {
1717 if (lock->can_wrlock(client)) {
1718 lock->get_wrlock();
1719 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
1720 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
1721 return true;
1722 }
1723 if (!lock->is_stable())
1724 break;
1725 CInode *in = static_cast<CInode *>(lock->get_parent());
1726 if (!in->is_auth())
1727 break;
1728 // caller may already has a log entry open. To avoid calling
1729 // scatter_writebehind or start_scatter. don't change nest lock
1730 // state if it has dirty scatterdata.
1731 if (lock->is_dirty())
1732 break;
1733 // To avoid calling scatter_writebehind or start_scatter. don't
1734 // change nest lock state to MIX.
1735 ScatterLock *slock = static_cast<ScatterLock*>(lock);
1736 if (slock->get_scatter_wanted() || in->has_subtree_or_exporting_dirfrag())
1737 break;
1738
1739 simple_lock(lock);
1740 }
1741 return false;
7c673cae
FG
1742}
1743
9f95a23c 1744bool Locker::wrlock_start(const MutationImpl::LockOp &op, MDRequestRef& mut)
7c673cae 1745{
9f95a23c 1746 SimpleLock *lock = op.lock;
7c673cae
FG
1747 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1748 lock->get_type() == CEPH_LOCK_DVERSION)
1749 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1750
1751 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1752
1753 CInode *in = static_cast<CInode *>(lock->get_parent());
9f95a23c
TL
1754 client_t client = op.is_state_pin() ? lock->get_excl_client() : mut->get_client();
1755 bool want_scatter = lock->get_parent()->is_auth() &&
7c673cae
FG
1756 (in->has_subtree_or_exporting_dirfrag() ||
1757 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1758
1759 while (1) {
1760 // wrlock?
1761 if (lock->can_wrlock(client) &&
1762 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1763 lock->get_wrlock();
9f95a23c 1764 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
11fdf7f2 1765 it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
7c673cae
FG
1766 return true;
1767 }
1768
1769 if (lock->get_type() == CEPH_LOCK_IFILE &&
1770 in->state_test(CInode::STATE_RECOVERING)) {
1771 mds->mdcache->recovery_queue.prioritize(in);
1772 }
1773
1774 if (!lock->is_stable())
1775 break;
1776
1777 if (in->is_auth()) {
7c673cae
FG
1778 if (want_scatter)
1779 scatter_mix(static_cast<ScatterLock*>(lock));
1780 else
1781 simple_lock(lock);
7c673cae
FG
1782 } else {
1783 // replica.
1784 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1785 mds_rank_t auth = lock->get_parent()->authority().first;
1786 if (!mds->is_cluster_degraded() ||
1787 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1788 dout(10) << "requesting scatter from auth on "
1789 << *lock << " on " << *lock->get_parent() << dendl;
9f95a23c 1790 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
7c673cae
FG
1791 }
1792 break;
1793 }
1794 }
1795
9f95a23c
TL
1796 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1797 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1798 nudge_log(lock);
7c673cae
FG
1799
1800 return false;
1801}
1802
11fdf7f2 1803void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
7c673cae 1804{
11fdf7f2
TL
1805 ceph_assert(it->is_wrlock());
1806 SimpleLock* lock = it->lock;
1807
7c673cae
FG
1808 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1809 lock->get_type() == CEPH_LOCK_DVERSION)
11fdf7f2 1810 return local_wrlock_finish(it, mut);
7c673cae
FG
1811
1812 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1813 lock->put_wrlock();
11fdf7f2
TL
1814
1815 if (it->is_remote_wrlock())
1816 it->clear_wrlock();
1817 else
1818 mut->locks.erase(it);
7c673cae 1819
9f95a23c
TL
1820 if (lock->is_wrlocked()) {
1821 // Evaluate unstable lock after scatter_writebehind_finish(). Because
1822 // eval_gather() does not change lock's state when lock is flushing.
1823 if (!lock->is_stable() && lock->is_flushed() &&
1824 lock->get_parent()->is_auth())
1825 eval_gather(lock, false, pneed_issue);
1826 } else {
7c673cae
FG
1827 if (!lock->is_stable())
1828 eval_gather(lock, false, pneed_issue);
1829 else if (lock->get_parent()->is_auth())
1830 try_eval(lock, pneed_issue);
1831 }
1832}
1833
1834
1835// remote wrlock
1836
1837void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1838{
1839 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1840
1841 // wait for active target
1842 if (mds->is_cluster_degraded() &&
1843 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1844 dout(7) << " mds." << target << " is not active" << dendl;
1845 if (mut->more()->waiting_on_slave.empty())
1846 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1847 return;
1848 }
1849
1850 // send lock request
1851 mut->start_locking(lock, target);
1852 mut->more()->slaves.insert(target);
9f95a23c 1853 auto r = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_WRLOCK);
7c673cae
FG
1854 r->set_lock_type(lock->get_type());
1855 lock->get_parent()->set_object_info(r->get_object_info());
1856 mds->send_message_mds(r, target);
1857
11fdf7f2 1858 ceph_assert(mut->more()->waiting_on_slave.count(target) == 0);
7c673cae
FG
1859 mut->more()->waiting_on_slave.insert(target);
1860}
1861
11fdf7f2 1862void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
7c673cae 1863{
11fdf7f2
TL
1864 ceph_assert(it->is_remote_wrlock());
1865 SimpleLock *lock = it->lock;
1866 mds_rank_t target = it->wrlock_target;
1867
1868 if (it->is_wrlock())
1869 it->clear_remote_wrlock();
1870 else
1871 mut->locks.erase(it);
7c673cae
FG
1872
1873 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1874 << " " << *lock->get_parent() << dendl;
1875 if (!mds->is_cluster_degraded() ||
1876 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
9f95a23c 1877 auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNWRLOCK);
7c673cae
FG
1878 slavereq->set_lock_type(lock->get_type());
1879 lock->get_parent()->set_object_info(slavereq->get_object_info());
1880 mds->send_message_mds(slavereq, target);
1881 }
1882}
1883
1884
1885// ------------------
1886// xlock
1887
1888bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1889{
1890 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1891 lock->get_type() == CEPH_LOCK_DVERSION)
1892 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1893
1894 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1895 client_t client = mut->get_client();
1896
a8e16298
TL
1897 CInode *in = nullptr;
1898 if (lock->get_cap_shift())
1899 in = static_cast<CInode *>(lock->get_parent());
1900
7c673cae
FG
1901 // auth?
1902 if (lock->get_parent()->is_auth()) {
1903 // auth
1904 while (1) {
a8e16298
TL
1905 if (lock->can_xlock(client) &&
1906 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1907 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
7c673cae
FG
1908 lock->set_state(LOCK_XLOCK);
1909 lock->get_xlock(mut, client);
9f95a23c 1910 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
7c673cae
FG
1911 mut->finish_locking(lock);
1912 return true;
1913 }
1914
a8e16298
TL
1915 if (lock->get_type() == CEPH_LOCK_IFILE &&
1916 in->state_test(CInode::STATE_RECOVERING)) {
1917 mds->mdcache->recovery_queue.prioritize(in);
7c673cae
FG
1918 }
1919
1920 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1921 lock->get_xlock_by_client() != client ||
1922 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1923 break;
1924
1925 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1926 mut->start_locking(lock);
1927 simple_xlock(lock);
1928 } else {
1929 simple_lock(lock);
1930 }
1931 }
1932
1933 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1934 nudge_log(lock);
1935 return false;
1936 } else {
1937 // replica
11fdf7f2
TL
1938 ceph_assert(lock->get_sm()->can_remote_xlock);
1939 ceph_assert(!mut->slave_request);
7c673cae
FG
1940
1941 // wait for single auth
1942 if (lock->get_parent()->is_ambiguous_auth()) {
1943 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1944 new C_MDS_RetryRequest(mdcache, mut));
1945 return false;
1946 }
1947
1948 // wait for active auth
1949 mds_rank_t auth = lock->get_parent()->authority().first;
1950 if (mds->is_cluster_degraded() &&
1951 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1952 dout(7) << " mds." << auth << " is not active" << dendl;
1953 if (mut->more()->waiting_on_slave.empty())
1954 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1955 return false;
1956 }
1957
1958 // send lock request
1959 mut->more()->slaves.insert(auth);
1960 mut->start_locking(lock, auth);
9f95a23c 1961 auto r = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_XLOCK);
7c673cae
FG
1962 r->set_lock_type(lock->get_type());
1963 lock->get_parent()->set_object_info(r->get_object_info());
1964 mds->send_message_mds(r, auth);
1965
11fdf7f2 1966 ceph_assert(mut->more()->waiting_on_slave.count(auth) == 0);
7c673cae
FG
1967 mut->more()->waiting_on_slave.insert(auth);
1968
1969 return false;
1970 }
1971}
1972
1973void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1974{
11fdf7f2
TL
1975 ceph_assert(!lock->is_stable());
1976 if (lock->get_type() != CEPH_LOCK_DN &&
1977 lock->get_type() != CEPH_LOCK_ISNAP &&
9f95a23c 1978 lock->get_type() != CEPH_LOCK_IPOLICY &&
11fdf7f2 1979 lock->get_num_rdlocks() == 0 &&
7c673cae 1980 lock->get_num_wrlocks() == 0 &&
b32b8144 1981 !lock->is_leased() &&
11fdf7f2 1982 lock->get_state() != LOCK_XLOCKSNAP) {
7c673cae
FG
1983 CInode *in = static_cast<CInode*>(lock->get_parent());
1984 client_t loner = in->get_target_loner();
1985 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1986 lock->set_state(LOCK_EXCL);
1987 lock->get_parent()->auth_unpin(lock);
1988 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1989 if (lock->get_cap_shift())
1990 *pneed_issue = true;
1991 if (lock->get_parent()->is_auth() &&
1992 lock->is_stable())
1993 try_eval(lock, pneed_issue);
1994 return;
1995 }
1996 }
1997 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1998 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1999}
2000
11fdf7f2 2001void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
7c673cae 2002{
11fdf7f2
TL
2003 ceph_assert(it->is_xlock());
2004 SimpleLock *lock = it->lock;
2005
7c673cae
FG
2006 if (lock->get_type() == CEPH_LOCK_IVERSION ||
2007 lock->get_type() == CEPH_LOCK_DVERSION)
11fdf7f2 2008 return local_xlock_finish(it, mut);
7c673cae
FG
2009
2010 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
2011
2012 client_t xlocker = lock->get_xlock_by_client();
2013
2014 // drop ref
2015 lock->put_xlock();
11fdf7f2
TL
2016 ceph_assert(mut);
2017 mut->locks.erase(it);
7c673cae
FG
2018
2019 bool do_issue = false;
2020
2021 // remote xlock?
2022 if (!lock->get_parent()->is_auth()) {
11fdf7f2 2023 ceph_assert(lock->get_sm()->can_remote_xlock);
7c673cae
FG
2024
2025 // tell auth
2026 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
2027 mds_rank_t auth = lock->get_parent()->authority().first;
2028 if (!mds->is_cluster_degraded() ||
2029 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
9f95a23c 2030 auto slavereq = make_message<MMDSSlaveRequest>(mut->reqid, mut->attempt, MMDSSlaveRequest::OP_UNXLOCK);
7c673cae
FG
2031 slavereq->set_lock_type(lock->get_type());
2032 lock->get_parent()->set_object_info(slavereq->get_object_info());
2033 mds->send_message_mds(slavereq, auth);
2034 }
2035 // others waiting?
2036 lock->finish_waiters(SimpleLock::WAIT_STABLE |
2037 SimpleLock::WAIT_WR |
2038 SimpleLock::WAIT_RD, 0);
2039 } else {
a8e16298
TL
2040 if (lock->get_num_xlocks() == 0 &&
2041 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
7c673cae
FG
2042 _finish_xlock(lock, xlocker, &do_issue);
2043 }
2044 }
2045
2046 if (do_issue) {
2047 CInode *in = static_cast<CInode*>(lock->get_parent());
2048 if (in->is_head()) {
2049 if (pneed_issue)
2050 *pneed_issue = true;
2051 else
2052 issue_caps(in);
2053 }
2054 }
2055}
2056
11fdf7f2 2057void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
7c673cae 2058{
11fdf7f2
TL
2059 ceph_assert(it->is_xlock());
2060 SimpleLock *lock = it->lock;
7c673cae
FG
2061 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
2062
2063 lock->put_xlock();
11fdf7f2 2064 mut->locks.erase(it);
7c673cae
FG
2065
2066 MDSCacheObject *p = lock->get_parent();
11fdf7f2 2067 ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
7c673cae
FG
2068
2069 if (!lock->is_stable())
2070 lock->get_parent()->auth_unpin(lock);
2071
2072 lock->set_state(LOCK_LOCK);
2073}
2074
2075void Locker::xlock_import(SimpleLock *lock)
2076{
2077 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
2078 lock->get_parent()->auth_pin(lock);
2079}
2080
9f95a23c
TL
2081void Locker::xlock_downgrade(SimpleLock *lock, MutationImpl *mut)
2082{
2083 dout(10) << "xlock_downgrade on " << *lock << " " << *lock->get_parent() << dendl;
2084 auto it = mut->locks.find(lock);
2085 if (it->is_rdlock())
2086 return; // already downgraded
2087
2088 ceph_assert(lock->get_parent()->is_auth());
2089 ceph_assert(it != mut->locks.end());
2090 ceph_assert(it->is_xlock());
2091
2092 lock->set_xlock_done();
2093 lock->get_rdlock();
2094 xlock_finish(it, mut, nullptr);
2095 mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
2096}
7c673cae
FG
2097
2098
2099// file i/o -----------------------------------------
2100
2101version_t Locker::issue_file_data_version(CInode *in)
2102{
2103 dout(7) << "issue_file_data_version on " << *in << dendl;
2104 return in->inode.file_data_version;
2105}
2106
2107class C_Locker_FileUpdate_finish : public LockerLogContext {
2108 CInode *in;
2109 MutationRef mut;
11fdf7f2 2110 unsigned flags;
7c673cae 2111 client_t client;
9f95a23c 2112 ref_t<MClientCaps> ack;
7c673cae 2113public:
11fdf7f2 2114 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
9f95a23c 2115 const ref_t<MClientCaps> &ack, client_t c=-1)
11fdf7f2 2116 : LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
7c673cae
FG
2117 in->get(CInode::PIN_PTRWAITER);
2118 }
2119 void finish(int r) override {
11fdf7f2 2120 locker->file_update_finish(in, mut, flags, client, ack);
7c673cae
FG
2121 in->put(CInode::PIN_PTRWAITER);
2122 }
2123};
2124
11fdf7f2
TL
2125enum {
2126 UPDATE_SHAREMAX = 1,
2127 UPDATE_NEEDSISSUE = 2,
2128 UPDATE_SNAPFLUSH = 4,
2129};
2130
2131void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
9f95a23c 2132 client_t client, const ref_t<MClientCaps> &ack)
7c673cae
FG
2133{
2134 dout(10) << "file_update_finish on " << *in << dendl;
2135 in->pop_and_dirty_projected_inode(mut->ls);
2136
2137 mut->apply();
11fdf7f2 2138
7c673cae
FG
2139 if (ack) {
2140 Session *session = mds->get_session(client);
92f5a8d4 2141 if (session && !session->is_closed()) {
7c673cae
FG
2142 // "oldest flush tid" > 0 means client uses unique TID for each flush
2143 if (ack->get_oldest_flush_tid() > 0)
11fdf7f2 2144 session->add_completed_flush(ack->get_client_tid());
7c673cae
FG
2145 mds->send_message_client_counted(ack, session);
2146 } else {
2147 dout(10) << " no session for client." << client << " " << *ack << dendl;
7c673cae
FG
2148 }
2149 }
2150
2151 set<CInode*> need_issue;
2152 drop_locks(mut.get(), &need_issue);
2153
11fdf7f2
TL
2154 if (in->is_head()) {
2155 if ((flags & UPDATE_NEEDSISSUE) && need_issue.count(in) == 0) {
2156 Capability *cap = in->get_client_cap(client);
2157 if (cap && (cap->wanted() & ~cap->pending()))
2158 issue_caps(in, cap);
2159 }
2160
2161 if ((flags & UPDATE_SHAREMAX) && in->is_auth() &&
2162 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
2163 share_inode_max_size(in);
2164
2165 } else if ((flags & UPDATE_SNAPFLUSH) && !in->client_snap_caps.empty()) {
7c673cae
FG
2166 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
2167 // check for snap writeback completion
eafe8130
TL
2168 in->client_snap_caps.erase(client);
2169 if (in->client_snap_caps.empty()) {
2170 for (int i = 0; i < num_cinode_locks; i++) {
2171 SimpleLock *lock = in->get_lock(cinode_lock_info[i].lock);
11fdf7f2 2172 ceph_assert(lock);
11fdf7f2 2173 lock->put_wrlock();
11fdf7f2 2174 }
eafe8130
TL
2175 in->item_open_file.remove_myself();
2176 in->item_caps.remove_myself();
7c673cae
FG
2177 eval_cap_gather(in, &need_issue);
2178 }
7c673cae
FG
2179 }
2180 issue_caps_set(need_issue);
2181
11fdf7f2 2182 mds->balancer->hit_inode(in, META_POP_IWR);
28e407b8 2183
7c673cae
FG
2184 // auth unpin after issuing caps
2185 mut->cleanup();
2186}
2187
2188Capability* Locker::issue_new_caps(CInode *in,
2189 int mode,
9f95a23c
TL
2190 MDRequestRef& mdr,
2191 SnapRealm *realm)
7c673cae
FG
2192{
2193 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
9f95a23c
TL
2194 Session *session = mdr->session;
2195 bool new_inode = (mdr->alloc_ino || mdr->used_prealloc_ino);
7c673cae 2196
9f95a23c
TL
2197 // if replay or async, try to reconnect cap, and otherwise do nothing.
2198 if (new_inode && mdr->client_request->is_queued_for_replay())
a8e16298
TL
2199 return mds->mdcache->try_reconnect_cap(in, session);
2200
7c673cae 2201 // my needs
11fdf7f2
TL
2202 ceph_assert(session->info.inst.name.is_client());
2203 client_t my_client = session->get_client();
7c673cae
FG
2204 int my_want = ceph_caps_for_mode(mode);
2205
2206 // register a capability
2207 Capability *cap = in->get_client_cap(my_client);
2208 if (!cap) {
2209 // new cap
9f95a23c 2210 cap = in->add_client_cap(my_client, session, realm, new_inode);
7c673cae
FG
2211 cap->set_wanted(my_want);
2212 cap->mark_new();
7c673cae 2213 } else {
7c673cae
FG
2214 // make sure it wants sufficient caps
2215 if (my_want & ~cap->wanted()) {
2216 // augment wanted caps for this client
2217 cap->set_wanted(cap->wanted() | my_want);
2218 }
2219 }
9f95a23c 2220 cap->inc_suppress(); // suppress file cap messages (we'll bundle with the request reply)
7c673cae
FG
2221
2222 if (in->is_auth()) {
2223 // [auth] twiddle mode?
2224 eval(in, CEPH_CAP_LOCKS);
2225
2226 if (_need_flush_mdlog(in, my_want))
2227 mds->mdlog->flush();
2228
2229 } else {
2230 // [replica] tell auth about any new caps wanted
2231 request_inode_file_caps(in);
2232 }
2233
2234 // issue caps (pot. incl new one)
2235 //issue_caps(in); // note: _eval above may have done this already...
2236
2237 // re-issue whatever we can
2238 //cap->issue(cap->pending());
2239
9f95a23c 2240 cap->dec_suppress();
7c673cae
FG
2241
2242 return cap;
2243}
2244
7c673cae
FG
2245void Locker::issue_caps_set(set<CInode*>& inset)
2246{
2247 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
2248 issue_caps(*p);
2249}
2250
494da23a
TL
2251class C_Locker_RevokeStaleCap : public LockerContext {
2252 CInode *in;
2253 client_t client;
2254public:
2255 C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
2256 LockerContext(l), in(i), client(c) {
2257 in->get(CInode::PIN_PTRWAITER);
2258 }
2259 void finish(int r) override {
2260 locker->revoke_stale_cap(in, client);
2261 in->put(CInode::PIN_PTRWAITER);
2262 }
2263};
2264
81eedcae 2265int Locker::issue_caps(CInode *in, Capability *only_cap)
7c673cae
FG
2266{
2267 // allowed caps are determined by the lock mode.
2268 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
2269 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
2270 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
2271
2272 client_t loner = in->get_loner();
2273 if (loner >= 0) {
2274 dout(7) << "issue_caps loner client." << loner
2275 << " allowed=" << ccap_string(loner_allowed)
2276 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2277 << ", others allowed=" << ccap_string(all_allowed)
2278 << " on " << *in << dendl;
2279 } else {
2280 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
2281 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2282 << " on " << *in << dendl;
2283 }
2284
11fdf7f2 2285 ceph_assert(in->is_head());
7c673cae
FG
2286
2287 // count conflicts with
2288 int nissued = 0;
2289
2290 // client caps
11fdf7f2 2291 map<client_t, Capability>::iterator it;
7c673cae
FG
2292 if (only_cap)
2293 it = in->client_caps.find(only_cap->get_client());
2294 else
2295 it = in->client_caps.begin();
2296 for (; it != in->client_caps.end(); ++it) {
11fdf7f2 2297 Capability *cap = &it->second;
7c673cae
FG
2298
2299 // do not issue _new_ bits when size|mtime is projected
2300 int allowed;
2301 if (loner == it->first)
2302 allowed = loner_allowed;
2303 else
2304 allowed = all_allowed;
2305
2306 // add in any xlocker-only caps (for locks this client is the xlocker for)
2307 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
9f95a23c
TL
2308 if (in->is_dir()) {
2309 allowed &= ~CEPH_CAP_ANY_DIR_OPS;
2310 if (allowed & CEPH_CAP_FILE_EXCL)
2311 allowed |= cap->get_lock_cache_allowed();
2312 }
7c673cae 2313
11fdf7f2
TL
2314 if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
2315 cap->is_noinline()) ||
2316 (!in->inode.layout.pool_ns.empty() &&
2317 cap->is_nopoolns()))
7c673cae
FG
2318 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2319
2320 int pending = cap->pending();
2321 int wanted = cap->wanted();
2322
2323 dout(20) << " client." << it->first
2324 << " pending " << ccap_string(pending)
2325 << " allowed " << ccap_string(allowed)
2326 << " wanted " << ccap_string(wanted)
2327 << dendl;
2328
2329 if (!(pending & ~allowed)) {
2330 // skip if suppress or new, and not revocation
494da23a
TL
2331 if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
2332 dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
2333 continue;
2334 }
2335 } else {
2336 ceph_assert(!cap->is_new());
2337 if (cap->is_stale()) {
2338 dout(20) << " revoke stale cap from client." << it->first << dendl;
2339 ceph_assert(!cap->is_valid());
2340 cap->issue(allowed & pending, false);
2341 mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
7c673cae
FG
2342 continue;
2343 }
92f5a8d4
TL
2344
2345 if (!cap->is_valid() && (pending & ~CEPH_CAP_PIN)) {
2346 // After stale->resume circle, client thinks it only has CEPH_CAP_PIN.
2347 // mds needs to re-issue caps, then do revocation.
2348 long seq = cap->issue(pending, true);
2349
2350 dout(7) << " sending MClientCaps to client." << it->first
2351 << " seq " << seq << " re-issue " << ccap_string(pending) << dendl;
2352
9f95a23c
TL
2353 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT, in->ino(),
2354 in->find_snaprealm()->inode->ino(),
2355 cap->get_cap_id(), cap->get_last_seq(),
2356 pending, wanted, 0, cap->get_mseq(),
2357 mds->get_osd_epoch_barrier());
92f5a8d4
TL
2358 in->encode_cap_message(m, cap);
2359
2360 mds->send_message_client_counted(m, cap->get_session());
2361 }
7c673cae
FG
2362 }
2363
2364 // notify clients about deleted inode, to make sure they release caps ASAP.
2365 if (in->inode.nlink == 0)
2366 wanted |= CEPH_CAP_LINK_SHARED;
2367
2368 // are there caps that the client _wants_ and can have, but aren't pending?
2369 // or do we need to revoke?
494da23a
TL
2370 if ((pending & ~allowed) || // need to revoke ~allowed caps.
2371 ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2372 !cap->is_valid()) { // after stale->resume circle
7c673cae
FG
2373 // issue
2374 nissued++;
2375
2376 // include caps that clients generally like, while we're at it.
2377 int likes = in->get_caps_liked();
2378 int before = pending;
2379 long seq;
2380 if (pending & ~allowed)
494da23a 2381 seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
7c673cae 2382 else
494da23a 2383 seq = cap->issue((wanted|likes) & allowed, true);
7c673cae
FG
2384 int after = cap->pending();
2385
494da23a
TL
2386 dout(7) << " sending MClientCaps to client." << it->first
2387 << " seq " << seq << " new pending " << ccap_string(after)
2388 << " was " << ccap_string(before) << dendl;
7c673cae 2389
494da23a
TL
2390 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2391 if (op == CEPH_CAP_OP_REVOKE) {
2392 revoking_caps.push_back(&cap->item_revoking_caps);
2393 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2394 cap->set_last_revoke_stamp(ceph_clock_now());
2395 cap->reset_num_revoke_warnings();
7c673cae 2396 }
494da23a 2397
9f95a23c
TL
2398 auto m = make_message<MClientCaps>(op, in->ino(),
2399 in->find_snaprealm()->inode->ino(),
2400 cap->get_cap_id(), cap->get_last_seq(),
2401 after, wanted, 0, cap->get_mseq(),
2402 mds->get_osd_epoch_barrier());
494da23a
TL
2403 in->encode_cap_message(m, cap);
2404
2405 mds->send_message_client_counted(m, cap->get_session());
7c673cae
FG
2406 }
2407
2408 if (only_cap)
2409 break;
2410 }
2411
81eedcae 2412 return nissued;
7c673cae
FG
2413}
2414
2415void Locker::issue_truncate(CInode *in)
2416{
2417 dout(7) << "issue_truncate on " << *in << dendl;
2418
11fdf7f2
TL
2419 for (auto &p : in->client_caps) {
2420 Capability *cap = &p.second;
9f95a23c 2421 auto m = make_message<MClientCaps>(CEPH_CAP_OP_TRUNC,
11fdf7f2
TL
2422 in->ino(),
2423 in->find_snaprealm()->inode->ino(),
2424 cap->get_cap_id(), cap->get_last_seq(),
2425 cap->pending(), cap->wanted(), 0,
2426 cap->get_mseq(),
2427 mds->get_osd_epoch_barrier());
7c673cae 2428 in->encode_cap_message(m, cap);
11fdf7f2 2429 mds->send_message_client_counted(m, p.first);
7c673cae
FG
2430 }
2431
2432 // should we increase max_size?
2433 if (in->is_auth() && in->is_file())
2434 check_inode_max_size(in);
2435}
2436
494da23a
TL
2437
2438void Locker::revoke_stale_cap(CInode *in, client_t client)
2439{
2440 dout(7) << __func__ << " client." << client << " on " << *in << dendl;
2441 Capability *cap = in->get_client_cap(client);
2442 if (!cap)
2443 return;
2444
2445 if (cap->revoking() & CEPH_CAP_ANY_WR) {
2446 std::stringstream ss;
2447 mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr);
2448 return;
2449 }
2450
2451 cap->revoke();
2452
2453 if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
2454 in->state_set(CInode::STATE_NEEDSRECOVER);
2455
2456 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2457 return;
2458
2459 if (!in->filelock.is_stable())
2460 eval_gather(&in->filelock);
2461 if (!in->linklock.is_stable())
2462 eval_gather(&in->linklock);
2463 if (!in->authlock.is_stable())
2464 eval_gather(&in->authlock);
2465 if (!in->xattrlock.is_stable())
2466 eval_gather(&in->xattrlock);
2467
2468 if (in->is_auth())
2469 try_eval(in, CEPH_CAP_LOCKS);
2470 else
2471 request_inode_file_caps(in);
2472}
2473
2474bool Locker::revoke_stale_caps(Session *session)
7c673cae 2475{
a8e16298 2476 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
7c673cae 2477
494da23a
TL
2478 // invalidate all caps
2479 session->inc_cap_gen();
2480
92f5a8d4 2481 bool ret = true;
a8e16298
TL
2482 std::vector<CInode*> to_eval;
2483
2484 for (auto p = session->caps.begin(); !p.end(); ) {
2485 Capability *cap = *p;
2486 ++p;
2487 if (!cap->is_notable()) {
2488 // the rest ones are not being revoked and don't have writeable range
2489 // and don't want exclusive caps or want file read/write. They don't
2490 // need recover, they don't affect eval_gather()/try_eval()
2491 break;
2492 }
2493
494da23a
TL
2494 int revoking = cap->revoking();
2495 if (!revoking)
a8e16298
TL
2496 continue;
2497
92f5a8d4
TL
2498 if (revoking & CEPH_CAP_ANY_WR) {
2499 ret = false;
2500 break;
2501 }
494da23a
TL
2502
2503 int issued = cap->issued();
a8e16298 2504 CInode *in = cap->get_inode();
7c673cae 2505 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
9f95a23c
TL
2506 int revoked = cap->revoke();
2507 if (revoked & CEPH_CAP_ANY_DIR_OPS)
2508 eval_lock_caches(cap);
7c673cae
FG
2509
2510 if (in->is_auth() &&
2511 in->inode.client_ranges.count(cap->get_client()))
2512 in->state_set(CInode::STATE_NEEDSRECOVER);
2513
a8e16298
TL
2514 // eval lock/inode may finish contexts, which may modify other cap's position
2515 // in the session->caps.
2516 to_eval.push_back(in);
7c673cae 2517 }
7c673cae 2518
a8e16298
TL
2519 for (auto in : to_eval) {
2520 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2521 continue;
2522
2523 if (!in->filelock.is_stable())
2524 eval_gather(&in->filelock);
2525 if (!in->linklock.is_stable())
2526 eval_gather(&in->linklock);
2527 if (!in->authlock.is_stable())
2528 eval_gather(&in->authlock);
2529 if (!in->xattrlock.is_stable())
2530 eval_gather(&in->xattrlock);
2531
2532 if (in->is_auth())
2533 try_eval(in, CEPH_CAP_LOCKS);
2534 else
2535 request_inode_file_caps(in);
7c673cae 2536 }
494da23a 2537
92f5a8d4 2538 return ret;
7c673cae
FG
2539}
2540
2541void Locker::resume_stale_caps(Session *session)
2542{
2543 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2544
11fdf7f2 2545 bool lazy = session->info.has_feature(CEPHFS_FEATURE_LAZY_CAP_WANTED);
a8e16298 2546 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
7c673cae 2547 Capability *cap = *p;
a8e16298 2548 ++p;
11fdf7f2 2549 if (lazy && !cap->is_notable())
a8e16298
TL
2550 break; // see revoke_stale_caps()
2551
7c673cae 2552 CInode *in = cap->get_inode();
a8e16298
TL
2553 ceph_assert(in->is_head());
2554 dout(10) << " clearing stale flag on " << *in << dendl;
7c673cae 2555
a8e16298
TL
2556 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2557 // if export succeeds, the cap will be removed. if export fails,
2558 // we need to re-issue the cap if it's not stale.
2559 in->state_set(CInode::STATE_EVALSTALECAPS);
2560 continue;
7c673cae 2561 }
a8e16298
TL
2562
2563 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2564 issue_caps(in, cap);
7c673cae
FG
2565 }
2566}
2567
2568void Locker::remove_stale_leases(Session *session)
2569{
2570 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2571 xlist<ClientLease*>::iterator p = session->leases.begin();
2572 while (!p.end()) {
2573 ClientLease *l = *p;
2574 ++p;
2575 CDentry *parent = static_cast<CDentry*>(l->parent);
2576 dout(15) << " removing lease on " << *parent << dendl;
2577 parent->remove_client_lease(l, this);
2578 }
2579}
2580
2581
2582class C_MDL_RequestInodeFileCaps : public LockerContext {
2583 CInode *in;
2584public:
2585 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2586 in->get(CInode::PIN_PTRWAITER);
2587 }
2588 void finish(int r) override {
2589 if (!in->is_auth())
2590 locker->request_inode_file_caps(in);
2591 in->put(CInode::PIN_PTRWAITER);
2592 }
2593};
2594
2595void Locker::request_inode_file_caps(CInode *in)
2596{
11fdf7f2 2597 ceph_assert(!in->is_auth());
7c673cae 2598
f64942e4 2599 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
7c673cae
FG
2600 if (wanted != in->replica_caps_wanted) {
2601 // wait for single auth
2602 if (in->is_ambiguous_auth()) {
2603 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2604 new C_MDL_RequestInodeFileCaps(this, in));
2605 return;
2606 }
2607
2608 mds_rank_t auth = in->authority().first;
2609 if (mds->is_cluster_degraded() &&
2610 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2611 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2612 return;
2613 }
2614
2615 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2616 << " was " << ccap_string(in->replica_caps_wanted)
2617 << " on " << *in << " to mds." << auth << dendl;
2618
2619 in->replica_caps_wanted = wanted;
2620
2621 if (!mds->is_cluster_degraded() ||
2622 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
9f95a23c 2623 mds->send_message_mds(make_message<MInodeFileCaps>(in->ino(), in->replica_caps_wanted), auth);
7c673cae
FG
2624 }
2625}
2626
9f95a23c 2627void Locker::handle_inode_file_caps(const cref_t<MInodeFileCaps> &m)
7c673cae
FG
2628{
2629 // nobody should be talking to us during recovery.
a8e16298
TL
2630 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2631 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2632 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2633 return;
2634 }
11fdf7f2 2635 ceph_abort_msg("got unexpected message during recovery");
a8e16298 2636 }
7c673cae
FG
2637
2638 // ok
2639 CInode *in = mdcache->get_inode(m->get_ino());
2640 mds_rank_t from = mds_rank_t(m->get_source().num());
2641
11fdf7f2
TL
2642 ceph_assert(in);
2643 ceph_assert(in->is_auth());
7c673cae
FG
2644
2645 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2646
11fdf7f2 2647 in->set_mds_caps_wanted(from, m->get_caps());
7c673cae
FG
2648
2649 try_eval(in, CEPH_CAP_LOCKS);
7c673cae
FG
2650}
2651
2652
2653class C_MDL_CheckMaxSize : public LockerContext {
2654 CInode *in;
2655 uint64_t new_max_size;
2656 uint64_t newsize;
2657 utime_t mtime;
2658
2659public:
2660 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2661 uint64_t _newsize, utime_t _mtime) :
2662 LockerContext(l), in(i),
2663 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2664 {
2665 in->get(CInode::PIN_PTRWAITER);
2666 }
2667 void finish(int r) override {
2668 if (in->is_auth())
2669 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2670 in->put(CInode::PIN_PTRWAITER);
2671 }
2672};
2673
94b18763 2674uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
31f18b77
FG
2675{
2676 uint64_t new_max = (size + 1) << 1;
11fdf7f2 2677 uint64_t max_inc = g_conf()->mds_client_writeable_range_max_inc_objs;
31f18b77 2678 if (max_inc > 0) {
b32b8144 2679 max_inc *= pi->layout.object_size;
11fdf7f2 2680 new_max = std::min(new_max, size + max_inc);
31f18b77 2681 }
11fdf7f2 2682 return round_up_to(new_max, pi->get_layout_size_increment());
31f18b77 2683}
7c673cae 2684
a8e16298 2685void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
94b18763 2686 CInode::mempool_inode::client_range_map *new_ranges,
7c673cae
FG
2687 bool *max_increased)
2688{
94b18763 2689 auto latest = in->get_projected_inode();
7c673cae 2690 uint64_t ms;
a8e16298 2691 if (latest->has_layout()) {
31f18b77 2692 ms = calc_new_max_size(latest, size);
7c673cae
FG
2693 } else {
2694 // Layout-less directories like ~mds0/, have zero size
2695 ms = 0;
2696 }
2697
2698 // increase ranges as appropriate.
2699 // shrink to 0 if no WR|BUFFER caps issued.
11fdf7f2
TL
2700 for (auto &p : in->client_caps) {
2701 if ((p.second.issued() | p.second.wanted()) & CEPH_CAP_ANY_FILE_WR) {
2702 client_writeable_range_t& nr = (*new_ranges)[p.first];
7c673cae 2703 nr.range.first = 0;
11fdf7f2
TL
2704 if (latest->client_ranges.count(p.first)) {
2705 client_writeable_range_t& oldr = latest->client_ranges[p.first];
7c673cae
FG
2706 if (ms > oldr.range.last)
2707 *max_increased = true;
11fdf7f2 2708 nr.range.last = std::max(ms, oldr.range.last);
7c673cae
FG
2709 nr.follows = oldr.follows;
2710 } else {
31f18b77 2711 *max_increased = true;
7c673cae
FG
2712 nr.range.last = ms;
2713 nr.follows = in->first - 1;
2714 }
a8e16298 2715 if (update)
11fdf7f2 2716 p.second.mark_clientwriteable();
a8e16298
TL
2717 } else {
2718 if (update)
11fdf7f2 2719 p.second.clear_clientwriteable();
7c673cae
FG
2720 }
2721 }
2722}
2723
2724bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2725 uint64_t new_max_size, uint64_t new_size,
2726 utime_t new_mtime)
2727{
11fdf7f2
TL
2728 ceph_assert(in->is_auth());
2729 ceph_assert(in->is_file());
7c673cae 2730
94b18763
FG
2731 CInode::mempool_inode *latest = in->get_projected_inode();
2732 CInode::mempool_inode::client_range_map new_ranges;
7c673cae
FG
2733 uint64_t size = latest->size;
2734 bool update_size = new_size > 0;
2735 bool update_max = false;
2736 bool max_increased = false;
2737
2738 if (update_size) {
11fdf7f2
TL
2739 new_size = size = std::max(size, new_size);
2740 new_mtime = std::max(new_mtime, latest->mtime);
7c673cae
FG
2741 if (latest->size == new_size && latest->mtime == new_mtime)
2742 update_size = false;
2743 }
2744
a8e16298
TL
2745 int can_update = 1;
2746 if (in->is_frozen()) {
2747 can_update = -1;
2748 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2749 // lock?
2750 if (in->filelock.is_stable()) {
2751 if (in->get_target_loner() >= 0)
2752 file_excl(&in->filelock);
2753 else
2754 simple_lock(&in->filelock);
2755 }
2756 if (!in->filelock.can_wrlock(in->get_loner()))
2757 can_update = -2;
2758 }
2759
2760 calc_new_client_ranges(in, std::max(new_max_size, size), can_update > 0,
2761 &new_ranges, &max_increased);
7c673cae
FG
2762
2763 if (max_increased || latest->client_ranges != new_ranges)
2764 update_max = true;
2765
2766 if (!update_size && !update_max) {
2767 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2768 return false;
2769 }
2770
2771 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2772 << " update_size " << update_size
2773 << " on " << *in << dendl;
2774
a8e16298
TL
2775 if (can_update < 0) {
2776 auto cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime);
2777 if (can_update == -1) {
2778 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2779 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2780 } else {
7c673cae
FG
2781 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2782 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
7c673cae 2783 }
a8e16298 2784 return false;
7c673cae
FG
2785 }
2786
2787 MutationRef mut(new MutationImpl());
2788 mut->ls = mds->mdlog->get_current_segment();
2789
94b18763
FG
2790 auto &pi = in->project_inode();
2791 pi.inode.version = in->pre_dirty();
7c673cae
FG
2792
2793 if (update_max) {
94b18763
FG
2794 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2795 pi.inode.client_ranges = new_ranges;
7c673cae
FG
2796 }
2797
2798 if (update_size) {
94b18763
FG
2799 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2800 pi.inode.size = new_size;
2801 pi.inode.rstat.rbytes = new_size;
2802 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2803 pi.inode.mtime = new_mtime;
91327a77
AA
2804 if (new_mtime > pi.inode.ctime) {
2805 pi.inode.ctime = new_mtime;
2806 if (new_mtime > pi.inode.rstat.rctime)
2807 pi.inode.rstat.rctime = new_mtime;
2808 }
7c673cae
FG
2809 }
2810
2811 // use EOpen if the file is still open; otherwise, use EUpdate.
2812 // this is just an optimization to push open files forward into
2813 // newer log segments.
2814 LogEvent *le;
2815 EMetaBlob *metablob;
2816 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2817 EOpen *eo = new EOpen(mds->mdlog);
2818 eo->add_ino(in->ino());
2819 metablob = &eo->metablob;
2820 le = eo;
7c673cae
FG
2821 } else {
2822 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2823 metablob = &eu->metablob;
2824 le = eu;
2825 }
2826 mds->mdlog->start_entry(le);
2827 if (update_size) { // FIXME if/when we do max_size nested accounting
2828 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2829 // no cow, here!
2830 CDentry *parent = in->get_projected_parent_dn();
2831 metablob->add_primary_dentry(parent, in, true);
2832 } else {
2833 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2834 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2835 }
11fdf7f2 2836 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
9f95a23c 2837 UPDATE_SHAREMAX, ref_t<MClientCaps>()));
7c673cae
FG
2838 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2839 mut->auth_pin(in);
2840
2841 // make max_size _increase_ timely
2842 if (max_increased)
2843 mds->mdlog->flush();
2844
2845 return true;
2846}
2847
2848
2849void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2850{
2851 /*
2852 * only share if currently issued a WR cap. if client doesn't have it,
2853 * file_max doesn't matter, and the client will get it if/when they get
2854 * the cap later.
2855 */
2856 dout(10) << "share_inode_max_size on " << *in << dendl;
11fdf7f2 2857 map<client_t, Capability>::iterator it;
7c673cae
FG
2858 if (only_cap)
2859 it = in->client_caps.find(only_cap->get_client());
2860 else
2861 it = in->client_caps.begin();
2862 for (; it != in->client_caps.end(); ++it) {
2863 const client_t client = it->first;
11fdf7f2 2864 Capability *cap = &it->second;
7c673cae
FG
2865 if (cap->is_suppress())
2866 continue;
2867 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2868 dout(10) << "share_inode_max_size with client." << client << dendl;
2869 cap->inc_last_seq();
9f95a23c 2870 auto m = make_message<MClientCaps>(CEPH_CAP_OP_GRANT,
11fdf7f2
TL
2871 in->ino(),
2872 in->find_snaprealm()->inode->ino(),
2873 cap->get_cap_id(),
2874 cap->get_last_seq(),
2875 cap->pending(),
2876 cap->wanted(), 0,
2877 cap->get_mseq(),
2878 mds->get_osd_epoch_barrier());
7c673cae
FG
2879 in->encode_cap_message(m, cap);
2880 mds->send_message_client_counted(m, client);
2881 }
2882 if (only_cap)
2883 break;
2884 }
2885}
2886
2887bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2888{
2889 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2890 * pending mutations. */
2891 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2892 in->filelock.is_unstable_and_locked()) ||
2893 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2894 in->authlock.is_unstable_and_locked()) ||
2895 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2896 in->linklock.is_unstable_and_locked()) ||
2897 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2898 in->xattrlock.is_unstable_and_locked()))
2899 return true;
2900 return false;
2901}
2902
2903void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2904{
2905 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2906 dout(10) << " wanted " << ccap_string(cap->wanted())
2907 << " -> " << ccap_string(wanted) << dendl;
2908 cap->set_wanted(wanted);
2909 } else if (wanted & ~cap->wanted()) {
2910 dout(10) << " wanted " << ccap_string(cap->wanted())
2911 << " -> " << ccap_string(wanted)
2912 << " (added caps even though we had seq mismatch!)" << dendl;
2913 cap->set_wanted(wanted | cap->wanted());
2914 } else {
2915 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2916 << " -> " << ccap_string(wanted)
2917 << " (issue_seq " << issue_seq << " != last_issue "
2918 << cap->get_last_issue() << ")" << dendl;
2919 return;
2920 }
2921
2922 CInode *cur = cap->get_inode();
2923 if (!cur->is_auth()) {
2924 request_inode_file_caps(cur);
2925 return;
2926 }
2927
11fdf7f2 2928 if (cap->wanted()) {
7c673cae
FG
2929 if (cur->state_test(CInode::STATE_RECOVERING) &&
2930 (cap->wanted() & (CEPH_CAP_FILE_RD |
2931 CEPH_CAP_FILE_WR))) {
2932 mds->mdcache->recovery_queue.prioritize(cur);
2933 }
2934
11fdf7f2
TL
2935 if (mdcache->open_file_table.should_log_open(cur)) {
2936 ceph_assert(cur->last == CEPH_NOSNAP);
7c673cae
FG
2937 EOpen *le = new EOpen(mds->mdlog);
2938 mds->mdlog->start_entry(le);
2939 le->add_clean_inode(cur);
7c673cae
FG
2940 mds->mdlog->submit_entry(le);
2941 }
2942 }
2943}
2944
11fdf7f2
TL
2945void Locker::snapflush_nudge(CInode *in)
2946{
2947 ceph_assert(in->last != CEPH_NOSNAP);
2948 if (in->client_snap_caps.empty())
2949 return;
2950
2951 CInode *head = mdcache->get_inode(in->ino());
2952 // head inode gets unpinned when snapflush starts. It might get trimmed
2953 // before snapflush finishes.
2954 if (!head)
2955 return;
7c673cae 2956
11fdf7f2
TL
2957 ceph_assert(head->is_auth());
2958 if (head->client_need_snapflush.empty())
2959 return;
2960
2961 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
2962 if (hlock->get_state() == LOCK_SYNC || !hlock->is_stable()) {
2963 hlock = NULL;
2964 for (int i = 0; i < num_cinode_locks; i++) {
2965 SimpleLock *lock = head->get_lock(cinode_lock_info[i].lock);
2966 if (lock->get_state() != LOCK_SYNC && lock->is_stable()) {
2967 hlock = lock;
2968 break;
2969 }
2970 }
2971 }
2972 if (hlock) {
2973 _rdlock_kick(hlock, true);
2974 } else {
2975 // also, requeue, in case of unstable lock
2976 need_snapflush_inodes.push_back(&in->item_caps);
2977 }
2978}
2979
2980void Locker::mark_need_snapflush_inode(CInode *in)
2981{
2982 ceph_assert(in->last != CEPH_NOSNAP);
2983 if (!in->item_caps.is_on_list()) {
2984 need_snapflush_inodes.push_back(&in->item_caps);
2985 utime_t now = ceph_clock_now();
2986 in->last_dirstat_prop = now;
2987 dout(10) << "mark_need_snapflush_inode " << *in << " - added at " << now << dendl;
2988 }
2989}
7c673cae 2990
494da23a
TL
2991bool Locker::is_revoking_any_caps_from(client_t client)
2992{
2993 auto it = revoking_caps_by_client.find(client);
2994 if (it == revoking_caps_by_client.end())
2995 return false;
2996 return !it->second.empty();
2997}
2998
c07f9fc5 2999void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
7c673cae
FG
3000{
3001 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
c07f9fc5
FG
3002 for (auto p = head_in->client_need_snapflush.begin();
3003 p != head_in->client_need_snapflush.end() && p->first < last; ) {
7c673cae 3004 snapid_t snapid = p->first;
94b18763 3005 auto &clients = p->second;
7c673cae
FG
3006 ++p; // be careful, q loop below depends on this
3007
3008 if (clients.count(client)) {
3009 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
b32b8144 3010 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
11fdf7f2
TL
3011 ceph_assert(sin);
3012 ceph_assert(sin->first <= snapid);
9f95a23c 3013 _do_snap_update(sin, snapid, 0, sin->first - 1, client, ref_t<MClientCaps>(), ref_t<MClientCaps>());
7c673cae
FG
3014 head_in->remove_need_snapflush(sin, snapid, client);
3015 }
3016 }
3017}
3018
3019
3020bool Locker::should_defer_client_cap_frozen(CInode *in)
3021{
9f95a23c
TL
3022 if (in->is_frozen())
3023 return true;
3024
7c673cae 3025 /*
9f95a23c
TL
3026 * This policy needs to be AT LEAST as permissive as allowing a client
3027 * request to go forward, or else a client request can release something,
3028 * the release gets deferred, but the request gets processed and deadlocks
3029 * because when the caps can't get revoked.
7c673cae 3030 *
9f95a23c
TL
3031 * No auth_pin implies that there is no unstable lock and @in is not auth
3032 * pinnned by client request. If parent dirfrag is auth pinned by a lock
3033 * cache, later request from lock cache owner may forcibly auth pin the @in.
7c673cae 3034 */
9f95a23c
TL
3035 if (in->is_freezing() && in->get_num_auth_pins() == 0) {
3036 CDir* dir = in->get_parent_dir();
3037 if (!dir || !dir->is_auth_pinned_by_lock_cache())
3038 return true;
3039 }
3040 return false;
7c673cae
FG
3041}
3042
9f95a23c 3043void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
7c673cae 3044{
7c673cae 3045 client_t client = m->get_source().num();
7c673cae 3046 snapid_t follows = m->get_snap_follows();
11fdf7f2
TL
3047 auto op = m->get_op();
3048 auto dirty = m->get_dirty();
7c673cae 3049 dout(7) << "handle_client_caps "
7c673cae
FG
3050 << " on " << m->get_ino()
3051 << " tid " << m->get_client_tid() << " follows " << follows
11fdf7f2
TL
3052 << " op " << ceph_cap_op_name(op)
3053 << " flags 0x" << std::hex << m->flags << std::dec << dendl;
7c673cae 3054
94b18763 3055 Session *session = mds->get_session(m);
7c673cae
FG
3056 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3057 if (!session) {
3058 dout(5) << " no session, dropping " << *m << dendl;
7c673cae
FG
3059 return;
3060 }
3061 if (session->is_closed() ||
3062 session->is_closing() ||
3063 session->is_killing()) {
3064 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
7c673cae
FG
3065 return;
3066 }
11fdf7f2
TL
3067 if ((mds->is_reconnect() || mds->get_want_state() == MDSMap::STATE_RECONNECT) &&
3068 dirty && m->get_client_tid() > 0 &&
7c673cae 3069 !session->have_completed_flush(m->get_client_tid())) {
11fdf7f2
TL
3070 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), dirty,
3071 op == CEPH_CAP_OP_FLUSHSNAP);
7c673cae
FG
3072 }
3073 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3074 return;
3075 }
3076
3077 if (m->get_client_tid() > 0 && session &&
3078 session->have_completed_flush(m->get_client_tid())) {
3079 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
3080 << " for client." << client << dendl;
9f95a23c 3081 ref_t<MClientCaps> ack;
11fdf7f2 3082 if (op == CEPH_CAP_OP_FLUSHSNAP) {
9f95a23c 3083 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
7c673cae 3084 } else {
9f95a23c 3085 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
7c673cae
FG
3086 }
3087 ack->set_snap_follows(follows);
3088 ack->set_client_tid(m->get_client_tid());
3089 mds->send_message_client_counted(ack, m->get_connection());
11fdf7f2 3090 if (op == CEPH_CAP_OP_FLUSHSNAP) {
7c673cae
FG
3091 return;
3092 } else {
3093 // fall-thru because the message may release some caps
11fdf7f2
TL
3094 dirty = false;
3095 op = CEPH_CAP_OP_UPDATE;
7c673cae
FG
3096 }
3097 }
3098
3099 // "oldest flush tid" > 0 means client uses unique TID for each flush
3100 if (m->get_oldest_flush_tid() > 0 && session) {
3101 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
3102 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
3103
3104 if (session->get_num_trim_flushes_warnings() > 0 &&
11fdf7f2 3105 session->get_num_completed_flushes() * 2 < g_conf()->mds_max_completed_flushes)
7c673cae
FG
3106 session->reset_num_trim_flushes_warnings();
3107 } else {
3108 if (session->get_num_completed_flushes() >=
11fdf7f2 3109 (g_conf()->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
7c673cae
FG
3110 session->inc_num_trim_flushes_warnings();
3111 stringstream ss;
3112 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
3113 << m->get_oldest_flush_tid() << "), "
3114 << session->get_num_completed_flushes()
3115 << " completed flushes recorded in session";
3116 mds->clog->warn() << ss.str();
3117 dout(20) << __func__ << " " << ss.str() << dendl;
3118 }
3119 }
3120 }
3121
3122 CInode *head_in = mdcache->get_inode(m->get_ino());
3123 if (!head_in) {
3124 if (mds->is_clientreplay()) {
3125 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
3126 << ", will try again after replayed client requests" << dendl;
3127 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
3128 return;
3129 }
1adf2230
AA
3130
3131 /*
3132 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
3133 * Sequence of events that cause this are:
3134 * - client sends caps message to mds.a
3135 * - mds finishes subtree migration, send cap export to client
3136 * - mds trim its cache
3137 * - mds receives cap messages from client
3138 */
3139 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
7c673cae
FG
3140 return;
3141 }
3142
3143 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3144 // Pause RADOS operations until we see the required epoch
3145 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3146 }
3147
3148 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3149 // Record the barrier so that we will retransmit it to clients
3150 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3151 }
3152
b32b8144 3153 dout(10) << " head inode " << *head_in << dendl;
7c673cae
FG
3154
3155 Capability *cap = 0;
b32b8144 3156 cap = head_in->get_client_cap(client);
7c673cae 3157 if (!cap) {
b32b8144 3158 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
7c673cae
FG
3159 return;
3160 }
11fdf7f2 3161 ceph_assert(cap);
7c673cae
FG
3162
3163 // freezing|frozen?
b32b8144
FG
3164 if (should_defer_client_cap_frozen(head_in)) {
3165 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
3166 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
7c673cae
FG
3167 return;
3168 }
3169 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
3170 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
3171 << ", dropping" << dendl;
7c673cae
FG
3172 return;
3173 }
3174
11fdf7f2 3175 bool need_unpin = false;
7c673cae
FG
3176
3177 // flushsnap?
3178 if (op == CEPH_CAP_OP_FLUSHSNAP) {
b32b8144
FG
3179 if (!head_in->is_auth()) {
3180 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
7c673cae
FG
3181 goto out;
3182 }
3183
b32b8144 3184 SnapRealm *realm = head_in->find_snaprealm();
7c673cae
FG
3185 snapid_t snap = realm->get_snap_following(follows);
3186 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
3187
11fdf7f2
TL
3188 auto p = head_in->client_need_snapflush.begin();
3189 if (p != head_in->client_need_snapflush.end() && p->first < snap) {
3190 head_in->auth_pin(this); // prevent subtree frozen
3191 need_unpin = true;
3192 _do_null_snapflush(head_in, client, snap);
3193 }
3194
b32b8144
FG
3195 CInode *in = head_in;
3196 if (snap != CEPH_NOSNAP) {
3197 in = mdcache->pick_inode_snap(head_in, snap - 1);
3198 if (in != head_in)
3199 dout(10) << " snapped inode " << *in << dendl;
3200 }
3201
7c673cae
FG
3202 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
3203 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
3204 // case we get a dup response, so whatever.)
9f95a23c 3205 ref_t<MClientCaps> ack;
11fdf7f2 3206 if (dirty) {
9f95a23c 3207 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
7c673cae
FG
3208 ack->set_snap_follows(follows);
3209 ack->set_client_tid(m->get_client_tid());
3210 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3211 }
3212
3213 if (in == head_in ||
3214 (head_in->client_need_snapflush.count(snap) &&
3215 head_in->client_need_snapflush[snap].count(client))) {
3216 dout(7) << " flushsnap snap " << snap
3217 << " client." << client << " on " << *in << dendl;
3218
3219 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
3220 if (in == head_in)
3221 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
3222
11fdf7f2 3223 _do_snap_update(in, snap, dirty, follows, client, m, ack);
7c673cae
FG
3224
3225 if (in != head_in)
3226 head_in->remove_need_snapflush(in, snap, client);
7c673cae
FG
3227 } else {
3228 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
3229 if (ack)
3230 mds->send_message_client_counted(ack, m->get_connection());
3231 }
3232 goto out;
3233 }
3234
3235 if (cap->get_cap_id() != m->get_cap_id()) {
3236 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
3237 } else {
b32b8144
FG
3238 CInode *in = head_in;
3239 if (follows > 0) {
3240 in = mdcache->pick_inode_snap(head_in, follows);
3241 // intermediate snap inodes
3242 while (in != head_in) {
11fdf7f2
TL
3243 ceph_assert(in->last != CEPH_NOSNAP);
3244 if (in->is_auth() && dirty) {
b32b8144 3245 dout(10) << " updating intermediate snapped inode " << *in << dendl;
9f95a23c 3246 _do_cap_update(in, NULL, dirty, follows, m, ref_t<MClientCaps>());
b32b8144
FG
3247 }
3248 in = mdcache->pick_inode_snap(head_in, in->last);
7c673cae 3249 }
7c673cae
FG
3250 }
3251
3252 // head inode, and cap
9f95a23c 3253 ref_t<MClientCaps> ack;
7c673cae
FG
3254
3255 int caps = m->get_caps();
3256 if (caps & ~cap->issued()) {
3257 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3258 caps &= cap->issued();
3259 }
3260
9f95a23c 3261 int revoked = cap->confirm_receipt(m->get_seq(), caps);
7c673cae
FG
3262 dout(10) << " follows " << follows
3263 << " retains " << ccap_string(m->get_caps())
11fdf7f2 3264 << " dirty " << ccap_string(dirty)
7c673cae
FG
3265 << " on " << *in << dendl;
3266
9f95a23c
TL
3267 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3268 eval_lock_caches(cap);
7c673cae
FG
3269
3270 // missing/skipped snapflush?
3271 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
3272 // presently only does so when it has actual dirty metadata. But, we
3273 // set up the need_snapflush stuff based on the issued caps.
3274 // We can infer that the client WONT send a FLUSHSNAP once they have
3275 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
3276 // update/release).
3277 if (!head_in->client_need_snapflush.empty()) {
11fdf7f2
TL
3278 if (!(cap->issued() & CEPH_CAP_ANY_FILE_WR) &&
3279 !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP)) {
3280 head_in->auth_pin(this); // prevent subtree frozen
3281 need_unpin = true;
7c673cae
FG
3282 _do_null_snapflush(head_in, client);
3283 } else {
3284 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
3285 }
3286 }
eafe8130
TL
3287 if (cap->need_snapflush() && !(m->flags & MClientCaps::FLAG_PENDING_CAPSNAP))
3288 cap->clear_needsnapflush();
11fdf7f2 3289
11fdf7f2
TL
3290 if (dirty && in->is_auth()) {
3291 dout(7) << " flush client." << client << " dirty " << ccap_string(dirty)
7c673cae 3292 << " seq " << m->get_seq() << " on " << *in << dendl;
9f95a23c 3293 ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
11fdf7f2 3294 m->get_caps(), 0, dirty, 0, mds->get_osd_epoch_barrier());
7c673cae
FG
3295 ack->set_client_tid(m->get_client_tid());
3296 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
3297 }
3298
3299 // filter wanted based on what we could ever give out (given auth/replica status)
11fdf7f2 3300 bool need_flush = m->flags & MClientCaps::FLAG_SYNC;
f64942e4 3301 int new_wanted = m->get_wanted();
7c673cae 3302 if (new_wanted != cap->wanted()) {
f64942e4 3303 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
7c673cae
FG
3304 // exapnding caps. make sure we aren't waiting for a log flush
3305 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
3306 }
3307
3308 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
3309 }
11fdf7f2 3310
eafe8130
TL
3311 if (in->is_auth() &&
3312 _do_cap_update(in, cap, dirty, follows, m, ack, &need_flush)) {
3313 // updated
7c673cae
FG
3314 eval(in, CEPH_CAP_LOCKS);
3315
3316 if (!need_flush && (cap->wanted() & ~cap->pending()))
3317 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
3318 } else {
3319 // no update, ack now.
3320 if (ack)
3321 mds->send_message_client_counted(ack, m->get_connection());
3322
3323 bool did_issue = eval(in, CEPH_CAP_LOCKS);
3324 if (!did_issue && (cap->wanted() & ~cap->pending()))
3325 issue_caps(in, cap);
3326
3327 if (cap->get_last_seq() == 0 &&
3328 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
7c673cae
FG
3329 share_inode_max_size(in, cap);
3330 }
3331 }
3332
3333 if (need_flush)
3334 mds->mdlog->flush();
3335 }
3336
3337 out:
11fdf7f2
TL
3338 if (need_unpin)
3339 head_in->auth_unpin(this);
7c673cae
FG
3340}
3341
3342
3343class C_Locker_RetryRequestCapRelease : public LockerContext {
3344 client_t client;
3345 ceph_mds_request_release item;
3346public:
3347 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
3348 LockerContext(l), client(c), item(it) { }
3349 void finish(int r) override {
3350 string dname;
3351 MDRequestRef null_ref;
3352 locker->process_request_cap_release(null_ref, client, item, dname);
3353 }
3354};
3355
3356void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
11fdf7f2 3357 std::string_view dname)
7c673cae
FG
3358{
3359 inodeno_t ino = (uint64_t)item.ino;
3360 uint64_t cap_id = item.cap_id;
3361 int caps = item.caps;
3362 int wanted = item.wanted;
3363 int seq = item.seq;
3364 int issue_seq = item.issue_seq;
3365 int mseq = item.mseq;
3366
3367 CInode *in = mdcache->get_inode(ino);
3368 if (!in)
3369 return;
3370
3371 if (dname.length()) {
3372 frag_t fg = in->pick_dirfrag(dname);
3373 CDir *dir = in->get_dirfrag(fg);
3374 if (dir) {
3375 CDentry *dn = dir->lookup(dname);
3376 if (dn) {
3377 ClientLease *l = dn->get_client_lease(client);
3378 if (l) {
92f5a8d4 3379 dout(10) << __func__ << " removing lease on " << *dn << dendl;
7c673cae
FG
3380 dn->remove_client_lease(l, this);
3381 } else {
92f5a8d4 3382 dout(7) << __func__ << " client." << client
7c673cae
FG
3383 << " doesn't have lease on " << *dn << dendl;
3384 }
3385 } else {
92f5a8d4 3386 dout(7) << __func__ << " client." << client << " released lease on dn "
7c673cae
FG
3387 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3388 }
3389 }
3390 }
3391
3392 Capability *cap = in->get_client_cap(client);
3393 if (!cap)
3394 return;
3395
92f5a8d4 3396 dout(10) << __func__ << " client." << client << " " << ccap_string(caps) << " on " << *in
7c673cae
FG
3397 << (mdr ? "" : " (DEFERRED, no mdr)")
3398 << dendl;
3399
3400 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3401 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3402 return;
3403 }
3404
3405 if (cap->get_cap_id() != cap_id) {
3406 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3407 return;
3408 }
3409
3410 if (should_defer_client_cap_frozen(in)) {
3411 dout(7) << " frozen, deferring" << dendl;
3412 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3413 return;
3414 }
3415
3416 if (caps & ~cap->issued()) {
3417 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3418 caps &= cap->issued();
3419 }
9f95a23c
TL
3420 int revoked = cap->confirm_receipt(seq, caps);
3421 if (revoked & CEPH_CAP_ANY_DIR_OPS)
3422 eval_lock_caches(cap);
7c673cae
FG
3423
3424 if (!in->client_need_snapflush.empty() &&
3425 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3426 _do_null_snapflush(in, client);
3427 }
3428
3429 adjust_cap_wanted(cap, wanted, issue_seq);
3430
3431 if (mdr)
3432 cap->inc_suppress();
3433 eval(in, CEPH_CAP_LOCKS);
3434 if (mdr)
3435 cap->dec_suppress();
3436
3437 // take note; we may need to reissue on this cap later
3438 if (mdr)
3439 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3440}
3441
3442class C_Locker_RetryKickIssueCaps : public LockerContext {
3443 CInode *in;
3444 client_t client;
3445 ceph_seq_t seq;
3446public:
3447 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3448 LockerContext(l), in(i), client(c), seq(s) {
3449 in->get(CInode::PIN_PTRWAITER);
3450 }
3451 void finish(int r) override {
3452 locker->kick_issue_caps(in, client, seq);
3453 in->put(CInode::PIN_PTRWAITER);
3454 }
3455};
3456
3457void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3458{
3459 Capability *cap = in->get_client_cap(client);
a8e16298 3460 if (!cap || cap->get_last_seq() != seq)
7c673cae
FG
3461 return;
3462 if (in->is_frozen()) {
3463 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3464 in->add_waiter(CInode::WAIT_UNFREEZE,
3465 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3466 return;
3467 }
3468 dout(10) << "kick_issue_caps released at current seq " << seq
3469 << ", reissuing" << dendl;
3470 issue_caps(in, cap);
3471}
3472
3473void Locker::kick_cap_releases(MDRequestRef& mdr)
3474{
3475 client_t client = mdr->get_client();
3476 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3477 p != mdr->cap_releases.end();
3478 ++p) {
3479 CInode *in = mdcache->get_inode(p->first);
3480 if (!in)
3481 continue;
3482 kick_issue_caps(in, client, p->second);
3483 }
3484}
3485
7c673cae
FG
3486/**
3487 * m and ack might be NULL, so don't dereference them unless dirty != 0
3488 */
9f95a23c 3489void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack)
7c673cae
FG
3490{
3491 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3492 << " follows " << follows << " snap " << snap
3493 << " on " << *in << dendl;
3494
3495 if (snap == CEPH_NOSNAP) {
3496 // hmm, i guess snap was already deleted? just ack!
3497 dout(10) << " wow, the snap following " << follows
3498 << " was already deleted. nothing to record, just ack." << dendl;
3499 if (ack)
3500 mds->send_message_client_counted(ack, m->get_connection());
3501 return;
3502 }
3503
3504 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3505 mds->mdlog->start_entry(le);
3506 MutationRef mut = new MutationImpl();
3507 mut->ls = mds->mdlog->get_current_segment();
3508
3509 // normal metadata updates that we can apply to the head as well.
3510
3511 // update xattrs?
94b18763
FG
3512 CInode::mempool_xattr_map *px = nullptr;
3513 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3514 m->xattrbl.length() &&
3515 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3516
3517 CInode::mempool_old_inode *oi = 0;
7c673cae
FG
3518 if (in->is_multiversion()) {
3519 oi = in->pick_old_inode(snap);
3520 }
3521
94b18763 3522 CInode::mempool_inode *i;
7c673cae
FG
3523 if (oi) {
3524 dout(10) << " writing into old inode" << dendl;
94b18763
FG
3525 auto &pi = in->project_inode();
3526 pi.inode.version = in->pre_dirty();
7c673cae
FG
3527 if (snap > oi->first)
3528 in->split_old_inode(snap);
94b18763 3529 i = &oi->inode;
7c673cae
FG
3530 if (xattrs)
3531 px = &oi->xattrs;
3532 } else {
94b18763
FG
3533 auto &pi = in->project_inode(xattrs);
3534 pi.inode.version = in->pre_dirty();
3535 i = &pi.inode;
7c673cae 3536 if (xattrs)
94b18763 3537 px = pi.xattrs.get();
7c673cae
FG
3538 }
3539
94b18763 3540 _update_cap_fields(in, dirty, m, i);
7c673cae
FG
3541
3542 // xattr
94b18763
FG
3543 if (xattrs) {
3544 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
7c673cae 3545 << " len " << m->xattrbl.length() << dendl;
94b18763 3546 i->xattr_version = m->head.xattr_version;
11fdf7f2
TL
3547 auto p = m->xattrbl.cbegin();
3548 decode(*px, p);
7c673cae
FG
3549 }
3550
94b18763
FG
3551 {
3552 auto it = i->client_ranges.find(client);
3553 if (it != i->client_ranges.end()) {
3554 if (in->last == snap) {
3555 dout(10) << " removing client_range entirely" << dendl;
3556 i->client_ranges.erase(it);
3557 } else {
3558 dout(10) << " client_range now follows " << snap << dendl;
3559 it->second.follows = snap;
3560 }
7c673cae
FG
3561 }
3562 }
3563
3564 mut->auth_pin(in);
3565 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3566 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3567
3568 // "oldest flush tid" > 0 means client uses unique TID for each flush
3569 if (ack && ack->get_oldest_flush_tid() > 0)
3570 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3571 ack->get_oldest_flush_tid());
3572
11fdf7f2
TL
3573 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, UPDATE_SNAPFLUSH,
3574 ack, client));
7c673cae
FG
3575}
3576
9f95a23c 3577void Locker::_update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps> &m, CInode::mempool_inode *pi)
7c673cae
FG
3578{
3579 if (dirty == 0)
3580 return;
3581
3582 /* m must be valid if there are dirty caps */
11fdf7f2 3583 ceph_assert(m);
7c673cae
FG
3584 uint64_t features = m->get_connection()->get_features();
3585
3586 if (m->get_ctime() > pi->ctime) {
3587 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3588 << " for " << *in << dendl;
91327a77
AA
3589 pi->ctime = m->get_ctime();
3590 if (m->get_ctime() > pi->rstat.rctime)
3591 pi->rstat.rctime = m->get_ctime();
7c673cae
FG
3592 }
3593
3594 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3595 m->get_change_attr() > pi->change_attr) {
3596 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3597 << " for " << *in << dendl;
3598 pi->change_attr = m->get_change_attr();
3599 }
3600
3601 // file
3602 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3603 utime_t atime = m->get_atime();
3604 utime_t mtime = m->get_mtime();
3605 uint64_t size = m->get_size();
3606 version_t inline_version = m->inline_version;
3607
3608 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3609 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3610 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3611 << " for " << *in << dendl;
3612 pi->mtime = mtime;
91327a77
AA
3613 if (mtime > pi->rstat.rctime)
3614 pi->rstat.rctime = mtime;
7c673cae
FG
3615 }
3616 if (in->inode.is_file() && // ONLY if regular file
3617 size > pi->size) {
3618 dout(7) << " size " << pi->size << " -> " << size
3619 << " for " << *in << dendl;
3620 pi->size = size;
3621 pi->rstat.rbytes = size;
3622 }
3623 if (in->inode.is_file() &&
3624 (dirty & CEPH_CAP_FILE_WR) &&
3625 inline_version > pi->inline_data.version) {
3626 pi->inline_data.version = inline_version;
3627 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3628 pi->inline_data.get_data() = m->inline_data;
3629 else
3630 pi->inline_data.free_data();
3631 }
3632 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3633 dout(7) << " atime " << pi->atime << " -> " << atime
3634 << " for " << *in << dendl;
3635 pi->atime = atime;
3636 }
3637 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3638 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3639 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3640 << " for " << *in << dendl;
3641 pi->time_warp_seq = m->get_time_warp_seq();
3642 }
3643 }
3644 // auth
3645 if (dirty & CEPH_CAP_AUTH_EXCL) {
3646 if (m->head.uid != pi->uid) {
3647 dout(7) << " uid " << pi->uid
3648 << " -> " << m->head.uid
3649 << " for " << *in << dendl;
3650 pi->uid = m->head.uid;
3651 }
3652 if (m->head.gid != pi->gid) {
3653 dout(7) << " gid " << pi->gid
3654 << " -> " << m->head.gid
3655 << " for " << *in << dendl;
3656 pi->gid = m->head.gid;
3657 }
3658 if (m->head.mode != pi->mode) {
3659 dout(7) << " mode " << oct << pi->mode
3660 << " -> " << m->head.mode << dec
3661 << " for " << *in << dendl;
3662 pi->mode = m->head.mode;
3663 }
3664 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3665 dout(7) << " btime " << oct << pi->btime
3666 << " -> " << m->get_btime() << dec
3667 << " for " << *in << dendl;
3668 pi->btime = m->get_btime();
3669 }
3670 }
3671}
3672
3673/*
3674 * update inode based on cap flush|flushsnap|wanted.
3675 * adjust max_size, if needed.
3676 * if we update, return true; otherwise, false (no updated needed).
3677 */
3678bool Locker::_do_cap_update(CInode *in, Capability *cap,
3679 int dirty, snapid_t follows,
9f95a23c 3680 const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack,
7c673cae
FG
3681 bool *need_flush)
3682{
3683 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3684 << " issued " << ccap_string(cap ? cap->issued() : 0)
3685 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3686 << " on " << *in << dendl;
11fdf7f2 3687 ceph_assert(in->is_auth());
7c673cae 3688 client_t client = m->get_source().num();
94b18763 3689 CInode::mempool_inode *latest = in->get_projected_inode();
7c673cae
FG
3690
3691 // increase or zero max_size?
3692 uint64_t size = m->get_size();
3693 bool change_max = false;
3694 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3695 uint64_t new_max = old_max;
3696
3697 if (in->is_file()) {
3698 bool forced_change_max = false;
3699 dout(20) << "inode is file" << dendl;
3700 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3701 dout(20) << "client has write caps; m->get_max_size="
3702 << m->get_max_size() << "; old_max=" << old_max << dendl;
3703 if (m->get_max_size() > new_max) {
3704 dout(10) << "client requests file_max " << m->get_max_size()
3705 << " > max " << old_max << dendl;
3706 change_max = true;
3707 forced_change_max = true;
31f18b77 3708 new_max = calc_new_max_size(latest, m->get_max_size());
7c673cae 3709 } else {
31f18b77 3710 new_max = calc_new_max_size(latest, size);
7c673cae
FG
3711
3712 if (new_max > old_max)
3713 change_max = true;
3714 else
3715 new_max = old_max;
3716 }
3717 } else {
3718 if (old_max) {
3719 change_max = true;
3720 new_max = 0;
3721 }
3722 }
3723
3724 if (in->last == CEPH_NOSNAP &&
3725 change_max &&
3726 !in->filelock.can_wrlock(client) &&
3727 !in->filelock.can_force_wrlock(client)) {
3728 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3729 if (in->filelock.is_stable()) {
3730 bool need_issue = false;
3731 if (cap)
3732 cap->inc_suppress();
11fdf7f2 3733 if (in->get_mds_caps_wanted().empty() &&
7c673cae
FG
3734 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3735 if (in->filelock.get_state() != LOCK_EXCL)
3736 file_excl(&in->filelock, &need_issue);
3737 } else
3738 simple_lock(&in->filelock, &need_issue);
3739 if (need_issue)
3740 issue_caps(in);
3741 if (cap)
3742 cap->dec_suppress();
3743 }
3744 if (!in->filelock.can_wrlock(client) &&
3745 !in->filelock.can_force_wrlock(client)) {
3746 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3747 forced_change_max ? new_max : 0,
3748 0, utime_t());
3749
3750 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3751 change_max = false;
3752 }
3753 }
3754 }
3755
3756 if (m->flockbl.length()) {
3757 int32_t num_locks;
11fdf7f2
TL
3758 auto bli = m->flockbl.cbegin();
3759 decode(num_locks, bli);
7c673cae
FG
3760 for ( int i=0; i < num_locks; ++i) {
3761 ceph_filelock decoded_lock;
11fdf7f2 3762 decode(decoded_lock, bli);
7c673cae
FG
3763 in->get_fcntl_lock_state()->held_locks.
3764 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3765 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3766 }
11fdf7f2 3767 decode(num_locks, bli);
7c673cae
FG
3768 for ( int i=0; i < num_locks; ++i) {
3769 ceph_filelock decoded_lock;
11fdf7f2 3770 decode(decoded_lock, bli);
7c673cae
FG
3771 in->get_flock_lock_state()->held_locks.
3772 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3773 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3774 }
3775 }
3776
3777 if (!dirty && !change_max)
3778 return false;
3779
94b18763 3780 Session *session = mds->get_session(m);
7c673cae
FG
3781 if (session->check_access(in, MAY_WRITE,
3782 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
7c673cae
FG
3783 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3784 return false;
3785 }
7c673cae
FG
3786
3787 // do the update.
3788 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3789 mds->mdlog->start_entry(le);
3790
94b18763
FG
3791 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3792 m->xattrbl.length() &&
3793 m->head.xattr_version > in->get_projected_inode()->xattr_version;
7c673cae 3794
94b18763
FG
3795 auto &pi = in->project_inode(xattr);
3796 pi.inode.version = in->pre_dirty();
7c673cae
FG
3797
3798 MutationRef mut(new MutationImpl());
3799 mut->ls = mds->mdlog->get_current_segment();
3800
94b18763 3801 _update_cap_fields(in, dirty, m, &pi.inode);
7c673cae
FG
3802
3803 if (change_max) {
3804 dout(7) << " max_size " << old_max << " -> " << new_max
3805 << " for " << *in << dendl;
3806 if (new_max) {
94b18763
FG
3807 auto &cr = pi.inode.client_ranges[client];
3808 cr.range.first = 0;
3809 cr.range.last = new_max;
3810 cr.follows = in->first - 1;
a8e16298
TL
3811 if (cap)
3812 cap->mark_clientwriteable();
3813 } else {
94b18763 3814 pi.inode.client_ranges.erase(client);
a8e16298
TL
3815 if (cap)
3816 cap->clear_clientwriteable();
3817 }
7c673cae
FG
3818 }
3819
3820 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3821 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3822
3823 // auth
3824 if (dirty & CEPH_CAP_AUTH_EXCL)
3825 wrlock_force(&in->authlock, mut);
3826
94b18763
FG
3827 // xattrs update?
3828 if (xattr) {
3829 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3830 pi.inode.xattr_version = m->head.xattr_version;
11fdf7f2
TL
3831 auto p = m->xattrbl.cbegin();
3832 decode(*pi.xattrs, p);
7c673cae
FG
3833 wrlock_force(&in->xattrlock, mut);
3834 }
3835
3836 mut->auth_pin(in);
3837 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3838 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3839
3840 // "oldest flush tid" > 0 means client uses unique TID for each flush
3841 if (ack && ack->get_oldest_flush_tid() > 0)
3842 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3843 ack->get_oldest_flush_tid());
3844
11fdf7f2
TL
3845 unsigned update_flags = 0;
3846 if (change_max)
3847 update_flags |= UPDATE_SHAREMAX;
3848 if (cap)
3849 update_flags |= UPDATE_NEEDSISSUE;
3850 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, update_flags,
3851 ack, client));
7c673cae
FG
3852 if (need_flush && !*need_flush &&
3853 ((change_max && new_max) || // max INCREASE
3854 _need_flush_mdlog(in, dirty)))
3855 *need_flush = true;
3856
3857 return true;
3858}
3859
9f95a23c 3860void Locker::handle_client_cap_release(const cref_t<MClientCapRelease> &m)
7c673cae
FG
3861{
3862 client_t client = m->get_source().num();
3863 dout(10) << "handle_client_cap_release " << *m << dendl;
3864
3865 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3866 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3867 return;
3868 }
3869
3870 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3871 // Pause RADOS operations until we see the required epoch
3872 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3873 }
3874
3875 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3876 // Record the barrier so that we will retransmit it to clients
3877 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3878 }
3879
94b18763 3880 Session *session = mds->get_session(m);
7c673cae 3881
11fdf7f2
TL
3882 for (const auto &cap : m->caps) {
3883 _do_cap_release(client, inodeno_t((uint64_t)cap.ino) , cap.cap_id, cap.migrate_seq, cap.seq);
7c673cae
FG
3884 }
3885
3886 if (session) {
3887 session->notify_cap_release(m->caps.size());
3888 }
7c673cae
FG
3889}
3890
3891class C_Locker_RetryCapRelease : public LockerContext {
3892 client_t client;
3893 inodeno_t ino;
3894 uint64_t cap_id;
3895 ceph_seq_t migrate_seq;
3896 ceph_seq_t issue_seq;
3897public:
3898 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3899 ceph_seq_t mseq, ceph_seq_t seq) :
3900 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3901 void finish(int r) override {
3902 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3903 }
3904};
3905
3906void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3907 ceph_seq_t mseq, ceph_seq_t seq)
3908{
3909 CInode *in = mdcache->get_inode(ino);
3910 if (!in) {
3911 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3912 return;
3913 }
3914 Capability *cap = in->get_client_cap(client);
3915 if (!cap) {
3916 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3917 return;
3918 }
3919
3920 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3921 if (cap->get_cap_id() != cap_id) {
3922 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3923 return;
3924 }
3925 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3926 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3927 return;
3928 }
3929 if (should_defer_client_cap_frozen(in)) {
3930 dout(7) << " freezing|frozen, deferring" << dendl;
3931 in->add_waiter(CInode::WAIT_UNFREEZE,
3932 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3933 return;
3934 }
3935 if (seq != cap->get_last_issue()) {
3936 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3937 // clean out any old revoke history
3938 cap->clean_revoke_from(seq);
3939 eval_cap_gather(in);
3940 return;
3941 }
a8e16298 3942 remove_client_cap(in, cap);
7c673cae
FG
3943}
3944
494da23a 3945void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
7c673cae 3946{
a8e16298 3947 client_t client = cap->get_client();
7c673cae
FG
3948 // clean out any pending snapflush state
3949 if (!in->client_need_snapflush.empty())
3950 _do_null_snapflush(in, client);
3951
9f95a23c
TL
3952 while (!cap->lock_caches.empty()) {
3953 MDLockCache* lock_cache = cap->lock_caches.front();
3954 lock_cache->client_cap = nullptr;
3955 invalidate_lock_cache(lock_cache);
3956 }
3957
a8e16298 3958 bool notable = cap->is_notable();
7c673cae 3959 in->remove_client_cap(client);
a8e16298
TL
3960 if (!notable)
3961 return;
7c673cae
FG
3962
3963 if (in->is_auth()) {
3964 // make sure we clear out the client byte range
3965 if (in->get_projected_inode()->client_ranges.count(client) &&
494da23a
TL
3966 !(in->inode.nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
3967 if (kill)
3968 in->state_set(CInode::STATE_NEEDSRECOVER);
3969 else
3970 check_inode_max_size(in);
3971 }
7c673cae
FG
3972 } else {
3973 request_inode_file_caps(in);
3974 }
3975
3976 try_eval(in, CEPH_CAP_LOCKS);
3977}
3978
3979
3980/**
3981 * Return true if any currently revoking caps exceed the
f64942e4 3982 * session_timeout threshold.
7c673cae 3983 */
91327a77
AA
3984bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
3985 double timeout) const
7c673cae
FG
3986{
3987 xlist<Capability*>::const_iterator p = revoking.begin();
3988 if (p.end()) {
3989 // No revoking caps at the moment
3990 return false;
3991 } else {
3992 utime_t now = ceph_clock_now();
3993 utime_t age = now - (*p)->get_last_revoke_stamp();
91327a77 3994 if (age <= timeout) {
7c673cae
FG
3995 return false;
3996 } else {
3997 return true;
3998 }
3999 }
4000}
4001
9f95a23c 4002std::set<client_t> Locker::get_late_revoking_clients(double timeout) const
7c673cae 4003{
9f95a23c 4004 std::set<client_t> result;
7c673cae 4005
9f95a23c
TL
4006 if (any_late_revoking_caps(revoking_caps, timeout)) {
4007 // Slow path: execute in O(N_clients)
4008 for (auto &p : revoking_caps_by_client) {
4009 if (any_late_revoking_caps(p.second, timeout)) {
4010 result.insert(p.first);
4011 }
7c673cae 4012 }
9f95a23c
TL
4013 } else {
4014 // Fast path: no misbehaving clients, execute in O(1)
7c673cae 4015 }
9f95a23c 4016 return result;
7c673cae
FG
4017}
4018
4019// Hard-code instead of surfacing a config settings because this is
4020// really a hack that should go away at some point when we have better
4021// inspection tools for getting at detailed cap state (#7316)
4022#define MAX_WARN_CAPS 100
4023
4024void Locker::caps_tick()
4025{
4026 utime_t now = ceph_clock_now();
4027
11fdf7f2
TL
4028 if (!need_snapflush_inodes.empty()) {
4029 // snap inodes that needs flush are auth pinned, they affect
4030 // subtree/difrarg freeze.
4031 utime_t cutoff = now;
4032 cutoff -= g_conf()->mds_freeze_tree_timeout / 3;
4033
4034 CInode *last = need_snapflush_inodes.back();
4035 while (!need_snapflush_inodes.empty()) {
4036 CInode *in = need_snapflush_inodes.front();
4037 if (in->last_dirstat_prop >= cutoff)
4038 break;
4039 in->item_caps.remove_myself();
4040 snapflush_nudge(in);
4041 if (in == last)
4042 break;
4043 }
4044 }
4045
7c673cae
FG
4046 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
4047
11fdf7f2
TL
4048 now = ceph_clock_now();
4049 int n = 0;
7c673cae
FG
4050 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
4051 Capability *cap = *p;
4052
4053 utime_t age = now - cap->get_last_revoke_stamp();
11fdf7f2 4054 dout(20) << __func__ << " age = " << age << " client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
f64942e4
AA
4055 if (age <= mds->mdsmap->get_session_timeout()) {
4056 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
7c673cae
FG
4057 break;
4058 } else {
11fdf7f2
TL
4059 ++n;
4060 if (n > MAX_WARN_CAPS) {
7c673cae
FG
4061 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
4062 << "revoking, ignoring subsequent caps" << dendl;
4063 break;
4064 }
4065 }
4066 // exponential backoff of warning intervals
f64942e4 4067 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
7c673cae
FG
4068 cap->inc_num_revoke_warnings();
4069 stringstream ss;
4070 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
4071 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
4072 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
4073 mds->clog->warn() << ss.str();
4074 dout(20) << __func__ << " " << ss.str() << dendl;
4075 } else {
11fdf7f2 4076 dout(20) << __func__ << " silencing log message (backoff) for " << "client." << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
7c673cae
FG
4077 }
4078 }
4079}
4080
4081
9f95a23c 4082void Locker::handle_client_lease(const cref_t<MClientLease> &m)
7c673cae
FG
4083{
4084 dout(10) << "handle_client_lease " << *m << dendl;
4085
11fdf7f2 4086 ceph_assert(m->get_source().is_client());
7c673cae
FG
4087 client_t client = m->get_source().num();
4088
4089 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
4090 if (!in) {
4091 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
7c673cae
FG
4092 return;
4093 }
4094 CDentry *dn = 0;
4095
4096 frag_t fg = in->pick_dirfrag(m->dname);
4097 CDir *dir = in->get_dirfrag(fg);
4098 if (dir)
4099 dn = dir->lookup(m->dname);
4100 if (!dn) {
4101 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
7c673cae
FG
4102 return;
4103 }
4104 dout(10) << " on " << *dn << dendl;
4105
4106 // replica and lock
4107 ClientLease *l = dn->get_client_lease(client);
4108 if (!l) {
4109 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
7c673cae
FG
4110 return;
4111 }
4112
4113 switch (m->get_action()) {
4114 case CEPH_MDS_LEASE_REVOKE_ACK:
4115 case CEPH_MDS_LEASE_RELEASE:
4116 if (l->seq != m->get_seq()) {
4117 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
4118 } else {
4119 dout(7) << "handle_client_lease client." << client
4120 << " on " << *dn << dendl;
4121 dn->remove_client_lease(l, this);
4122 }
7c673cae
FG
4123 break;
4124
4125 case CEPH_MDS_LEASE_RENEW:
4126 {
4127 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
4128 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
4129 if (dn->lock.can_lease(client)) {
9f95a23c 4130 auto reply = make_message<MClientLease>(*m);
7c673cae 4131 int pool = 1; // fixme.. do something smart!
11fdf7f2
TL
4132 reply->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
4133 reply->h.seq = ++l->seq;
4134 reply->clear_payload();
7c673cae
FG
4135
4136 utime_t now = ceph_clock_now();
4137 now += mdcache->client_lease_durations[pool];
4138 mdcache->touch_client_lease(l, pool, now);
4139
11fdf7f2 4140 mds->send_message_client_counted(reply, m->get_connection());
7c673cae
FG
4141 }
4142 }
4143 break;
4144
4145 default:
4146 ceph_abort(); // implement me
4147 break;
4148 }
4149}
4150
4151
9f95a23c
TL
4152void Locker::issue_client_lease(CDentry *dn, MDRequestRef &mdr, int mask,
4153 utime_t now, bufferlist &bl)
7c673cae 4154{
9f95a23c
TL
4155 client_t client = mdr->get_client();
4156 Session *session = mdr->session;
4157
7c673cae 4158 CInode *diri = dn->get_dir()->get_inode();
9f95a23c
TL
4159 if (mdr->snapid == CEPH_NOSNAP &&
4160 dn->lock.can_lease(client) &&
4161 !diri->is_stray() && // do not issue dn leases in stray dir!
4162 !diri->filelock.can_lease(client) &&
4163 !(diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL))) {
7c673cae
FG
4164 // issue a dentry lease
4165 ClientLease *l = dn->add_client_lease(client, session);
4166 session->touch_lease(l);
4167
9f95a23c 4168 int pool = 1; // fixme.. do something smart!
7c673cae
FG
4169 now += mdcache->client_lease_durations[pool];
4170 mdcache->touch_client_lease(l, pool, now);
4171
11fdf7f2 4172 LeaseStat lstat;
9f95a23c 4173 lstat.mask = CEPH_LEASE_VALID | mask;
11fdf7f2
TL
4174 lstat.duration_ms = (uint32_t)(1000 * mdcache->client_lease_durations[pool]);
4175 lstat.seq = ++l->seq;
4176 encode_lease(bl, session->info, lstat);
4177 dout(20) << "issue_client_lease seq " << lstat.seq << " dur " << lstat.duration_ms << "ms "
7c673cae
FG
4178 << " on " << *dn << dendl;
4179 } else {
4180 // null lease
11fdf7f2 4181 LeaseStat lstat;
9f95a23c 4182 lstat.mask = mask;
11fdf7f2 4183 encode_lease(bl, session->info, lstat);
7c673cae
FG
4184 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
4185 }
4186}
4187
4188
4189void Locker::revoke_client_leases(SimpleLock *lock)
4190{
4191 int n = 0;
4192 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4193 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
4194 p != dn->client_lease_map.end();
4195 ++p) {
4196 ClientLease *l = p->second;
4197
4198 n++;
11fdf7f2 4199 ceph_assert(lock->get_type() == CEPH_LOCK_DN);
7c673cae
FG
4200
4201 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
4202 int mask = 1 | CEPH_LOCK_DN; // old and new bits
4203
4204 // i should also revoke the dir ICONTENT lease, if they have it!
4205 CInode *diri = dn->get_dir()->get_inode();
9f95a23c 4206 auto lease = make_message<MClientLease>(CEPH_MDS_LEASE_REVOKE, l->seq, mask, diri->ino(), diri->first, CEPH_NOSNAP, dn->get_name());
11fdf7f2 4207 mds->send_message_client_counted(lease, l->client);
7c673cae 4208 }
7c673cae
FG
4209}
4210
11fdf7f2
TL
4211void Locker::encode_lease(bufferlist& bl, const session_info_t& info,
4212 const LeaseStat& ls)
4213{
4214 if (info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
4215 ENCODE_START(1, 1, bl);
4216 encode(ls.mask, bl);
4217 encode(ls.duration_ms, bl);
4218 encode(ls.seq, bl);
4219 ENCODE_FINISH(bl);
4220 }
4221 else {
4222 encode(ls.mask, bl);
4223 encode(ls.duration_ms, bl);
4224 encode(ls.seq, bl);
4225 }
4226}
7c673cae
FG
4227
4228// locks ----------------------------------------------------------------
4229
11fdf7f2 4230SimpleLock *Locker::get_lock(int lock_type, const MDSCacheObjectInfo &info)
7c673cae
FG
4231{
4232 switch (lock_type) {
4233 case CEPH_LOCK_DN:
4234 {
4235 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
4236 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
4237 frag_t fg;
4238 CDir *dir = 0;
4239 CDentry *dn = 0;
4240 if (diri) {
4241 fg = diri->pick_dirfrag(info.dname);
4242 dir = diri->get_dirfrag(fg);
4243 if (dir)
4244 dn = dir->lookup(info.dname, info.snapid);
4245 }
4246 if (!dn) {
4247 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
4248 return 0;
4249 }
4250 return &dn->lock;
4251 }
4252
4253 case CEPH_LOCK_IAUTH:
4254 case CEPH_LOCK_ILINK:
4255 case CEPH_LOCK_IDFT:
4256 case CEPH_LOCK_IFILE:
4257 case CEPH_LOCK_INEST:
4258 case CEPH_LOCK_IXATTR:
4259 case CEPH_LOCK_ISNAP:
4260 case CEPH_LOCK_IFLOCK:
4261 case CEPH_LOCK_IPOLICY:
4262 {
4263 CInode *in = mdcache->get_inode(info.ino, info.snapid);
4264 if (!in) {
4265 dout(7) << "get_lock don't have ino " << info.ino << dendl;
4266 return 0;
4267 }
4268 switch (lock_type) {
4269 case CEPH_LOCK_IAUTH: return &in->authlock;
4270 case CEPH_LOCK_ILINK: return &in->linklock;
4271 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
4272 case CEPH_LOCK_IFILE: return &in->filelock;
4273 case CEPH_LOCK_INEST: return &in->nestlock;
4274 case CEPH_LOCK_IXATTR: return &in->xattrlock;
4275 case CEPH_LOCK_ISNAP: return &in->snaplock;
4276 case CEPH_LOCK_IFLOCK: return &in->flocklock;
4277 case CEPH_LOCK_IPOLICY: return &in->policylock;
4278 }
4279 }
4280
4281 default:
4282 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
4283 ceph_abort();
4284 break;
4285 }
4286
4287 return 0;
4288}
4289
9f95a23c 4290void Locker::handle_lock(const cref_t<MLock> &m)
7c673cae
FG
4291{
4292 // nobody should be talking to us during recovery.
11fdf7f2 4293 ceph_assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
7c673cae
FG
4294
4295 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
4296 if (!lock) {
4297 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
7c673cae
FG
4298 return;
4299 }
4300
4301 switch (lock->get_type()) {
4302 case CEPH_LOCK_DN:
4303 case CEPH_LOCK_IAUTH:
4304 case CEPH_LOCK_ILINK:
4305 case CEPH_LOCK_ISNAP:
4306 case CEPH_LOCK_IXATTR:
4307 case CEPH_LOCK_IFLOCK:
4308 case CEPH_LOCK_IPOLICY:
4309 handle_simple_lock(lock, m);
4310 break;
4311
4312 case CEPH_LOCK_IDFT:
4313 case CEPH_LOCK_INEST:
4314 //handle_scatter_lock((ScatterLock*)lock, m);
4315 //break;
4316
4317 case CEPH_LOCK_IFILE:
4318 handle_file_lock(static_cast<ScatterLock*>(lock), m);
4319 break;
4320
4321 default:
4322 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
4323 ceph_abort();
4324 break;
4325 }
4326}
4327
4328
4329
4330
4331
4332// ==========================================================================
4333// simple lock
4334
4335/** This function may take a reference to m if it needs one, but does
4336 * not put references. */
9f95a23c 4337void Locker::handle_reqrdlock(SimpleLock *lock, const cref_t<MLock> &m)
7c673cae
FG
4338{
4339 MDSCacheObject *parent = lock->get_parent();
4340 if (parent->is_auth() &&
4341 lock->get_state() != LOCK_SYNC &&
4342 !parent->is_frozen()) {
4343 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
4344 << " on " << *parent << dendl;
11fdf7f2 4345 ceph_assert(parent->is_auth()); // replica auth pinned if they're doing this!
7c673cae
FG
4346 if (lock->is_stable()) {
4347 simple_sync(lock);
4348 } else {
4349 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
4350 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
11fdf7f2 4351 new C_MDS_RetryMessage(mds, m));
7c673cae
FG
4352 }
4353 } else {
4354 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
4355 << " on " << *parent << dendl;
4356 // replica should retry
4357 }
4358}
4359
9f95a23c 4360void Locker::handle_simple_lock(SimpleLock *lock, const cref_t<MLock> &m)
7c673cae
FG
4361{
4362 int from = m->get_asker();
4363
4364 dout(10) << "handle_simple_lock " << *m
4365 << " on " << *lock << " " << *lock->get_parent() << dendl;
4366
4367 if (mds->is_rejoin()) {
4368 if (lock->get_parent()->is_rejoining()) {
4369 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
4370 << ", dropping " << *m << dendl;
7c673cae
FG
4371 return;
4372 }
4373 }
4374
4375 switch (m->get_action()) {
4376 // -- replica --
4377 case LOCK_AC_SYNC:
11fdf7f2 4378 ceph_assert(lock->get_state() == LOCK_LOCK);
7c673cae
FG
4379 lock->decode_locked_state(m->get_data());
4380 lock->set_state(LOCK_SYNC);
4381 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4382 break;
4383
4384 case LOCK_AC_LOCK:
11fdf7f2 4385 ceph_assert(lock->get_state() == LOCK_SYNC);
7c673cae
FG
4386 lock->set_state(LOCK_SYNC_LOCK);
4387 if (lock->is_leased())
4388 revoke_client_leases(lock);
4389 eval_gather(lock, true);
9f95a23c
TL
4390 if (lock->is_unstable_and_locked()) {
4391 if (lock->is_cached())
4392 invalidate_lock_caches(lock);
31f18b77 4393 mds->mdlog->flush();
9f95a23c 4394 }
7c673cae
FG
4395 break;
4396
4397
4398 // -- auth --
4399 case LOCK_AC_LOCKACK:
11fdf7f2 4400 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
7c673cae 4401 lock->get_state() == LOCK_SYNC_EXCL);
11fdf7f2 4402 ceph_assert(lock->is_gathering(from));
7c673cae
FG
4403 lock->remove_gather(from);
4404
4405 if (lock->is_gathering()) {
4406 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4407 << ", still gathering " << lock->get_gather_set() << dendl;
4408 } else {
4409 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
4410 << ", last one" << dendl;
4411 eval_gather(lock);
4412 }
4413 break;
4414
4415 case LOCK_AC_REQRDLOCK:
4416 handle_reqrdlock(lock, m);
4417 break;
4418
4419 }
7c673cae
FG
4420}
4421
4422/* unused, currently.
4423
4424class C_Locker_SimpleEval : public Context {
4425 Locker *locker;
4426 SimpleLock *lock;
4427public:
4428 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4429 void finish(int r) {
4430 locker->try_simple_eval(lock);
4431 }
4432};
4433
4434void Locker::try_simple_eval(SimpleLock *lock)
4435{
4436 // unstable and ambiguous auth?
4437 if (!lock->is_stable() &&
4438 lock->get_parent()->is_ambiguous_auth()) {
4439 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4440 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4441 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4442 return;
4443 }
4444
4445 if (!lock->get_parent()->is_auth()) {
4446 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4447 return;
4448 }
4449
4450 if (!lock->get_parent()->can_auth_pin()) {
4451 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4452 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4453 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4454 return;
4455 }
4456
4457 if (lock->is_stable())
4458 simple_eval(lock);
4459}
4460*/
4461
4462
4463void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4464{
4465 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4466
11fdf7f2
TL
4467 ceph_assert(lock->get_parent()->is_auth());
4468 ceph_assert(lock->is_stable());
7c673cae
FG
4469
4470 if (lock->get_parent()->is_freezing_or_frozen()) {
11fdf7f2
TL
4471 // dentry/snap lock in unreadable state can block path traverse
4472 if ((lock->get_type() != CEPH_LOCK_DN &&
9f95a23c
TL
4473 lock->get_type() != CEPH_LOCK_ISNAP &&
4474 lock->get_type() != CEPH_LOCK_IPOLICY) ||
7c673cae 4475 lock->get_state() == LOCK_SYNC ||
11fdf7f2 4476 lock->get_parent()->is_frozen())
7c673cae
FG
4477 return;
4478 }
4479
4480 if (mdcache->is_readonly()) {
4481 if (lock->get_state() != LOCK_SYNC) {
4482 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4483 simple_sync(lock, need_issue);
4484 }
4485 return;
4486 }
4487
4488 CInode *in = 0;
4489 int wanted = 0;
11fdf7f2 4490 if (lock->get_cap_shift()) {
7c673cae
FG
4491 in = static_cast<CInode*>(lock->get_parent());
4492 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4493 }
4494
4495 // -> excl?
4496 if (lock->get_state() != LOCK_EXCL &&
4497 in && in->get_target_loner() >= 0 &&
4498 (wanted & CEPH_CAP_GEXCL)) {
4499 dout(7) << "simple_eval stable, going to excl " << *lock
4500 << " on " << *lock->get_parent() << dendl;
4501 simple_excl(lock, need_issue);
4502 }
4503
4504 // stable -> sync?
4505 else if (lock->get_state() != LOCK_SYNC &&
4506 !lock->is_wrlocked() &&
4507 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4508 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4509 dout(7) << "simple_eval stable, syncing " << *lock
4510 << " on " << *lock->get_parent() << dendl;
4511 simple_sync(lock, need_issue);
4512 }
4513}
4514
4515
4516// mid
4517
4518bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4519{
4520 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
11fdf7f2
TL
4521 ceph_assert(lock->get_parent()->is_auth());
4522 ceph_assert(lock->is_stable());
7c673cae
FG
4523
4524 CInode *in = 0;
4525 if (lock->get_cap_shift())
4526 in = static_cast<CInode *>(lock->get_parent());
4527
4528 int old_state = lock->get_state();
4529
4530 if (old_state != LOCK_TSYN) {
4531
4532 switch (lock->get_state()) {
4533 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4534 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4535 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4536 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4537 default: ceph_abort();
4538 }
4539
4540 int gather = 0;
9f95a23c 4541 if (lock->is_wrlocked()) {
7c673cae 4542 gather++;
9f95a23c
TL
4543 if (lock->is_cached())
4544 invalidate_lock_caches(lock);
4545 }
7c673cae
FG
4546
4547 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4548 send_lock_message(lock, LOCK_AC_SYNC);
4549 lock->init_gather();
4550 gather++;
4551 }
4552
4553 if (in && in->is_head()) {
4554 if (in->issued_caps_need_gather(lock)) {
4555 if (need_issue)
4556 *need_issue = true;
4557 else
4558 issue_caps(in);
4559 gather++;
4560 }
4561 }
4562
4563 bool need_recover = false;
4564 if (lock->get_type() == CEPH_LOCK_IFILE) {
11fdf7f2 4565 ceph_assert(in);
7c673cae
FG
4566 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4567 mds->mdcache->queue_file_recover(in);
4568 need_recover = true;
4569 gather++;
4570 }
4571 }
4572
4573 if (!gather && lock->is_dirty()) {
4574 lock->get_parent()->auth_pin(lock);
4575 scatter_writebehind(static_cast<ScatterLock*>(lock));
4576 mds->mdlog->flush();
4577 return false;
4578 }
4579
4580 if (gather) {
4581 lock->get_parent()->auth_pin(lock);
4582 if (need_recover)
4583 mds->mdcache->do_file_recover();
4584 return false;
4585 }
4586 }
4587
4588 if (lock->get_parent()->is_replicated()) { // FIXME
4589 bufferlist data;
4590 lock->encode_locked_state(data);
4591 send_lock_message(lock, LOCK_AC_SYNC, data);
4592 }
4593 lock->set_state(LOCK_SYNC);
4594 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4595 if (in && in->is_head()) {
4596 if (need_issue)
4597 *need_issue = true;
4598 else
4599 issue_caps(in);
4600 }
4601 return true;
4602}
4603
4604void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4605{
4606 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
11fdf7f2
TL
4607 ceph_assert(lock->get_parent()->is_auth());
4608 ceph_assert(lock->is_stable());
7c673cae
FG
4609
4610 CInode *in = 0;
4611 if (lock->get_cap_shift())
4612 in = static_cast<CInode *>(lock->get_parent());
4613
4614 switch (lock->get_state()) {
4615 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4616 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4617 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4618 default: ceph_abort();
4619 }
4620
4621 int gather = 0;
4622 if (lock->is_rdlocked())
4623 gather++;
4624 if (lock->is_wrlocked())
4625 gather++;
9f95a23c
TL
4626 if (gather && lock->is_cached())
4627 invalidate_lock_caches(lock);
7c673cae
FG
4628
4629 if (lock->get_parent()->is_replicated() &&
4630 lock->get_state() != LOCK_LOCK_EXCL &&
4631 lock->get_state() != LOCK_XSYN_EXCL) {
4632 send_lock_message(lock, LOCK_AC_LOCK);
4633 lock->init_gather();
4634 gather++;
4635 }
4636
4637 if (in && in->is_head()) {
4638 if (in->issued_caps_need_gather(lock)) {
4639 if (need_issue)
4640 *need_issue = true;
4641 else
4642 issue_caps(in);
4643 gather++;
4644 }
4645 }
4646
4647 if (gather) {
4648 lock->get_parent()->auth_pin(lock);
4649 } else {
4650 lock->set_state(LOCK_EXCL);
4651 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4652 if (in) {
4653 if (need_issue)
4654 *need_issue = true;
4655 else
4656 issue_caps(in);
4657 }
4658 }
4659}
4660
4661void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4662{
4663 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
11fdf7f2
TL
4664 ceph_assert(lock->get_parent()->is_auth());
4665 ceph_assert(lock->is_stable());
4666 ceph_assert(lock->get_state() != LOCK_LOCK);
7c673cae
FG
4667
4668 CInode *in = 0;
4669 if (lock->get_cap_shift())
4670 in = static_cast<CInode *>(lock->get_parent());
4671
4672 int old_state = lock->get_state();
4673
4674 switch (lock->get_state()) {
4675 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
b32b8144 4676 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
7c673cae
FG
4677 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4678 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4679 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4680 break;
4681 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4682 default: ceph_abort();
4683 }
4684
4685 int gather = 0;
4686 if (lock->is_leased()) {
4687 gather++;
4688 revoke_client_leases(lock);
4689 }
9f95a23c
TL
4690 if (lock->is_rdlocked()) {
4691 if (lock->is_cached())
4692 invalidate_lock_caches(lock);
7c673cae 4693 gather++;
9f95a23c 4694 }
7c673cae
FG
4695 if (in && in->is_head()) {
4696 if (in->issued_caps_need_gather(lock)) {
4697 if (need_issue)
4698 *need_issue = true;
4699 else
4700 issue_caps(in);
4701 gather++;
4702 }
4703 }
4704
4705 bool need_recover = false;
4706 if (lock->get_type() == CEPH_LOCK_IFILE) {
11fdf7f2 4707 ceph_assert(in);
7c673cae
FG
4708 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4709 mds->mdcache->queue_file_recover(in);
4710 need_recover = true;
4711 gather++;
4712 }
4713 }
4714
4715 if (lock->get_parent()->is_replicated() &&
4716 lock->get_state() == LOCK_MIX_LOCK &&
4717 gather) {
4718 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4719 } else {
4720 // move to second stage of gather now, so we don't send the lock action later.
4721 if (lock->get_state() == LOCK_MIX_LOCK)
4722 lock->set_state(LOCK_MIX_LOCK2);
4723
4724 if (lock->get_parent()->is_replicated() &&
4725 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4726 gather++;
4727 send_lock_message(lock, LOCK_AC_LOCK);
4728 lock->init_gather();
4729 }
4730 }
4731
4732 if (!gather && lock->is_dirty()) {
4733 lock->get_parent()->auth_pin(lock);
4734 scatter_writebehind(static_cast<ScatterLock*>(lock));
4735 mds->mdlog->flush();
4736 return;
4737 }
4738
4739 if (gather) {
4740 lock->get_parent()->auth_pin(lock);
4741 if (need_recover)
4742 mds->mdcache->do_file_recover();
4743 } else {
4744 lock->set_state(LOCK_LOCK);
4745 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4746 }
4747}
4748
4749
4750void Locker::simple_xlock(SimpleLock *lock)
4751{
4752 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
11fdf7f2 4753 ceph_assert(lock->get_parent()->is_auth());
7c673cae 4754 //assert(lock->is_stable());
11fdf7f2 4755 ceph_assert(lock->get_state() != LOCK_XLOCK);
7c673cae
FG
4756
4757 CInode *in = 0;
4758 if (lock->get_cap_shift())
4759 in = static_cast<CInode *>(lock->get_parent());
4760
4761 if (lock->is_stable())
4762 lock->get_parent()->auth_pin(lock);
4763
4764 switch (lock->get_state()) {
4765 case LOCK_LOCK:
4766 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4767 default: ceph_abort();
4768 }
4769
4770 int gather = 0;
4771 if (lock->is_rdlocked())
4772 gather++;
4773 if (lock->is_wrlocked())
4774 gather++;
9f95a23c
TL
4775 if (gather && lock->is_cached())
4776 invalidate_lock_caches(lock);
7c673cae
FG
4777
4778 if (in && in->is_head()) {
4779 if (in->issued_caps_need_gather(lock)) {
4780 issue_caps(in);
4781 gather++;
4782 }
4783 }
4784
4785 if (!gather) {
4786 lock->set_state(LOCK_PREXLOCK);
4787 //assert("shouldn't be called if we are already xlockable" == 0);
4788 }
4789}
4790
4791
4792
4793
4794
4795// ==========================================================================
4796// scatter lock
4797
4798/*
4799
4800Some notes on scatterlocks.
4801
4802 - The scatter/gather is driven by the inode lock. The scatter always
4803 brings in the latest metadata from the fragments.
4804
4805 - When in a scattered/MIX state, fragments are only allowed to
4806 update/be written to if the accounted stat matches the inode's
4807 current version.
4808
4809 - That means, on gather, we _only_ assimilate diffs for frag metadata
4810 that match the current version, because those are the only ones
4811 written during this scatter/gather cycle. (Others didn't permit
4812 it.) We increment the version and journal this to disk.
4813
4814 - When possible, we also simultaneously update our local frag
4815 accounted stats to match.
4816
4817 - On scatter, the new inode info is broadcast to frags, both local
4818 and remote. If possible (auth and !frozen), the dirfrag auth
4819 should update the accounted state (if it isn't already up to date).
4820 Note that this may occur on both the local inode auth node and
4821 inode replicas, so there are two potential paths. If it is NOT
4822 possible, they need to mark_stale to prevent any possible writes.
4823
4824 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4825 only). Both are opportunities to update the frag accounted stats,
4826 even though only the MIX case is affected by a stale dirfrag.
4827
4828 - Because many scatter/gather cycles can potentially go by without a
4829 frag being able to update its accounted stats (due to being frozen
4830 by exports/refragments in progress), the frag may have (even very)
4831 old stat versions. That's fine. If when we do want to update it,
4832 we can update accounted_* and the version first.
4833
4834*/
4835
4836class C_Locker_ScatterWB : public LockerLogContext {
4837 ScatterLock *lock;
4838 MutationRef mut;
4839public:
4840 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4841 LockerLogContext(l), lock(sl), mut(m) {}
4842 void finish(int r) override {
4843 locker->scatter_writebehind_finish(lock, mut);
4844 }
4845};
4846
4847void Locker::scatter_writebehind(ScatterLock *lock)
4848{
4849 CInode *in = static_cast<CInode*>(lock->get_parent());
4850 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4851
4852 // journal
4853 MutationRef mut(new MutationImpl());
4854 mut->ls = mds->mdlog->get_current_segment();
4855
4856 // forcefully take a wrlock
4857 lock->get_wrlock(true);
9f95a23c 4858 mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
7c673cae
FG
4859
4860 in->pre_cow_old_inode(); // avoid cow mayhem
4861
94b18763
FG
4862 auto &pi = in->project_inode();
4863 pi.inode.version = in->pre_dirty();
7c673cae
FG
4864
4865 in->finish_scatter_gather_update(lock->get_type());
4866 lock->start_flush();
4867
4868 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4869 mds->mdlog->start_entry(le);
4870
4871 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4872 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4873
4874 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4875
4876 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4877}
4878
4879void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4880{
4881 CInode *in = static_cast<CInode*>(lock->get_parent());
4882 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4883 in->pop_and_dirty_projected_inode(mut->ls);
4884
4885 lock->finish_flush();
4886
4887 // if replicas may have flushed in a mix->lock state, send another
4888 // message so they can finish_flush().
4889 if (in->is_replicated()) {
4890 switch (lock->get_state()) {
4891 case LOCK_MIX_LOCK:
4892 case LOCK_MIX_LOCK2:
4893 case LOCK_MIX_EXCL:
4894 case LOCK_MIX_TSYN:
4895 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4896 }
4897 }
4898
4899 mut->apply();
4900 drop_locks(mut.get());
4901 mut->cleanup();
4902
4903 if (lock->is_stable())
4904 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4905
4906 //scatter_eval_gather(lock);
4907}
4908
4909void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4910{
4911 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4912
11fdf7f2
TL
4913 ceph_assert(lock->get_parent()->is_auth());
4914 ceph_assert(lock->is_stable());
7c673cae
FG
4915
4916 if (lock->get_parent()->is_freezing_or_frozen()) {
4917 dout(20) << " freezing|frozen" << dendl;
4918 return;
4919 }
4920
4921 if (mdcache->is_readonly()) {
4922 if (lock->get_state() != LOCK_SYNC) {
4923 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4924 simple_sync(lock, need_issue);
4925 }
4926 return;
4927 }
4928
4929 if (!lock->is_rdlocked() &&
4930 lock->get_state() != LOCK_MIX &&
4931 lock->get_scatter_wanted()) {
4932 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4933 << " on " << *lock->get_parent() << dendl;
4934 scatter_mix(lock, need_issue);
4935 return;
4936 }
4937
4938 if (lock->get_type() == CEPH_LOCK_INEST) {
4939 // in general, we want to keep INEST writable at all times.
4940 if (!lock->is_rdlocked()) {
4941 if (lock->get_parent()->is_replicated()) {
4942 if (lock->get_state() != LOCK_MIX)
4943 scatter_mix(lock, need_issue);
4944 } else {
4945 if (lock->get_state() != LOCK_LOCK)
4946 simple_lock(lock, need_issue);
4947 }
4948 }
4949 return;
4950 }
4951
4952 CInode *in = static_cast<CInode*>(lock->get_parent());
4953 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4954 // i _should_ be sync.
4955 if (!lock->is_wrlocked() &&
4956 lock->get_state() != LOCK_SYNC) {
4957 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4958 simple_sync(lock, need_issue);
4959 }
4960 }
4961}
4962
4963
4964/*
4965 * mark a scatterlock to indicate that the dir fnode has some dirty data
4966 */
4967void Locker::mark_updated_scatterlock(ScatterLock *lock)
4968{
4969 lock->mark_dirty();
4970 if (lock->get_updated_item()->is_on_list()) {
4971 dout(10) << "mark_updated_scatterlock " << *lock
4972 << " - already on list since " << lock->get_update_stamp() << dendl;
4973 } else {
4974 updated_scatterlocks.push_back(lock->get_updated_item());
4975 utime_t now = ceph_clock_now();
4976 lock->set_update_stamp(now);
4977 dout(10) << "mark_updated_scatterlock " << *lock
4978 << " - added at " << now << dendl;
4979 }
4980}
4981
4982/*
4983 * this is called by scatter_tick and LogSegment::try_to_trim() when
4984 * trying to flush dirty scattered data (i.e. updated fnode) back to
4985 * the inode.
4986 *
4987 * we need to lock|scatter in order to push fnode changes into the
4988 * inode.dirstat.
4989 */
11fdf7f2 4990void Locker::scatter_nudge(ScatterLock *lock, MDSContext *c, bool forcelockchange)
7c673cae
FG
4991{
4992 CInode *p = static_cast<CInode *>(lock->get_parent());
4993
4994 if (p->is_frozen() || p->is_freezing()) {
4995 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4996 if (c)
4997 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
b32b8144 4998 else if (lock->is_dirty())
7c673cae
FG
4999 // just requeue. not ideal.. starvation prone..
5000 updated_scatterlocks.push_back(lock->get_updated_item());
5001 return;
5002 }
5003
5004 if (p->is_ambiguous_auth()) {
5005 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
5006 if (c)
5007 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
b32b8144 5008 else if (lock->is_dirty())
7c673cae
FG
5009 // just requeue. not ideal.. starvation prone..
5010 updated_scatterlocks.push_back(lock->get_updated_item());
5011 return;
5012 }
5013
5014 if (p->is_auth()) {
5015 int count = 0;
5016 while (true) {
5017 if (lock->is_stable()) {
5018 // can we do it now?
5019 // (only if we're not replicated.. if we are, we really do need
5020 // to nudge the lock state!)
5021 /*
5022 actually, even if we're not replicated, we can't stay in MIX, because another mds
5023 could discover and replicate us at any time. if that happens while we're flushing,
5024 they end up in MIX but their inode has the old scatterstat version.
5025
5026 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
5027 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
5028 scatter_writebehind(lock);
5029 if (c)
5030 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5031 return;
5032 }
5033 */
5034
5035 if (mdcache->is_readonly()) {
5036 if (lock->get_state() != LOCK_SYNC) {
5037 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
5038 simple_sync(static_cast<ScatterLock*>(lock));
5039 }
5040 break;
5041 }
5042
5043 // adjust lock state
5044 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
5045 switch (lock->get_type()) {
5046 case CEPH_LOCK_IFILE:
5047 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5048 scatter_mix(static_cast<ScatterLock*>(lock));
5049 else if (lock->get_state() != LOCK_LOCK)
5050 simple_lock(static_cast<ScatterLock*>(lock));
5051 else
5052 simple_sync(static_cast<ScatterLock*>(lock));
5053 break;
5054
5055 case CEPH_LOCK_IDFT:
5056 case CEPH_LOCK_INEST:
5057 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
5058 scatter_mix(lock);
5059 else if (lock->get_state() != LOCK_LOCK)
5060 simple_lock(lock);
5061 else
5062 simple_sync(lock);
5063 break;
5064 default:
5065 ceph_abort();
5066 }
5067 ++count;
5068 if (lock->is_stable() && count == 2) {
5069 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
5070 // this should only realy happen when called via
5071 // handle_file_lock due to AC_NUDGE, because the rest of the
5072 // time we are replicated or have dirty data and won't get
5073 // called. bailing here avoids an infinite loop.
11fdf7f2 5074 ceph_assert(!c);
7c673cae
FG
5075 break;
5076 }
5077 } else {
5078 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
5079 if (c)
5080 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5081 return;
5082 }
5083 }
5084 } else {
5085 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
5086 << *lock << " on " << *p << dendl;
5087 // request unscatter?
5088 mds_rank_t auth = lock->get_parent()->authority().first;
11fdf7f2 5089 if (!mds->is_cluster_degraded() || mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
9f95a23c 5090 mds->send_message_mds(make_message<MLock>(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
11fdf7f2 5091 }
7c673cae
FG
5092
5093 // wait...
5094 if (c)
5095 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
5096
5097 // also, requeue, in case we had wrong auth or something
b32b8144
FG
5098 if (lock->is_dirty())
5099 updated_scatterlocks.push_back(lock->get_updated_item());
7c673cae
FG
5100 }
5101}
5102
5103void Locker::scatter_tick()
5104{
5105 dout(10) << "scatter_tick" << dendl;
5106
5107 // updated
5108 utime_t now = ceph_clock_now();
5109 int n = updated_scatterlocks.size();
5110 while (!updated_scatterlocks.empty()) {
5111 ScatterLock *lock = updated_scatterlocks.front();
5112
5113 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
5114
5115 if (!lock->is_dirty()) {
5116 updated_scatterlocks.pop_front();
5117 dout(10) << " removing from updated_scatterlocks "
5118 << *lock << " " << *lock->get_parent() << dendl;
5119 continue;
5120 }
11fdf7f2 5121 if (now - lock->get_update_stamp() < g_conf()->mds_scatter_nudge_interval)
7c673cae
FG
5122 break;
5123 updated_scatterlocks.pop_front();
5124 scatter_nudge(lock, 0);
5125 }
5126 mds->mdlog->flush();
5127}
5128
5129
5130void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
5131{
5132 dout(10) << "scatter_tempsync " << *lock
5133 << " on " << *lock->get_parent() << dendl;
11fdf7f2
TL
5134 ceph_assert(lock->get_parent()->is_auth());
5135 ceph_assert(lock->is_stable());
7c673cae 5136
11fdf7f2 5137 ceph_abort_msg("not fully implemented, at least not for filelock");
7c673cae
FG
5138
5139 CInode *in = static_cast<CInode *>(lock->get_parent());
5140
5141 switch (lock->get_state()) {
5142 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
5143 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
5144 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
5145 default: ceph_abort();
5146 }
5147
5148 int gather = 0;
9f95a23c
TL
5149 if (lock->is_wrlocked()) {
5150 if (lock->is_cached())
5151 invalidate_lock_caches(lock);
7c673cae 5152 gather++;
9f95a23c 5153 }
7c673cae
FG
5154
5155 if (lock->get_cap_shift() &&
5156 in->is_head() &&
5157 in->issued_caps_need_gather(lock)) {
5158 if (need_issue)
5159 *need_issue = true;
5160 else
5161 issue_caps(in);
5162 gather++;
5163 }
5164
5165 if (lock->get_state() == LOCK_MIX_TSYN &&
5166 in->is_replicated()) {
5167 lock->init_gather();
5168 send_lock_message(lock, LOCK_AC_LOCK);
5169 gather++;
5170 }
5171
5172 if (gather) {
5173 in->auth_pin(lock);
5174 } else {
5175 // do tempsync
5176 lock->set_state(LOCK_TSYN);
5177 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
5178 if (lock->get_cap_shift()) {
5179 if (need_issue)
5180 *need_issue = true;
5181 else
5182 issue_caps(in);
5183 }
5184 }
5185}
5186
5187
5188
5189// ==========================================================================
5190// local lock
5191
5192void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
5193{
5194 dout(7) << "local_wrlock_grab on " << *lock
5195 << " on " << *lock->get_parent() << dendl;
5196
11fdf7f2
TL
5197 ceph_assert(lock->get_parent()->is_auth());
5198 ceph_assert(lock->can_wrlock());
7c673cae 5199 lock->get_wrlock(mut->get_client());
11fdf7f2 5200
9f95a23c
TL
5201 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
5202 ceph_assert(it->is_wrlock());
7c673cae
FG
5203}
5204
5205bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
5206{
5207 dout(7) << "local_wrlock_start on " << *lock
5208 << " on " << *lock->get_parent() << dendl;
5209
11fdf7f2 5210 ceph_assert(lock->get_parent()->is_auth());
7c673cae 5211 if (lock->can_wrlock()) {
7c673cae 5212 lock->get_wrlock(mut->get_client());
9f95a23c 5213 auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
11fdf7f2 5214 ceph_assert(it->is_wrlock());
7c673cae
FG
5215 return true;
5216 } else {
5217 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5218 return false;
5219 }
5220}
5221
11fdf7f2 5222void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
7c673cae 5223{
11fdf7f2
TL
5224 ceph_assert(it->is_wrlock());
5225 LocalLock *lock = static_cast<LocalLock*>(it->lock);
7c673cae
FG
5226 dout(7) << "local_wrlock_finish on " << *lock
5227 << " on " << *lock->get_parent() << dendl;
5228 lock->put_wrlock();
11fdf7f2 5229 mut->locks.erase(it);
7c673cae
FG
5230 if (lock->get_num_wrlocks() == 0) {
5231 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5232 SimpleLock::WAIT_WR |
5233 SimpleLock::WAIT_RD);
5234 }
5235}
5236
5237bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
5238{
5239 dout(7) << "local_xlock_start on " << *lock
5240 << " on " << *lock->get_parent() << dendl;
5241
11fdf7f2 5242 ceph_assert(lock->get_parent()->is_auth());
7c673cae
FG
5243 if (!lock->can_xlock_local()) {
5244 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
5245 return false;
5246 }
5247
5248 lock->get_xlock(mut, mut->get_client());
9f95a23c 5249 mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
7c673cae
FG
5250 return true;
5251}
5252
11fdf7f2 5253void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
7c673cae 5254{
11fdf7f2
TL
5255 ceph_assert(it->is_xlock());
5256 LocalLock *lock = static_cast<LocalLock*>(it->lock);
7c673cae
FG
5257 dout(7) << "local_xlock_finish on " << *lock
5258 << " on " << *lock->get_parent() << dendl;
5259 lock->put_xlock();
11fdf7f2 5260 mut->locks.erase(it);
7c673cae
FG
5261
5262 lock->finish_waiters(SimpleLock::WAIT_STABLE |
5263 SimpleLock::WAIT_WR |
5264 SimpleLock::WAIT_RD);
5265}
5266
5267
5268
5269// ==========================================================================
5270// file lock
5271
5272
5273void Locker::file_eval(ScatterLock *lock, bool *need_issue)
5274{
5275 CInode *in = static_cast<CInode*>(lock->get_parent());
5276 int loner_wanted, other_wanted;
5277 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
5278 dout(7) << "file_eval wanted=" << gcap_string(wanted)
5279 << " loner_wanted=" << gcap_string(loner_wanted)
5280 << " other_wanted=" << gcap_string(other_wanted)
5281 << " filelock=" << *lock << " on " << *lock->get_parent()
5282 << dendl;
5283
11fdf7f2
TL
5284 ceph_assert(lock->get_parent()->is_auth());
5285 ceph_assert(lock->is_stable());
7c673cae
FG
5286
5287 if (lock->get_parent()->is_freezing_or_frozen())
5288 return;
5289
5290 if (mdcache->is_readonly()) {
5291 if (lock->get_state() != LOCK_SYNC) {
5292 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
5293 simple_sync(lock, need_issue);
5294 }
5295 return;
5296 }
5297
5298 // excl -> *?
5299 if (lock->get_state() == LOCK_EXCL) {
5300 dout(20) << " is excl" << dendl;
5301 int loner_issued, other_issued, xlocker_issued;
5302 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
5303 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
5304 << " other_issued=" << gcap_string(other_issued)
5305 << " xlocker_issued=" << gcap_string(xlocker_issued)
5306 << dendl;
5307 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
9f95a23c
TL
5308 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
5309 (in->is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
7c673cae
FG
5310 dout(20) << " should lose it" << dendl;
5311 // we should lose it.
5312 // loner other want
5313 // R R SYNC
5314 // R R|W MIX
5315 // R W MIX
5316 // R|W R MIX
5317 // R|W R|W MIX
5318 // R|W W MIX
5319 // W R MIX
5320 // W R|W MIX
5321 // W W MIX
5322 // -> any writer means MIX; RD doesn't matter.
5323 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
5324 lock->is_waiter_for(SimpleLock::WAIT_WR))
5325 scatter_mix(lock, need_issue);
5326 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
5327 simple_sync(lock, need_issue);
5328 else
5329 dout(10) << " waiting for wrlock to drain" << dendl;
5330 }
5331 }
5332
5333 // * -> excl?
5334 else if (lock->get_state() != LOCK_EXCL &&
5335 !lock->is_rdlocked() &&
5336 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
9f95a23c
TL
5337 in->get_target_loner() >= 0 &&
5338 (in->is_dir() ?
5339 !in->has_subtree_or_exporting_dirfrag() :
5340 (wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))) {
7c673cae
FG
5341 dout(7) << "file_eval stable, bump to loner " << *lock
5342 << " on " << *lock->get_parent() << dendl;
5343 file_excl(lock, need_issue);
5344 }
5345
5346 // * -> mixed?
5347 else if (lock->get_state() != LOCK_MIX &&
5348 !lock->is_rdlocked() &&
5349 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
5350 (lock->get_scatter_wanted() ||
b32b8144 5351 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
7c673cae
FG
5352 dout(7) << "file_eval stable, bump to mixed " << *lock
5353 << " on " << *lock->get_parent() << dendl;
5354 scatter_mix(lock, need_issue);
5355 }
5356
5357 // * -> sync?
5358 else if (lock->get_state() != LOCK_SYNC &&
5359 !lock->is_wrlocked() && // drain wrlocks first!
5360 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
f64942e4 5361 !(wanted & CEPH_CAP_GWR) &&
7c673cae
FG
5362 !((lock->get_state() == LOCK_MIX) &&
5363 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
5364 //((wanted & CEPH_CAP_RD) ||
5365 //in->is_replicated() ||
b32b8144 5366 //lock->is_leased() ||
7c673cae
FG
5367 //(!loner && lock->get_state() == LOCK_EXCL)) &&
5368 ) {
5369 dout(7) << "file_eval stable, bump to sync " << *lock
5370 << " on " << *lock->get_parent() << dendl;
5371 simple_sync(lock, need_issue);
5372 }
5373}
5374
5375
5376
5377void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
5378{
5379 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
5380
5381 CInode *in = static_cast<CInode*>(lock->get_parent());
11fdf7f2
TL
5382 ceph_assert(in->is_auth());
5383 ceph_assert(lock->is_stable());
7c673cae
FG
5384
5385 if (lock->get_state() == LOCK_LOCK) {
5386 in->start_scatter(lock);
5387 if (in->is_replicated()) {
5388 // data
5389 bufferlist softdata;
5390 lock->encode_locked_state(softdata);
5391
5392 // bcast to replicas
5393 send_lock_message(lock, LOCK_AC_MIX, softdata);
5394 }
5395
5396 // change lock
5397 lock->set_state(LOCK_MIX);
5398 lock->clear_scatter_wanted();
5399 if (lock->get_cap_shift()) {
5400 if (need_issue)
5401 *need_issue = true;
5402 else
5403 issue_caps(in);
5404 }
5405 } else {
5406 // gather?
5407 switch (lock->get_state()) {
5408 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
7c673cae 5409 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
b32b8144 5410 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
7c673cae
FG
5411 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
5412 default: ceph_abort();
5413 }
5414
5415 int gather = 0;
9f95a23c
TL
5416 if (lock->is_rdlocked()) {
5417 if (lock->is_cached())
5418 invalidate_lock_caches(lock);
7c673cae 5419 gather++;
9f95a23c 5420 }
7c673cae 5421 if (in->is_replicated()) {
b32b8144 5422 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
7c673cae
FG
5423 send_lock_message(lock, LOCK_AC_MIX);
5424 lock->init_gather();
5425 gather++;
5426 }
5427 }
5428 if (lock->is_leased()) {
5429 revoke_client_leases(lock);
5430 gather++;
5431 }
5432 if (lock->get_cap_shift() &&
5433 in->is_head() &&
5434 in->issued_caps_need_gather(lock)) {
5435 if (need_issue)
5436 *need_issue = true;
5437 else
5438 issue_caps(in);
5439 gather++;
5440 }
5441 bool need_recover = false;
5442 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5443 mds->mdcache->queue_file_recover(in);
5444 need_recover = true;
5445 gather++;
5446 }
5447
5448 if (gather) {
5449 lock->get_parent()->auth_pin(lock);
5450 if (need_recover)
5451 mds->mdcache->do_file_recover();
5452 } else {
5453 in->start_scatter(lock);
5454 lock->set_state(LOCK_MIX);
5455 lock->clear_scatter_wanted();
5456 if (in->is_replicated()) {
5457 bufferlist softdata;
5458 lock->encode_locked_state(softdata);
5459 send_lock_message(lock, LOCK_AC_MIX, softdata);
5460 }
5461 if (lock->get_cap_shift()) {
5462 if (need_issue)
5463 *need_issue = true;
5464 else
5465 issue_caps(in);
5466 }
5467 }
5468 }
5469}
5470
5471
5472void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5473{
5474 CInode *in = static_cast<CInode*>(lock->get_parent());
5475 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5476
11fdf7f2
TL
5477 ceph_assert(in->is_auth());
5478 ceph_assert(lock->is_stable());
7c673cae 5479
11fdf7f2 5480 ceph_assert((in->get_loner() >= 0 && in->get_mds_caps_wanted().empty()) ||
7c673cae
FG
5481 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5482
5483 switch (lock->get_state()) {
5484 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5485 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5486 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5487 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5488 default: ceph_abort();
5489 }
5490 int gather = 0;
5491
5492 if (lock->is_rdlocked())
5493 gather++;
5494 if (lock->is_wrlocked())
5495 gather++;
9f95a23c
TL
5496 if (gather && lock->is_cached())
5497 invalidate_lock_caches(lock);
7c673cae
FG
5498
5499 if (in->is_replicated() &&
5500 lock->get_state() != LOCK_LOCK_EXCL &&
5501 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5502 send_lock_message(lock, LOCK_AC_LOCK);
5503 lock->init_gather();
5504 gather++;
5505 }
5506 if (lock->is_leased()) {
5507 revoke_client_leases(lock);
5508 gather++;
5509 }
5510 if (in->is_head() &&
5511 in->issued_caps_need_gather(lock)) {
5512 if (need_issue)
5513 *need_issue = true;
5514 else
5515 issue_caps(in);
5516 gather++;
5517 }
5518 bool need_recover = false;
5519 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5520 mds->mdcache->queue_file_recover(in);
5521 need_recover = true;
5522 gather++;
5523 }
5524
5525 if (gather) {
5526 lock->get_parent()->auth_pin(lock);
5527 if (need_recover)
5528 mds->mdcache->do_file_recover();
5529 } else {
5530 lock->set_state(LOCK_EXCL);
5531 if (need_issue)
5532 *need_issue = true;
5533 else
5534 issue_caps(in);
5535 }
5536}
5537
5538void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5539{
5540 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5541 CInode *in = static_cast<CInode *>(lock->get_parent());
11fdf7f2
TL
5542 ceph_assert(in->is_auth());
5543 ceph_assert(in->get_loner() >= 0 && in->get_mds_caps_wanted().empty());
7c673cae
FG
5544
5545 switch (lock->get_state()) {
5546 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5547 default: ceph_abort();
5548 }
5549
5550 int gather = 0;
9f95a23c
TL
5551 if (lock->is_wrlocked()) {
5552 if (lock->is_cached())
5553 invalidate_lock_caches(lock);
7c673cae 5554 gather++;
9f95a23c 5555 }
7c673cae
FG
5556
5557 if (in->is_head() &&
5558 in->issued_caps_need_gather(lock)) {
5559 if (need_issue)
5560 *need_issue = true;
5561 else
5562 issue_caps(in);
5563 gather++;
5564 }
5565
5566 if (gather) {
5567 lock->get_parent()->auth_pin(lock);
5568 } else {
5569 lock->set_state(LOCK_XSYN);
5570 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5571 if (need_issue)
5572 *need_issue = true;
5573 else
5574 issue_caps(in);
5575 }
5576}
5577
5578void Locker::file_recover(ScatterLock *lock)
5579{
5580 CInode *in = static_cast<CInode *>(lock->get_parent());
5581 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5582
11fdf7f2 5583 ceph_assert(in->is_auth());
7c673cae 5584 //assert(lock->is_stable());
11fdf7f2 5585 ceph_assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
7c673cae
FG
5586
5587 int gather = 0;
5588
5589 /*
5590 if (in->is_replicated()
5591 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5592 send_lock_message(lock, LOCK_AC_LOCK);
5593 lock->init_gather();
5594 gather++;
5595 }
5596 */
5597 if (in->is_head() &&
5598 in->issued_caps_need_gather(lock)) {
5599 issue_caps(in);
5600 gather++;
5601 }
5602
5603 lock->set_state(LOCK_SCAN);
5604 if (gather)
5605 in->state_set(CInode::STATE_NEEDSRECOVER);
5606 else
5607 mds->mdcache->queue_file_recover(in);
5608}
5609
5610
5611// messenger
9f95a23c 5612void Locker::handle_file_lock(ScatterLock *lock, const cref_t<MLock> &m)
7c673cae
FG
5613{
5614 CInode *in = static_cast<CInode*>(lock->get_parent());
5615 int from = m->get_asker();
5616
5617 if (mds->is_rejoin()) {
5618 if (in->is_rejoining()) {
5619 dout(7) << "handle_file_lock still rejoining " << *in
5620 << ", dropping " << *m << dendl;
7c673cae
FG
5621 return;
5622 }
5623 }
5624
11fdf7f2 5625 dout(7) << "handle_file_lock a=" << lock->get_lock_action_name(m->get_action())
7c673cae
FG
5626 << " on " << *lock
5627 << " from mds." << from << " "
5628 << *in << dendl;
5629
5630 bool caps = lock->get_cap_shift();
5631
5632 switch (m->get_action()) {
5633 // -- replica --
5634 case LOCK_AC_SYNC:
11fdf7f2 5635 ceph_assert(lock->get_state() == LOCK_LOCK ||
7c673cae
FG
5636 lock->get_state() == LOCK_MIX ||
5637 lock->get_state() == LOCK_MIX_SYNC2);
5638
5639 if (lock->get_state() == LOCK_MIX) {
5640 lock->set_state(LOCK_MIX_SYNC);
5641 eval_gather(lock, true);
9f95a23c
TL
5642 if (lock->is_unstable_and_locked()) {
5643 if (lock->is_cached())
5644 invalidate_lock_caches(lock);
31f18b77 5645 mds->mdlog->flush();
9f95a23c 5646 }
7c673cae
FG
5647 break;
5648 }
5649
5650 (static_cast<ScatterLock *>(lock))->finish_flush();
5651 (static_cast<ScatterLock *>(lock))->clear_flushed();
5652
5653 // ok
5654 lock->decode_locked_state(m->get_data());
5655 lock->set_state(LOCK_SYNC);
5656
5657 lock->get_rdlock();
5658 if (caps)
5659 issue_caps(in);
5660 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5661 lock->put_rdlock();
5662 break;
5663
5664 case LOCK_AC_LOCK:
5665 switch (lock->get_state()) {
5666 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5667 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5668 default: ceph_abort();
5669 }
5670
5671 eval_gather(lock, true);
9f95a23c
TL
5672 if (lock->is_unstable_and_locked()) {
5673 if (lock->is_cached())
5674 invalidate_lock_caches(lock);
31f18b77 5675 mds->mdlog->flush();
9f95a23c 5676 }
31f18b77 5677
7c673cae
FG
5678 break;
5679
5680 case LOCK_AC_LOCKFLUSHED:
5681 (static_cast<ScatterLock *>(lock))->finish_flush();
5682 (static_cast<ScatterLock *>(lock))->clear_flushed();
5683 // wake up scatter_nudge waiters
5684 if (lock->is_stable())
5685 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5686 break;
5687
5688 case LOCK_AC_MIX:
11fdf7f2 5689 ceph_assert(lock->get_state() == LOCK_SYNC ||
7c673cae
FG
5690 lock->get_state() == LOCK_LOCK ||
5691 lock->get_state() == LOCK_SYNC_MIX2);
5692
5693 if (lock->get_state() == LOCK_SYNC) {
5694 // MIXED
5695 lock->set_state(LOCK_SYNC_MIX);
5696 eval_gather(lock, true);
9f95a23c
TL
5697 if (lock->is_unstable_and_locked()) {
5698 if (lock->is_cached())
5699 invalidate_lock_caches(lock);
31f18b77 5700 mds->mdlog->flush();
9f95a23c 5701 }
7c673cae
FG
5702 break;
5703 }
c07f9fc5 5704
7c673cae 5705 // ok
7c673cae 5706 lock->set_state(LOCK_MIX);
c07f9fc5 5707 lock->decode_locked_state(m->get_data());
7c673cae
FG
5708
5709 if (caps)
5710 issue_caps(in);
5711
5712 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5713 break;
5714
5715
5716 // -- auth --
5717 case LOCK_AC_LOCKACK:
11fdf7f2 5718 ceph_assert(lock->get_state() == LOCK_SYNC_LOCK ||
7c673cae
FG
5719 lock->get_state() == LOCK_MIX_LOCK ||
5720 lock->get_state() == LOCK_MIX_LOCK2 ||
5721 lock->get_state() == LOCK_MIX_EXCL ||
5722 lock->get_state() == LOCK_SYNC_EXCL ||
5723 lock->get_state() == LOCK_SYNC_MIX ||
5724 lock->get_state() == LOCK_MIX_TSYN);
11fdf7f2 5725 ceph_assert(lock->is_gathering(from));
7c673cae
FG
5726 lock->remove_gather(from);
5727
5728 if (lock->get_state() == LOCK_MIX_LOCK ||
5729 lock->get_state() == LOCK_MIX_LOCK2 ||
5730 lock->get_state() == LOCK_MIX_EXCL ||
5731 lock->get_state() == LOCK_MIX_TSYN) {
5732 lock->decode_locked_state(m->get_data());
5733 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5734 // delay calling scatter_writebehind().
5735 lock->clear_flushed();
5736 }
5737
5738 if (lock->is_gathering()) {
5739 dout(7) << "handle_file_lock " << *in << " from " << from
5740 << ", still gathering " << lock->get_gather_set() << dendl;
5741 } else {
5742 dout(7) << "handle_file_lock " << *in << " from " << from
5743 << ", last one" << dendl;
5744 eval_gather(lock);
5745 }
5746 break;
5747
5748 case LOCK_AC_SYNCACK:
11fdf7f2
TL
5749 ceph_assert(lock->get_state() == LOCK_MIX_SYNC);
5750 ceph_assert(lock->is_gathering(from));
7c673cae
FG
5751 lock->remove_gather(from);
5752
5753 lock->decode_locked_state(m->get_data());
5754
5755 if (lock->is_gathering()) {
5756 dout(7) << "handle_file_lock " << *in << " from " << from
5757 << ", still gathering " << lock->get_gather_set() << dendl;
5758 } else {
5759 dout(7) << "handle_file_lock " << *in << " from " << from
5760 << ", last one" << dendl;
5761 eval_gather(lock);
5762 }
5763 break;
5764
5765 case LOCK_AC_MIXACK:
11fdf7f2
TL
5766 ceph_assert(lock->get_state() == LOCK_SYNC_MIX);
5767 ceph_assert(lock->is_gathering(from));
7c673cae
FG
5768 lock->remove_gather(from);
5769
5770 if (lock->is_gathering()) {
5771 dout(7) << "handle_file_lock " << *in << " from " << from
5772 << ", still gathering " << lock->get_gather_set() << dendl;
5773 } else {
5774 dout(7) << "handle_file_lock " << *in << " from " << from
5775 << ", last one" << dendl;
5776 eval_gather(lock);
5777 }
5778 break;
5779
5780
5781 // requests....
5782 case LOCK_AC_REQSCATTER:
5783 if (lock->is_stable()) {
5784 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5785 * because the replica should be holding an auth_pin if they're
5786 * doing this (and thus, we are freezing, not frozen, and indefinite
5787 * starvation isn't an issue).
5788 */
5789 dout(7) << "handle_file_lock got scatter request on " << *lock
5790 << " on " << *lock->get_parent() << dendl;
5791 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5792 scatter_mix(lock);
5793 } else {
5794 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5795 << " on " << *lock->get_parent() << dendl;
5796 lock->set_scatter_wanted();
5797 }
5798 break;
5799
5800 case LOCK_AC_REQUNSCATTER:
5801 if (lock->is_stable()) {
5802 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5803 * because the replica should be holding an auth_pin if they're
5804 * doing this (and thus, we are freezing, not frozen, and indefinite
5805 * starvation isn't an issue).
5806 */
5807 dout(7) << "handle_file_lock got unscatter request on " << *lock
5808 << " on " << *lock->get_parent() << dendl;
5809 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5810 simple_lock(lock); // FIXME tempsync?
5811 } else {
5812 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5813 << " on " << *lock->get_parent() << dendl;
5814 lock->set_unscatter_wanted();
5815 }
5816 break;
5817
5818 case LOCK_AC_REQRDLOCK:
5819 handle_reqrdlock(lock, m);
5820 break;
5821
5822 case LOCK_AC_NUDGE:
5823 if (!lock->get_parent()->is_auth()) {
5824 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5825 << " on " << *lock->get_parent() << dendl;
5826 } else if (!lock->get_parent()->is_replicated()) {
5827 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5828 << " on " << *lock->get_parent() << dendl;
5829 } else {
5830 dout(7) << "handle_file_lock trying nudge on " << *lock
5831 << " on " << *lock->get_parent() << dendl;
5832 scatter_nudge(lock, 0, true);
5833 mds->mdlog->flush();
5834 }
5835 break;
5836
5837 default:
5838 ceph_abort();
5839 }
7c673cae 5840}