]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Locker.cc
update sources to v12.2.5
[ceph.git] / ceph / src / mds / Locker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
94b18763 15#include <boost/utility/string_view.hpp>
7c673cae
FG
16
17#include "MDSRank.h"
18#include "MDCache.h"
19#include "Locker.h"
20#include "CInode.h"
21#include "CDir.h"
22#include "CDentry.h"
23#include "Mutation.h"
24#include "MDSContext.h"
25
26#include "MDLog.h"
27#include "MDSMap.h"
28
29#include "events/EUpdate.h"
30#include "events/EOpen.h"
31
32#include "msg/Messenger.h"
33#include "osdc/Objecter.h"
34
35#include "messages/MInodeFileCaps.h"
36#include "messages/MLock.h"
37#include "messages/MClientLease.h"
38#include "messages/MClientReply.h"
39#include "messages/MClientCaps.h"
40#include "messages/MClientCapRelease.h"
41
42#include "messages/MMDSSlaveRequest.h"
43
44#include <errno.h>
45
46#include "common/config.h"
47
48
49#define dout_subsys ceph_subsys_mds
50#undef dout_prefix
51#define dout_context g_ceph_context
52#define dout_prefix _prefix(_dout, mds)
53static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
54 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
55}
56
57
58class LockerContext : public MDSInternalContextBase {
59protected:
60 Locker *locker;
61 MDSRank *get_mds() override
62 {
63 return locker->mds;
64 }
65
66public:
67 explicit LockerContext(Locker *locker_) : locker(locker_) {
68 assert(locker != NULL);
69 }
70};
71
72class LockerLogContext : public MDSLogContextBase {
73protected:
74 Locker *locker;
75 MDSRank *get_mds() override
76 {
77 return locker->mds;
78 }
79
80public:
81 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
82 assert(locker != NULL);
83 }
84};
85
86/* This function DOES put the passed message before returning */
87void Locker::dispatch(Message *m)
88{
89
90 switch (m->get_type()) {
91
92 // inter-mds locking
93 case MSG_MDS_LOCK:
94 handle_lock(static_cast<MLock*>(m));
95 break;
96 // inter-mds caps
97 case MSG_MDS_INODEFILECAPS:
98 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
99 break;
100
101 // client sync
102 case CEPH_MSG_CLIENT_CAPS:
103 handle_client_caps(static_cast<MClientCaps*>(m));
104
105 break;
106 case CEPH_MSG_CLIENT_CAPRELEASE:
107 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
108 break;
109 case CEPH_MSG_CLIENT_LEASE:
110 handle_client_lease(static_cast<MClientLease*>(m));
111 break;
112
113 default:
114 derr << "locker unknown message " << m->get_type() << dendl;
115 assert(0 == "locker unknown message");
116 }
117}
118
119void Locker::tick()
120{
121 scatter_tick();
122 caps_tick();
123}
124
125/*
126 * locks vs rejoin
127 *
128 *
129 *
130 */
131
132void Locker::send_lock_message(SimpleLock *lock, int msg)
133{
181888fb 134 for (const auto &it : lock->get_parent()->get_replicas()) {
7c673cae 135 if (mds->is_cluster_degraded() &&
181888fb 136 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
7c673cae
FG
137 continue;
138 MLock *m = new MLock(lock, msg, mds->get_nodeid());
181888fb 139 mds->send_message_mds(m, it.first);
7c673cae
FG
140 }
141}
142
143void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
144{
181888fb 145 for (const auto &it : lock->get_parent()->get_replicas()) {
7c673cae 146 if (mds->is_cluster_degraded() &&
181888fb 147 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
7c673cae
FG
148 continue;
149 MLock *m = new MLock(lock, msg, mds->get_nodeid());
150 m->set_data(data);
181888fb 151 mds->send_message_mds(m, it.first);
7c673cae
FG
152 }
153}
154
155
156
157
158void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
159{
160 // rdlock ancestor snaps
161 CInode *t = in;
162 rdlocks.insert(&in->snaplock);
163 while (t->get_projected_parent_dn()) {
164 t = t->get_projected_parent_dn()->get_dir()->get_inode();
165 rdlocks.insert(&t->snaplock);
166 }
167}
168
169void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
170 file_layout_t **layout)
171{
172 //rdlock ancestor snaps
173 CInode *t = in;
174 rdlocks.insert(&in->snaplock);
175 rdlocks.insert(&in->policylock);
176 bool found_layout = false;
177 while (t) {
178 rdlocks.insert(&t->snaplock);
179 if (!found_layout) {
180 rdlocks.insert(&t->policylock);
181 if (t->get_projected_inode()->has_layout()) {
182 *layout = &t->get_projected_inode()->layout;
183 found_layout = true;
184 }
185 }
186 if (t->get_projected_parent_dn() &&
187 t->get_projected_parent_dn()->get_dir())
188 t = t->get_projected_parent_dn()->get_dir()->get_inode();
189 else t = NULL;
190 }
191}
192
193struct MarkEventOnDestruct {
194 MDRequestRef& mdr;
195 const char* message;
196 bool mark_event;
197 MarkEventOnDestruct(MDRequestRef& _mdr,
198 const char *_message) : mdr(_mdr),
199 message(_message),
200 mark_event(true) {}
201 ~MarkEventOnDestruct() {
202 if (mark_event)
203 mdr->mark_event(message);
204 }
205};
206
207/* If this function returns false, the mdr has been placed
208 * on the appropriate wait list */
209bool Locker::acquire_locks(MDRequestRef& mdr,
210 set<SimpleLock*> &rdlocks,
211 set<SimpleLock*> &wrlocks,
212 set<SimpleLock*> &xlocks,
213 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
214 CInode *auth_pin_freeze,
215 bool auth_pin_nonblock)
216{
217 if (mdr->done_locking &&
218 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
219 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
220 return true; // at least we had better be!
221 }
222 dout(10) << "acquire_locks " << *mdr << dendl;
223
224 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
225
226 client_t client = mdr->get_client();
227
228 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
229 set<MDSCacheObject*> mustpin; // items to authpin
230
231 // xlocks
232 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
b32b8144
FG
233 SimpleLock *lock = *p;
234
235 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
236 lock->get_type() == CEPH_LOCK_IPOLICY) &&
237 mds->is_cluster_degraded() &&
238 mdr->is_master() &&
239 !mdr->is_queued_for_replay()) {
240 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
241 // get processed in proper order.
242 bool wait = false;
243 if (lock->get_parent()->is_auth()) {
244 if (!mdr->locks.count(lock)) {
245 set<mds_rank_t> ls;
246 lock->get_parent()->list_replicas(ls);
247 for (auto m : ls) {
248 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
249 wait = true;
250 break;
251 }
252 }
253 }
254 } else {
255 // if the lock is the latest locked one, it's possible that slave mds got the lock
256 // while there are recovering mds.
257 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
258 wait = true;
259 }
260 if (wait) {
261 dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
262 << ", waiting for cluster recovered" << dendl;
263 mds->locker->drop_locks(mdr.get(), NULL);
264 mdr->drop_local_auth_pins();
265 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
266 return false;
267 }
268 }
269
270 dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
271
272 sorted.insert(lock);
273 mustpin.insert(lock->get_parent());
7c673cae
FG
274
275 // augment xlock with a versionlock?
276 if ((*p)->get_type() == CEPH_LOCK_DN) {
b32b8144 277 CDentry *dn = (CDentry*)lock->get_parent();
7c673cae
FG
278 if (!dn->is_auth())
279 continue;
280
281 if (xlocks.count(&dn->versionlock))
282 continue; // we're xlocking the versionlock too; don't wrlock it!
283
284 if (mdr->is_master()) {
285 // master. wrlock versionlock so we can pipeline dentry updates to journal.
286 wrlocks.insert(&dn->versionlock);
287 } else {
288 // slave. exclusively lock the dentry version (i.e. block other journal updates).
289 // this makes rollback safe.
290 xlocks.insert(&dn->versionlock);
291 sorted.insert(&dn->versionlock);
292 }
293 }
b32b8144 294 if (lock->get_type() > CEPH_LOCK_IVERSION) {
7c673cae 295 // inode version lock?
b32b8144 296 CInode *in = (CInode*)lock->get_parent();
7c673cae
FG
297 if (!in->is_auth())
298 continue;
299 if (mdr->is_master()) {
300 // master. wrlock versionlock so we can pipeline inode updates to journal.
301 wrlocks.insert(&in->versionlock);
302 } else {
303 // slave. exclusively lock the inode version (i.e. block other journal updates).
304 // this makes rollback safe.
305 xlocks.insert(&in->versionlock);
306 sorted.insert(&in->versionlock);
307 }
308 }
309 }
310
311 // wrlocks
312 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
313 MDSCacheObject *object = (*p)->get_parent();
314 dout(20) << " must wrlock " << **p << " " << *object << dendl;
315 sorted.insert(*p);
316 if (object->is_auth())
317 mustpin.insert(object);
318 else if (!object->is_auth() &&
319 !(*p)->can_wrlock(client) && // we might have to request a scatter
320 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
321 dout(15) << " will also auth_pin " << *object
322 << " in case we need to request a scatter" << dendl;
323 mustpin.insert(object);
324 }
325 }
326
327 // remote_wrlocks
328 if (remote_wrlocks) {
329 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
330 MDSCacheObject *object = p->first->get_parent();
331 dout(20) << " must remote_wrlock on mds." << p->second << " "
332 << *p->first << " " << *object << dendl;
333 sorted.insert(p->first);
334 mustpin.insert(object);
335 }
336 }
337
338 // rdlocks
339 for (set<SimpleLock*>::iterator p = rdlocks.begin();
340 p != rdlocks.end();
341 ++p) {
342 MDSCacheObject *object = (*p)->get_parent();
343 dout(20) << " must rdlock " << **p << " " << *object << dendl;
344 sorted.insert(*p);
345 if (object->is_auth())
346 mustpin.insert(object);
347 else if (!object->is_auth() &&
348 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
349 dout(15) << " will also auth_pin " << *object
350 << " in case we need to request a rdlock" << dendl;
351 mustpin.insert(object);
352 }
353 }
354
355
356 // AUTH PINS
357 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
358
359 // can i auth pin them all now?
360 marker.message = "failed to authpin local pins";
361 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
362 p != mustpin.end();
363 ++p) {
364 MDSCacheObject *object = *p;
365
366 dout(10) << " must authpin " << *object << dendl;
367
368 if (mdr->is_auth_pinned(object)) {
369 if (object != (MDSCacheObject*)auth_pin_freeze)
370 continue;
371 if (mdr->more()->is_remote_frozen_authpin) {
372 if (mdr->more()->rename_inode == auth_pin_freeze)
373 continue;
374 // unfreeze auth pin for the wrong inode
375 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
376 }
377 }
378
379 if (!object->is_auth()) {
380 if (!mdr->locks.empty())
381 drop_locks(mdr.get());
382 if (object->is_ambiguous_auth()) {
383 // wait
384 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
385 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
386 mdr->drop_local_auth_pins();
387 return false;
388 }
389 mustpin_remote[object->authority().first].insert(object);
390 continue;
391 }
392 if (!object->can_auth_pin()) {
393 // wait
394 drop_locks(mdr.get());
395 mdr->drop_local_auth_pins();
396 if (auth_pin_nonblock) {
397 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
398 mdr->aborted = true;
399 return false;
400 }
401 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
402 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
224ce89b
WB
403
404 if (!mdr->remote_auth_pins.empty())
405 notify_freeze_waiter(object);
406
7c673cae
FG
407 return false;
408 }
409 }
410
411 // ok, grab local auth pins
412 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
413 p != mustpin.end();
414 ++p) {
415 MDSCacheObject *object = *p;
416 if (mdr->is_auth_pinned(object)) {
417 dout(10) << " already auth_pinned " << *object << dendl;
418 } else if (object->is_auth()) {
419 dout(10) << " auth_pinning " << *object << dendl;
420 mdr->auth_pin(object);
421 }
422 }
423
424 // request remote auth_pins
425 if (!mustpin_remote.empty()) {
426 marker.message = "requesting remote authpins";
427 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
428 p != mdr->remote_auth_pins.end();
429 ++p) {
430 if (mustpin.count(p->first)) {
431 assert(p->second == p->first->authority().first);
432 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
433 if (q != mustpin_remote.end())
434 q->second.insert(p->first);
435 }
436 }
437 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
438 p != mustpin_remote.end();
439 ++p) {
440 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
441
442 // wait for active auth
443 if (mds->is_cluster_degraded() &&
444 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
445 dout(10) << " mds." << p->first << " is not active" << dendl;
446 if (mdr->more()->waiting_on_slave.empty())
447 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
448 return false;
449 }
450
451 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
452 MMDSSlaveRequest::OP_AUTHPIN);
453 for (set<MDSCacheObject*>::iterator q = p->second.begin();
454 q != p->second.end();
455 ++q) {
456 dout(10) << " req remote auth_pin of " << **q << dendl;
457 MDSCacheObjectInfo info;
458 (*q)->set_object_info(info);
459 req->get_authpins().push_back(info);
460 if (*q == auth_pin_freeze)
461 (*q)->set_object_info(req->get_authpin_freeze());
462 mdr->pin(*q);
463 }
464 if (auth_pin_nonblock)
465 req->mark_nonblock();
466 mds->send_message_mds(req, p->first);
467
468 // put in waiting list
469 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
470 mdr->more()->waiting_on_slave.insert(p->first);
471 }
472 return false;
473 }
474
475 // caps i'll need to issue
476 set<CInode*> issue_set;
477 bool result = false;
478
479 // acquire locks.
480 // make sure they match currently acquired locks.
481 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
482 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
483 p != sorted.end();
484 ++p) {
485 bool need_wrlock = !!wrlocks.count(*p);
486 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
487
488 // already locked?
489 if (existing != mdr->locks.end() && *existing == *p) {
490 // right kind?
491 SimpleLock *have = *existing;
492 ++existing;
493 if (xlocks.count(have) && mdr->xlocks.count(have)) {
494 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
495 continue;
496 }
497 if (mdr->remote_wrlocks.count(have)) {
498 if (!need_remote_wrlock ||
499 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
500 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
501 << " " << *have << " " << *have->get_parent() << dendl;
502 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
503 }
504 }
505 if (need_wrlock || need_remote_wrlock) {
506 if (need_wrlock == !!mdr->wrlocks.count(have) &&
507 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
508 if (need_wrlock)
509 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
510 if (need_remote_wrlock)
511 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
512 continue;
513 }
514 }
515 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
516 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
517 continue;
518 }
519 }
520
521 // hose any stray locks
522 if (existing != mdr->locks.end() && *existing == *p) {
523 assert(need_wrlock || need_remote_wrlock);
524 SimpleLock *lock = *existing;
525 if (mdr->wrlocks.count(lock)) {
526 if (!need_wrlock)
527 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
528 else if (need_remote_wrlock) // acquire remote_wrlock first
529 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
530 bool need_issue = false;
531 wrlock_finish(lock, mdr.get(), &need_issue);
532 if (need_issue)
533 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
534 }
535 ++existing;
536 }
537 while (existing != mdr->locks.end()) {
538 SimpleLock *stray = *existing;
539 ++existing;
540 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
541 bool need_issue = false;
542 if (mdr->xlocks.count(stray)) {
543 xlock_finish(stray, mdr.get(), &need_issue);
544 } else if (mdr->rdlocks.count(stray)) {
545 rdlock_finish(stray, mdr.get(), &need_issue);
546 } else {
547 // may have acquired both wrlock and remore wrlock
548 if (mdr->wrlocks.count(stray))
549 wrlock_finish(stray, mdr.get(), &need_issue);
550 if (mdr->remote_wrlocks.count(stray))
551 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
552 }
553 if (need_issue)
554 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
555 }
556
557 // lock
558 if (mdr->locking && *p != mdr->locking) {
559 cancel_locking(mdr.get(), &issue_set);
560 }
561 if (xlocks.count(*p)) {
562 marker.message = "failed to xlock, waiting";
563 if (!xlock_start(*p, mdr))
564 goto out;
565 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
566 } else if (need_wrlock || need_remote_wrlock) {
567 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
568 marker.message = "waiting for remote wrlocks";
569 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
570 goto out;
571 }
572 if (need_wrlock && !mdr->wrlocks.count(*p)) {
573 marker.message = "failed to wrlock, waiting";
574 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
575 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
576 // can't take the wrlock because the scatter lock is gathering. need to
577 // release the remote wrlock, so that the gathering process can finish.
578 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
579 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
580 goto out;
581 }
582 // nowait if we have already gotten remote wrlock
583 if (!wrlock_start(*p, mdr, need_remote_wrlock))
584 goto out;
585 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
586 }
587 } else {
588 assert(mdr->is_master());
b32b8144
FG
589 if ((*p)->needs_recover()) {
590 if (mds->is_cluster_degraded()) {
591 if (!mdr->is_queued_for_replay()) {
592 // see comments in SimpleLock::set_state_rejoin() and
593 // ScatterLock::encode_state_for_rejoin()
594 drop_locks(mdr.get());
595 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
596 dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
597 << ", waiting for cluster recovered" << dendl;
598 marker.message = "rejoin recovering lock, waiting for cluster recovered";
599 return false;
7c673cae 600 }
b32b8144
FG
601 } else {
602 (*p)->clear_need_recover();
7c673cae
FG
603 }
604 }
605
606 marker.message = "failed to rdlock, waiting";
607 if (!rdlock_start(*p, mdr))
608 goto out;
609 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
610 }
611 }
612
613 // any extra unneeded locks?
614 while (existing != mdr->locks.end()) {
615 SimpleLock *stray = *existing;
616 ++existing;
617 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
618 bool need_issue = false;
619 if (mdr->xlocks.count(stray)) {
620 xlock_finish(stray, mdr.get(), &need_issue);
621 } else if (mdr->rdlocks.count(stray)) {
622 rdlock_finish(stray, mdr.get(), &need_issue);
623 } else {
624 // may have acquired both wrlock and remore wrlock
625 if (mdr->wrlocks.count(stray))
626 wrlock_finish(stray, mdr.get(), &need_issue);
627 if (mdr->remote_wrlocks.count(stray))
628 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
629 }
630 if (need_issue)
631 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
632 }
633
634 mdr->done_locking = true;
635 mdr->set_mds_stamp(ceph_clock_now());
636 result = true;
637 marker.message = "acquired locks";
638
639 out:
640 issue_caps_set(issue_set);
641 return result;
642}
643
224ce89b
WB
644void Locker::notify_freeze_waiter(MDSCacheObject *o)
645{
646 CDir *dir = NULL;
647 if (CInode *in = dynamic_cast<CInode*>(o)) {
648 if (!in->is_root())
649 dir = in->get_parent_dir();
650 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
651 dir = dn->get_dir();
652 } else {
653 dir = dynamic_cast<CDir*>(o);
654 assert(dir);
655 }
656 if (dir) {
657 if (dir->is_freezing_dir())
658 mdcache->fragment_freeze_inc_num_waiters(dir);
659 if (dir->is_freezing_tree()) {
660 while (!dir->is_freezing_tree_root())
661 dir = dir->get_parent_dir();
662 mdcache->migrator->export_freeze_inc_num_waiters(dir);
663 }
664 }
665}
7c673cae
FG
666
667void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
668{
669 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
670 p != mut->xlocks.end();
671 ++p) {
672 MDSCacheObject *object = (*p)->get_parent();
673 assert(object->is_auth());
674 if (skip_dentry &&
675 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
676 continue;
677 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
678 (*p)->set_xlock_done();
679 }
680}
681
682void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
683{
684 while (!mut->rdlocks.empty()) {
685 bool ni = false;
686 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
687 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
688 if (ni)
689 pneed_issue->insert(static_cast<CInode*>(p));
690 }
691}
692
693void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
694{
695 set<mds_rank_t> slaves;
696
697 while (!mut->xlocks.empty()) {
698 SimpleLock *lock = *mut->xlocks.begin();
699 MDSCacheObject *p = lock->get_parent();
700 if (!p->is_auth()) {
701 assert(lock->get_sm()->can_remote_xlock);
702 slaves.insert(p->authority().first);
703 lock->put_xlock();
704 mut->locks.erase(lock);
705 mut->xlocks.erase(lock);
706 continue;
707 }
708 bool ni = false;
709 xlock_finish(lock, mut, &ni);
710 if (ni)
711 pneed_issue->insert(static_cast<CInode*>(p));
712 }
713
714 while (!mut->remote_wrlocks.empty()) {
715 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
716 slaves.insert(p->second);
717 if (mut->wrlocks.count(p->first) == 0)
718 mut->locks.erase(p->first);
719 mut->remote_wrlocks.erase(p);
720 }
721
722 while (!mut->wrlocks.empty()) {
723 bool ni = false;
724 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
725 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
726 if (ni)
727 pneed_issue->insert(static_cast<CInode*>(p));
728 }
729
730 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
731 if (!mds->is_cluster_degraded() ||
732 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
733 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
734 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
735 MMDSSlaveRequest::OP_DROPLOCKS);
736 mds->send_message_mds(slavereq, *p);
737 }
738 }
739}
740
741void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
742{
743 SimpleLock *lock = mut->locking;
744 assert(lock);
745 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
746
747 if (lock->get_parent()->is_auth()) {
748 bool need_issue = false;
749 if (lock->get_state() == LOCK_PREXLOCK) {
750 _finish_xlock(lock, -1, &need_issue);
751 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
752 lock->get_num_xlocks() == 0) {
753 lock->set_state(LOCK_XLOCKDONE);
754 eval_gather(lock, true, &need_issue);
755 }
756 if (need_issue)
757 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
758 }
759 mut->finish_locking(lock);
760}
761
762void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
763{
764 // leftover locks
765 set<CInode*> my_need_issue;
766 if (!pneed_issue)
767 pneed_issue = &my_need_issue;
768
769 if (mut->locking)
770 cancel_locking(mut, pneed_issue);
771 _drop_non_rdlocks(mut, pneed_issue);
772 _drop_rdlocks(mut, pneed_issue);
773
774 if (pneed_issue == &my_need_issue)
775 issue_caps_set(*pneed_issue);
776 mut->done_locking = false;
777}
778
779void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
780{
781 set<CInode*> my_need_issue;
782 if (!pneed_issue)
783 pneed_issue = &my_need_issue;
784
785 _drop_non_rdlocks(mut, pneed_issue);
786
787 if (pneed_issue == &my_need_issue)
788 issue_caps_set(*pneed_issue);
789}
790
b32b8144 791void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
7c673cae 792{
b32b8144 793 set<CInode*> need_issue;
7c673cae 794
b32b8144
FG
795 for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
796 SimpleLock *lock = *p;
797 ++p;
798 // make later mksnap/setlayout (at other mds) wait for this unsafe request
799 if (lock->get_type() == CEPH_LOCK_ISNAP ||
800 lock->get_type() == CEPH_LOCK_IPOLICY)
801 continue;
802 bool ni = false;
803 rdlock_finish(lock, mut, &ni);
804 if (ni)
805 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
806 }
7c673cae 807
b32b8144 808 issue_caps_set(need_issue);
7c673cae
FG
809}
810
7c673cae
FG
811// generics
812
813void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
814{
815 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
816 assert(!lock->is_stable());
817
818 int next = lock->get_next_state();
819
820 CInode *in = 0;
821 bool caps = lock->get_cap_shift();
822 if (lock->get_type() != CEPH_LOCK_DN)
823 in = static_cast<CInode *>(lock->get_parent());
824
825 bool need_issue = false;
826
827 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
828 assert(!caps || in != NULL);
829 if (caps && in->is_head()) {
830 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
831 lock->get_cap_shift(), lock->get_cap_mask());
832 dout(10) << " next state is " << lock->get_state_name(next)
833 << " issued/allows loner " << gcap_string(loner_issued)
834 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
835 << " xlocker " << gcap_string(xlocker_issued)
836 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
837 << " other " << gcap_string(other_issued)
838 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
839 << dendl;
840
841 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
842 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
843 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
844 need_issue = true;
845 }
846
847#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
848 bool auth = lock->get_parent()->is_auth();
849 if (!lock->is_gathering() &&
850 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
851 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
853 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
854 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
855 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
856 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
857 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
858 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
859 lock->get_state() != LOCK_MIX_SYNC2
860 ) {
861 dout(7) << "eval_gather finished gather on " << *lock
862 << " on " << *lock->get_parent() << dendl;
863
864 if (lock->get_sm() == &sm_filelock) {
865 assert(in);
866 if (in->state_test(CInode::STATE_RECOVERING)) {
867 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
868 return;
869 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
870 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
871 mds->mdcache->queue_file_recover(in);
872 mds->mdcache->do_file_recover();
873 return;
874 }
875 }
876
877 if (!lock->get_parent()->is_auth()) {
878 // replica: tell auth
879 mds_rank_t auth = lock->get_parent()->authority().first;
880
881 if (lock->get_parent()->is_rejoining() &&
882 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
883 dout(7) << "eval_gather finished gather, but still rejoining "
884 << *lock->get_parent() << dendl;
885 return;
886 }
887
888 if (!mds->is_cluster_degraded() ||
889 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
890 switch (lock->get_state()) {
891 case LOCK_SYNC_LOCK:
892 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
893 auth);
894 break;
895
896 case LOCK_MIX_SYNC:
897 {
898 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
899 lock->encode_locked_state(reply->get_data());
900 mds->send_message_mds(reply, auth);
901 next = LOCK_MIX_SYNC2;
902 (static_cast<ScatterLock *>(lock))->start_flush();
903 }
904 break;
905
906 case LOCK_MIX_SYNC2:
907 (static_cast<ScatterLock *>(lock))->finish_flush();
908 (static_cast<ScatterLock *>(lock))->clear_flushed();
909
910 case LOCK_SYNC_MIX2:
911 // do nothing, we already acked
912 break;
913
914 case LOCK_SYNC_MIX:
915 {
916 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
917 mds->send_message_mds(reply, auth);
918 next = LOCK_SYNC_MIX2;
919 }
920 break;
921
922 case LOCK_MIX_LOCK:
923 {
924 bufferlist data;
925 lock->encode_locked_state(data);
926 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
927 (static_cast<ScatterLock *>(lock))->start_flush();
928 // we'll get an AC_LOCKFLUSHED to complete
929 }
930 break;
931
932 default:
933 ceph_abort();
934 }
935 }
936 } else {
937 // auth
938
939 // once the first (local) stage of mix->lock gather complete we can
940 // gather from replicas
941 if (lock->get_state() == LOCK_MIX_LOCK &&
942 lock->get_parent()->is_replicated()) {
943 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
944 send_lock_message(lock, LOCK_AC_LOCK);
945 lock->init_gather();
946 lock->set_state(LOCK_MIX_LOCK2);
947 return;
948 }
949
950 if (lock->is_dirty() && !lock->is_flushed()) {
951 scatter_writebehind(static_cast<ScatterLock *>(lock));
952 mds->mdlog->flush();
953 return;
954 }
955 lock->clear_flushed();
956
957 switch (lock->get_state()) {
958 // to mixed
959 case LOCK_TSYN_MIX:
960 case LOCK_SYNC_MIX:
961 case LOCK_EXCL_MIX:
b32b8144 962 case LOCK_XSYN_MIX:
7c673cae
FG
963 in->start_scatter(static_cast<ScatterLock *>(lock));
964 if (lock->get_parent()->is_replicated()) {
965 bufferlist softdata;
966 lock->encode_locked_state(softdata);
967 send_lock_message(lock, LOCK_AC_MIX, softdata);
968 }
969 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
970 break;
971
972 case LOCK_XLOCK:
973 case LOCK_XLOCKDONE:
974 if (next != LOCK_SYNC)
975 break;
976 // fall-thru
977
978 // to sync
979 case LOCK_EXCL_SYNC:
980 case LOCK_LOCK_SYNC:
981 case LOCK_MIX_SYNC:
982 case LOCK_XSYN_SYNC:
983 if (lock->get_parent()->is_replicated()) {
984 bufferlist softdata;
985 lock->encode_locked_state(softdata);
986 send_lock_message(lock, LOCK_AC_SYNC, softdata);
987 }
988 break;
989 }
990
991 }
992
993 lock->set_state(next);
994
995 if (lock->get_parent()->is_auth() &&
996 lock->is_stable())
997 lock->get_parent()->auth_unpin(lock);
998
999 // drop loner before doing waiters
1000 if (caps &&
1001 in->is_head() &&
1002 in->is_auth() &&
1003 in->get_wanted_loner() != in->get_loner()) {
1004 dout(10) << " trying to drop loner" << dendl;
1005 if (in->try_drop_loner()) {
1006 dout(10) << " dropped loner" << dendl;
1007 need_issue = true;
1008 }
1009 }
1010
1011 if (pfinishers)
1012 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1013 *pfinishers);
1014 else
1015 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1016
1017 if (caps && in->is_head())
1018 need_issue = true;
1019
1020 if (lock->get_parent()->is_auth() &&
1021 lock->is_stable())
1022 try_eval(lock, &need_issue);
1023 }
1024
1025 if (need_issue) {
1026 if (pneed_issue)
1027 *pneed_issue = true;
1028 else if (in->is_head())
1029 issue_caps(in);
1030 }
1031
1032}
1033
1034bool Locker::eval(CInode *in, int mask, bool caps_imported)
1035{
1036 bool need_issue = caps_imported;
1037 list<MDSInternalContextBase*> finishers;
1038
1039 dout(10) << "eval " << mask << " " << *in << dendl;
1040
1041 // choose loner?
1042 if (in->is_auth() && in->is_head()) {
b32b8144
FG
1043 client_t orig_loner = in->get_loner();
1044 if (in->choose_ideal_loner()) {
1045 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1046 need_issue = true;
1047 mask = -1;
1048 } else if (in->get_wanted_loner() != in->get_loner()) {
1049 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1050 mask = -1;
1051 }
7c673cae
FG
1052 }
1053
1054 retry:
1055 if (mask & CEPH_LOCK_IFILE)
1056 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1057 if (mask & CEPH_LOCK_IAUTH)
1058 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1059 if (mask & CEPH_LOCK_ILINK)
1060 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1061 if (mask & CEPH_LOCK_IXATTR)
1062 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1063 if (mask & CEPH_LOCK_INEST)
1064 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1065 if (mask & CEPH_LOCK_IFLOCK)
1066 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1067 if (mask & CEPH_LOCK_IPOLICY)
1068 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1069
1070 // drop loner?
1071 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
7c673cae 1072 if (in->try_drop_loner()) {
7c673cae 1073 need_issue = true;
7c673cae 1074 if (in->get_wanted_loner() >= 0) {
b32b8144
FG
1075 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1076 bool ok = in->try_set_loner();
1077 assert(ok);
1078 mask = -1;
1079 goto retry;
7c673cae
FG
1080 }
1081 }
1082 }
1083
1084 finish_contexts(g_ceph_context, finishers);
1085
1086 if (need_issue && in->is_head())
1087 issue_caps(in);
1088
1089 dout(10) << "eval done" << dendl;
1090 return need_issue;
1091}
1092
1093class C_Locker_Eval : public LockerContext {
1094 MDSCacheObject *p;
1095 int mask;
1096public:
1097 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1098 // We are used as an MDSCacheObject waiter, so should
1099 // only be invoked by someone already holding the big lock.
1100 assert(locker->mds->mds_lock.is_locked_by_me());
1101 p->get(MDSCacheObject::PIN_PTRWAITER);
1102 }
1103 void finish(int r) override {
1104 locker->try_eval(p, mask);
1105 p->put(MDSCacheObject::PIN_PTRWAITER);
1106 }
1107};
1108
1109void Locker::try_eval(MDSCacheObject *p, int mask)
1110{
1111 // unstable and ambiguous auth?
1112 if (p->is_ambiguous_auth()) {
1113 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1114 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1115 return;
1116 }
1117
1118 if (p->is_auth() && p->is_frozen()) {
1119 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1120 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1121 return;
1122 }
1123
1124 if (mask & CEPH_LOCK_DN) {
1125 assert(mask == CEPH_LOCK_DN);
1126 bool need_issue = false; // ignore this, no caps on dentries
1127 CDentry *dn = static_cast<CDentry *>(p);
1128 eval_any(&dn->lock, &need_issue);
1129 } else {
1130 CInode *in = static_cast<CInode *>(p);
1131 eval(in, mask);
1132 }
1133}
1134
1135void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1136{
1137 MDSCacheObject *p = lock->get_parent();
1138
1139 // unstable and ambiguous auth?
1140 if (p->is_ambiguous_auth()) {
1141 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1142 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1143 return;
1144 }
1145
1146 if (!p->is_auth()) {
1147 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1148 return;
1149 }
1150
1151 if (p->is_frozen()) {
1152 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1153 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1154 return;
1155 }
1156
1157 /*
1158 * We could have a situation like:
1159 *
1160 * - mds A authpins item on mds B
1161 * - mds B starts to freeze tree containing item
1162 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1163 * - mds B lock is unstable, sets scatter_wanted
1164 * - mds B lock stabilizes, calls try_eval.
1165 *
1166 * We can defer while freezing without causing a deadlock. Honor
1167 * scatter_wanted flag here. This will never get deferred by the
1168 * checks above due to the auth_pin held by the master.
1169 */
1170 if (lock->is_scatterlock()) {
1171 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1172 if (slock->get_scatter_wanted() &&
1173 slock->get_state() != LOCK_MIX) {
1174 scatter_mix(slock, pneed_issue);
1175 if (!lock->is_stable())
1176 return;
1177 } else if (slock->get_unscatter_wanted() &&
1178 slock->get_state() != LOCK_LOCK) {
1179 simple_lock(slock, pneed_issue);
1180 if (!lock->is_stable()) {
1181 return;
1182 }
1183 }
1184 }
1185
1186 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1187 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1188 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1189 return;
1190 }
1191
1192 eval(lock, pneed_issue);
1193}
1194
1195void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1196{
1197 bool need_issue = false;
1198 list<MDSInternalContextBase*> finishers;
1199
1200 // kick locks now
1201 if (!in->filelock.is_stable())
1202 eval_gather(&in->filelock, false, &need_issue, &finishers);
1203 if (!in->authlock.is_stable())
1204 eval_gather(&in->authlock, false, &need_issue, &finishers);
1205 if (!in->linklock.is_stable())
1206 eval_gather(&in->linklock, false, &need_issue, &finishers);
1207 if (!in->xattrlock.is_stable())
1208 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1209
1210 if (need_issue && in->is_head()) {
1211 if (issue_set)
1212 issue_set->insert(in);
1213 else
1214 issue_caps(in);
1215 }
1216
1217 finish_contexts(g_ceph_context, finishers);
1218}
1219
1220void Locker::eval_scatter_gathers(CInode *in)
1221{
1222 bool need_issue = false;
1223 list<MDSInternalContextBase*> finishers;
1224
1225 dout(10) << "eval_scatter_gathers " << *in << dendl;
1226
1227 // kick locks now
1228 if (!in->filelock.is_stable())
1229 eval_gather(&in->filelock, false, &need_issue, &finishers);
1230 if (!in->nestlock.is_stable())
1231 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1232 if (!in->dirfragtreelock.is_stable())
1233 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1234
1235 if (need_issue && in->is_head())
1236 issue_caps(in);
1237
1238 finish_contexts(g_ceph_context, finishers);
1239}
1240
1241void Locker::eval(SimpleLock *lock, bool *need_issue)
1242{
1243 switch (lock->get_type()) {
1244 case CEPH_LOCK_IFILE:
1245 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1246 case CEPH_LOCK_IDFT:
1247 case CEPH_LOCK_INEST:
1248 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1249 default:
1250 return simple_eval(lock, need_issue);
1251 }
1252}
1253
1254
1255// ------------------
1256// rdlock
1257
1258bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1259{
1260 // kick the lock
1261 if (lock->is_stable()) {
1262 if (lock->get_parent()->is_auth()) {
1263 if (lock->get_sm() == &sm_scatterlock) {
1264 // not until tempsync is fully implemented
1265 //if (lock->get_parent()->is_replicated())
1266 //scatter_tempsync((ScatterLock*)lock);
1267 //else
1268 simple_sync(lock);
1269 } else if (lock->get_sm() == &sm_filelock) {
1270 CInode *in = static_cast<CInode*>(lock->get_parent());
1271 if (lock->get_state() == LOCK_EXCL &&
1272 in->get_target_loner() >= 0 &&
1273 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1274 file_xsyn(lock);
1275 else
1276 simple_sync(lock);
1277 } else
1278 simple_sync(lock);
1279 return true;
1280 } else {
1281 // request rdlock state change from auth
1282 mds_rank_t auth = lock->get_parent()->authority().first;
1283 if (!mds->is_cluster_degraded() ||
1284 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1285 dout(10) << "requesting rdlock from auth on "
1286 << *lock << " on " << *lock->get_parent() << dendl;
1287 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1288 }
1289 return false;
1290 }
1291 }
1292 if (lock->get_type() == CEPH_LOCK_IFILE) {
1293 CInode *in = static_cast<CInode *>(lock->get_parent());
1294 if (in->state_test(CInode::STATE_RECOVERING)) {
1295 mds->mdcache->recovery_queue.prioritize(in);
1296 }
1297 }
1298
1299 return false;
1300}
1301
1302bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1303{
1304 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1305
1306 // can read? grab ref.
1307 if (lock->can_rdlock(client))
1308 return true;
1309
1310 _rdlock_kick(lock, false);
1311
1312 if (lock->can_rdlock(client))
1313 return true;
1314
1315 // wait!
1316 if (con) {
1317 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1318 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1319 }
1320 return false;
1321}
1322
1323bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1324{
1325 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1326
1327 // client may be allowed to rdlock the same item it has xlocked.
1328 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1329 if (mut->snapid != CEPH_NOSNAP)
1330 as_anon = true;
1331 client_t client = as_anon ? -1 : mut->get_client();
1332
1333 CInode *in = 0;
1334 if (lock->get_type() != CEPH_LOCK_DN)
1335 in = static_cast<CInode *>(lock->get_parent());
1336
1337 /*
1338 if (!lock->get_parent()->is_auth() &&
1339 lock->fw_rdlock_to_auth()) {
1340 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1341 return false;
1342 }
1343 */
1344
1345 while (1) {
1346 // can read? grab ref.
1347 if (lock->can_rdlock(client)) {
1348 lock->get_rdlock();
1349 mut->rdlocks.insert(lock);
1350 mut->locks.insert(lock);
1351 return true;
1352 }
1353
1354 // hmm, wait a second.
1355 if (in && !in->is_head() && in->is_auth() &&
1356 lock->get_state() == LOCK_SNAP_SYNC) {
1357 // okay, we actually need to kick the head's lock to get ourselves synced up.
1358 CInode *head = mdcache->get_inode(in->ino());
1359 assert(head);
c07f9fc5
FG
1360 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1361 if (hlock->get_state() == LOCK_SYNC)
1362 hlock = head->get_lock(lock->get_type());
1363
7c673cae
FG
1364 if (hlock->get_state() != LOCK_SYNC) {
1365 dout(10) << "rdlock_start trying head inode " << *head << dendl;
c07f9fc5 1366 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
7c673cae
FG
1367 return false;
1368 // oh, check our lock again then
1369 }
1370 }
1371
1372 if (!_rdlock_kick(lock, as_anon))
1373 break;
1374 }
1375
1376 // wait!
1377 int wait_on;
1378 if (lock->get_parent()->is_auth() && lock->is_stable())
1379 wait_on = SimpleLock::WAIT_RD;
1380 else
1381 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1382 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1383 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1384 nudge_log(lock);
1385 return false;
1386}
1387
1388void Locker::nudge_log(SimpleLock *lock)
1389{
1390 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1391 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1392 mds->mdlog->flush();
1393}
1394
1395void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1396{
1397 // drop ref
1398 lock->put_rdlock();
1399 if (mut) {
1400 mut->rdlocks.erase(lock);
1401 mut->locks.erase(lock);
1402 }
1403
1404 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1405
1406 // last one?
1407 if (!lock->is_rdlocked()) {
1408 if (!lock->is_stable())
1409 eval_gather(lock, false, pneed_issue);
1410 else if (lock->get_parent()->is_auth())
1411 try_eval(lock, pneed_issue);
1412 }
1413}
1414
1415
1416bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1417{
1418 dout(10) << "can_rdlock_set " << locks << dendl;
1419 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1420 if (!(*p)->can_rdlock(-1)) {
1421 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1422 return false;
1423 }
1424 return true;
1425}
1426
1427bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1428{
1429 dout(10) << "rdlock_try_set " << locks << dendl;
1430 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1431 if (!rdlock_try(*p, -1, NULL)) {
1432 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1433 return false;
1434 }
1435 return true;
1436}
1437
1438void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1439{
1440 dout(10) << "rdlock_take_set " << locks << dendl;
1441 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1442 (*p)->get_rdlock();
1443 mut->rdlocks.insert(*p);
1444 mut->locks.insert(*p);
1445 }
1446}
1447
1448// ------------------
1449// wrlock
1450
1451void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1452{
1453 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1454 lock->get_type() == CEPH_LOCK_DVERSION)
1455 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1456
1457 dout(7) << "wrlock_force on " << *lock
1458 << " on " << *lock->get_parent() << dendl;
1459 lock->get_wrlock(true);
1460 mut->wrlocks.insert(lock);
1461 mut->locks.insert(lock);
1462}
1463
1464bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1465{
1466 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1467 lock->get_type() == CEPH_LOCK_DVERSION)
1468 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1469
1470 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1471
1472 CInode *in = static_cast<CInode *>(lock->get_parent());
1473 client_t client = mut->get_client();
1474 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1475 (in->has_subtree_or_exporting_dirfrag() ||
1476 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1477
1478 while (1) {
1479 // wrlock?
1480 if (lock->can_wrlock(client) &&
1481 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1482 lock->get_wrlock();
1483 mut->wrlocks.insert(lock);
1484 mut->locks.insert(lock);
1485 return true;
1486 }
1487
1488 if (lock->get_type() == CEPH_LOCK_IFILE &&
1489 in->state_test(CInode::STATE_RECOVERING)) {
1490 mds->mdcache->recovery_queue.prioritize(in);
1491 }
1492
1493 if (!lock->is_stable())
1494 break;
1495
1496 if (in->is_auth()) {
1497 // don't do nested lock state change if we have dirty scatterdata and
1498 // may scatter_writebehind or start_scatter, because nowait==true implies
1499 // that the caller already has a log entry open!
1500 if (nowait && lock->is_dirty())
1501 return false;
1502
1503 if (want_scatter)
1504 scatter_mix(static_cast<ScatterLock*>(lock));
1505 else
1506 simple_lock(lock);
1507
1508 if (nowait && !lock->can_wrlock(client))
1509 return false;
1510
1511 } else {
1512 // replica.
1513 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1514 mds_rank_t auth = lock->get_parent()->authority().first;
1515 if (!mds->is_cluster_degraded() ||
1516 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1517 dout(10) << "requesting scatter from auth on "
1518 << *lock << " on " << *lock->get_parent() << dendl;
1519 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1520 }
1521 break;
1522 }
1523 }
1524
1525 if (!nowait) {
1526 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1527 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1528 nudge_log(lock);
1529 }
1530
1531 return false;
1532}
1533
1534void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1535{
1536 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1537 lock->get_type() == CEPH_LOCK_DVERSION)
1538 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1539
1540 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1541 lock->put_wrlock();
1542 if (mut) {
1543 mut->wrlocks.erase(lock);
1544 if (mut->remote_wrlocks.count(lock) == 0)
1545 mut->locks.erase(lock);
1546 }
1547
1548 if (!lock->is_wrlocked()) {
1549 if (!lock->is_stable())
1550 eval_gather(lock, false, pneed_issue);
1551 else if (lock->get_parent()->is_auth())
1552 try_eval(lock, pneed_issue);
1553 }
1554}
1555
1556
1557// remote wrlock
1558
1559void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1560{
1561 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1562
1563 // wait for active target
1564 if (mds->is_cluster_degraded() &&
1565 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1566 dout(7) << " mds." << target << " is not active" << dendl;
1567 if (mut->more()->waiting_on_slave.empty())
1568 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1569 return;
1570 }
1571
1572 // send lock request
1573 mut->start_locking(lock, target);
1574 mut->more()->slaves.insert(target);
1575 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1576 MMDSSlaveRequest::OP_WRLOCK);
1577 r->set_lock_type(lock->get_type());
1578 lock->get_parent()->set_object_info(r->get_object_info());
1579 mds->send_message_mds(r, target);
1580
1581 assert(mut->more()->waiting_on_slave.count(target) == 0);
1582 mut->more()->waiting_on_slave.insert(target);
1583}
1584
1585void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1586 MutationImpl *mut)
1587{
1588 // drop ref
1589 mut->remote_wrlocks.erase(lock);
1590 if (mut->wrlocks.count(lock) == 0)
1591 mut->locks.erase(lock);
1592
1593 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1594 << " " << *lock->get_parent() << dendl;
1595 if (!mds->is_cluster_degraded() ||
1596 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1597 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1598 MMDSSlaveRequest::OP_UNWRLOCK);
1599 slavereq->set_lock_type(lock->get_type());
1600 lock->get_parent()->set_object_info(slavereq->get_object_info());
1601 mds->send_message_mds(slavereq, target);
1602 }
1603}
1604
1605
1606// ------------------
1607// xlock
1608
1609bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1610{
1611 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1612 lock->get_type() == CEPH_LOCK_DVERSION)
1613 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1614
1615 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1616 client_t client = mut->get_client();
1617
1618 // auth?
1619 if (lock->get_parent()->is_auth()) {
1620 // auth
1621 while (1) {
1622 if (lock->can_xlock(client)) {
1623 lock->set_state(LOCK_XLOCK);
1624 lock->get_xlock(mut, client);
1625 mut->xlocks.insert(lock);
1626 mut->locks.insert(lock);
1627 mut->finish_locking(lock);
1628 return true;
1629 }
1630
1631 if (lock->get_type() == CEPH_LOCK_IFILE) {
1632 CInode *in = static_cast<CInode*>(lock->get_parent());
1633 if (in->state_test(CInode::STATE_RECOVERING)) {
1634 mds->mdcache->recovery_queue.prioritize(in);
1635 }
1636 }
1637
1638 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1639 lock->get_xlock_by_client() != client ||
1640 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1641 break;
1642
1643 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1644 mut->start_locking(lock);
1645 simple_xlock(lock);
1646 } else {
1647 simple_lock(lock);
1648 }
1649 }
1650
1651 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1652 nudge_log(lock);
1653 return false;
1654 } else {
1655 // replica
1656 assert(lock->get_sm()->can_remote_xlock);
1657 assert(!mut->slave_request);
1658
1659 // wait for single auth
1660 if (lock->get_parent()->is_ambiguous_auth()) {
1661 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1662 new C_MDS_RetryRequest(mdcache, mut));
1663 return false;
1664 }
1665
1666 // wait for active auth
1667 mds_rank_t auth = lock->get_parent()->authority().first;
1668 if (mds->is_cluster_degraded() &&
1669 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1670 dout(7) << " mds." << auth << " is not active" << dendl;
1671 if (mut->more()->waiting_on_slave.empty())
1672 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1673 return false;
1674 }
1675
1676 // send lock request
1677 mut->more()->slaves.insert(auth);
1678 mut->start_locking(lock, auth);
1679 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1680 MMDSSlaveRequest::OP_XLOCK);
1681 r->set_lock_type(lock->get_type());
1682 lock->get_parent()->set_object_info(r->get_object_info());
1683 mds->send_message_mds(r, auth);
1684
1685 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1686 mut->more()->waiting_on_slave.insert(auth);
1687
1688 return false;
1689 }
1690}
1691
1692void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1693{
1694 assert(!lock->is_stable());
1695 if (lock->get_num_rdlocks() == 0 &&
1696 lock->get_num_wrlocks() == 0 &&
b32b8144 1697 !lock->is_leased() &&
7c673cae
FG
1698 lock->get_state() != LOCK_XLOCKSNAP &&
1699 lock->get_type() != CEPH_LOCK_DN) {
1700 CInode *in = static_cast<CInode*>(lock->get_parent());
1701 client_t loner = in->get_target_loner();
1702 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1703 lock->set_state(LOCK_EXCL);
1704 lock->get_parent()->auth_unpin(lock);
1705 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1706 if (lock->get_cap_shift())
1707 *pneed_issue = true;
1708 if (lock->get_parent()->is_auth() &&
1709 lock->is_stable())
1710 try_eval(lock, pneed_issue);
1711 return;
1712 }
1713 }
1714 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1715 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1716}
1717
1718void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1719{
1720 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1721 lock->get_type() == CEPH_LOCK_DVERSION)
1722 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1723
1724 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1725
1726 client_t xlocker = lock->get_xlock_by_client();
1727
1728 // drop ref
1729 lock->put_xlock();
1730 assert(mut);
1731 mut->xlocks.erase(lock);
1732 mut->locks.erase(lock);
1733
1734 bool do_issue = false;
1735
1736 // remote xlock?
1737 if (!lock->get_parent()->is_auth()) {
1738 assert(lock->get_sm()->can_remote_xlock);
1739
1740 // tell auth
1741 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1742 mds_rank_t auth = lock->get_parent()->authority().first;
1743 if (!mds->is_cluster_degraded() ||
1744 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1745 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1746 MMDSSlaveRequest::OP_UNXLOCK);
1747 slavereq->set_lock_type(lock->get_type());
1748 lock->get_parent()->set_object_info(slavereq->get_object_info());
1749 mds->send_message_mds(slavereq, auth);
1750 }
1751 // others waiting?
1752 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1753 SimpleLock::WAIT_WR |
1754 SimpleLock::WAIT_RD, 0);
1755 } else {
1756 if (lock->get_num_xlocks() == 0) {
1757 if (lock->get_state() == LOCK_LOCK_XLOCK)
1758 lock->set_state(LOCK_XLOCKDONE);
1759 _finish_xlock(lock, xlocker, &do_issue);
1760 }
1761 }
1762
1763 if (do_issue) {
1764 CInode *in = static_cast<CInode*>(lock->get_parent());
1765 if (in->is_head()) {
1766 if (pneed_issue)
1767 *pneed_issue = true;
1768 else
1769 issue_caps(in);
1770 }
1771 }
1772}
1773
1774void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1775{
1776 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1777
1778 lock->put_xlock();
1779 mut->xlocks.erase(lock);
1780 mut->locks.erase(lock);
1781
1782 MDSCacheObject *p = lock->get_parent();
1783 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1784
1785 if (!lock->is_stable())
1786 lock->get_parent()->auth_unpin(lock);
1787
1788 lock->set_state(LOCK_LOCK);
1789}
1790
1791void Locker::xlock_import(SimpleLock *lock)
1792{
1793 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1794 lock->get_parent()->auth_pin(lock);
1795}
1796
1797
1798
1799// file i/o -----------------------------------------
1800
1801version_t Locker::issue_file_data_version(CInode *in)
1802{
1803 dout(7) << "issue_file_data_version on " << *in << dendl;
1804 return in->inode.file_data_version;
1805}
1806
1807class C_Locker_FileUpdate_finish : public LockerLogContext {
1808 CInode *in;
1809 MutationRef mut;
1810 bool share_max;
1811 bool need_issue;
1812 client_t client;
1813 MClientCaps *ack;
1814public:
1815 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1816 bool sm=false, bool ni=false, client_t c=-1,
1817 MClientCaps *ac = 0)
1818 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1819 client(c), ack(ac) {
1820 in->get(CInode::PIN_PTRWAITER);
1821 }
1822 void finish(int r) override {
1823 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1824 in->put(CInode::PIN_PTRWAITER);
1825 }
1826};
1827
1828void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1829 client_t client, MClientCaps *ack)
1830{
1831 dout(10) << "file_update_finish on " << *in << dendl;
1832 in->pop_and_dirty_projected_inode(mut->ls);
1833
1834 mut->apply();
1835
1836 if (ack) {
1837 Session *session = mds->get_session(client);
1838 if (session) {
1839 // "oldest flush tid" > 0 means client uses unique TID for each flush
1840 if (ack->get_oldest_flush_tid() > 0)
1841 session->add_completed_flush(ack->get_client_tid());
1842 mds->send_message_client_counted(ack, session);
1843 } else {
1844 dout(10) << " no session for client." << client << " " << *ack << dendl;
1845 ack->put();
1846 }
1847 }
1848
1849 set<CInode*> need_issue;
1850 drop_locks(mut.get(), &need_issue);
1851
1852 if (!in->is_head() && !in->client_snap_caps.empty()) {
1853 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1854 // check for snap writeback completion
1855 bool gather = false;
94b18763 1856 auto p = in->client_snap_caps.begin();
7c673cae
FG
1857 while (p != in->client_snap_caps.end()) {
1858 SimpleLock *lock = in->get_lock(p->first);
1859 assert(lock);
1860 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1861 << " lock " << *lock << " on " << *in << dendl;
1862 lock->put_wrlock();
1863
1864 p->second.erase(client);
1865 if (p->second.empty()) {
1866 gather = true;
1867 in->client_snap_caps.erase(p++);
1868 } else
1869 ++p;
1870 }
1871 if (gather) {
1872 if (in->client_snap_caps.empty())
1873 in->item_open_file.remove_myself();
1874 eval_cap_gather(in, &need_issue);
1875 }
1876 } else {
1877 if (issue_client_cap && need_issue.count(in) == 0) {
1878 Capability *cap = in->get_client_cap(client);
1879 if (cap && (cap->wanted() & ~cap->pending()))
1880 issue_caps(in, cap);
1881 }
1882
1883 if (share_max && in->is_auth() &&
1884 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1885 share_inode_max_size(in);
1886 }
1887 issue_caps_set(need_issue);
1888
1889 // auth unpin after issuing caps
1890 mut->cleanup();
1891}
1892
1893Capability* Locker::issue_new_caps(CInode *in,
1894 int mode,
1895 Session *session,
1896 SnapRealm *realm,
1897 bool is_replay)
1898{
1899 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1900 bool is_new;
1901
1902 // if replay, try to reconnect cap, and otherwise do nothing.
1903 if (is_replay) {
1904 mds->mdcache->try_reconnect_cap(in, session);
1905 return 0;
1906 }
1907
1908 // my needs
1909 assert(session->info.inst.name.is_client());
31f18b77 1910 client_t my_client = session->info.inst.name.num();
7c673cae
FG
1911 int my_want = ceph_caps_for_mode(mode);
1912
1913 // register a capability
1914 Capability *cap = in->get_client_cap(my_client);
1915 if (!cap) {
1916 // new cap
1917 cap = in->add_client_cap(my_client, session, realm);
1918 cap->set_wanted(my_want);
1919 cap->mark_new();
1920 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1921 is_new = true;
1922 } else {
1923 is_new = false;
1924 // make sure it wants sufficient caps
1925 if (my_want & ~cap->wanted()) {
1926 // augment wanted caps for this client
1927 cap->set_wanted(cap->wanted() | my_want);
1928 }
1929 }
1930
1931 if (in->is_auth()) {
1932 // [auth] twiddle mode?
1933 eval(in, CEPH_CAP_LOCKS);
1934
1935 if (_need_flush_mdlog(in, my_want))
1936 mds->mdlog->flush();
1937
1938 } else {
1939 // [replica] tell auth about any new caps wanted
1940 request_inode_file_caps(in);
1941 }
1942
1943 // issue caps (pot. incl new one)
1944 //issue_caps(in); // note: _eval above may have done this already...
1945
1946 // re-issue whatever we can
1947 //cap->issue(cap->pending());
1948
1949 if (is_new)
1950 cap->dec_suppress();
1951
1952 return cap;
1953}
1954
1955
1956void Locker::issue_caps_set(set<CInode*>& inset)
1957{
1958 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1959 issue_caps(*p);
1960}
1961
1962bool Locker::issue_caps(CInode *in, Capability *only_cap)
1963{
1964 // allowed caps are determined by the lock mode.
1965 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1966 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1967 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1968
1969 client_t loner = in->get_loner();
1970 if (loner >= 0) {
1971 dout(7) << "issue_caps loner client." << loner
1972 << " allowed=" << ccap_string(loner_allowed)
1973 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1974 << ", others allowed=" << ccap_string(all_allowed)
1975 << " on " << *in << dendl;
1976 } else {
1977 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1978 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1979 << " on " << *in << dendl;
1980 }
1981
1982 assert(in->is_head());
1983
1984 // count conflicts with
1985 int nissued = 0;
1986
1987 // client caps
1988 map<client_t, Capability*>::iterator it;
1989 if (only_cap)
1990 it = in->client_caps.find(only_cap->get_client());
1991 else
1992 it = in->client_caps.begin();
1993 for (; it != in->client_caps.end(); ++it) {
1994 Capability *cap = it->second;
1995 if (cap->is_stale())
1996 continue;
1997
1998 // do not issue _new_ bits when size|mtime is projected
1999 int allowed;
2000 if (loner == it->first)
2001 allowed = loner_allowed;
2002 else
2003 allowed = all_allowed;
2004
2005 // add in any xlocker-only caps (for locks this client is the xlocker for)
2006 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2007
2008 Session *session = mds->get_session(it->first);
2009 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
2010 !(session && session->connection &&
2011 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
2012 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2013
2014 int pending = cap->pending();
2015 int wanted = cap->wanted();
2016
2017 dout(20) << " client." << it->first
2018 << " pending " << ccap_string(pending)
2019 << " allowed " << ccap_string(allowed)
2020 << " wanted " << ccap_string(wanted)
2021 << dendl;
2022
2023 if (!(pending & ~allowed)) {
2024 // skip if suppress or new, and not revocation
2025 if (cap->is_new() || cap->is_suppress()) {
2026 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2027 continue;
2028 }
2029 }
2030
2031 // notify clients about deleted inode, to make sure they release caps ASAP.
2032 if (in->inode.nlink == 0)
2033 wanted |= CEPH_CAP_LINK_SHARED;
2034
2035 // are there caps that the client _wants_ and can have, but aren't pending?
2036 // or do we need to revoke?
2037 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2038 (pending & ~allowed)) { // need to revoke ~allowed caps.
2039 // issue
2040 nissued++;
2041
2042 // include caps that clients generally like, while we're at it.
2043 int likes = in->get_caps_liked();
2044 int before = pending;
2045 long seq;
2046 if (pending & ~allowed)
2047 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2048 else
2049 seq = cap->issue((wanted|likes) & allowed);
2050 int after = cap->pending();
2051
2052 if (cap->is_new()) {
2053 // haven't send caps to client yet
2054 if (before & ~after)
2055 cap->confirm_receipt(seq, after);
2056 } else {
2057 dout(7) << " sending MClientCaps to client." << it->first
2058 << " seq " << cap->get_last_seq()
2059 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2060 << dendl;
2061
2062 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2063 if (op == CEPH_CAP_OP_REVOKE) {
2064 revoking_caps.push_back(&cap->item_revoking_caps);
2065 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2066 cap->set_last_revoke_stamp(ceph_clock_now());
2067 cap->reset_num_revoke_warnings();
2068 }
2069
2070 MClientCaps *m = new MClientCaps(op, in->ino(),
2071 in->find_snaprealm()->inode->ino(),
2072 cap->get_cap_id(), cap->get_last_seq(),
2073 after, wanted, 0,
2074 cap->get_mseq(),
2075 mds->get_osd_epoch_barrier());
2076 in->encode_cap_message(m, cap);
2077
2078 mds->send_message_client_counted(m, it->first);
2079 }
2080 }
2081
2082 if (only_cap)
2083 break;
2084 }
2085
2086 return (nissued == 0); // true if no re-issued, no callbacks
2087}
2088
2089void Locker::issue_truncate(CInode *in)
2090{
2091 dout(7) << "issue_truncate on " << *in << dendl;
2092
2093 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2094 it != in->client_caps.end();
2095 ++it) {
2096 Capability *cap = it->second;
2097 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2098 in->ino(),
2099 in->find_snaprealm()->inode->ino(),
2100 cap->get_cap_id(), cap->get_last_seq(),
2101 cap->pending(), cap->wanted(), 0,
2102 cap->get_mseq(),
2103 mds->get_osd_epoch_barrier());
2104 in->encode_cap_message(m, cap);
2105 mds->send_message_client_counted(m, it->first);
2106 }
2107
2108 // should we increase max_size?
2109 if (in->is_auth() && in->is_file())
2110 check_inode_max_size(in);
2111}
2112
2113
2114void Locker::revoke_stale_caps(Capability *cap)
2115{
2116 CInode *in = cap->get_inode();
2117 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2118 // if export succeeds, the cap will be removed. if export fails, we need to
2119 // revoke the cap if it's still stale.
2120 in->state_set(CInode::STATE_EVALSTALECAPS);
2121 return;
2122 }
2123
2124 int issued = cap->issued();
2125 if (issued & ~CEPH_CAP_PIN) {
2126 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2127 cap->revoke();
2128
2129 if (in->is_auth() &&
2130 in->inode.client_ranges.count(cap->get_client()))
2131 in->state_set(CInode::STATE_NEEDSRECOVER);
2132
2133 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2134 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2135 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2136 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2137
2138 if (in->is_auth()) {
2139 try_eval(in, CEPH_CAP_LOCKS);
2140 } else {
2141 request_inode_file_caps(in);
2142 }
2143 }
2144}
2145
2146void Locker::revoke_stale_caps(Session *session)
2147{
2148 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2149
2150 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2151 Capability *cap = *p;
2152 cap->mark_stale();
2153 revoke_stale_caps(cap);
2154 }
2155}
2156
2157void Locker::resume_stale_caps(Session *session)
2158{
2159 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2160
2161 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2162 Capability *cap = *p;
2163 CInode *in = cap->get_inode();
2164 assert(in->is_head());
2165 if (cap->is_stale()) {
2166 dout(10) << " clearing stale flag on " << *in << dendl;
2167 cap->clear_stale();
2168
2169 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2170 // if export succeeds, the cap will be removed. if export fails,
2171 // we need to re-issue the cap if it's not stale.
2172 in->state_set(CInode::STATE_EVALSTALECAPS);
2173 continue;
2174 }
2175
2176 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2177 issue_caps(in, cap);
2178 }
2179 }
2180}
2181
2182void Locker::remove_stale_leases(Session *session)
2183{
2184 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2185 xlist<ClientLease*>::iterator p = session->leases.begin();
2186 while (!p.end()) {
2187 ClientLease *l = *p;
2188 ++p;
2189 CDentry *parent = static_cast<CDentry*>(l->parent);
2190 dout(15) << " removing lease on " << *parent << dendl;
2191 parent->remove_client_lease(l, this);
2192 }
2193}
2194
2195
2196class C_MDL_RequestInodeFileCaps : public LockerContext {
2197 CInode *in;
2198public:
2199 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2200 in->get(CInode::PIN_PTRWAITER);
2201 }
2202 void finish(int r) override {
2203 if (!in->is_auth())
2204 locker->request_inode_file_caps(in);
2205 in->put(CInode::PIN_PTRWAITER);
2206 }
2207};
2208
2209void Locker::request_inode_file_caps(CInode *in)
2210{
2211 assert(!in->is_auth());
2212
2213 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2214 if (wanted != in->replica_caps_wanted) {
2215 // wait for single auth
2216 if (in->is_ambiguous_auth()) {
2217 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2218 new C_MDL_RequestInodeFileCaps(this, in));
2219 return;
2220 }
2221
2222 mds_rank_t auth = in->authority().first;
2223 if (mds->is_cluster_degraded() &&
2224 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2225 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2226 return;
2227 }
2228
2229 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2230 << " was " << ccap_string(in->replica_caps_wanted)
2231 << " on " << *in << " to mds." << auth << dendl;
2232
2233 in->replica_caps_wanted = wanted;
2234
2235 if (!mds->is_cluster_degraded() ||
2236 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2237 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2238 auth);
2239 }
2240}
2241
2242/* This function DOES put the passed message before returning */
2243void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2244{
2245 // nobody should be talking to us during recovery.
2246 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2247
2248 // ok
2249 CInode *in = mdcache->get_inode(m->get_ino());
2250 mds_rank_t from = mds_rank_t(m->get_source().num());
2251
2252 assert(in);
2253 assert(in->is_auth());
2254
2255 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2256
2257 if (m->get_caps())
2258 in->mds_caps_wanted[from] = m->get_caps();
2259 else
2260 in->mds_caps_wanted.erase(from);
2261
2262 try_eval(in, CEPH_CAP_LOCKS);
2263 m->put();
2264}
2265
2266
2267class C_MDL_CheckMaxSize : public LockerContext {
2268 CInode *in;
2269 uint64_t new_max_size;
2270 uint64_t newsize;
2271 utime_t mtime;
2272
2273public:
2274 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2275 uint64_t _newsize, utime_t _mtime) :
2276 LockerContext(l), in(i),
2277 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2278 {
2279 in->get(CInode::PIN_PTRWAITER);
2280 }
2281 void finish(int r) override {
2282 if (in->is_auth())
2283 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2284 in->put(CInode::PIN_PTRWAITER);
2285 }
2286};
2287
94b18763 2288uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
31f18b77
FG
2289{
2290 uint64_t new_max = (size + 1) << 1;
2291 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2292 if (max_inc > 0) {
b32b8144 2293 max_inc *= pi->layout.object_size;
31f18b77
FG
2294 new_max = MIN(new_max, size + max_inc);
2295 }
2296 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2297}
7c673cae
FG
2298
2299void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
94b18763 2300 CInode::mempool_inode::client_range_map *new_ranges,
7c673cae
FG
2301 bool *max_increased)
2302{
94b18763 2303 auto latest = in->get_projected_inode();
7c673cae
FG
2304 uint64_t ms;
2305 if(latest->has_layout()) {
31f18b77 2306 ms = calc_new_max_size(latest, size);
7c673cae
FG
2307 } else {
2308 // Layout-less directories like ~mds0/, have zero size
2309 ms = 0;
2310 }
2311
2312 // increase ranges as appropriate.
2313 // shrink to 0 if no WR|BUFFER caps issued.
2314 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2315 p != in->client_caps.end();
2316 ++p) {
2317 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2318 client_writeable_range_t& nr = (*new_ranges)[p->first];
2319 nr.range.first = 0;
2320 if (latest->client_ranges.count(p->first)) {
2321 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2322 if (ms > oldr.range.last)
2323 *max_increased = true;
2324 nr.range.last = MAX(ms, oldr.range.last);
2325 nr.follows = oldr.follows;
2326 } else {
31f18b77 2327 *max_increased = true;
7c673cae
FG
2328 nr.range.last = ms;
2329 nr.follows = in->first - 1;
2330 }
2331 }
2332 }
2333}
2334
2335bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2336 uint64_t new_max_size, uint64_t new_size,
2337 utime_t new_mtime)
2338{
2339 assert(in->is_auth());
2340 assert(in->is_file());
2341
94b18763
FG
2342 CInode::mempool_inode *latest = in->get_projected_inode();
2343 CInode::mempool_inode::client_range_map new_ranges;
7c673cae
FG
2344 uint64_t size = latest->size;
2345 bool update_size = new_size > 0;
2346 bool update_max = false;
2347 bool max_increased = false;
2348
2349 if (update_size) {
2350 new_size = size = MAX(size, new_size);
2351 new_mtime = MAX(new_mtime, latest->mtime);
2352 if (latest->size == new_size && latest->mtime == new_mtime)
2353 update_size = false;
2354 }
2355
2356 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2357
2358 if (max_increased || latest->client_ranges != new_ranges)
2359 update_max = true;
2360
2361 if (!update_size && !update_max) {
2362 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2363 return false;
2364 }
2365
2366 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2367 << " update_size " << update_size
2368 << " on " << *in << dendl;
2369
2370 if (in->is_frozen()) {
2371 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2372 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2373 new_max_size,
2374 new_size,
2375 new_mtime);
2376 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2377 return false;
2378 }
2379 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2380 // lock?
2381 if (in->filelock.is_stable()) {
2382 if (in->get_target_loner() >= 0)
2383 file_excl(&in->filelock);
2384 else
2385 simple_lock(&in->filelock);
2386 }
2387 if (!in->filelock.can_wrlock(in->get_loner())) {
2388 // try again later
2389 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2390 new_max_size,
2391 new_size,
2392 new_mtime);
2393
2394 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2395 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2396 return false;
2397 }
2398 }
2399
2400 MutationRef mut(new MutationImpl());
2401 mut->ls = mds->mdlog->get_current_segment();
2402
94b18763
FG
2403 auto &pi = in->project_inode();
2404 pi.inode.version = in->pre_dirty();
7c673cae
FG
2405
2406 if (update_max) {
94b18763
FG
2407 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2408 pi.inode.client_ranges = new_ranges;
7c673cae
FG
2409 }
2410
2411 if (update_size) {
94b18763
FG
2412 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2413 pi.inode.size = new_size;
2414 pi.inode.rstat.rbytes = new_size;
2415 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2416 pi.inode.mtime = new_mtime;
7c673cae
FG
2417 }
2418
2419 // use EOpen if the file is still open; otherwise, use EUpdate.
2420 // this is just an optimization to push open files forward into
2421 // newer log segments.
2422 LogEvent *le;
2423 EMetaBlob *metablob;
2424 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2425 EOpen *eo = new EOpen(mds->mdlog);
2426 eo->add_ino(in->ino());
2427 metablob = &eo->metablob;
2428 le = eo;
2429 mut->ls->open_files.push_back(&in->item_open_file);
2430 } else {
2431 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2432 metablob = &eu->metablob;
2433 le = eu;
2434 }
2435 mds->mdlog->start_entry(le);
2436 if (update_size) { // FIXME if/when we do max_size nested accounting
2437 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2438 // no cow, here!
2439 CDentry *parent = in->get_projected_parent_dn();
2440 metablob->add_primary_dentry(parent, in, true);
2441 } else {
2442 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2443 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2444 }
2445 mds->mdlog->submit_entry(le,
2446 new C_Locker_FileUpdate_finish(this, in, mut, true));
2447 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2448 mut->auth_pin(in);
2449
2450 // make max_size _increase_ timely
2451 if (max_increased)
2452 mds->mdlog->flush();
2453
2454 return true;
2455}
2456
2457
2458void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2459{
2460 /*
2461 * only share if currently issued a WR cap. if client doesn't have it,
2462 * file_max doesn't matter, and the client will get it if/when they get
2463 * the cap later.
2464 */
2465 dout(10) << "share_inode_max_size on " << *in << dendl;
2466 map<client_t, Capability*>::iterator it;
2467 if (only_cap)
2468 it = in->client_caps.find(only_cap->get_client());
2469 else
2470 it = in->client_caps.begin();
2471 for (; it != in->client_caps.end(); ++it) {
2472 const client_t client = it->first;
2473 Capability *cap = it->second;
2474 if (cap->is_suppress())
2475 continue;
2476 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2477 dout(10) << "share_inode_max_size with client." << client << dendl;
2478 cap->inc_last_seq();
2479 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2480 in->ino(),
2481 in->find_snaprealm()->inode->ino(),
2482 cap->get_cap_id(), cap->get_last_seq(),
2483 cap->pending(), cap->wanted(), 0,
2484 cap->get_mseq(),
2485 mds->get_osd_epoch_barrier());
2486 in->encode_cap_message(m, cap);
2487 mds->send_message_client_counted(m, client);
2488 }
2489 if (only_cap)
2490 break;
2491 }
2492}
2493
2494bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2495{
2496 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2497 * pending mutations. */
2498 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2499 in->filelock.is_unstable_and_locked()) ||
2500 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2501 in->authlock.is_unstable_and_locked()) ||
2502 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2503 in->linklock.is_unstable_and_locked()) ||
2504 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2505 in->xattrlock.is_unstable_and_locked()))
2506 return true;
2507 return false;
2508}
2509
2510void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2511{
2512 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2513 dout(10) << " wanted " << ccap_string(cap->wanted())
2514 << " -> " << ccap_string(wanted) << dendl;
2515 cap->set_wanted(wanted);
2516 } else if (wanted & ~cap->wanted()) {
2517 dout(10) << " wanted " << ccap_string(cap->wanted())
2518 << " -> " << ccap_string(wanted)
2519 << " (added caps even though we had seq mismatch!)" << dendl;
2520 cap->set_wanted(wanted | cap->wanted());
2521 } else {
2522 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2523 << " -> " << ccap_string(wanted)
2524 << " (issue_seq " << issue_seq << " != last_issue "
2525 << cap->get_last_issue() << ")" << dendl;
2526 return;
2527 }
2528
2529 CInode *cur = cap->get_inode();
2530 if (!cur->is_auth()) {
2531 request_inode_file_caps(cur);
2532 return;
2533 }
2534
2535 if (cap->wanted() == 0) {
2536 if (cur->item_open_file.is_on_list() &&
2537 !cur->is_any_caps_wanted()) {
2538 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2539 cur->item_open_file.remove_myself();
2540 }
2541 } else {
2542 if (cur->state_test(CInode::STATE_RECOVERING) &&
2543 (cap->wanted() & (CEPH_CAP_FILE_RD |
2544 CEPH_CAP_FILE_WR))) {
2545 mds->mdcache->recovery_queue.prioritize(cur);
2546 }
2547
2548 if (!cur->item_open_file.is_on_list()) {
2549 dout(10) << " adding to open file list " << *cur << dendl;
2550 assert(cur->last == CEPH_NOSNAP);
2551 LogSegment *ls = mds->mdlog->get_current_segment();
2552 EOpen *le = new EOpen(mds->mdlog);
2553 mds->mdlog->start_entry(le);
2554 le->add_clean_inode(cur);
2555 ls->open_files.push_back(&cur->item_open_file);
2556 mds->mdlog->submit_entry(le);
2557 }
2558 }
2559}
2560
2561
2562
c07f9fc5 2563void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
7c673cae
FG
2564{
2565 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
c07f9fc5
FG
2566 for (auto p = head_in->client_need_snapflush.begin();
2567 p != head_in->client_need_snapflush.end() && p->first < last; ) {
7c673cae 2568 snapid_t snapid = p->first;
94b18763 2569 auto &clients = p->second;
7c673cae
FG
2570 ++p; // be careful, q loop below depends on this
2571
2572 if (clients.count(client)) {
2573 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
b32b8144
FG
2574 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2575 assert(sin);
2576 assert(sin->first <= snapid);
7c673cae
FG
2577 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2578 head_in->remove_need_snapflush(sin, snapid, client);
2579 }
2580 }
2581}
2582
2583
2584bool Locker::should_defer_client_cap_frozen(CInode *in)
2585{
2586 /*
2587 * This policy needs to be AT LEAST as permissive as allowing a client request
2588 * to go forward, or else a client request can release something, the release
2589 * gets deferred, but the request gets processed and deadlocks because when the
2590 * caps can't get revoked.
2591 *
2592 * Currently, a request wait if anything locked is freezing (can't
2593 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2594 * _MUST_ be in the lock/auth_pin set.
2595 *
2596 * auth_pins==0 implies no unstable lock and not auth pinnned by
2597 * client request, otherwise continue even it's freezing.
2598 */
2599 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2600}
2601
2602/*
2603 * This function DOES put the passed message before returning
2604 */
2605void Locker::handle_client_caps(MClientCaps *m)
2606{
7c673cae 2607 client_t client = m->get_source().num();
7c673cae
FG
2608 snapid_t follows = m->get_snap_follows();
2609 dout(7) << "handle_client_caps "
2610 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2611 << " on " << m->get_ino()
2612 << " tid " << m->get_client_tid() << " follows " << follows
2613 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2614
94b18763 2615 Session *session = mds->get_session(m);
7c673cae
FG
2616 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2617 if (!session) {
2618 dout(5) << " no session, dropping " << *m << dendl;
2619 m->put();
2620 return;
2621 }
2622 if (session->is_closed() ||
2623 session->is_closing() ||
2624 session->is_killing()) {
2625 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2626 m->put();
2627 return;
2628 }
2629 if (mds->is_reconnect() &&
2630 m->get_dirty() && m->get_client_tid() > 0 &&
2631 !session->have_completed_flush(m->get_client_tid())) {
2632 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2633 }
2634 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2635 return;
2636 }
2637
2638 if (m->get_client_tid() > 0 && session &&
2639 session->have_completed_flush(m->get_client_tid())) {
2640 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2641 << " for client." << client << dendl;
2642 MClientCaps *ack;
2643 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2644 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2645 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2646 } else {
2647 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2648 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2649 mds->get_osd_epoch_barrier());
2650 }
2651 ack->set_snap_follows(follows);
2652 ack->set_client_tid(m->get_client_tid());
2653 mds->send_message_client_counted(ack, m->get_connection());
2654 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2655 m->put();
2656 return;
2657 } else {
2658 // fall-thru because the message may release some caps
2659 m->clear_dirty();
2660 m->set_op(CEPH_CAP_OP_UPDATE);
2661 }
2662 }
2663
2664 // "oldest flush tid" > 0 means client uses unique TID for each flush
2665 if (m->get_oldest_flush_tid() > 0 && session) {
2666 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2667 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2668
2669 if (session->get_num_trim_flushes_warnings() > 0 &&
2670 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2671 session->reset_num_trim_flushes_warnings();
2672 } else {
2673 if (session->get_num_completed_flushes() >=
2674 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2675 session->inc_num_trim_flushes_warnings();
2676 stringstream ss;
2677 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2678 << m->get_oldest_flush_tid() << "), "
2679 << session->get_num_completed_flushes()
2680 << " completed flushes recorded in session";
2681 mds->clog->warn() << ss.str();
2682 dout(20) << __func__ << " " << ss.str() << dendl;
2683 }
2684 }
2685 }
2686
2687 CInode *head_in = mdcache->get_inode(m->get_ino());
2688 if (!head_in) {
2689 if (mds->is_clientreplay()) {
2690 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2691 << ", will try again after replayed client requests" << dendl;
2692 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2693 return;
2694 }
2695 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2696 m->put();
2697 return;
2698 }
2699
2700 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2701 // Pause RADOS operations until we see the required epoch
2702 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2703 }
2704
2705 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2706 // Record the barrier so that we will retransmit it to clients
2707 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2708 }
2709
b32b8144 2710 dout(10) << " head inode " << *head_in << dendl;
7c673cae
FG
2711
2712 Capability *cap = 0;
b32b8144 2713 cap = head_in->get_client_cap(client);
7c673cae 2714 if (!cap) {
b32b8144 2715 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
7c673cae
FG
2716 m->put();
2717 return;
2718 }
2719 assert(cap);
2720
2721 // freezing|frozen?
b32b8144
FG
2722 if (should_defer_client_cap_frozen(head_in)) {
2723 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2724 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
7c673cae
FG
2725 return;
2726 }
2727 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2728 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2729 << ", dropping" << dendl;
2730 m->put();
2731 return;
2732 }
2733
2734 int op = m->get_op();
2735
2736 // flushsnap?
2737 if (op == CEPH_CAP_OP_FLUSHSNAP) {
b32b8144
FG
2738 if (!head_in->is_auth()) {
2739 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
7c673cae
FG
2740 goto out;
2741 }
2742
b32b8144 2743 SnapRealm *realm = head_in->find_snaprealm();
7c673cae
FG
2744 snapid_t snap = realm->get_snap_following(follows);
2745 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2746
b32b8144
FG
2747 CInode *in = head_in;
2748 if (snap != CEPH_NOSNAP) {
2749 in = mdcache->pick_inode_snap(head_in, snap - 1);
2750 if (in != head_in)
2751 dout(10) << " snapped inode " << *in << dendl;
2752 }
2753
7c673cae
FG
2754 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2755 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2756 // case we get a dup response, so whatever.)
2757 MClientCaps *ack = 0;
2758 if (m->get_dirty()) {
2759 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2760 ack->set_snap_follows(follows);
2761 ack->set_client_tid(m->get_client_tid());
2762 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2763 }
2764
2765 if (in == head_in ||
2766 (head_in->client_need_snapflush.count(snap) &&
2767 head_in->client_need_snapflush[snap].count(client))) {
2768 dout(7) << " flushsnap snap " << snap
2769 << " client." << client << " on " << *in << dendl;
2770
2771 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2772 if (in == head_in)
2773 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
c07f9fc5
FG
2774 else if (head_in->client_need_snapflush.begin()->first < snap)
2775 _do_null_snapflush(head_in, client, snap);
7c673cae
FG
2776
2777 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2778
2779 if (in != head_in)
2780 head_in->remove_need_snapflush(in, snap, client);
7c673cae
FG
2781 } else {
2782 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2783 if (ack)
2784 mds->send_message_client_counted(ack, m->get_connection());
2785 }
2786 goto out;
2787 }
2788
2789 if (cap->get_cap_id() != m->get_cap_id()) {
2790 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2791 } else {
b32b8144
FG
2792 CInode *in = head_in;
2793 if (follows > 0) {
2794 in = mdcache->pick_inode_snap(head_in, follows);
2795 // intermediate snap inodes
2796 while (in != head_in) {
2797 assert(in->last != CEPH_NOSNAP);
2798 if (in->is_auth() && m->get_dirty()) {
2799 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2800 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2801 }
2802 in = mdcache->pick_inode_snap(head_in, in->last);
7c673cae 2803 }
7c673cae
FG
2804 }
2805
2806 // head inode, and cap
2807 MClientCaps *ack = 0;
2808
2809 int caps = m->get_caps();
2810 if (caps & ~cap->issued()) {
2811 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2812 caps &= cap->issued();
2813 }
2814
2815 cap->confirm_receipt(m->get_seq(), caps);
2816 dout(10) << " follows " << follows
2817 << " retains " << ccap_string(m->get_caps())
2818 << " dirty " << ccap_string(m->get_dirty())
2819 << " on " << *in << dendl;
2820
2821
2822 // missing/skipped snapflush?
2823 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2824 // presently only does so when it has actual dirty metadata. But, we
2825 // set up the need_snapflush stuff based on the issued caps.
2826 // We can infer that the client WONT send a FLUSHSNAP once they have
2827 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2828 // update/release).
2829 if (!head_in->client_need_snapflush.empty()) {
2830 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2831 _do_null_snapflush(head_in, client);
2832 } else {
2833 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2834 }
2835 }
2836
2837 if (m->get_dirty() && in->is_auth()) {
2838 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2839 << " seq " << m->get_seq() << " on " << *in << dendl;
2840 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2841 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2842 ack->set_client_tid(m->get_client_tid());
2843 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2844 }
2845
2846 // filter wanted based on what we could ever give out (given auth/replica status)
2847 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2848 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2849 if (new_wanted != cap->wanted()) {
2850 if (!need_flush && (new_wanted & ~cap->pending())) {
2851 // exapnding caps. make sure we aren't waiting for a log flush
2852 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2853 }
2854
2855 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2856 }
2857
2858 if (in->is_auth() &&
2859 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2860 // updated
2861 eval(in, CEPH_CAP_LOCKS);
2862
2863 if (!need_flush && (cap->wanted() & ~cap->pending()))
2864 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2865 } else {
2866 // no update, ack now.
2867 if (ack)
2868 mds->send_message_client_counted(ack, m->get_connection());
2869
2870 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2871 if (!did_issue && (cap->wanted() & ~cap->pending()))
2872 issue_caps(in, cap);
2873
2874 if (cap->get_last_seq() == 0 &&
2875 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2876 cap->issue_norevoke(cap->issued());
2877 share_inode_max_size(in, cap);
2878 }
2879 }
2880
2881 if (need_flush)
2882 mds->mdlog->flush();
2883 }
2884
2885 out:
2886 m->put();
2887}
2888
2889
2890class C_Locker_RetryRequestCapRelease : public LockerContext {
2891 client_t client;
2892 ceph_mds_request_release item;
2893public:
2894 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2895 LockerContext(l), client(c), item(it) { }
2896 void finish(int r) override {
2897 string dname;
2898 MDRequestRef null_ref;
2899 locker->process_request_cap_release(null_ref, client, item, dname);
2900 }
2901};
2902
2903void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
94b18763 2904 boost::string_view dname)
7c673cae
FG
2905{
2906 inodeno_t ino = (uint64_t)item.ino;
2907 uint64_t cap_id = item.cap_id;
2908 int caps = item.caps;
2909 int wanted = item.wanted;
2910 int seq = item.seq;
2911 int issue_seq = item.issue_seq;
2912 int mseq = item.mseq;
2913
2914 CInode *in = mdcache->get_inode(ino);
2915 if (!in)
2916 return;
2917
2918 if (dname.length()) {
2919 frag_t fg = in->pick_dirfrag(dname);
2920 CDir *dir = in->get_dirfrag(fg);
2921 if (dir) {
2922 CDentry *dn = dir->lookup(dname);
2923 if (dn) {
2924 ClientLease *l = dn->get_client_lease(client);
2925 if (l) {
2926 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2927 dn->remove_client_lease(l, this);
2928 } else {
2929 dout(7) << "process_cap_release client." << client
2930 << " doesn't have lease on " << *dn << dendl;
2931 }
2932 } else {
2933 dout(7) << "process_cap_release client." << client << " released lease on dn "
2934 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2935 }
2936 }
2937 }
2938
2939 Capability *cap = in->get_client_cap(client);
2940 if (!cap)
2941 return;
2942
2943 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2944 << (mdr ? "" : " (DEFERRED, no mdr)")
2945 << dendl;
2946
2947 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2948 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2949 return;
2950 }
2951
2952 if (cap->get_cap_id() != cap_id) {
2953 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2954 return;
2955 }
2956
2957 if (should_defer_client_cap_frozen(in)) {
2958 dout(7) << " frozen, deferring" << dendl;
2959 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2960 return;
2961 }
2962
2963 if (caps & ~cap->issued()) {
2964 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2965 caps &= cap->issued();
2966 }
2967 cap->confirm_receipt(seq, caps);
2968
2969 if (!in->client_need_snapflush.empty() &&
2970 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2971 _do_null_snapflush(in, client);
2972 }
2973
2974 adjust_cap_wanted(cap, wanted, issue_seq);
2975
2976 if (mdr)
2977 cap->inc_suppress();
2978 eval(in, CEPH_CAP_LOCKS);
2979 if (mdr)
2980 cap->dec_suppress();
2981
2982 // take note; we may need to reissue on this cap later
2983 if (mdr)
2984 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2985}
2986
2987class C_Locker_RetryKickIssueCaps : public LockerContext {
2988 CInode *in;
2989 client_t client;
2990 ceph_seq_t seq;
2991public:
2992 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2993 LockerContext(l), in(i), client(c), seq(s) {
2994 in->get(CInode::PIN_PTRWAITER);
2995 }
2996 void finish(int r) override {
2997 locker->kick_issue_caps(in, client, seq);
2998 in->put(CInode::PIN_PTRWAITER);
2999 }
3000};
3001
3002void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3003{
3004 Capability *cap = in->get_client_cap(client);
3005 if (!cap || cap->get_last_sent() != seq)
3006 return;
3007 if (in->is_frozen()) {
3008 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3009 in->add_waiter(CInode::WAIT_UNFREEZE,
3010 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3011 return;
3012 }
3013 dout(10) << "kick_issue_caps released at current seq " << seq
3014 << ", reissuing" << dendl;
3015 issue_caps(in, cap);
3016}
3017
3018void Locker::kick_cap_releases(MDRequestRef& mdr)
3019{
3020 client_t client = mdr->get_client();
3021 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3022 p != mdr->cap_releases.end();
3023 ++p) {
3024 CInode *in = mdcache->get_inode(p->first);
3025 if (!in)
3026 continue;
3027 kick_issue_caps(in, client, p->second);
3028 }
3029}
3030
7c673cae
FG
3031/**
3032 * m and ack might be NULL, so don't dereference them unless dirty != 0
3033 */
3034void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3035{
3036 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3037 << " follows " << follows << " snap " << snap
3038 << " on " << *in << dendl;
3039
3040 if (snap == CEPH_NOSNAP) {
3041 // hmm, i guess snap was already deleted? just ack!
3042 dout(10) << " wow, the snap following " << follows
3043 << " was already deleted. nothing to record, just ack." << dendl;
3044 if (ack)
3045 mds->send_message_client_counted(ack, m->get_connection());
3046 return;
3047 }
3048
3049 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3050 mds->mdlog->start_entry(le);
3051 MutationRef mut = new MutationImpl();
3052 mut->ls = mds->mdlog->get_current_segment();
3053
3054 // normal metadata updates that we can apply to the head as well.
3055
3056 // update xattrs?
94b18763
FG
3057 CInode::mempool_xattr_map *px = nullptr;
3058 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3059 m->xattrbl.length() &&
3060 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3061
3062 CInode::mempool_old_inode *oi = 0;
7c673cae
FG
3063 if (in->is_multiversion()) {
3064 oi = in->pick_old_inode(snap);
3065 }
3066
94b18763 3067 CInode::mempool_inode *i;
7c673cae
FG
3068 if (oi) {
3069 dout(10) << " writing into old inode" << dendl;
94b18763
FG
3070 auto &pi = in->project_inode();
3071 pi.inode.version = in->pre_dirty();
7c673cae
FG
3072 if (snap > oi->first)
3073 in->split_old_inode(snap);
94b18763 3074 i = &oi->inode;
7c673cae
FG
3075 if (xattrs)
3076 px = &oi->xattrs;
3077 } else {
94b18763
FG
3078 auto &pi = in->project_inode(xattrs);
3079 pi.inode.version = in->pre_dirty();
3080 i = &pi.inode;
7c673cae 3081 if (xattrs)
94b18763 3082 px = pi.xattrs.get();
7c673cae
FG
3083 }
3084
94b18763 3085 _update_cap_fields(in, dirty, m, i);
7c673cae
FG
3086
3087 // xattr
94b18763
FG
3088 if (xattrs) {
3089 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
7c673cae 3090 << " len " << m->xattrbl.length() << dendl;
94b18763 3091 i->xattr_version = m->head.xattr_version;
7c673cae
FG
3092 bufferlist::iterator p = m->xattrbl.begin();
3093 ::decode(*px, p);
3094 }
3095
94b18763
FG
3096 {
3097 auto it = i->client_ranges.find(client);
3098 if (it != i->client_ranges.end()) {
3099 if (in->last == snap) {
3100 dout(10) << " removing client_range entirely" << dendl;
3101 i->client_ranges.erase(it);
3102 } else {
3103 dout(10) << " client_range now follows " << snap << dendl;
3104 it->second.follows = snap;
3105 }
7c673cae
FG
3106 }
3107 }
3108
3109 mut->auth_pin(in);
3110 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3111 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3112
3113 // "oldest flush tid" > 0 means client uses unique TID for each flush
3114 if (ack && ack->get_oldest_flush_tid() > 0)
3115 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3116 ack->get_oldest_flush_tid());
3117
3118 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3119 client, ack));
3120}
3121
94b18763 3122void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
7c673cae
FG
3123{
3124 if (dirty == 0)
3125 return;
3126
3127 /* m must be valid if there are dirty caps */
3128 assert(m);
3129 uint64_t features = m->get_connection()->get_features();
3130
3131 if (m->get_ctime() > pi->ctime) {
3132 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3133 << " for " << *in << dendl;
3134 pi->ctime = m->get_ctime();
3135 }
3136
3137 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3138 m->get_change_attr() > pi->change_attr) {
3139 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3140 << " for " << *in << dendl;
3141 pi->change_attr = m->get_change_attr();
3142 }
3143
3144 // file
3145 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3146 utime_t atime = m->get_atime();
3147 utime_t mtime = m->get_mtime();
3148 uint64_t size = m->get_size();
3149 version_t inline_version = m->inline_version;
3150
3151 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3152 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3153 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3154 << " for " << *in << dendl;
3155 pi->mtime = mtime;
3156 }
3157 if (in->inode.is_file() && // ONLY if regular file
3158 size > pi->size) {
3159 dout(7) << " size " << pi->size << " -> " << size
3160 << " for " << *in << dendl;
3161 pi->size = size;
3162 pi->rstat.rbytes = size;
3163 }
3164 if (in->inode.is_file() &&
3165 (dirty & CEPH_CAP_FILE_WR) &&
3166 inline_version > pi->inline_data.version) {
3167 pi->inline_data.version = inline_version;
3168 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3169 pi->inline_data.get_data() = m->inline_data;
3170 else
3171 pi->inline_data.free_data();
3172 }
3173 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3174 dout(7) << " atime " << pi->atime << " -> " << atime
3175 << " for " << *in << dendl;
3176 pi->atime = atime;
3177 }
3178 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3179 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3180 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3181 << " for " << *in << dendl;
3182 pi->time_warp_seq = m->get_time_warp_seq();
3183 }
3184 }
3185 // auth
3186 if (dirty & CEPH_CAP_AUTH_EXCL) {
3187 if (m->head.uid != pi->uid) {
3188 dout(7) << " uid " << pi->uid
3189 << " -> " << m->head.uid
3190 << " for " << *in << dendl;
3191 pi->uid = m->head.uid;
3192 }
3193 if (m->head.gid != pi->gid) {
3194 dout(7) << " gid " << pi->gid
3195 << " -> " << m->head.gid
3196 << " for " << *in << dendl;
3197 pi->gid = m->head.gid;
3198 }
3199 if (m->head.mode != pi->mode) {
3200 dout(7) << " mode " << oct << pi->mode
3201 << " -> " << m->head.mode << dec
3202 << " for " << *in << dendl;
3203 pi->mode = m->head.mode;
3204 }
3205 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3206 dout(7) << " btime " << oct << pi->btime
3207 << " -> " << m->get_btime() << dec
3208 << " for " << *in << dendl;
3209 pi->btime = m->get_btime();
3210 }
3211 }
3212}
3213
3214/*
3215 * update inode based on cap flush|flushsnap|wanted.
3216 * adjust max_size, if needed.
3217 * if we update, return true; otherwise, false (no updated needed).
3218 */
3219bool Locker::_do_cap_update(CInode *in, Capability *cap,
3220 int dirty, snapid_t follows,
3221 MClientCaps *m, MClientCaps *ack,
3222 bool *need_flush)
3223{
3224 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3225 << " issued " << ccap_string(cap ? cap->issued() : 0)
3226 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3227 << " on " << *in << dendl;
3228 assert(in->is_auth());
3229 client_t client = m->get_source().num();
94b18763 3230 CInode::mempool_inode *latest = in->get_projected_inode();
7c673cae
FG
3231
3232 // increase or zero max_size?
3233 uint64_t size = m->get_size();
3234 bool change_max = false;
3235 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3236 uint64_t new_max = old_max;
3237
3238 if (in->is_file()) {
3239 bool forced_change_max = false;
3240 dout(20) << "inode is file" << dendl;
3241 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3242 dout(20) << "client has write caps; m->get_max_size="
3243 << m->get_max_size() << "; old_max=" << old_max << dendl;
3244 if (m->get_max_size() > new_max) {
3245 dout(10) << "client requests file_max " << m->get_max_size()
3246 << " > max " << old_max << dendl;
3247 change_max = true;
3248 forced_change_max = true;
31f18b77 3249 new_max = calc_new_max_size(latest, m->get_max_size());
7c673cae 3250 } else {
31f18b77 3251 new_max = calc_new_max_size(latest, size);
7c673cae
FG
3252
3253 if (new_max > old_max)
3254 change_max = true;
3255 else
3256 new_max = old_max;
3257 }
3258 } else {
3259 if (old_max) {
3260 change_max = true;
3261 new_max = 0;
3262 }
3263 }
3264
3265 if (in->last == CEPH_NOSNAP &&
3266 change_max &&
3267 !in->filelock.can_wrlock(client) &&
3268 !in->filelock.can_force_wrlock(client)) {
3269 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3270 if (in->filelock.is_stable()) {
3271 bool need_issue = false;
3272 if (cap)
3273 cap->inc_suppress();
3274 if (in->mds_caps_wanted.empty() &&
3275 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3276 if (in->filelock.get_state() != LOCK_EXCL)
3277 file_excl(&in->filelock, &need_issue);
3278 } else
3279 simple_lock(&in->filelock, &need_issue);
3280 if (need_issue)
3281 issue_caps(in);
3282 if (cap)
3283 cap->dec_suppress();
3284 }
3285 if (!in->filelock.can_wrlock(client) &&
3286 !in->filelock.can_force_wrlock(client)) {
3287 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3288 forced_change_max ? new_max : 0,
3289 0, utime_t());
3290
3291 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3292 change_max = false;
3293 }
3294 }
3295 }
3296
3297 if (m->flockbl.length()) {
3298 int32_t num_locks;
3299 bufferlist::iterator bli = m->flockbl.begin();
3300 ::decode(num_locks, bli);
3301 for ( int i=0; i < num_locks; ++i) {
3302 ceph_filelock decoded_lock;
3303 ::decode(decoded_lock, bli);
3304 in->get_fcntl_lock_state()->held_locks.
3305 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3306 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3307 }
3308 ::decode(num_locks, bli);
3309 for ( int i=0; i < num_locks; ++i) {
3310 ceph_filelock decoded_lock;
3311 ::decode(decoded_lock, bli);
3312 in->get_flock_lock_state()->held_locks.
3313 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3314 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3315 }
3316 }
3317
3318 if (!dirty && !change_max)
3319 return false;
3320
94b18763 3321 Session *session = mds->get_session(m);
7c673cae
FG
3322 if (session->check_access(in, MAY_WRITE,
3323 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
7c673cae
FG
3324 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3325 return false;
3326 }
7c673cae
FG
3327
3328 // do the update.
3329 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3330 mds->mdlog->start_entry(le);
3331
94b18763
FG
3332 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3333 m->xattrbl.length() &&
3334 m->head.xattr_version > in->get_projected_inode()->xattr_version;
7c673cae 3335
94b18763
FG
3336 auto &pi = in->project_inode(xattr);
3337 pi.inode.version = in->pre_dirty();
7c673cae
FG
3338
3339 MutationRef mut(new MutationImpl());
3340 mut->ls = mds->mdlog->get_current_segment();
3341
94b18763 3342 _update_cap_fields(in, dirty, m, &pi.inode);
7c673cae
FG
3343
3344 if (change_max) {
3345 dout(7) << " max_size " << old_max << " -> " << new_max
3346 << " for " << *in << dendl;
3347 if (new_max) {
94b18763
FG
3348 auto &cr = pi.inode.client_ranges[client];
3349 cr.range.first = 0;
3350 cr.range.last = new_max;
3351 cr.follows = in->first - 1;
7c673cae 3352 } else
94b18763 3353 pi.inode.client_ranges.erase(client);
7c673cae
FG
3354 }
3355
3356 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3357 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3358
3359 // auth
3360 if (dirty & CEPH_CAP_AUTH_EXCL)
3361 wrlock_force(&in->authlock, mut);
3362
94b18763
FG
3363 // xattrs update?
3364 if (xattr) {
3365 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3366 pi.inode.xattr_version = m->head.xattr_version;
7c673cae 3367 bufferlist::iterator p = m->xattrbl.begin();
94b18763 3368 ::decode(*pi.xattrs, p);
7c673cae
FG
3369 wrlock_force(&in->xattrlock, mut);
3370 }
3371
3372 mut->auth_pin(in);
3373 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3374 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3375
3376 // "oldest flush tid" > 0 means client uses unique TID for each flush
3377 if (ack && ack->get_oldest_flush_tid() > 0)
3378 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3379 ack->get_oldest_flush_tid());
3380
3381 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3382 change_max, !!cap,
3383 client, ack));
3384 if (need_flush && !*need_flush &&
3385 ((change_max && new_max) || // max INCREASE
3386 _need_flush_mdlog(in, dirty)))
3387 *need_flush = true;
3388
3389 return true;
3390}
3391
3392/* This function DOES put the passed message before returning */
3393void Locker::handle_client_cap_release(MClientCapRelease *m)
3394{
3395 client_t client = m->get_source().num();
3396 dout(10) << "handle_client_cap_release " << *m << dendl;
3397
3398 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3399 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3400 return;
3401 }
3402
3403 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3404 // Pause RADOS operations until we see the required epoch
3405 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3406 }
3407
3408 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3409 // Record the barrier so that we will retransmit it to clients
3410 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3411 }
3412
94b18763 3413 Session *session = mds->get_session(m);
7c673cae
FG
3414
3415 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3416 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3417 }
3418
3419 if (session) {
3420 session->notify_cap_release(m->caps.size());
3421 }
3422
3423 m->put();
3424}
3425
3426class C_Locker_RetryCapRelease : public LockerContext {
3427 client_t client;
3428 inodeno_t ino;
3429 uint64_t cap_id;
3430 ceph_seq_t migrate_seq;
3431 ceph_seq_t issue_seq;
3432public:
3433 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3434 ceph_seq_t mseq, ceph_seq_t seq) :
3435 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3436 void finish(int r) override {
3437 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3438 }
3439};
3440
3441void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3442 ceph_seq_t mseq, ceph_seq_t seq)
3443{
3444 CInode *in = mdcache->get_inode(ino);
3445 if (!in) {
3446 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3447 return;
3448 }
3449 Capability *cap = in->get_client_cap(client);
3450 if (!cap) {
3451 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3452 return;
3453 }
3454
3455 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3456 if (cap->get_cap_id() != cap_id) {
3457 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3458 return;
3459 }
3460 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3461 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3462 return;
3463 }
3464 if (should_defer_client_cap_frozen(in)) {
3465 dout(7) << " freezing|frozen, deferring" << dendl;
3466 in->add_waiter(CInode::WAIT_UNFREEZE,
3467 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3468 return;
3469 }
3470 if (seq != cap->get_last_issue()) {
3471 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3472 // clean out any old revoke history
3473 cap->clean_revoke_from(seq);
3474 eval_cap_gather(in);
3475 return;
3476 }
3477 remove_client_cap(in, client);
3478}
3479
3480/* This function DOES put the passed message before returning */
3481
3482void Locker::remove_client_cap(CInode *in, client_t client)
3483{
3484 // clean out any pending snapflush state
3485 if (!in->client_need_snapflush.empty())
3486 _do_null_snapflush(in, client);
3487
3488 in->remove_client_cap(client);
3489
3490 if (in->is_auth()) {
3491 // make sure we clear out the client byte range
3492 if (in->get_projected_inode()->client_ranges.count(client) &&
3493 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3494 check_inode_max_size(in);
3495 } else {
3496 request_inode_file_caps(in);
3497 }
3498
3499 try_eval(in, CEPH_CAP_LOCKS);
3500}
3501
3502
3503/**
3504 * Return true if any currently revoking caps exceed the
b32b8144 3505 * mds_session_timeout threshold.
7c673cae
FG
3506 */
3507bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3508{
3509 xlist<Capability*>::const_iterator p = revoking.begin();
3510 if (p.end()) {
3511 // No revoking caps at the moment
3512 return false;
3513 } else {
3514 utime_t now = ceph_clock_now();
3515 utime_t age = now - (*p)->get_last_revoke_stamp();
b32b8144 3516 if (age <= g_conf->mds_session_timeout) {
7c673cae
FG
3517 return false;
3518 } else {
3519 return true;
3520 }
3521 }
3522}
3523
3524
3525void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3526{
3527 if (!any_late_revoking_caps(revoking_caps)) {
3528 // Fast path: no misbehaving clients, execute in O(1)
3529 return;
3530 }
3531
3532 // Slow path: execute in O(N_clients)
3533 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3534 for (client_rc_iter = revoking_caps_by_client.begin();
3535 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3536 xlist<Capability*> const &client_rc = client_rc_iter->second;
3537 bool any_late = any_late_revoking_caps(client_rc);
3538 if (any_late) {
3539 result->push_back(client_rc_iter->first);
3540 }
3541 }
3542}
3543
3544// Hard-code instead of surfacing a config settings because this is
3545// really a hack that should go away at some point when we have better
3546// inspection tools for getting at detailed cap state (#7316)
3547#define MAX_WARN_CAPS 100
3548
3549void Locker::caps_tick()
3550{
3551 utime_t now = ceph_clock_now();
3552
3553 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3554
3555 int i = 0;
3556 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3557 Capability *cap = *p;
3558
3559 utime_t age = now - cap->get_last_revoke_stamp();
3560 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
b32b8144
FG
3561 if (age <= g_conf->mds_session_timeout) {
3562 dout(20) << __func__ << " age below timeout " << g_conf->mds_session_timeout << dendl;
7c673cae
FG
3563 break;
3564 } else {
3565 ++i;
3566 if (i > MAX_WARN_CAPS) {
3567 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3568 << "revoking, ignoring subsequent caps" << dendl;
3569 break;
3570 }
3571 }
3572 // exponential backoff of warning intervals
b32b8144 3573 if (age > g_conf->mds_session_timeout * (1 << cap->get_num_revoke_warnings())) {
7c673cae
FG
3574 cap->inc_num_revoke_warnings();
3575 stringstream ss;
3576 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3577 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3578 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3579 mds->clog->warn() << ss.str();
3580 dout(20) << __func__ << " " << ss.str() << dendl;
3581 } else {
3582 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3583 }
3584 }
3585}
3586
3587
3588void Locker::handle_client_lease(MClientLease *m)
3589{
3590 dout(10) << "handle_client_lease " << *m << dendl;
3591
3592 assert(m->get_source().is_client());
3593 client_t client = m->get_source().num();
3594
3595 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3596 if (!in) {
3597 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3598 m->put();
3599 return;
3600 }
3601 CDentry *dn = 0;
3602
3603 frag_t fg = in->pick_dirfrag(m->dname);
3604 CDir *dir = in->get_dirfrag(fg);
3605 if (dir)
3606 dn = dir->lookup(m->dname);
3607 if (!dn) {
3608 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3609 m->put();
3610 return;
3611 }
3612 dout(10) << " on " << *dn << dendl;
3613
3614 // replica and lock
3615 ClientLease *l = dn->get_client_lease(client);
3616 if (!l) {
3617 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3618 m->put();
3619 return;
3620 }
3621
3622 switch (m->get_action()) {
3623 case CEPH_MDS_LEASE_REVOKE_ACK:
3624 case CEPH_MDS_LEASE_RELEASE:
3625 if (l->seq != m->get_seq()) {
3626 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3627 } else {
3628 dout(7) << "handle_client_lease client." << client
3629 << " on " << *dn << dendl;
3630 dn->remove_client_lease(l, this);
3631 }
3632 m->put();
3633 break;
3634
3635 case CEPH_MDS_LEASE_RENEW:
3636 {
3637 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3638 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3639 if (dn->lock.can_lease(client)) {
3640 int pool = 1; // fixme.. do something smart!
3641 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3642 m->h.seq = ++l->seq;
3643 m->clear_payload();
3644
3645 utime_t now = ceph_clock_now();
3646 now += mdcache->client_lease_durations[pool];
3647 mdcache->touch_client_lease(l, pool, now);
3648
3649 mds->send_message_client_counted(m, m->get_connection());
3650 }
3651 }
3652 break;
3653
3654 default:
3655 ceph_abort(); // implement me
3656 break;
3657 }
3658}
3659
3660
3661void Locker::issue_client_lease(CDentry *dn, client_t client,
3662 bufferlist &bl, utime_t now, Session *session)
3663{
3664 CInode *diri = dn->get_dir()->get_inode();
3665 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3666 ((!diri->filelock.can_lease(client) &&
3667 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3668 dn->lock.can_lease(client)) {
3669 int pool = 1; // fixme.. do something smart!
3670 // issue a dentry lease
3671 ClientLease *l = dn->add_client_lease(client, session);
3672 session->touch_lease(l);
3673
3674 now += mdcache->client_lease_durations[pool];
3675 mdcache->touch_client_lease(l, pool, now);
3676
3677 LeaseStat e;
3678 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3679 e.seq = ++l->seq;
3680 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3681 ::encode(e, bl);
3682 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3683 << " on " << *dn << dendl;
3684 } else {
3685 // null lease
3686 LeaseStat e;
3687 e.mask = 0;
3688 e.seq = 0;
3689 e.duration_ms = 0;
3690 ::encode(e, bl);
3691 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3692 }
3693}
3694
3695
3696void Locker::revoke_client_leases(SimpleLock *lock)
3697{
3698 int n = 0;
3699 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3700 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3701 p != dn->client_lease_map.end();
3702 ++p) {
3703 ClientLease *l = p->second;
3704
3705 n++;
3706 assert(lock->get_type() == CEPH_LOCK_DN);
3707
3708 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3709 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3710
3711 // i should also revoke the dir ICONTENT lease, if they have it!
3712 CInode *diri = dn->get_dir()->get_inode();
3713 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3714 mask,
3715 diri->ino(),
3716 diri->first, CEPH_NOSNAP,
3717 dn->get_name()),
3718 l->client);
3719 }
7c673cae
FG
3720}
3721
3722
3723
3724// locks ----------------------------------------------------------------
3725
3726SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3727{
3728 switch (lock_type) {
3729 case CEPH_LOCK_DN:
3730 {
3731 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3732 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3733 frag_t fg;
3734 CDir *dir = 0;
3735 CDentry *dn = 0;
3736 if (diri) {
3737 fg = diri->pick_dirfrag(info.dname);
3738 dir = diri->get_dirfrag(fg);
3739 if (dir)
3740 dn = dir->lookup(info.dname, info.snapid);
3741 }
3742 if (!dn) {
3743 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3744 return 0;
3745 }
3746 return &dn->lock;
3747 }
3748
3749 case CEPH_LOCK_IAUTH:
3750 case CEPH_LOCK_ILINK:
3751 case CEPH_LOCK_IDFT:
3752 case CEPH_LOCK_IFILE:
3753 case CEPH_LOCK_INEST:
3754 case CEPH_LOCK_IXATTR:
3755 case CEPH_LOCK_ISNAP:
3756 case CEPH_LOCK_IFLOCK:
3757 case CEPH_LOCK_IPOLICY:
3758 {
3759 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3760 if (!in) {
3761 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3762 return 0;
3763 }
3764 switch (lock_type) {
3765 case CEPH_LOCK_IAUTH: return &in->authlock;
3766 case CEPH_LOCK_ILINK: return &in->linklock;
3767 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3768 case CEPH_LOCK_IFILE: return &in->filelock;
3769 case CEPH_LOCK_INEST: return &in->nestlock;
3770 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3771 case CEPH_LOCK_ISNAP: return &in->snaplock;
3772 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3773 case CEPH_LOCK_IPOLICY: return &in->policylock;
3774 }
3775 }
3776
3777 default:
3778 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3779 ceph_abort();
3780 break;
3781 }
3782
3783 return 0;
3784}
3785
3786/* This function DOES put the passed message before returning */
3787void Locker::handle_lock(MLock *m)
3788{
3789 // nobody should be talking to us during recovery.
3790 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3791
3792 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3793 if (!lock) {
3794 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3795 m->put();
3796 return;
3797 }
3798
3799 switch (lock->get_type()) {
3800 case CEPH_LOCK_DN:
3801 case CEPH_LOCK_IAUTH:
3802 case CEPH_LOCK_ILINK:
3803 case CEPH_LOCK_ISNAP:
3804 case CEPH_LOCK_IXATTR:
3805 case CEPH_LOCK_IFLOCK:
3806 case CEPH_LOCK_IPOLICY:
3807 handle_simple_lock(lock, m);
3808 break;
3809
3810 case CEPH_LOCK_IDFT:
3811 case CEPH_LOCK_INEST:
3812 //handle_scatter_lock((ScatterLock*)lock, m);
3813 //break;
3814
3815 case CEPH_LOCK_IFILE:
3816 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3817 break;
3818
3819 default:
3820 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3821 ceph_abort();
3822 break;
3823 }
3824}
3825
3826
3827
3828
3829
3830// ==========================================================================
3831// simple lock
3832
3833/** This function may take a reference to m if it needs one, but does
3834 * not put references. */
3835void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3836{
3837 MDSCacheObject *parent = lock->get_parent();
3838 if (parent->is_auth() &&
3839 lock->get_state() != LOCK_SYNC &&
3840 !parent->is_frozen()) {
3841 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3842 << " on " << *parent << dendl;
3843 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3844 if (lock->is_stable()) {
3845 simple_sync(lock);
3846 } else {
3847 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3848 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3849 new C_MDS_RetryMessage(mds, m->get()));
3850 }
3851 } else {
3852 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3853 << " on " << *parent << dendl;
3854 // replica should retry
3855 }
3856}
3857
3858/* This function DOES put the passed message before returning */
3859void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3860{
3861 int from = m->get_asker();
3862
3863 dout(10) << "handle_simple_lock " << *m
3864 << " on " << *lock << " " << *lock->get_parent() << dendl;
3865
3866 if (mds->is_rejoin()) {
3867 if (lock->get_parent()->is_rejoining()) {
3868 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3869 << ", dropping " << *m << dendl;
3870 m->put();
3871 return;
3872 }
3873 }
3874
3875 switch (m->get_action()) {
3876 // -- replica --
3877 case LOCK_AC_SYNC:
3878 assert(lock->get_state() == LOCK_LOCK);
3879 lock->decode_locked_state(m->get_data());
3880 lock->set_state(LOCK_SYNC);
3881 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3882 break;
3883
3884 case LOCK_AC_LOCK:
3885 assert(lock->get_state() == LOCK_SYNC);
3886 lock->set_state(LOCK_SYNC_LOCK);
3887 if (lock->is_leased())
3888 revoke_client_leases(lock);
3889 eval_gather(lock, true);
31f18b77
FG
3890 if (lock->is_unstable_and_locked())
3891 mds->mdlog->flush();
7c673cae
FG
3892 break;
3893
3894
3895 // -- auth --
3896 case LOCK_AC_LOCKACK:
3897 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3898 lock->get_state() == LOCK_SYNC_EXCL);
3899 assert(lock->is_gathering(from));
3900 lock->remove_gather(from);
3901
3902 if (lock->is_gathering()) {
3903 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3904 << ", still gathering " << lock->get_gather_set() << dendl;
3905 } else {
3906 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3907 << ", last one" << dendl;
3908 eval_gather(lock);
3909 }
3910 break;
3911
3912 case LOCK_AC_REQRDLOCK:
3913 handle_reqrdlock(lock, m);
3914 break;
3915
3916 }
3917
3918 m->put();
3919}
3920
3921/* unused, currently.
3922
3923class C_Locker_SimpleEval : public Context {
3924 Locker *locker;
3925 SimpleLock *lock;
3926public:
3927 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3928 void finish(int r) {
3929 locker->try_simple_eval(lock);
3930 }
3931};
3932
3933void Locker::try_simple_eval(SimpleLock *lock)
3934{
3935 // unstable and ambiguous auth?
3936 if (!lock->is_stable() &&
3937 lock->get_parent()->is_ambiguous_auth()) {
3938 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3939 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3940 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3941 return;
3942 }
3943
3944 if (!lock->get_parent()->is_auth()) {
3945 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3946 return;
3947 }
3948
3949 if (!lock->get_parent()->can_auth_pin()) {
3950 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3951 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3952 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3953 return;
3954 }
3955
3956 if (lock->is_stable())
3957 simple_eval(lock);
3958}
3959*/
3960
3961
3962void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3963{
3964 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3965
3966 assert(lock->get_parent()->is_auth());
3967 assert(lock->is_stable());
3968
3969 if (lock->get_parent()->is_freezing_or_frozen()) {
3970 // dentry lock in unreadable state can block path traverse
3971 if ((lock->get_type() != CEPH_LOCK_DN ||
3972 lock->get_state() == LOCK_SYNC ||
3973 lock->get_parent()->is_frozen()))
3974 return;
3975 }
3976
3977 if (mdcache->is_readonly()) {
3978 if (lock->get_state() != LOCK_SYNC) {
3979 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3980 simple_sync(lock, need_issue);
3981 }
3982 return;
3983 }
3984
3985 CInode *in = 0;
3986 int wanted = 0;
3987 if (lock->get_type() != CEPH_LOCK_DN) {
3988 in = static_cast<CInode*>(lock->get_parent());
3989 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3990 }
3991
3992 // -> excl?
3993 if (lock->get_state() != LOCK_EXCL &&
3994 in && in->get_target_loner() >= 0 &&
3995 (wanted & CEPH_CAP_GEXCL)) {
3996 dout(7) << "simple_eval stable, going to excl " << *lock
3997 << " on " << *lock->get_parent() << dendl;
3998 simple_excl(lock, need_issue);
3999 }
4000
4001 // stable -> sync?
4002 else if (lock->get_state() != LOCK_SYNC &&
4003 !lock->is_wrlocked() &&
4004 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4005 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4006 dout(7) << "simple_eval stable, syncing " << *lock
4007 << " on " << *lock->get_parent() << dendl;
4008 simple_sync(lock, need_issue);
4009 }
4010}
4011
4012
4013// mid
4014
4015bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4016{
4017 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4018 assert(lock->get_parent()->is_auth());
4019 assert(lock->is_stable());
4020
4021 CInode *in = 0;
4022 if (lock->get_cap_shift())
4023 in = static_cast<CInode *>(lock->get_parent());
4024
4025 int old_state = lock->get_state();
4026
4027 if (old_state != LOCK_TSYN) {
4028
4029 switch (lock->get_state()) {
4030 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4031 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4032 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4033 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4034 default: ceph_abort();
4035 }
4036
4037 int gather = 0;
4038 if (lock->is_wrlocked())
4039 gather++;
4040
4041 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4042 send_lock_message(lock, LOCK_AC_SYNC);
4043 lock->init_gather();
4044 gather++;
4045 }
4046
4047 if (in && in->is_head()) {
4048 if (in->issued_caps_need_gather(lock)) {
4049 if (need_issue)
4050 *need_issue = true;
4051 else
4052 issue_caps(in);
4053 gather++;
4054 }
4055 }
4056
4057 bool need_recover = false;
4058 if (lock->get_type() == CEPH_LOCK_IFILE) {
4059 assert(in);
4060 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4061 mds->mdcache->queue_file_recover(in);
4062 need_recover = true;
4063 gather++;
4064 }
4065 }
4066
4067 if (!gather && lock->is_dirty()) {
4068 lock->get_parent()->auth_pin(lock);
4069 scatter_writebehind(static_cast<ScatterLock*>(lock));
4070 mds->mdlog->flush();
4071 return false;
4072 }
4073
4074 if (gather) {
4075 lock->get_parent()->auth_pin(lock);
4076 if (need_recover)
4077 mds->mdcache->do_file_recover();
4078 return false;
4079 }
4080 }
4081
4082 if (lock->get_parent()->is_replicated()) { // FIXME
4083 bufferlist data;
4084 lock->encode_locked_state(data);
4085 send_lock_message(lock, LOCK_AC_SYNC, data);
4086 }
4087 lock->set_state(LOCK_SYNC);
4088 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4089 if (in && in->is_head()) {
4090 if (need_issue)
4091 *need_issue = true;
4092 else
4093 issue_caps(in);
4094 }
4095 return true;
4096}
4097
4098void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4099{
4100 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4101 assert(lock->get_parent()->is_auth());
4102 assert(lock->is_stable());
4103
4104 CInode *in = 0;
4105 if (lock->get_cap_shift())
4106 in = static_cast<CInode *>(lock->get_parent());
4107
4108 switch (lock->get_state()) {
4109 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4110 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4111 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4112 default: ceph_abort();
4113 }
4114
4115 int gather = 0;
4116 if (lock->is_rdlocked())
4117 gather++;
4118 if (lock->is_wrlocked())
4119 gather++;
4120
4121 if (lock->get_parent()->is_replicated() &&
4122 lock->get_state() != LOCK_LOCK_EXCL &&
4123 lock->get_state() != LOCK_XSYN_EXCL) {
4124 send_lock_message(lock, LOCK_AC_LOCK);
4125 lock->init_gather();
4126 gather++;
4127 }
4128
4129 if (in && in->is_head()) {
4130 if (in->issued_caps_need_gather(lock)) {
4131 if (need_issue)
4132 *need_issue = true;
4133 else
4134 issue_caps(in);
4135 gather++;
4136 }
4137 }
4138
4139 if (gather) {
4140 lock->get_parent()->auth_pin(lock);
4141 } else {
4142 lock->set_state(LOCK_EXCL);
4143 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4144 if (in) {
4145 if (need_issue)
4146 *need_issue = true;
4147 else
4148 issue_caps(in);
4149 }
4150 }
4151}
4152
4153void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4154{
4155 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4156 assert(lock->get_parent()->is_auth());
4157 assert(lock->is_stable());
4158 assert(lock->get_state() != LOCK_LOCK);
4159
4160 CInode *in = 0;
4161 if (lock->get_cap_shift())
4162 in = static_cast<CInode *>(lock->get_parent());
4163
4164 int old_state = lock->get_state();
4165
4166 switch (lock->get_state()) {
4167 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
b32b8144 4168 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
7c673cae
FG
4169 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4170 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4171 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4172 break;
4173 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4174 default: ceph_abort();
4175 }
4176
4177 int gather = 0;
4178 if (lock->is_leased()) {
4179 gather++;
4180 revoke_client_leases(lock);
4181 }
4182 if (lock->is_rdlocked())
4183 gather++;
4184 if (in && in->is_head()) {
4185 if (in->issued_caps_need_gather(lock)) {
4186 if (need_issue)
4187 *need_issue = true;
4188 else
4189 issue_caps(in);
4190 gather++;
4191 }
4192 }
4193
4194 bool need_recover = false;
4195 if (lock->get_type() == CEPH_LOCK_IFILE) {
4196 assert(in);
4197 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4198 mds->mdcache->queue_file_recover(in);
4199 need_recover = true;
4200 gather++;
4201 }
4202 }
4203
4204 if (lock->get_parent()->is_replicated() &&
4205 lock->get_state() == LOCK_MIX_LOCK &&
4206 gather) {
4207 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4208 } else {
4209 // move to second stage of gather now, so we don't send the lock action later.
4210 if (lock->get_state() == LOCK_MIX_LOCK)
4211 lock->set_state(LOCK_MIX_LOCK2);
4212
4213 if (lock->get_parent()->is_replicated() &&
4214 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4215 gather++;
4216 send_lock_message(lock, LOCK_AC_LOCK);
4217 lock->init_gather();
4218 }
4219 }
4220
4221 if (!gather && lock->is_dirty()) {
4222 lock->get_parent()->auth_pin(lock);
4223 scatter_writebehind(static_cast<ScatterLock*>(lock));
4224 mds->mdlog->flush();
4225 return;
4226 }
4227
4228 if (gather) {
4229 lock->get_parent()->auth_pin(lock);
4230 if (need_recover)
4231 mds->mdcache->do_file_recover();
4232 } else {
4233 lock->set_state(LOCK_LOCK);
4234 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4235 }
4236}
4237
4238
4239void Locker::simple_xlock(SimpleLock *lock)
4240{
4241 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4242 assert(lock->get_parent()->is_auth());
4243 //assert(lock->is_stable());
4244 assert(lock->get_state() != LOCK_XLOCK);
4245
4246 CInode *in = 0;
4247 if (lock->get_cap_shift())
4248 in = static_cast<CInode *>(lock->get_parent());
4249
4250 if (lock->is_stable())
4251 lock->get_parent()->auth_pin(lock);
4252
4253 switch (lock->get_state()) {
4254 case LOCK_LOCK:
4255 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4256 default: ceph_abort();
4257 }
4258
4259 int gather = 0;
4260 if (lock->is_rdlocked())
4261 gather++;
4262 if (lock->is_wrlocked())
4263 gather++;
4264
4265 if (in && in->is_head()) {
4266 if (in->issued_caps_need_gather(lock)) {
4267 issue_caps(in);
4268 gather++;
4269 }
4270 }
4271
4272 if (!gather) {
4273 lock->set_state(LOCK_PREXLOCK);
4274 //assert("shouldn't be called if we are already xlockable" == 0);
4275 }
4276}
4277
4278
4279
4280
4281
4282// ==========================================================================
4283// scatter lock
4284
4285/*
4286
4287Some notes on scatterlocks.
4288
4289 - The scatter/gather is driven by the inode lock. The scatter always
4290 brings in the latest metadata from the fragments.
4291
4292 - When in a scattered/MIX state, fragments are only allowed to
4293 update/be written to if the accounted stat matches the inode's
4294 current version.
4295
4296 - That means, on gather, we _only_ assimilate diffs for frag metadata
4297 that match the current version, because those are the only ones
4298 written during this scatter/gather cycle. (Others didn't permit
4299 it.) We increment the version and journal this to disk.
4300
4301 - When possible, we also simultaneously update our local frag
4302 accounted stats to match.
4303
4304 - On scatter, the new inode info is broadcast to frags, both local
4305 and remote. If possible (auth and !frozen), the dirfrag auth
4306 should update the accounted state (if it isn't already up to date).
4307 Note that this may occur on both the local inode auth node and
4308 inode replicas, so there are two potential paths. If it is NOT
4309 possible, they need to mark_stale to prevent any possible writes.
4310
4311 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4312 only). Both are opportunities to update the frag accounted stats,
4313 even though only the MIX case is affected by a stale dirfrag.
4314
4315 - Because many scatter/gather cycles can potentially go by without a
4316 frag being able to update its accounted stats (due to being frozen
4317 by exports/refragments in progress), the frag may have (even very)
4318 old stat versions. That's fine. If when we do want to update it,
4319 we can update accounted_* and the version first.
4320
4321*/
4322
4323class C_Locker_ScatterWB : public LockerLogContext {
4324 ScatterLock *lock;
4325 MutationRef mut;
4326public:
4327 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4328 LockerLogContext(l), lock(sl), mut(m) {}
4329 void finish(int r) override {
4330 locker->scatter_writebehind_finish(lock, mut);
4331 }
4332};
4333
4334void Locker::scatter_writebehind(ScatterLock *lock)
4335{
4336 CInode *in = static_cast<CInode*>(lock->get_parent());
4337 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4338
4339 // journal
4340 MutationRef mut(new MutationImpl());
4341 mut->ls = mds->mdlog->get_current_segment();
4342
4343 // forcefully take a wrlock
4344 lock->get_wrlock(true);
4345 mut->wrlocks.insert(lock);
4346 mut->locks.insert(lock);
4347
4348 in->pre_cow_old_inode(); // avoid cow mayhem
4349
94b18763
FG
4350 auto &pi = in->project_inode();
4351 pi.inode.version = in->pre_dirty();
7c673cae
FG
4352
4353 in->finish_scatter_gather_update(lock->get_type());
4354 lock->start_flush();
4355
4356 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4357 mds->mdlog->start_entry(le);
4358
4359 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4360 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4361
4362 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4363
4364 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4365}
4366
4367void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4368{
4369 CInode *in = static_cast<CInode*>(lock->get_parent());
4370 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4371 in->pop_and_dirty_projected_inode(mut->ls);
4372
4373 lock->finish_flush();
4374
4375 // if replicas may have flushed in a mix->lock state, send another
4376 // message so they can finish_flush().
4377 if (in->is_replicated()) {
4378 switch (lock->get_state()) {
4379 case LOCK_MIX_LOCK:
4380 case LOCK_MIX_LOCK2:
4381 case LOCK_MIX_EXCL:
4382 case LOCK_MIX_TSYN:
4383 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4384 }
4385 }
4386
4387 mut->apply();
4388 drop_locks(mut.get());
4389 mut->cleanup();
4390
4391 if (lock->is_stable())
4392 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4393
4394 //scatter_eval_gather(lock);
4395}
4396
4397void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4398{
4399 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4400
4401 assert(lock->get_parent()->is_auth());
4402 assert(lock->is_stable());
4403
4404 if (lock->get_parent()->is_freezing_or_frozen()) {
4405 dout(20) << " freezing|frozen" << dendl;
4406 return;
4407 }
4408
4409 if (mdcache->is_readonly()) {
4410 if (lock->get_state() != LOCK_SYNC) {
4411 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4412 simple_sync(lock, need_issue);
4413 }
4414 return;
4415 }
4416
4417 if (!lock->is_rdlocked() &&
4418 lock->get_state() != LOCK_MIX &&
4419 lock->get_scatter_wanted()) {
4420 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4421 << " on " << *lock->get_parent() << dendl;
4422 scatter_mix(lock, need_issue);
4423 return;
4424 }
4425
4426 if (lock->get_type() == CEPH_LOCK_INEST) {
4427 // in general, we want to keep INEST writable at all times.
4428 if (!lock->is_rdlocked()) {
4429 if (lock->get_parent()->is_replicated()) {
4430 if (lock->get_state() != LOCK_MIX)
4431 scatter_mix(lock, need_issue);
4432 } else {
4433 if (lock->get_state() != LOCK_LOCK)
4434 simple_lock(lock, need_issue);
4435 }
4436 }
4437 return;
4438 }
4439
4440 CInode *in = static_cast<CInode*>(lock->get_parent());
4441 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4442 // i _should_ be sync.
4443 if (!lock->is_wrlocked() &&
4444 lock->get_state() != LOCK_SYNC) {
4445 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4446 simple_sync(lock, need_issue);
4447 }
4448 }
4449}
4450
4451
4452/*
4453 * mark a scatterlock to indicate that the dir fnode has some dirty data
4454 */
4455void Locker::mark_updated_scatterlock(ScatterLock *lock)
4456{
4457 lock->mark_dirty();
4458 if (lock->get_updated_item()->is_on_list()) {
4459 dout(10) << "mark_updated_scatterlock " << *lock
4460 << " - already on list since " << lock->get_update_stamp() << dendl;
4461 } else {
4462 updated_scatterlocks.push_back(lock->get_updated_item());
4463 utime_t now = ceph_clock_now();
4464 lock->set_update_stamp(now);
4465 dout(10) << "mark_updated_scatterlock " << *lock
4466 << " - added at " << now << dendl;
4467 }
4468}
4469
4470/*
4471 * this is called by scatter_tick and LogSegment::try_to_trim() when
4472 * trying to flush dirty scattered data (i.e. updated fnode) back to
4473 * the inode.
4474 *
4475 * we need to lock|scatter in order to push fnode changes into the
4476 * inode.dirstat.
4477 */
4478void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4479{
4480 CInode *p = static_cast<CInode *>(lock->get_parent());
4481
4482 if (p->is_frozen() || p->is_freezing()) {
4483 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4484 if (c)
4485 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
b32b8144 4486 else if (lock->is_dirty())
7c673cae
FG
4487 // just requeue. not ideal.. starvation prone..
4488 updated_scatterlocks.push_back(lock->get_updated_item());
4489 return;
4490 }
4491
4492 if (p->is_ambiguous_auth()) {
4493 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4494 if (c)
4495 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
b32b8144 4496 else if (lock->is_dirty())
7c673cae
FG
4497 // just requeue. not ideal.. starvation prone..
4498 updated_scatterlocks.push_back(lock->get_updated_item());
4499 return;
4500 }
4501
4502 if (p->is_auth()) {
4503 int count = 0;
4504 while (true) {
4505 if (lock->is_stable()) {
4506 // can we do it now?
4507 // (only if we're not replicated.. if we are, we really do need
4508 // to nudge the lock state!)
4509 /*
4510 actually, even if we're not replicated, we can't stay in MIX, because another mds
4511 could discover and replicate us at any time. if that happens while we're flushing,
4512 they end up in MIX but their inode has the old scatterstat version.
4513
4514 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4515 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4516 scatter_writebehind(lock);
4517 if (c)
4518 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4519 return;
4520 }
4521 */
4522
4523 if (mdcache->is_readonly()) {
4524 if (lock->get_state() != LOCK_SYNC) {
4525 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4526 simple_sync(static_cast<ScatterLock*>(lock));
4527 }
4528 break;
4529 }
4530
4531 // adjust lock state
4532 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4533 switch (lock->get_type()) {
4534 case CEPH_LOCK_IFILE:
4535 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4536 scatter_mix(static_cast<ScatterLock*>(lock));
4537 else if (lock->get_state() != LOCK_LOCK)
4538 simple_lock(static_cast<ScatterLock*>(lock));
4539 else
4540 simple_sync(static_cast<ScatterLock*>(lock));
4541 break;
4542
4543 case CEPH_LOCK_IDFT:
4544 case CEPH_LOCK_INEST:
4545 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4546 scatter_mix(lock);
4547 else if (lock->get_state() != LOCK_LOCK)
4548 simple_lock(lock);
4549 else
4550 simple_sync(lock);
4551 break;
4552 default:
4553 ceph_abort();
4554 }
4555 ++count;
4556 if (lock->is_stable() && count == 2) {
4557 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4558 // this should only realy happen when called via
4559 // handle_file_lock due to AC_NUDGE, because the rest of the
4560 // time we are replicated or have dirty data and won't get
4561 // called. bailing here avoids an infinite loop.
4562 assert(!c);
4563 break;
4564 }
4565 } else {
4566 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4567 if (c)
4568 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4569 return;
4570 }
4571 }
4572 } else {
4573 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4574 << *lock << " on " << *p << dendl;
4575 // request unscatter?
4576 mds_rank_t auth = lock->get_parent()->authority().first;
4577 if (!mds->is_cluster_degraded() ||
4578 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4579 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4580
4581 // wait...
4582 if (c)
4583 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4584
4585 // also, requeue, in case we had wrong auth or something
b32b8144
FG
4586 if (lock->is_dirty())
4587 updated_scatterlocks.push_back(lock->get_updated_item());
7c673cae
FG
4588 }
4589}
4590
4591void Locker::scatter_tick()
4592{
4593 dout(10) << "scatter_tick" << dendl;
4594
4595 // updated
4596 utime_t now = ceph_clock_now();
4597 int n = updated_scatterlocks.size();
4598 while (!updated_scatterlocks.empty()) {
4599 ScatterLock *lock = updated_scatterlocks.front();
4600
4601 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4602
4603 if (!lock->is_dirty()) {
4604 updated_scatterlocks.pop_front();
4605 dout(10) << " removing from updated_scatterlocks "
4606 << *lock << " " << *lock->get_parent() << dendl;
4607 continue;
4608 }
4609 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4610 break;
4611 updated_scatterlocks.pop_front();
4612 scatter_nudge(lock, 0);
4613 }
4614 mds->mdlog->flush();
4615}
4616
4617
4618void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4619{
4620 dout(10) << "scatter_tempsync " << *lock
4621 << " on " << *lock->get_parent() << dendl;
4622 assert(lock->get_parent()->is_auth());
4623 assert(lock->is_stable());
4624
4625 assert(0 == "not fully implemented, at least not for filelock");
4626
4627 CInode *in = static_cast<CInode *>(lock->get_parent());
4628
4629 switch (lock->get_state()) {
4630 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4631 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4632 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4633 default: ceph_abort();
4634 }
4635
4636 int gather = 0;
4637 if (lock->is_wrlocked())
4638 gather++;
4639
4640 if (lock->get_cap_shift() &&
4641 in->is_head() &&
4642 in->issued_caps_need_gather(lock)) {
4643 if (need_issue)
4644 *need_issue = true;
4645 else
4646 issue_caps(in);
4647 gather++;
4648 }
4649
4650 if (lock->get_state() == LOCK_MIX_TSYN &&
4651 in->is_replicated()) {
4652 lock->init_gather();
4653 send_lock_message(lock, LOCK_AC_LOCK);
4654 gather++;
4655 }
4656
4657 if (gather) {
4658 in->auth_pin(lock);
4659 } else {
4660 // do tempsync
4661 lock->set_state(LOCK_TSYN);
4662 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4663 if (lock->get_cap_shift()) {
4664 if (need_issue)
4665 *need_issue = true;
4666 else
4667 issue_caps(in);
4668 }
4669 }
4670}
4671
4672
4673
4674// ==========================================================================
4675// local lock
4676
4677void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4678{
4679 dout(7) << "local_wrlock_grab on " << *lock
4680 << " on " << *lock->get_parent() << dendl;
4681
4682 assert(lock->get_parent()->is_auth());
4683 assert(lock->can_wrlock());
4684 assert(!mut->wrlocks.count(lock));
4685 lock->get_wrlock(mut->get_client());
4686 mut->wrlocks.insert(lock);
4687 mut->locks.insert(lock);
4688}
4689
4690bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4691{
4692 dout(7) << "local_wrlock_start on " << *lock
4693 << " on " << *lock->get_parent() << dendl;
4694
4695 assert(lock->get_parent()->is_auth());
4696 if (lock->can_wrlock()) {
4697 assert(!mut->wrlocks.count(lock));
4698 lock->get_wrlock(mut->get_client());
4699 mut->wrlocks.insert(lock);
4700 mut->locks.insert(lock);
4701 return true;
4702 } else {
4703 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4704 return false;
4705 }
4706}
4707
4708void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4709{
4710 dout(7) << "local_wrlock_finish on " << *lock
4711 << " on " << *lock->get_parent() << dendl;
4712 lock->put_wrlock();
4713 mut->wrlocks.erase(lock);
4714 mut->locks.erase(lock);
4715 if (lock->get_num_wrlocks() == 0) {
4716 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4717 SimpleLock::WAIT_WR |
4718 SimpleLock::WAIT_RD);
4719 }
4720}
4721
4722bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4723{
4724 dout(7) << "local_xlock_start on " << *lock
4725 << " on " << *lock->get_parent() << dendl;
4726
4727 assert(lock->get_parent()->is_auth());
4728 if (!lock->can_xlock_local()) {
4729 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4730 return false;
4731 }
4732
4733 lock->get_xlock(mut, mut->get_client());
4734 mut->xlocks.insert(lock);
4735 mut->locks.insert(lock);
4736 return true;
4737}
4738
4739void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4740{
4741 dout(7) << "local_xlock_finish on " << *lock
4742 << " on " << *lock->get_parent() << dendl;
4743 lock->put_xlock();
4744 mut->xlocks.erase(lock);
4745 mut->locks.erase(lock);
4746
4747 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4748 SimpleLock::WAIT_WR |
4749 SimpleLock::WAIT_RD);
4750}
4751
4752
4753
4754// ==========================================================================
4755// file lock
4756
4757
4758void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4759{
4760 CInode *in = static_cast<CInode*>(lock->get_parent());
4761 int loner_wanted, other_wanted;
4762 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4763 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4764 << " loner_wanted=" << gcap_string(loner_wanted)
4765 << " other_wanted=" << gcap_string(other_wanted)
4766 << " filelock=" << *lock << " on " << *lock->get_parent()
4767 << dendl;
4768
4769 assert(lock->get_parent()->is_auth());
4770 assert(lock->is_stable());
4771
4772 if (lock->get_parent()->is_freezing_or_frozen())
4773 return;
4774
4775 if (mdcache->is_readonly()) {
4776 if (lock->get_state() != LOCK_SYNC) {
4777 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4778 simple_sync(lock, need_issue);
4779 }
4780 return;
4781 }
4782
4783 // excl -> *?
4784 if (lock->get_state() == LOCK_EXCL) {
4785 dout(20) << " is excl" << dendl;
4786 int loner_issued, other_issued, xlocker_issued;
4787 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4788 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4789 << " other_issued=" << gcap_string(other_issued)
4790 << " xlocker_issued=" << gcap_string(xlocker_issued)
4791 << dendl;
4792 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4793 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4794 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4795 dout(20) << " should lose it" << dendl;
4796 // we should lose it.
4797 // loner other want
4798 // R R SYNC
4799 // R R|W MIX
4800 // R W MIX
4801 // R|W R MIX
4802 // R|W R|W MIX
4803 // R|W W MIX
4804 // W R MIX
4805 // W R|W MIX
4806 // W W MIX
4807 // -> any writer means MIX; RD doesn't matter.
4808 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4809 lock->is_waiter_for(SimpleLock::WAIT_WR))
4810 scatter_mix(lock, need_issue);
4811 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4812 simple_sync(lock, need_issue);
4813 else
4814 dout(10) << " waiting for wrlock to drain" << dendl;
4815 }
4816 }
4817
4818 // * -> excl?
4819 else if (lock->get_state() != LOCK_EXCL &&
4820 !lock->is_rdlocked() &&
4821 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4822 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4823 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4824 in->get_target_loner() >= 0) {
4825 dout(7) << "file_eval stable, bump to loner " << *lock
4826 << " on " << *lock->get_parent() << dendl;
4827 file_excl(lock, need_issue);
4828 }
4829
4830 // * -> mixed?
4831 else if (lock->get_state() != LOCK_MIX &&
4832 !lock->is_rdlocked() &&
4833 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4834 (lock->get_scatter_wanted() ||
b32b8144 4835 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
7c673cae
FG
4836 dout(7) << "file_eval stable, bump to mixed " << *lock
4837 << " on " << *lock->get_parent() << dendl;
4838 scatter_mix(lock, need_issue);
4839 }
4840
4841 // * -> sync?
4842 else if (lock->get_state() != LOCK_SYNC &&
4843 !lock->is_wrlocked() && // drain wrlocks first!
4844 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4845 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4846 !((lock->get_state() == LOCK_MIX) &&
4847 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4848 //((wanted & CEPH_CAP_RD) ||
4849 //in->is_replicated() ||
b32b8144 4850 //lock->is_leased() ||
7c673cae
FG
4851 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4852 ) {
4853 dout(7) << "file_eval stable, bump to sync " << *lock
4854 << " on " << *lock->get_parent() << dendl;
4855 simple_sync(lock, need_issue);
4856 }
4857}
4858
4859
4860
4861void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4862{
4863 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4864
4865 CInode *in = static_cast<CInode*>(lock->get_parent());
4866 assert(in->is_auth());
4867 assert(lock->is_stable());
4868
4869 if (lock->get_state() == LOCK_LOCK) {
4870 in->start_scatter(lock);
4871 if (in->is_replicated()) {
4872 // data
4873 bufferlist softdata;
4874 lock->encode_locked_state(softdata);
4875
4876 // bcast to replicas
4877 send_lock_message(lock, LOCK_AC_MIX, softdata);
4878 }
4879
4880 // change lock
4881 lock->set_state(LOCK_MIX);
4882 lock->clear_scatter_wanted();
4883 if (lock->get_cap_shift()) {
4884 if (need_issue)
4885 *need_issue = true;
4886 else
4887 issue_caps(in);
4888 }
4889 } else {
4890 // gather?
4891 switch (lock->get_state()) {
4892 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
7c673cae 4893 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
b32b8144 4894 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
7c673cae
FG
4895 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4896 default: ceph_abort();
4897 }
4898
4899 int gather = 0;
4900 if (lock->is_rdlocked())
4901 gather++;
4902 if (in->is_replicated()) {
b32b8144 4903 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
7c673cae
FG
4904 send_lock_message(lock, LOCK_AC_MIX);
4905 lock->init_gather();
4906 gather++;
4907 }
4908 }
4909 if (lock->is_leased()) {
4910 revoke_client_leases(lock);
4911 gather++;
4912 }
4913 if (lock->get_cap_shift() &&
4914 in->is_head() &&
4915 in->issued_caps_need_gather(lock)) {
4916 if (need_issue)
4917 *need_issue = true;
4918 else
4919 issue_caps(in);
4920 gather++;
4921 }
4922 bool need_recover = false;
4923 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4924 mds->mdcache->queue_file_recover(in);
4925 need_recover = true;
4926 gather++;
4927 }
4928
4929 if (gather) {
4930 lock->get_parent()->auth_pin(lock);
4931 if (need_recover)
4932 mds->mdcache->do_file_recover();
4933 } else {
4934 in->start_scatter(lock);
4935 lock->set_state(LOCK_MIX);
4936 lock->clear_scatter_wanted();
4937 if (in->is_replicated()) {
4938 bufferlist softdata;
4939 lock->encode_locked_state(softdata);
4940 send_lock_message(lock, LOCK_AC_MIX, softdata);
4941 }
4942 if (lock->get_cap_shift()) {
4943 if (need_issue)
4944 *need_issue = true;
4945 else
4946 issue_caps(in);
4947 }
4948 }
4949 }
4950}
4951
4952
4953void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4954{
4955 CInode *in = static_cast<CInode*>(lock->get_parent());
4956 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4957
4958 assert(in->is_auth());
4959 assert(lock->is_stable());
4960
4961 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4962 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4963
4964 switch (lock->get_state()) {
4965 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4966 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4967 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4968 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4969 default: ceph_abort();
4970 }
4971 int gather = 0;
4972
4973 if (lock->is_rdlocked())
4974 gather++;
4975 if (lock->is_wrlocked())
4976 gather++;
4977
4978 if (in->is_replicated() &&
4979 lock->get_state() != LOCK_LOCK_EXCL &&
4980 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4981 send_lock_message(lock, LOCK_AC_LOCK);
4982 lock->init_gather();
4983 gather++;
4984 }
4985 if (lock->is_leased()) {
4986 revoke_client_leases(lock);
4987 gather++;
4988 }
4989 if (in->is_head() &&
4990 in->issued_caps_need_gather(lock)) {
4991 if (need_issue)
4992 *need_issue = true;
4993 else
4994 issue_caps(in);
4995 gather++;
4996 }
4997 bool need_recover = false;
4998 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4999 mds->mdcache->queue_file_recover(in);
5000 need_recover = true;
5001 gather++;
5002 }
5003
5004 if (gather) {
5005 lock->get_parent()->auth_pin(lock);
5006 if (need_recover)
5007 mds->mdcache->do_file_recover();
5008 } else {
5009 lock->set_state(LOCK_EXCL);
5010 if (need_issue)
5011 *need_issue = true;
5012 else
5013 issue_caps(in);
5014 }
5015}
5016
5017void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5018{
5019 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5020 CInode *in = static_cast<CInode *>(lock->get_parent());
5021 assert(in->is_auth());
5022 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5023
5024 switch (lock->get_state()) {
5025 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5026 default: ceph_abort();
5027 }
5028
5029 int gather = 0;
5030 if (lock->is_wrlocked())
5031 gather++;
5032
5033 if (in->is_head() &&
5034 in->issued_caps_need_gather(lock)) {
5035 if (need_issue)
5036 *need_issue = true;
5037 else
5038 issue_caps(in);
5039 gather++;
5040 }
5041
5042 if (gather) {
5043 lock->get_parent()->auth_pin(lock);
5044 } else {
5045 lock->set_state(LOCK_XSYN);
5046 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5047 if (need_issue)
5048 *need_issue = true;
5049 else
5050 issue_caps(in);
5051 }
5052}
5053
5054void Locker::file_recover(ScatterLock *lock)
5055{
5056 CInode *in = static_cast<CInode *>(lock->get_parent());
5057 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5058
5059 assert(in->is_auth());
5060 //assert(lock->is_stable());
5061 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5062
5063 int gather = 0;
5064
5065 /*
5066 if (in->is_replicated()
5067 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5068 send_lock_message(lock, LOCK_AC_LOCK);
5069 lock->init_gather();
5070 gather++;
5071 }
5072 */
5073 if (in->is_head() &&
5074 in->issued_caps_need_gather(lock)) {
5075 issue_caps(in);
5076 gather++;
5077 }
5078
5079 lock->set_state(LOCK_SCAN);
5080 if (gather)
5081 in->state_set(CInode::STATE_NEEDSRECOVER);
5082 else
5083 mds->mdcache->queue_file_recover(in);
5084}
5085
5086
5087// messenger
5088/* This function DOES put the passed message before returning */
5089void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5090{
5091 CInode *in = static_cast<CInode*>(lock->get_parent());
5092 int from = m->get_asker();
5093
5094 if (mds->is_rejoin()) {
5095 if (in->is_rejoining()) {
5096 dout(7) << "handle_file_lock still rejoining " << *in
5097 << ", dropping " << *m << dendl;
5098 m->put();
5099 return;
5100 }
5101 }
5102
5103 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5104 << " on " << *lock
5105 << " from mds." << from << " "
5106 << *in << dendl;
5107
5108 bool caps = lock->get_cap_shift();
5109
5110 switch (m->get_action()) {
5111 // -- replica --
5112 case LOCK_AC_SYNC:
5113 assert(lock->get_state() == LOCK_LOCK ||
5114 lock->get_state() == LOCK_MIX ||
5115 lock->get_state() == LOCK_MIX_SYNC2);
5116
5117 if (lock->get_state() == LOCK_MIX) {
5118 lock->set_state(LOCK_MIX_SYNC);
5119 eval_gather(lock, true);
31f18b77
FG
5120 if (lock->is_unstable_and_locked())
5121 mds->mdlog->flush();
7c673cae
FG
5122 break;
5123 }
5124
5125 (static_cast<ScatterLock *>(lock))->finish_flush();
5126 (static_cast<ScatterLock *>(lock))->clear_flushed();
5127
5128 // ok
5129 lock->decode_locked_state(m->get_data());
5130 lock->set_state(LOCK_SYNC);
5131
5132 lock->get_rdlock();
5133 if (caps)
5134 issue_caps(in);
5135 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5136 lock->put_rdlock();
5137 break;
5138
5139 case LOCK_AC_LOCK:
5140 switch (lock->get_state()) {
5141 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5142 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5143 default: ceph_abort();
5144 }
5145
5146 eval_gather(lock, true);
31f18b77
FG
5147 if (lock->is_unstable_and_locked())
5148 mds->mdlog->flush();
5149
7c673cae
FG
5150 break;
5151
5152 case LOCK_AC_LOCKFLUSHED:
5153 (static_cast<ScatterLock *>(lock))->finish_flush();
5154 (static_cast<ScatterLock *>(lock))->clear_flushed();
5155 // wake up scatter_nudge waiters
5156 if (lock->is_stable())
5157 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5158 break;
5159
5160 case LOCK_AC_MIX:
5161 assert(lock->get_state() == LOCK_SYNC ||
5162 lock->get_state() == LOCK_LOCK ||
5163 lock->get_state() == LOCK_SYNC_MIX2);
5164
5165 if (lock->get_state() == LOCK_SYNC) {
5166 // MIXED
5167 lock->set_state(LOCK_SYNC_MIX);
5168 eval_gather(lock, true);
31f18b77
FG
5169 if (lock->is_unstable_and_locked())
5170 mds->mdlog->flush();
7c673cae
FG
5171 break;
5172 }
c07f9fc5 5173
7c673cae 5174 // ok
7c673cae 5175 lock->set_state(LOCK_MIX);
c07f9fc5 5176 lock->decode_locked_state(m->get_data());
7c673cae
FG
5177
5178 if (caps)
5179 issue_caps(in);
5180
5181 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5182 break;
5183
5184
5185 // -- auth --
5186 case LOCK_AC_LOCKACK:
5187 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5188 lock->get_state() == LOCK_MIX_LOCK ||
5189 lock->get_state() == LOCK_MIX_LOCK2 ||
5190 lock->get_state() == LOCK_MIX_EXCL ||
5191 lock->get_state() == LOCK_SYNC_EXCL ||
5192 lock->get_state() == LOCK_SYNC_MIX ||
5193 lock->get_state() == LOCK_MIX_TSYN);
5194 assert(lock->is_gathering(from));
5195 lock->remove_gather(from);
5196
5197 if (lock->get_state() == LOCK_MIX_LOCK ||
5198 lock->get_state() == LOCK_MIX_LOCK2 ||
5199 lock->get_state() == LOCK_MIX_EXCL ||
5200 lock->get_state() == LOCK_MIX_TSYN) {
5201 lock->decode_locked_state(m->get_data());
5202 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5203 // delay calling scatter_writebehind().
5204 lock->clear_flushed();
5205 }
5206
5207 if (lock->is_gathering()) {
5208 dout(7) << "handle_file_lock " << *in << " from " << from
5209 << ", still gathering " << lock->get_gather_set() << dendl;
5210 } else {
5211 dout(7) << "handle_file_lock " << *in << " from " << from
5212 << ", last one" << dendl;
5213 eval_gather(lock);
5214 }
5215 break;
5216
5217 case LOCK_AC_SYNCACK:
5218 assert(lock->get_state() == LOCK_MIX_SYNC);
5219 assert(lock->is_gathering(from));
5220 lock->remove_gather(from);
5221
5222 lock->decode_locked_state(m->get_data());
5223
5224 if (lock->is_gathering()) {
5225 dout(7) << "handle_file_lock " << *in << " from " << from
5226 << ", still gathering " << lock->get_gather_set() << dendl;
5227 } else {
5228 dout(7) << "handle_file_lock " << *in << " from " << from
5229 << ", last one" << dendl;
5230 eval_gather(lock);
5231 }
5232 break;
5233
5234 case LOCK_AC_MIXACK:
5235 assert(lock->get_state() == LOCK_SYNC_MIX);
5236 assert(lock->is_gathering(from));
5237 lock->remove_gather(from);
5238
5239 if (lock->is_gathering()) {
5240 dout(7) << "handle_file_lock " << *in << " from " << from
5241 << ", still gathering " << lock->get_gather_set() << dendl;
5242 } else {
5243 dout(7) << "handle_file_lock " << *in << " from " << from
5244 << ", last one" << dendl;
5245 eval_gather(lock);
5246 }
5247 break;
5248
5249
5250 // requests....
5251 case LOCK_AC_REQSCATTER:
5252 if (lock->is_stable()) {
5253 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5254 * because the replica should be holding an auth_pin if they're
5255 * doing this (and thus, we are freezing, not frozen, and indefinite
5256 * starvation isn't an issue).
5257 */
5258 dout(7) << "handle_file_lock got scatter request on " << *lock
5259 << " on " << *lock->get_parent() << dendl;
5260 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5261 scatter_mix(lock);
5262 } else {
5263 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5264 << " on " << *lock->get_parent() << dendl;
5265 lock->set_scatter_wanted();
5266 }
5267 break;
5268
5269 case LOCK_AC_REQUNSCATTER:
5270 if (lock->is_stable()) {
5271 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5272 * because the replica should be holding an auth_pin if they're
5273 * doing this (and thus, we are freezing, not frozen, and indefinite
5274 * starvation isn't an issue).
5275 */
5276 dout(7) << "handle_file_lock got unscatter request on " << *lock
5277 << " on " << *lock->get_parent() << dendl;
5278 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5279 simple_lock(lock); // FIXME tempsync?
5280 } else {
5281 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5282 << " on " << *lock->get_parent() << dendl;
5283 lock->set_unscatter_wanted();
5284 }
5285 break;
5286
5287 case LOCK_AC_REQRDLOCK:
5288 handle_reqrdlock(lock, m);
5289 break;
5290
5291 case LOCK_AC_NUDGE:
5292 if (!lock->get_parent()->is_auth()) {
5293 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5294 << " on " << *lock->get_parent() << dendl;
5295 } else if (!lock->get_parent()->is_replicated()) {
5296 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5297 << " on " << *lock->get_parent() << dendl;
5298 } else {
5299 dout(7) << "handle_file_lock trying nudge on " << *lock
5300 << " on " << *lock->get_parent() << dendl;
5301 scatter_nudge(lock, 0, true);
5302 mds->mdlog->flush();
5303 }
5304 break;
5305
5306 default:
5307 ceph_abort();
5308 }
5309
5310 m->put();
5311}
5312
5313
5314
5315
5316
5317