]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Locker.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mds / Locker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
94b18763 15#include <boost/utility/string_view.hpp>
7c673cae
FG
16
17#include "MDSRank.h"
18#include "MDCache.h"
19#include "Locker.h"
28e407b8 20#include "MDBalancer.h"
7c673cae
FG
21#include "CInode.h"
22#include "CDir.h"
23#include "CDentry.h"
24#include "Mutation.h"
25#include "MDSContext.h"
26
27#include "MDLog.h"
28#include "MDSMap.h"
29
30#include "events/EUpdate.h"
31#include "events/EOpen.h"
32
33#include "msg/Messenger.h"
34#include "osdc/Objecter.h"
35
36#include "messages/MInodeFileCaps.h"
37#include "messages/MLock.h"
38#include "messages/MClientLease.h"
39#include "messages/MClientReply.h"
40#include "messages/MClientCaps.h"
41#include "messages/MClientCapRelease.h"
42
43#include "messages/MMDSSlaveRequest.h"
44
45#include <errno.h>
46
47#include "common/config.h"
48
49
50#define dout_subsys ceph_subsys_mds
51#undef dout_prefix
52#define dout_context g_ceph_context
53#define dout_prefix _prefix(_dout, mds)
54static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
55 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
56}
57
58
59class LockerContext : public MDSInternalContextBase {
60protected:
61 Locker *locker;
62 MDSRank *get_mds() override
63 {
64 return locker->mds;
65 }
66
67public:
68 explicit LockerContext(Locker *locker_) : locker(locker_) {
69 assert(locker != NULL);
70 }
71};
72
73class LockerLogContext : public MDSLogContextBase {
74protected:
75 Locker *locker;
76 MDSRank *get_mds() override
77 {
78 return locker->mds;
79 }
80
81public:
82 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
83 assert(locker != NULL);
84 }
85};
86
87/* This function DOES put the passed message before returning */
88void Locker::dispatch(Message *m)
89{
90
91 switch (m->get_type()) {
92
93 // inter-mds locking
94 case MSG_MDS_LOCK:
95 handle_lock(static_cast<MLock*>(m));
96 break;
97 // inter-mds caps
98 case MSG_MDS_INODEFILECAPS:
99 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
100 break;
101
102 // client sync
103 case CEPH_MSG_CLIENT_CAPS:
104 handle_client_caps(static_cast<MClientCaps*>(m));
105
106 break;
107 case CEPH_MSG_CLIENT_CAPRELEASE:
108 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
109 break;
110 case CEPH_MSG_CLIENT_LEASE:
111 handle_client_lease(static_cast<MClientLease*>(m));
112 break;
113
114 default:
115 derr << "locker unknown message " << m->get_type() << dendl;
116 assert(0 == "locker unknown message");
117 }
118}
119
120void Locker::tick()
121{
122 scatter_tick();
123 caps_tick();
124}
125
126/*
127 * locks vs rejoin
128 *
129 *
130 *
131 */
132
133void Locker::send_lock_message(SimpleLock *lock, int msg)
134{
181888fb 135 for (const auto &it : lock->get_parent()->get_replicas()) {
7c673cae 136 if (mds->is_cluster_degraded() &&
181888fb 137 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
7c673cae
FG
138 continue;
139 MLock *m = new MLock(lock, msg, mds->get_nodeid());
181888fb 140 mds->send_message_mds(m, it.first);
7c673cae
FG
141 }
142}
143
144void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145{
181888fb 146 for (const auto &it : lock->get_parent()->get_replicas()) {
7c673cae 147 if (mds->is_cluster_degraded() &&
181888fb 148 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
7c673cae
FG
149 continue;
150 MLock *m = new MLock(lock, msg, mds->get_nodeid());
151 m->set_data(data);
181888fb 152 mds->send_message_mds(m, it.first);
7c673cae
FG
153 }
154}
155
156
157
158
159void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
160{
161 // rdlock ancestor snaps
162 CInode *t = in;
163 rdlocks.insert(&in->snaplock);
164 while (t->get_projected_parent_dn()) {
165 t = t->get_projected_parent_dn()->get_dir()->get_inode();
166 rdlocks.insert(&t->snaplock);
167 }
168}
169
170void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
171 file_layout_t **layout)
172{
173 //rdlock ancestor snaps
174 CInode *t = in;
175 rdlocks.insert(&in->snaplock);
176 rdlocks.insert(&in->policylock);
177 bool found_layout = false;
178 while (t) {
179 rdlocks.insert(&t->snaplock);
180 if (!found_layout) {
181 rdlocks.insert(&t->policylock);
182 if (t->get_projected_inode()->has_layout()) {
183 *layout = &t->get_projected_inode()->layout;
184 found_layout = true;
185 }
186 }
187 if (t->get_projected_parent_dn() &&
188 t->get_projected_parent_dn()->get_dir())
189 t = t->get_projected_parent_dn()->get_dir()->get_inode();
190 else t = NULL;
191 }
192}
193
194struct MarkEventOnDestruct {
195 MDRequestRef& mdr;
196 const char* message;
197 bool mark_event;
198 MarkEventOnDestruct(MDRequestRef& _mdr,
199 const char *_message) : mdr(_mdr),
200 message(_message),
201 mark_event(true) {}
202 ~MarkEventOnDestruct() {
203 if (mark_event)
204 mdr->mark_event(message);
205 }
206};
207
208/* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210bool Locker::acquire_locks(MDRequestRef& mdr,
211 set<SimpleLock*> &rdlocks,
212 set<SimpleLock*> &wrlocks,
213 set<SimpleLock*> &xlocks,
214 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
215 CInode *auth_pin_freeze,
216 bool auth_pin_nonblock)
217{
218 if (mdr->done_locking &&
219 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
220 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
221 return true; // at least we had better be!
222 }
223 dout(10) << "acquire_locks " << *mdr << dendl;
224
225 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
226
227 client_t client = mdr->get_client();
228
229 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
230 set<MDSCacheObject*> mustpin; // items to authpin
231
232 // xlocks
233 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
b32b8144
FG
234 SimpleLock *lock = *p;
235
236 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
237 lock->get_type() == CEPH_LOCK_IPOLICY) &&
238 mds->is_cluster_degraded() &&
239 mdr->is_master() &&
240 !mdr->is_queued_for_replay()) {
241 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
242 // get processed in proper order.
243 bool wait = false;
244 if (lock->get_parent()->is_auth()) {
245 if (!mdr->locks.count(lock)) {
246 set<mds_rank_t> ls;
247 lock->get_parent()->list_replicas(ls);
248 for (auto m : ls) {
249 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
250 wait = true;
251 break;
252 }
253 }
254 }
255 } else {
256 // if the lock is the latest locked one, it's possible that slave mds got the lock
257 // while there are recovering mds.
258 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
259 wait = true;
260 }
261 if (wait) {
262 dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
263 << ", waiting for cluster recovered" << dendl;
264 mds->locker->drop_locks(mdr.get(), NULL);
265 mdr->drop_local_auth_pins();
266 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
267 return false;
268 }
269 }
270
271 dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
272
273 sorted.insert(lock);
274 mustpin.insert(lock->get_parent());
7c673cae
FG
275
276 // augment xlock with a versionlock?
277 if ((*p)->get_type() == CEPH_LOCK_DN) {
b32b8144 278 CDentry *dn = (CDentry*)lock->get_parent();
7c673cae
FG
279 if (!dn->is_auth())
280 continue;
281
282 if (xlocks.count(&dn->versionlock))
283 continue; // we're xlocking the versionlock too; don't wrlock it!
284
285 if (mdr->is_master()) {
286 // master. wrlock versionlock so we can pipeline dentry updates to journal.
287 wrlocks.insert(&dn->versionlock);
288 } else {
289 // slave. exclusively lock the dentry version (i.e. block other journal updates).
290 // this makes rollback safe.
291 xlocks.insert(&dn->versionlock);
292 sorted.insert(&dn->versionlock);
293 }
294 }
b32b8144 295 if (lock->get_type() > CEPH_LOCK_IVERSION) {
7c673cae 296 // inode version lock?
b32b8144 297 CInode *in = (CInode*)lock->get_parent();
7c673cae
FG
298 if (!in->is_auth())
299 continue;
300 if (mdr->is_master()) {
301 // master. wrlock versionlock so we can pipeline inode updates to journal.
302 wrlocks.insert(&in->versionlock);
303 } else {
304 // slave. exclusively lock the inode version (i.e. block other journal updates).
305 // this makes rollback safe.
306 xlocks.insert(&in->versionlock);
307 sorted.insert(&in->versionlock);
308 }
309 }
310 }
311
312 // wrlocks
313 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
314 MDSCacheObject *object = (*p)->get_parent();
315 dout(20) << " must wrlock " << **p << " " << *object << dendl;
316 sorted.insert(*p);
317 if (object->is_auth())
318 mustpin.insert(object);
319 else if (!object->is_auth() &&
320 !(*p)->can_wrlock(client) && // we might have to request a scatter
321 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
322 dout(15) << " will also auth_pin " << *object
323 << " in case we need to request a scatter" << dendl;
324 mustpin.insert(object);
325 }
326 }
327
328 // remote_wrlocks
329 if (remote_wrlocks) {
330 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
331 MDSCacheObject *object = p->first->get_parent();
332 dout(20) << " must remote_wrlock on mds." << p->second << " "
333 << *p->first << " " << *object << dendl;
334 sorted.insert(p->first);
335 mustpin.insert(object);
336 }
337 }
338
339 // rdlocks
340 for (set<SimpleLock*>::iterator p = rdlocks.begin();
341 p != rdlocks.end();
342 ++p) {
343 MDSCacheObject *object = (*p)->get_parent();
344 dout(20) << " must rdlock " << **p << " " << *object << dendl;
345 sorted.insert(*p);
346 if (object->is_auth())
347 mustpin.insert(object);
348 else if (!object->is_auth() &&
349 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
350 dout(15) << " will also auth_pin " << *object
351 << " in case we need to request a rdlock" << dendl;
352 mustpin.insert(object);
353 }
354 }
355
356
357 // AUTH PINS
358 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
359
360 // can i auth pin them all now?
361 marker.message = "failed to authpin local pins";
362 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
363 p != mustpin.end();
364 ++p) {
365 MDSCacheObject *object = *p;
366
367 dout(10) << " must authpin " << *object << dendl;
368
369 if (mdr->is_auth_pinned(object)) {
370 if (object != (MDSCacheObject*)auth_pin_freeze)
371 continue;
372 if (mdr->more()->is_remote_frozen_authpin) {
373 if (mdr->more()->rename_inode == auth_pin_freeze)
374 continue;
375 // unfreeze auth pin for the wrong inode
376 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
377 }
378 }
379
380 if (!object->is_auth()) {
381 if (!mdr->locks.empty())
382 drop_locks(mdr.get());
383 if (object->is_ambiguous_auth()) {
384 // wait
385 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
386 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
387 mdr->drop_local_auth_pins();
388 return false;
389 }
390 mustpin_remote[object->authority().first].insert(object);
391 continue;
392 }
393 if (!object->can_auth_pin()) {
394 // wait
395 drop_locks(mdr.get());
396 mdr->drop_local_auth_pins();
397 if (auth_pin_nonblock) {
398 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
399 mdr->aborted = true;
400 return false;
401 }
402 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
403 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
224ce89b
WB
404
405 if (!mdr->remote_auth_pins.empty())
406 notify_freeze_waiter(object);
407
7c673cae
FG
408 return false;
409 }
410 }
411
412 // ok, grab local auth pins
413 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
414 p != mustpin.end();
415 ++p) {
416 MDSCacheObject *object = *p;
417 if (mdr->is_auth_pinned(object)) {
418 dout(10) << " already auth_pinned " << *object << dendl;
419 } else if (object->is_auth()) {
420 dout(10) << " auth_pinning " << *object << dendl;
421 mdr->auth_pin(object);
422 }
423 }
424
425 // request remote auth_pins
426 if (!mustpin_remote.empty()) {
427 marker.message = "requesting remote authpins";
428 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
429 p != mdr->remote_auth_pins.end();
430 ++p) {
431 if (mustpin.count(p->first)) {
432 assert(p->second == p->first->authority().first);
433 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
434 if (q != mustpin_remote.end())
435 q->second.insert(p->first);
436 }
437 }
438 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
439 p != mustpin_remote.end();
440 ++p) {
441 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
442
443 // wait for active auth
444 if (mds->is_cluster_degraded() &&
445 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
446 dout(10) << " mds." << p->first << " is not active" << dendl;
447 if (mdr->more()->waiting_on_slave.empty())
448 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
449 return false;
450 }
451
452 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
453 MMDSSlaveRequest::OP_AUTHPIN);
454 for (set<MDSCacheObject*>::iterator q = p->second.begin();
455 q != p->second.end();
456 ++q) {
457 dout(10) << " req remote auth_pin of " << **q << dendl;
458 MDSCacheObjectInfo info;
459 (*q)->set_object_info(info);
460 req->get_authpins().push_back(info);
461 if (*q == auth_pin_freeze)
462 (*q)->set_object_info(req->get_authpin_freeze());
463 mdr->pin(*q);
464 }
465 if (auth_pin_nonblock)
466 req->mark_nonblock();
467 mds->send_message_mds(req, p->first);
468
469 // put in waiting list
470 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
471 mdr->more()->waiting_on_slave.insert(p->first);
472 }
473 return false;
474 }
475
476 // caps i'll need to issue
477 set<CInode*> issue_set;
478 bool result = false;
479
480 // acquire locks.
481 // make sure they match currently acquired locks.
482 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
483 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
484 p != sorted.end();
485 ++p) {
486 bool need_wrlock = !!wrlocks.count(*p);
487 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
488
489 // already locked?
490 if (existing != mdr->locks.end() && *existing == *p) {
491 // right kind?
492 SimpleLock *have = *existing;
493 ++existing;
494 if (xlocks.count(have) && mdr->xlocks.count(have)) {
495 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
496 continue;
497 }
498 if (mdr->remote_wrlocks.count(have)) {
499 if (!need_remote_wrlock ||
500 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
501 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
502 << " " << *have << " " << *have->get_parent() << dendl;
503 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
504 }
505 }
506 if (need_wrlock || need_remote_wrlock) {
507 if (need_wrlock == !!mdr->wrlocks.count(have) &&
508 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
509 if (need_wrlock)
510 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
511 if (need_remote_wrlock)
512 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
513 continue;
514 }
515 }
516 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
517 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
518 continue;
519 }
520 }
521
522 // hose any stray locks
523 if (existing != mdr->locks.end() && *existing == *p) {
524 assert(need_wrlock || need_remote_wrlock);
525 SimpleLock *lock = *existing;
526 if (mdr->wrlocks.count(lock)) {
527 if (!need_wrlock)
528 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
529 else if (need_remote_wrlock) // acquire remote_wrlock first
530 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
531 bool need_issue = false;
532 wrlock_finish(lock, mdr.get(), &need_issue);
533 if (need_issue)
534 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
535 }
536 ++existing;
537 }
538 while (existing != mdr->locks.end()) {
539 SimpleLock *stray = *existing;
540 ++existing;
541 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
542 bool need_issue = false;
543 if (mdr->xlocks.count(stray)) {
544 xlock_finish(stray, mdr.get(), &need_issue);
545 } else if (mdr->rdlocks.count(stray)) {
546 rdlock_finish(stray, mdr.get(), &need_issue);
547 } else {
548 // may have acquired both wrlock and remore wrlock
549 if (mdr->wrlocks.count(stray))
550 wrlock_finish(stray, mdr.get(), &need_issue);
551 if (mdr->remote_wrlocks.count(stray))
552 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
553 }
554 if (need_issue)
555 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
556 }
557
558 // lock
559 if (mdr->locking && *p != mdr->locking) {
560 cancel_locking(mdr.get(), &issue_set);
561 }
562 if (xlocks.count(*p)) {
563 marker.message = "failed to xlock, waiting";
564 if (!xlock_start(*p, mdr))
565 goto out;
566 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
567 } else if (need_wrlock || need_remote_wrlock) {
568 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
569 marker.message = "waiting for remote wrlocks";
570 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
571 goto out;
572 }
573 if (need_wrlock && !mdr->wrlocks.count(*p)) {
574 marker.message = "failed to wrlock, waiting";
575 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
576 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
577 // can't take the wrlock because the scatter lock is gathering. need to
578 // release the remote wrlock, so that the gathering process can finish.
579 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
580 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
581 goto out;
582 }
583 // nowait if we have already gotten remote wrlock
584 if (!wrlock_start(*p, mdr, need_remote_wrlock))
585 goto out;
586 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
587 }
588 } else {
589 assert(mdr->is_master());
b32b8144
FG
590 if ((*p)->needs_recover()) {
591 if (mds->is_cluster_degraded()) {
592 if (!mdr->is_queued_for_replay()) {
593 // see comments in SimpleLock::set_state_rejoin() and
594 // ScatterLock::encode_state_for_rejoin()
595 drop_locks(mdr.get());
596 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
597 dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
598 << ", waiting for cluster recovered" << dendl;
599 marker.message = "rejoin recovering lock, waiting for cluster recovered";
600 return false;
7c673cae 601 }
b32b8144
FG
602 } else {
603 (*p)->clear_need_recover();
7c673cae
FG
604 }
605 }
606
607 marker.message = "failed to rdlock, waiting";
608 if (!rdlock_start(*p, mdr))
609 goto out;
610 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
611 }
612 }
613
614 // any extra unneeded locks?
615 while (existing != mdr->locks.end()) {
616 SimpleLock *stray = *existing;
617 ++existing;
618 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
619 bool need_issue = false;
620 if (mdr->xlocks.count(stray)) {
621 xlock_finish(stray, mdr.get(), &need_issue);
622 } else if (mdr->rdlocks.count(stray)) {
623 rdlock_finish(stray, mdr.get(), &need_issue);
624 } else {
625 // may have acquired both wrlock and remore wrlock
626 if (mdr->wrlocks.count(stray))
627 wrlock_finish(stray, mdr.get(), &need_issue);
628 if (mdr->remote_wrlocks.count(stray))
629 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
630 }
631 if (need_issue)
632 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
633 }
634
635 mdr->done_locking = true;
636 mdr->set_mds_stamp(ceph_clock_now());
637 result = true;
638 marker.message = "acquired locks";
639
640 out:
641 issue_caps_set(issue_set);
642 return result;
643}
644
224ce89b
WB
645void Locker::notify_freeze_waiter(MDSCacheObject *o)
646{
647 CDir *dir = NULL;
648 if (CInode *in = dynamic_cast<CInode*>(o)) {
649 if (!in->is_root())
650 dir = in->get_parent_dir();
651 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
652 dir = dn->get_dir();
653 } else {
654 dir = dynamic_cast<CDir*>(o);
655 assert(dir);
656 }
657 if (dir) {
658 if (dir->is_freezing_dir())
659 mdcache->fragment_freeze_inc_num_waiters(dir);
660 if (dir->is_freezing_tree()) {
661 while (!dir->is_freezing_tree_root())
662 dir = dir->get_parent_dir();
663 mdcache->migrator->export_freeze_inc_num_waiters(dir);
664 }
665 }
666}
7c673cae
FG
667
668void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
669{
670 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
671 p != mut->xlocks.end();
672 ++p) {
673 MDSCacheObject *object = (*p)->get_parent();
674 assert(object->is_auth());
675 if (skip_dentry &&
676 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
677 continue;
678 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
679 (*p)->set_xlock_done();
680 }
681}
682
683void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
684{
685 while (!mut->rdlocks.empty()) {
686 bool ni = false;
687 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
688 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
689 if (ni)
690 pneed_issue->insert(static_cast<CInode*>(p));
691 }
692}
693
694void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
695{
696 set<mds_rank_t> slaves;
697
698 while (!mut->xlocks.empty()) {
699 SimpleLock *lock = *mut->xlocks.begin();
700 MDSCacheObject *p = lock->get_parent();
701 if (!p->is_auth()) {
702 assert(lock->get_sm()->can_remote_xlock);
703 slaves.insert(p->authority().first);
704 lock->put_xlock();
705 mut->locks.erase(lock);
706 mut->xlocks.erase(lock);
707 continue;
708 }
709 bool ni = false;
710 xlock_finish(lock, mut, &ni);
711 if (ni)
712 pneed_issue->insert(static_cast<CInode*>(p));
713 }
714
715 while (!mut->remote_wrlocks.empty()) {
716 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
717 slaves.insert(p->second);
718 if (mut->wrlocks.count(p->first) == 0)
719 mut->locks.erase(p->first);
720 mut->remote_wrlocks.erase(p);
721 }
722
723 while (!mut->wrlocks.empty()) {
724 bool ni = false;
725 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
726 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
727 if (ni)
728 pneed_issue->insert(static_cast<CInode*>(p));
729 }
730
731 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
732 if (!mds->is_cluster_degraded() ||
733 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
734 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
735 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
736 MMDSSlaveRequest::OP_DROPLOCKS);
737 mds->send_message_mds(slavereq, *p);
738 }
739 }
740}
741
742void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
743{
744 SimpleLock *lock = mut->locking;
745 assert(lock);
746 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
747
748 if (lock->get_parent()->is_auth()) {
749 bool need_issue = false;
750 if (lock->get_state() == LOCK_PREXLOCK) {
751 _finish_xlock(lock, -1, &need_issue);
752 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
753 lock->get_num_xlocks() == 0) {
754 lock->set_state(LOCK_XLOCKDONE);
755 eval_gather(lock, true, &need_issue);
756 }
757 if (need_issue)
758 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
759 }
760 mut->finish_locking(lock);
761}
762
763void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
764{
765 // leftover locks
766 set<CInode*> my_need_issue;
767 if (!pneed_issue)
768 pneed_issue = &my_need_issue;
769
770 if (mut->locking)
771 cancel_locking(mut, pneed_issue);
772 _drop_non_rdlocks(mut, pneed_issue);
773 _drop_rdlocks(mut, pneed_issue);
774
775 if (pneed_issue == &my_need_issue)
776 issue_caps_set(*pneed_issue);
777 mut->done_locking = false;
778}
779
780void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
781{
782 set<CInode*> my_need_issue;
783 if (!pneed_issue)
784 pneed_issue = &my_need_issue;
785
786 _drop_non_rdlocks(mut, pneed_issue);
787
788 if (pneed_issue == &my_need_issue)
789 issue_caps_set(*pneed_issue);
790}
791
b32b8144 792void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
7c673cae 793{
b32b8144 794 set<CInode*> need_issue;
7c673cae 795
b32b8144
FG
796 for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
797 SimpleLock *lock = *p;
798 ++p;
799 // make later mksnap/setlayout (at other mds) wait for this unsafe request
800 if (lock->get_type() == CEPH_LOCK_ISNAP ||
801 lock->get_type() == CEPH_LOCK_IPOLICY)
802 continue;
803 bool ni = false;
804 rdlock_finish(lock, mut, &ni);
805 if (ni)
806 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
807 }
7c673cae 808
b32b8144 809 issue_caps_set(need_issue);
7c673cae
FG
810}
811
7c673cae
FG
812// generics
813
814void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
815{
816 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
817 assert(!lock->is_stable());
818
819 int next = lock->get_next_state();
820
821 CInode *in = 0;
822 bool caps = lock->get_cap_shift();
823 if (lock->get_type() != CEPH_LOCK_DN)
824 in = static_cast<CInode *>(lock->get_parent());
825
826 bool need_issue = false;
827
828 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
829 assert(!caps || in != NULL);
830 if (caps && in->is_head()) {
831 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
832 lock->get_cap_shift(), lock->get_cap_mask());
833 dout(10) << " next state is " << lock->get_state_name(next)
834 << " issued/allows loner " << gcap_string(loner_issued)
835 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
836 << " xlocker " << gcap_string(xlocker_issued)
837 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
838 << " other " << gcap_string(other_issued)
839 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
840 << dendl;
841
842 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
843 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
844 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
845 need_issue = true;
846 }
847
848#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
849 bool auth = lock->get_parent()->is_auth();
850 if (!lock->is_gathering() &&
851 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
853 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
854 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
855 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
856 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
857 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
858 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
859 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
860 lock->get_state() != LOCK_MIX_SYNC2
861 ) {
862 dout(7) << "eval_gather finished gather on " << *lock
863 << " on " << *lock->get_parent() << dendl;
864
865 if (lock->get_sm() == &sm_filelock) {
866 assert(in);
867 if (in->state_test(CInode::STATE_RECOVERING)) {
868 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
869 return;
870 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
871 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
872 mds->mdcache->queue_file_recover(in);
873 mds->mdcache->do_file_recover();
874 return;
875 }
876 }
877
878 if (!lock->get_parent()->is_auth()) {
879 // replica: tell auth
880 mds_rank_t auth = lock->get_parent()->authority().first;
881
882 if (lock->get_parent()->is_rejoining() &&
883 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
884 dout(7) << "eval_gather finished gather, but still rejoining "
885 << *lock->get_parent() << dendl;
886 return;
887 }
888
889 if (!mds->is_cluster_degraded() ||
890 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
891 switch (lock->get_state()) {
892 case LOCK_SYNC_LOCK:
893 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
894 auth);
895 break;
896
897 case LOCK_MIX_SYNC:
898 {
899 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
900 lock->encode_locked_state(reply->get_data());
901 mds->send_message_mds(reply, auth);
902 next = LOCK_MIX_SYNC2;
903 (static_cast<ScatterLock *>(lock))->start_flush();
904 }
905 break;
906
907 case LOCK_MIX_SYNC2:
908 (static_cast<ScatterLock *>(lock))->finish_flush();
909 (static_cast<ScatterLock *>(lock))->clear_flushed();
910
911 case LOCK_SYNC_MIX2:
912 // do nothing, we already acked
913 break;
914
915 case LOCK_SYNC_MIX:
916 {
917 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
918 mds->send_message_mds(reply, auth);
919 next = LOCK_SYNC_MIX2;
920 }
921 break;
922
923 case LOCK_MIX_LOCK:
924 {
925 bufferlist data;
926 lock->encode_locked_state(data);
927 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
928 (static_cast<ScatterLock *>(lock))->start_flush();
929 // we'll get an AC_LOCKFLUSHED to complete
930 }
931 break;
932
933 default:
934 ceph_abort();
935 }
936 }
937 } else {
938 // auth
939
940 // once the first (local) stage of mix->lock gather complete we can
941 // gather from replicas
942 if (lock->get_state() == LOCK_MIX_LOCK &&
943 lock->get_parent()->is_replicated()) {
944 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
945 send_lock_message(lock, LOCK_AC_LOCK);
946 lock->init_gather();
947 lock->set_state(LOCK_MIX_LOCK2);
948 return;
949 }
950
951 if (lock->is_dirty() && !lock->is_flushed()) {
952 scatter_writebehind(static_cast<ScatterLock *>(lock));
953 mds->mdlog->flush();
954 return;
955 }
956 lock->clear_flushed();
957
958 switch (lock->get_state()) {
959 // to mixed
960 case LOCK_TSYN_MIX:
961 case LOCK_SYNC_MIX:
962 case LOCK_EXCL_MIX:
b32b8144 963 case LOCK_XSYN_MIX:
7c673cae
FG
964 in->start_scatter(static_cast<ScatterLock *>(lock));
965 if (lock->get_parent()->is_replicated()) {
966 bufferlist softdata;
967 lock->encode_locked_state(softdata);
968 send_lock_message(lock, LOCK_AC_MIX, softdata);
969 }
970 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
971 break;
972
973 case LOCK_XLOCK:
974 case LOCK_XLOCKDONE:
975 if (next != LOCK_SYNC)
976 break;
977 // fall-thru
978
979 // to sync
980 case LOCK_EXCL_SYNC:
981 case LOCK_LOCK_SYNC:
982 case LOCK_MIX_SYNC:
983 case LOCK_XSYN_SYNC:
984 if (lock->get_parent()->is_replicated()) {
985 bufferlist softdata;
986 lock->encode_locked_state(softdata);
987 send_lock_message(lock, LOCK_AC_SYNC, softdata);
988 }
989 break;
990 }
991
992 }
993
994 lock->set_state(next);
995
996 if (lock->get_parent()->is_auth() &&
997 lock->is_stable())
998 lock->get_parent()->auth_unpin(lock);
999
1000 // drop loner before doing waiters
1001 if (caps &&
1002 in->is_head() &&
1003 in->is_auth() &&
1004 in->get_wanted_loner() != in->get_loner()) {
1005 dout(10) << " trying to drop loner" << dendl;
1006 if (in->try_drop_loner()) {
1007 dout(10) << " dropped loner" << dendl;
1008 need_issue = true;
1009 }
1010 }
1011
1012 if (pfinishers)
1013 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1014 *pfinishers);
1015 else
1016 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1017
1018 if (caps && in->is_head())
1019 need_issue = true;
1020
1021 if (lock->get_parent()->is_auth() &&
1022 lock->is_stable())
1023 try_eval(lock, &need_issue);
1024 }
1025
1026 if (need_issue) {
1027 if (pneed_issue)
1028 *pneed_issue = true;
1029 else if (in->is_head())
1030 issue_caps(in);
1031 }
1032
1033}
1034
1035bool Locker::eval(CInode *in, int mask, bool caps_imported)
1036{
1037 bool need_issue = caps_imported;
1038 list<MDSInternalContextBase*> finishers;
1039
1040 dout(10) << "eval " << mask << " " << *in << dendl;
1041
1042 // choose loner?
1043 if (in->is_auth() && in->is_head()) {
b32b8144
FG
1044 client_t orig_loner = in->get_loner();
1045 if (in->choose_ideal_loner()) {
1046 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1047 need_issue = true;
1048 mask = -1;
1049 } else if (in->get_wanted_loner() != in->get_loner()) {
1050 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1051 mask = -1;
1052 }
7c673cae
FG
1053 }
1054
1055 retry:
1056 if (mask & CEPH_LOCK_IFILE)
1057 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1058 if (mask & CEPH_LOCK_IAUTH)
1059 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1060 if (mask & CEPH_LOCK_ILINK)
1061 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1062 if (mask & CEPH_LOCK_IXATTR)
1063 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1064 if (mask & CEPH_LOCK_INEST)
1065 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1066 if (mask & CEPH_LOCK_IFLOCK)
1067 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1068 if (mask & CEPH_LOCK_IPOLICY)
1069 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1070
1071 // drop loner?
1072 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
7c673cae 1073 if (in->try_drop_loner()) {
7c673cae 1074 need_issue = true;
7c673cae 1075 if (in->get_wanted_loner() >= 0) {
b32b8144
FG
1076 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1077 bool ok = in->try_set_loner();
1078 assert(ok);
1079 mask = -1;
1080 goto retry;
7c673cae
FG
1081 }
1082 }
1083 }
1084
1085 finish_contexts(g_ceph_context, finishers);
1086
1087 if (need_issue && in->is_head())
1088 issue_caps(in);
1089
1090 dout(10) << "eval done" << dendl;
1091 return need_issue;
1092}
1093
1094class C_Locker_Eval : public LockerContext {
1095 MDSCacheObject *p;
1096 int mask;
1097public:
1098 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1099 // We are used as an MDSCacheObject waiter, so should
1100 // only be invoked by someone already holding the big lock.
1101 assert(locker->mds->mds_lock.is_locked_by_me());
1102 p->get(MDSCacheObject::PIN_PTRWAITER);
1103 }
1104 void finish(int r) override {
1105 locker->try_eval(p, mask);
1106 p->put(MDSCacheObject::PIN_PTRWAITER);
1107 }
1108};
1109
1110void Locker::try_eval(MDSCacheObject *p, int mask)
1111{
1112 // unstable and ambiguous auth?
1113 if (p->is_ambiguous_auth()) {
1114 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1115 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1116 return;
1117 }
1118
1119 if (p->is_auth() && p->is_frozen()) {
1120 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1121 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1122 return;
1123 }
1124
1125 if (mask & CEPH_LOCK_DN) {
1126 assert(mask == CEPH_LOCK_DN);
1127 bool need_issue = false; // ignore this, no caps on dentries
1128 CDentry *dn = static_cast<CDentry *>(p);
1129 eval_any(&dn->lock, &need_issue);
1130 } else {
1131 CInode *in = static_cast<CInode *>(p);
1132 eval(in, mask);
1133 }
1134}
1135
1136void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1137{
1138 MDSCacheObject *p = lock->get_parent();
1139
1140 // unstable and ambiguous auth?
1141 if (p->is_ambiguous_auth()) {
1142 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1143 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1144 return;
1145 }
1146
1147 if (!p->is_auth()) {
1148 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1149 return;
1150 }
1151
1152 if (p->is_frozen()) {
1153 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1154 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1155 return;
1156 }
1157
1158 /*
1159 * We could have a situation like:
1160 *
1161 * - mds A authpins item on mds B
1162 * - mds B starts to freeze tree containing item
1163 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1164 * - mds B lock is unstable, sets scatter_wanted
1165 * - mds B lock stabilizes, calls try_eval.
1166 *
1167 * We can defer while freezing without causing a deadlock. Honor
1168 * scatter_wanted flag here. This will never get deferred by the
1169 * checks above due to the auth_pin held by the master.
1170 */
1171 if (lock->is_scatterlock()) {
1172 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1173 if (slock->get_scatter_wanted() &&
1174 slock->get_state() != LOCK_MIX) {
1175 scatter_mix(slock, pneed_issue);
1176 if (!lock->is_stable())
1177 return;
1178 } else if (slock->get_unscatter_wanted() &&
1179 slock->get_state() != LOCK_LOCK) {
1180 simple_lock(slock, pneed_issue);
1181 if (!lock->is_stable()) {
1182 return;
1183 }
1184 }
1185 }
1186
1187 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1188 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1189 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1190 return;
1191 }
1192
1193 eval(lock, pneed_issue);
1194}
1195
1196void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1197{
1198 bool need_issue = false;
1199 list<MDSInternalContextBase*> finishers;
1200
1201 // kick locks now
1202 if (!in->filelock.is_stable())
1203 eval_gather(&in->filelock, false, &need_issue, &finishers);
1204 if (!in->authlock.is_stable())
1205 eval_gather(&in->authlock, false, &need_issue, &finishers);
1206 if (!in->linklock.is_stable())
1207 eval_gather(&in->linklock, false, &need_issue, &finishers);
1208 if (!in->xattrlock.is_stable())
1209 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1210
1211 if (need_issue && in->is_head()) {
1212 if (issue_set)
1213 issue_set->insert(in);
1214 else
1215 issue_caps(in);
1216 }
1217
1218 finish_contexts(g_ceph_context, finishers);
1219}
1220
1221void Locker::eval_scatter_gathers(CInode *in)
1222{
1223 bool need_issue = false;
1224 list<MDSInternalContextBase*> finishers;
1225
1226 dout(10) << "eval_scatter_gathers " << *in << dendl;
1227
1228 // kick locks now
1229 if (!in->filelock.is_stable())
1230 eval_gather(&in->filelock, false, &need_issue, &finishers);
1231 if (!in->nestlock.is_stable())
1232 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1233 if (!in->dirfragtreelock.is_stable())
1234 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1235
1236 if (need_issue && in->is_head())
1237 issue_caps(in);
1238
1239 finish_contexts(g_ceph_context, finishers);
1240}
1241
1242void Locker::eval(SimpleLock *lock, bool *need_issue)
1243{
1244 switch (lock->get_type()) {
1245 case CEPH_LOCK_IFILE:
1246 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1247 case CEPH_LOCK_IDFT:
1248 case CEPH_LOCK_INEST:
1249 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1250 default:
1251 return simple_eval(lock, need_issue);
1252 }
1253}
1254
1255
1256// ------------------
1257// rdlock
1258
1259bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1260{
1261 // kick the lock
1262 if (lock->is_stable()) {
1263 if (lock->get_parent()->is_auth()) {
1264 if (lock->get_sm() == &sm_scatterlock) {
1265 // not until tempsync is fully implemented
1266 //if (lock->get_parent()->is_replicated())
1267 //scatter_tempsync((ScatterLock*)lock);
1268 //else
1269 simple_sync(lock);
1270 } else if (lock->get_sm() == &sm_filelock) {
1271 CInode *in = static_cast<CInode*>(lock->get_parent());
1272 if (lock->get_state() == LOCK_EXCL &&
1273 in->get_target_loner() >= 0 &&
1274 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1275 file_xsyn(lock);
1276 else
1277 simple_sync(lock);
1278 } else
1279 simple_sync(lock);
1280 return true;
1281 } else {
1282 // request rdlock state change from auth
1283 mds_rank_t auth = lock->get_parent()->authority().first;
1284 if (!mds->is_cluster_degraded() ||
1285 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1286 dout(10) << "requesting rdlock from auth on "
1287 << *lock << " on " << *lock->get_parent() << dendl;
1288 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1289 }
1290 return false;
1291 }
1292 }
1293 if (lock->get_type() == CEPH_LOCK_IFILE) {
1294 CInode *in = static_cast<CInode *>(lock->get_parent());
1295 if (in->state_test(CInode::STATE_RECOVERING)) {
1296 mds->mdcache->recovery_queue.prioritize(in);
1297 }
1298 }
1299
1300 return false;
1301}
1302
1303bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1304{
1305 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1306
1307 // can read? grab ref.
1308 if (lock->can_rdlock(client))
1309 return true;
1310
1311 _rdlock_kick(lock, false);
1312
1313 if (lock->can_rdlock(client))
1314 return true;
1315
1316 // wait!
1317 if (con) {
1318 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1319 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1320 }
1321 return false;
1322}
1323
1324bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1325{
1326 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1327
1328 // client may be allowed to rdlock the same item it has xlocked.
1329 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1330 if (mut->snapid != CEPH_NOSNAP)
1331 as_anon = true;
1332 client_t client = as_anon ? -1 : mut->get_client();
1333
1334 CInode *in = 0;
1335 if (lock->get_type() != CEPH_LOCK_DN)
1336 in = static_cast<CInode *>(lock->get_parent());
1337
1338 /*
1339 if (!lock->get_parent()->is_auth() &&
1340 lock->fw_rdlock_to_auth()) {
1341 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1342 return false;
1343 }
1344 */
1345
1346 while (1) {
1347 // can read? grab ref.
1348 if (lock->can_rdlock(client)) {
1349 lock->get_rdlock();
1350 mut->rdlocks.insert(lock);
1351 mut->locks.insert(lock);
1352 return true;
1353 }
1354
1355 // hmm, wait a second.
1356 if (in && !in->is_head() && in->is_auth() &&
1357 lock->get_state() == LOCK_SNAP_SYNC) {
1358 // okay, we actually need to kick the head's lock to get ourselves synced up.
1359 CInode *head = mdcache->get_inode(in->ino());
1360 assert(head);
c07f9fc5
FG
1361 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1362 if (hlock->get_state() == LOCK_SYNC)
1363 hlock = head->get_lock(lock->get_type());
1364
7c673cae
FG
1365 if (hlock->get_state() != LOCK_SYNC) {
1366 dout(10) << "rdlock_start trying head inode " << *head << dendl;
c07f9fc5 1367 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
7c673cae
FG
1368 return false;
1369 // oh, check our lock again then
1370 }
1371 }
1372
1373 if (!_rdlock_kick(lock, as_anon))
1374 break;
1375 }
1376
1377 // wait!
1378 int wait_on;
1379 if (lock->get_parent()->is_auth() && lock->is_stable())
1380 wait_on = SimpleLock::WAIT_RD;
1381 else
1382 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1383 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1384 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1385 nudge_log(lock);
1386 return false;
1387}
1388
1389void Locker::nudge_log(SimpleLock *lock)
1390{
1391 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1392 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1393 mds->mdlog->flush();
1394}
1395
1396void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1397{
1398 // drop ref
1399 lock->put_rdlock();
1400 if (mut) {
1401 mut->rdlocks.erase(lock);
1402 mut->locks.erase(lock);
1403 }
1404
1405 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1406
1407 // last one?
1408 if (!lock->is_rdlocked()) {
1409 if (!lock->is_stable())
1410 eval_gather(lock, false, pneed_issue);
1411 else if (lock->get_parent()->is_auth())
1412 try_eval(lock, pneed_issue);
1413 }
1414}
1415
1416
1417bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1418{
1419 dout(10) << "can_rdlock_set " << locks << dendl;
1420 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1421 if (!(*p)->can_rdlock(-1)) {
1422 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1423 return false;
1424 }
1425 return true;
1426}
1427
1428bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1429{
1430 dout(10) << "rdlock_try_set " << locks << dendl;
1431 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1432 if (!rdlock_try(*p, -1, NULL)) {
1433 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1434 return false;
1435 }
1436 return true;
1437}
1438
1439void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1440{
1441 dout(10) << "rdlock_take_set " << locks << dendl;
1442 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1443 (*p)->get_rdlock();
1444 mut->rdlocks.insert(*p);
1445 mut->locks.insert(*p);
1446 }
1447}
1448
1449// ------------------
1450// wrlock
1451
1452void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1453{
1454 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1455 lock->get_type() == CEPH_LOCK_DVERSION)
1456 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1457
1458 dout(7) << "wrlock_force on " << *lock
1459 << " on " << *lock->get_parent() << dendl;
1460 lock->get_wrlock(true);
1461 mut->wrlocks.insert(lock);
1462 mut->locks.insert(lock);
1463}
1464
1465bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1466{
1467 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1468 lock->get_type() == CEPH_LOCK_DVERSION)
1469 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1470
1471 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1472
1473 CInode *in = static_cast<CInode *>(lock->get_parent());
1474 client_t client = mut->get_client();
1475 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1476 (in->has_subtree_or_exporting_dirfrag() ||
1477 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1478
1479 while (1) {
1480 // wrlock?
1481 if (lock->can_wrlock(client) &&
1482 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1483 lock->get_wrlock();
1484 mut->wrlocks.insert(lock);
1485 mut->locks.insert(lock);
1486 return true;
1487 }
1488
1489 if (lock->get_type() == CEPH_LOCK_IFILE &&
1490 in->state_test(CInode::STATE_RECOVERING)) {
1491 mds->mdcache->recovery_queue.prioritize(in);
1492 }
1493
1494 if (!lock->is_stable())
1495 break;
1496
1497 if (in->is_auth()) {
1498 // don't do nested lock state change if we have dirty scatterdata and
1499 // may scatter_writebehind or start_scatter, because nowait==true implies
1500 // that the caller already has a log entry open!
1501 if (nowait && lock->is_dirty())
1502 return false;
1503
1504 if (want_scatter)
1505 scatter_mix(static_cast<ScatterLock*>(lock));
1506 else
1507 simple_lock(lock);
1508
1509 if (nowait && !lock->can_wrlock(client))
1510 return false;
1511
1512 } else {
1513 // replica.
1514 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1515 mds_rank_t auth = lock->get_parent()->authority().first;
1516 if (!mds->is_cluster_degraded() ||
1517 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1518 dout(10) << "requesting scatter from auth on "
1519 << *lock << " on " << *lock->get_parent() << dendl;
1520 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1521 }
1522 break;
1523 }
1524 }
1525
1526 if (!nowait) {
1527 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1528 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1529 nudge_log(lock);
1530 }
1531
1532 return false;
1533}
1534
1535void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1536{
1537 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1538 lock->get_type() == CEPH_LOCK_DVERSION)
1539 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1540
1541 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1542 lock->put_wrlock();
1543 if (mut) {
1544 mut->wrlocks.erase(lock);
1545 if (mut->remote_wrlocks.count(lock) == 0)
1546 mut->locks.erase(lock);
1547 }
1548
1549 if (!lock->is_wrlocked()) {
1550 if (!lock->is_stable())
1551 eval_gather(lock, false, pneed_issue);
1552 else if (lock->get_parent()->is_auth())
1553 try_eval(lock, pneed_issue);
1554 }
1555}
1556
1557
1558// remote wrlock
1559
1560void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1561{
1562 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1563
1564 // wait for active target
1565 if (mds->is_cluster_degraded() &&
1566 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1567 dout(7) << " mds." << target << " is not active" << dendl;
1568 if (mut->more()->waiting_on_slave.empty())
1569 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1570 return;
1571 }
1572
1573 // send lock request
1574 mut->start_locking(lock, target);
1575 mut->more()->slaves.insert(target);
1576 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1577 MMDSSlaveRequest::OP_WRLOCK);
1578 r->set_lock_type(lock->get_type());
1579 lock->get_parent()->set_object_info(r->get_object_info());
1580 mds->send_message_mds(r, target);
1581
1582 assert(mut->more()->waiting_on_slave.count(target) == 0);
1583 mut->more()->waiting_on_slave.insert(target);
1584}
1585
1586void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1587 MutationImpl *mut)
1588{
1589 // drop ref
1590 mut->remote_wrlocks.erase(lock);
1591 if (mut->wrlocks.count(lock) == 0)
1592 mut->locks.erase(lock);
1593
1594 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1595 << " " << *lock->get_parent() << dendl;
1596 if (!mds->is_cluster_degraded() ||
1597 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1598 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1599 MMDSSlaveRequest::OP_UNWRLOCK);
1600 slavereq->set_lock_type(lock->get_type());
1601 lock->get_parent()->set_object_info(slavereq->get_object_info());
1602 mds->send_message_mds(slavereq, target);
1603 }
1604}
1605
1606
1607// ------------------
1608// xlock
1609
1610bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1611{
1612 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1613 lock->get_type() == CEPH_LOCK_DVERSION)
1614 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1615
1616 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1617 client_t client = mut->get_client();
1618
1619 // auth?
1620 if (lock->get_parent()->is_auth()) {
1621 // auth
1622 while (1) {
1623 if (lock->can_xlock(client)) {
1624 lock->set_state(LOCK_XLOCK);
1625 lock->get_xlock(mut, client);
1626 mut->xlocks.insert(lock);
1627 mut->locks.insert(lock);
1628 mut->finish_locking(lock);
1629 return true;
1630 }
1631
1632 if (lock->get_type() == CEPH_LOCK_IFILE) {
1633 CInode *in = static_cast<CInode*>(lock->get_parent());
1634 if (in->state_test(CInode::STATE_RECOVERING)) {
1635 mds->mdcache->recovery_queue.prioritize(in);
1636 }
1637 }
1638
1639 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1640 lock->get_xlock_by_client() != client ||
1641 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1642 break;
1643
1644 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1645 mut->start_locking(lock);
1646 simple_xlock(lock);
1647 } else {
1648 simple_lock(lock);
1649 }
1650 }
1651
1652 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1653 nudge_log(lock);
1654 return false;
1655 } else {
1656 // replica
1657 assert(lock->get_sm()->can_remote_xlock);
1658 assert(!mut->slave_request);
1659
1660 // wait for single auth
1661 if (lock->get_parent()->is_ambiguous_auth()) {
1662 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1663 new C_MDS_RetryRequest(mdcache, mut));
1664 return false;
1665 }
1666
1667 // wait for active auth
1668 mds_rank_t auth = lock->get_parent()->authority().first;
1669 if (mds->is_cluster_degraded() &&
1670 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1671 dout(7) << " mds." << auth << " is not active" << dendl;
1672 if (mut->more()->waiting_on_slave.empty())
1673 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1674 return false;
1675 }
1676
1677 // send lock request
1678 mut->more()->slaves.insert(auth);
1679 mut->start_locking(lock, auth);
1680 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1681 MMDSSlaveRequest::OP_XLOCK);
1682 r->set_lock_type(lock->get_type());
1683 lock->get_parent()->set_object_info(r->get_object_info());
1684 mds->send_message_mds(r, auth);
1685
1686 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1687 mut->more()->waiting_on_slave.insert(auth);
1688
1689 return false;
1690 }
1691}
1692
1693void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1694{
1695 assert(!lock->is_stable());
1696 if (lock->get_num_rdlocks() == 0 &&
1697 lock->get_num_wrlocks() == 0 &&
b32b8144 1698 !lock->is_leased() &&
7c673cae
FG
1699 lock->get_state() != LOCK_XLOCKSNAP &&
1700 lock->get_type() != CEPH_LOCK_DN) {
1701 CInode *in = static_cast<CInode*>(lock->get_parent());
1702 client_t loner = in->get_target_loner();
1703 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1704 lock->set_state(LOCK_EXCL);
1705 lock->get_parent()->auth_unpin(lock);
1706 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1707 if (lock->get_cap_shift())
1708 *pneed_issue = true;
1709 if (lock->get_parent()->is_auth() &&
1710 lock->is_stable())
1711 try_eval(lock, pneed_issue);
1712 return;
1713 }
1714 }
1715 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1716 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1717}
1718
1719void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1720{
1721 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1722 lock->get_type() == CEPH_LOCK_DVERSION)
1723 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1724
1725 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1726
1727 client_t xlocker = lock->get_xlock_by_client();
1728
1729 // drop ref
1730 lock->put_xlock();
1731 assert(mut);
1732 mut->xlocks.erase(lock);
1733 mut->locks.erase(lock);
1734
1735 bool do_issue = false;
1736
1737 // remote xlock?
1738 if (!lock->get_parent()->is_auth()) {
1739 assert(lock->get_sm()->can_remote_xlock);
1740
1741 // tell auth
1742 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1743 mds_rank_t auth = lock->get_parent()->authority().first;
1744 if (!mds->is_cluster_degraded() ||
1745 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1746 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1747 MMDSSlaveRequest::OP_UNXLOCK);
1748 slavereq->set_lock_type(lock->get_type());
1749 lock->get_parent()->set_object_info(slavereq->get_object_info());
1750 mds->send_message_mds(slavereq, auth);
1751 }
1752 // others waiting?
1753 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1754 SimpleLock::WAIT_WR |
1755 SimpleLock::WAIT_RD, 0);
1756 } else {
1757 if (lock->get_num_xlocks() == 0) {
1758 if (lock->get_state() == LOCK_LOCK_XLOCK)
1759 lock->set_state(LOCK_XLOCKDONE);
1760 _finish_xlock(lock, xlocker, &do_issue);
1761 }
1762 }
1763
1764 if (do_issue) {
1765 CInode *in = static_cast<CInode*>(lock->get_parent());
1766 if (in->is_head()) {
1767 if (pneed_issue)
1768 *pneed_issue = true;
1769 else
1770 issue_caps(in);
1771 }
1772 }
1773}
1774
1775void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1776{
1777 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1778
1779 lock->put_xlock();
1780 mut->xlocks.erase(lock);
1781 mut->locks.erase(lock);
1782
1783 MDSCacheObject *p = lock->get_parent();
1784 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1785
1786 if (!lock->is_stable())
1787 lock->get_parent()->auth_unpin(lock);
1788
1789 lock->set_state(LOCK_LOCK);
1790}
1791
1792void Locker::xlock_import(SimpleLock *lock)
1793{
1794 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1795 lock->get_parent()->auth_pin(lock);
1796}
1797
1798
1799
1800// file i/o -----------------------------------------
1801
1802version_t Locker::issue_file_data_version(CInode *in)
1803{
1804 dout(7) << "issue_file_data_version on " << *in << dendl;
1805 return in->inode.file_data_version;
1806}
1807
1808class C_Locker_FileUpdate_finish : public LockerLogContext {
1809 CInode *in;
1810 MutationRef mut;
1811 bool share_max;
1812 bool need_issue;
1813 client_t client;
1814 MClientCaps *ack;
1815public:
1816 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1817 bool sm=false, bool ni=false, client_t c=-1,
1818 MClientCaps *ac = 0)
1819 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1820 client(c), ack(ac) {
1821 in->get(CInode::PIN_PTRWAITER);
1822 }
1823 void finish(int r) override {
1824 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1825 in->put(CInode::PIN_PTRWAITER);
1826 }
1827};
1828
1829void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1830 client_t client, MClientCaps *ack)
1831{
1832 dout(10) << "file_update_finish on " << *in << dendl;
1833 in->pop_and_dirty_projected_inode(mut->ls);
1834
1835 mut->apply();
1836
1837 if (ack) {
1838 Session *session = mds->get_session(client);
1839 if (session) {
1840 // "oldest flush tid" > 0 means client uses unique TID for each flush
1841 if (ack->get_oldest_flush_tid() > 0)
1842 session->add_completed_flush(ack->get_client_tid());
1843 mds->send_message_client_counted(ack, session);
1844 } else {
1845 dout(10) << " no session for client." << client << " " << *ack << dendl;
1846 ack->put();
1847 }
1848 }
1849
1850 set<CInode*> need_issue;
1851 drop_locks(mut.get(), &need_issue);
1852
1853 if (!in->is_head() && !in->client_snap_caps.empty()) {
1854 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1855 // check for snap writeback completion
1856 bool gather = false;
94b18763 1857 auto p = in->client_snap_caps.begin();
7c673cae
FG
1858 while (p != in->client_snap_caps.end()) {
1859 SimpleLock *lock = in->get_lock(p->first);
1860 assert(lock);
1861 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1862 << " lock " << *lock << " on " << *in << dendl;
1863 lock->put_wrlock();
1864
1865 p->second.erase(client);
1866 if (p->second.empty()) {
1867 gather = true;
1868 in->client_snap_caps.erase(p++);
1869 } else
1870 ++p;
1871 }
1872 if (gather) {
1873 if (in->client_snap_caps.empty())
1874 in->item_open_file.remove_myself();
1875 eval_cap_gather(in, &need_issue);
1876 }
1877 } else {
1878 if (issue_client_cap && need_issue.count(in) == 0) {
1879 Capability *cap = in->get_client_cap(client);
1880 if (cap && (cap->wanted() & ~cap->pending()))
1881 issue_caps(in, cap);
1882 }
1883
1884 if (share_max && in->is_auth() &&
1885 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1886 share_inode_max_size(in);
1887 }
1888 issue_caps_set(need_issue);
1889
28e407b8
AA
1890 utime_t now = ceph_clock_now();
1891 mds->balancer->hit_inode(now, in, META_POP_IWR);
1892
7c673cae
FG
1893 // auth unpin after issuing caps
1894 mut->cleanup();
1895}
1896
1897Capability* Locker::issue_new_caps(CInode *in,
1898 int mode,
1899 Session *session,
1900 SnapRealm *realm,
1901 bool is_replay)
1902{
1903 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1904 bool is_new;
1905
1906 // if replay, try to reconnect cap, and otherwise do nothing.
1907 if (is_replay) {
1908 mds->mdcache->try_reconnect_cap(in, session);
1909 return 0;
1910 }
1911
1912 // my needs
1913 assert(session->info.inst.name.is_client());
31f18b77 1914 client_t my_client = session->info.inst.name.num();
7c673cae
FG
1915 int my_want = ceph_caps_for_mode(mode);
1916
1917 // register a capability
1918 Capability *cap = in->get_client_cap(my_client);
1919 if (!cap) {
1920 // new cap
1921 cap = in->add_client_cap(my_client, session, realm);
1922 cap->set_wanted(my_want);
1923 cap->mark_new();
1924 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1925 is_new = true;
1926 } else {
1927 is_new = false;
1928 // make sure it wants sufficient caps
1929 if (my_want & ~cap->wanted()) {
1930 // augment wanted caps for this client
1931 cap->set_wanted(cap->wanted() | my_want);
1932 }
1933 }
1934
1935 if (in->is_auth()) {
1936 // [auth] twiddle mode?
1937 eval(in, CEPH_CAP_LOCKS);
1938
1939 if (_need_flush_mdlog(in, my_want))
1940 mds->mdlog->flush();
1941
1942 } else {
1943 // [replica] tell auth about any new caps wanted
1944 request_inode_file_caps(in);
1945 }
1946
1947 // issue caps (pot. incl new one)
1948 //issue_caps(in); // note: _eval above may have done this already...
1949
1950 // re-issue whatever we can
1951 //cap->issue(cap->pending());
1952
1953 if (is_new)
1954 cap->dec_suppress();
1955
1956 return cap;
1957}
1958
1959
1960void Locker::issue_caps_set(set<CInode*>& inset)
1961{
1962 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1963 issue_caps(*p);
1964}
1965
1966bool Locker::issue_caps(CInode *in, Capability *only_cap)
1967{
1968 // allowed caps are determined by the lock mode.
1969 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1970 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1971 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1972
1973 client_t loner = in->get_loner();
1974 if (loner >= 0) {
1975 dout(7) << "issue_caps loner client." << loner
1976 << " allowed=" << ccap_string(loner_allowed)
1977 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1978 << ", others allowed=" << ccap_string(all_allowed)
1979 << " on " << *in << dendl;
1980 } else {
1981 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1982 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1983 << " on " << *in << dendl;
1984 }
1985
1986 assert(in->is_head());
1987
1988 // count conflicts with
1989 int nissued = 0;
1990
1991 // client caps
1992 map<client_t, Capability*>::iterator it;
1993 if (only_cap)
1994 it = in->client_caps.find(only_cap->get_client());
1995 else
1996 it = in->client_caps.begin();
1997 for (; it != in->client_caps.end(); ++it) {
1998 Capability *cap = it->second;
1999 if (cap->is_stale())
2000 continue;
2001
2002 // do not issue _new_ bits when size|mtime is projected
2003 int allowed;
2004 if (loner == it->first)
2005 allowed = loner_allowed;
2006 else
2007 allowed = all_allowed;
2008
2009 // add in any xlocker-only caps (for locks this client is the xlocker for)
2010 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2011
2012 Session *session = mds->get_session(it->first);
2013 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
2014 !(session && session->connection &&
2015 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
2016 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2017
2018 int pending = cap->pending();
2019 int wanted = cap->wanted();
2020
2021 dout(20) << " client." << it->first
2022 << " pending " << ccap_string(pending)
2023 << " allowed " << ccap_string(allowed)
2024 << " wanted " << ccap_string(wanted)
2025 << dendl;
2026
2027 if (!(pending & ~allowed)) {
2028 // skip if suppress or new, and not revocation
2029 if (cap->is_new() || cap->is_suppress()) {
2030 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2031 continue;
2032 }
2033 }
2034
2035 // notify clients about deleted inode, to make sure they release caps ASAP.
2036 if (in->inode.nlink == 0)
2037 wanted |= CEPH_CAP_LINK_SHARED;
2038
2039 // are there caps that the client _wants_ and can have, but aren't pending?
2040 // or do we need to revoke?
2041 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2042 (pending & ~allowed)) { // need to revoke ~allowed caps.
2043 // issue
2044 nissued++;
2045
2046 // include caps that clients generally like, while we're at it.
2047 int likes = in->get_caps_liked();
2048 int before = pending;
2049 long seq;
2050 if (pending & ~allowed)
2051 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2052 else
2053 seq = cap->issue((wanted|likes) & allowed);
2054 int after = cap->pending();
2055
2056 if (cap->is_new()) {
2057 // haven't send caps to client yet
2058 if (before & ~after)
2059 cap->confirm_receipt(seq, after);
2060 } else {
2061 dout(7) << " sending MClientCaps to client." << it->first
2062 << " seq " << cap->get_last_seq()
2063 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2064 << dendl;
2065
2066 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2067 if (op == CEPH_CAP_OP_REVOKE) {
2068 revoking_caps.push_back(&cap->item_revoking_caps);
2069 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2070 cap->set_last_revoke_stamp(ceph_clock_now());
2071 cap->reset_num_revoke_warnings();
2072 }
2073
2074 MClientCaps *m = new MClientCaps(op, in->ino(),
2075 in->find_snaprealm()->inode->ino(),
2076 cap->get_cap_id(), cap->get_last_seq(),
2077 after, wanted, 0,
2078 cap->get_mseq(),
2079 mds->get_osd_epoch_barrier());
2080 in->encode_cap_message(m, cap);
2081
2082 mds->send_message_client_counted(m, it->first);
2083 }
2084 }
2085
2086 if (only_cap)
2087 break;
2088 }
2089
2090 return (nissued == 0); // true if no re-issued, no callbacks
2091}
2092
2093void Locker::issue_truncate(CInode *in)
2094{
2095 dout(7) << "issue_truncate on " << *in << dendl;
2096
2097 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2098 it != in->client_caps.end();
2099 ++it) {
2100 Capability *cap = it->second;
2101 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2102 in->ino(),
2103 in->find_snaprealm()->inode->ino(),
2104 cap->get_cap_id(), cap->get_last_seq(),
2105 cap->pending(), cap->wanted(), 0,
2106 cap->get_mseq(),
2107 mds->get_osd_epoch_barrier());
2108 in->encode_cap_message(m, cap);
2109 mds->send_message_client_counted(m, it->first);
2110 }
2111
2112 // should we increase max_size?
2113 if (in->is_auth() && in->is_file())
2114 check_inode_max_size(in);
2115}
2116
2117
2118void Locker::revoke_stale_caps(Capability *cap)
2119{
2120 CInode *in = cap->get_inode();
2121 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2122 // if export succeeds, the cap will be removed. if export fails, we need to
2123 // revoke the cap if it's still stale.
2124 in->state_set(CInode::STATE_EVALSTALECAPS);
2125 return;
2126 }
2127
2128 int issued = cap->issued();
2129 if (issued & ~CEPH_CAP_PIN) {
2130 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2131 cap->revoke();
2132
2133 if (in->is_auth() &&
2134 in->inode.client_ranges.count(cap->get_client()))
2135 in->state_set(CInode::STATE_NEEDSRECOVER);
2136
2137 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2138 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2139 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2140 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2141
2142 if (in->is_auth()) {
2143 try_eval(in, CEPH_CAP_LOCKS);
2144 } else {
2145 request_inode_file_caps(in);
2146 }
2147 }
2148}
2149
2150void Locker::revoke_stale_caps(Session *session)
2151{
2152 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2153
2154 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2155 Capability *cap = *p;
2156 cap->mark_stale();
2157 revoke_stale_caps(cap);
2158 }
2159}
2160
2161void Locker::resume_stale_caps(Session *session)
2162{
2163 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2164
2165 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2166 Capability *cap = *p;
2167 CInode *in = cap->get_inode();
2168 assert(in->is_head());
2169 if (cap->is_stale()) {
2170 dout(10) << " clearing stale flag on " << *in << dendl;
2171 cap->clear_stale();
2172
2173 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2174 // if export succeeds, the cap will be removed. if export fails,
2175 // we need to re-issue the cap if it's not stale.
2176 in->state_set(CInode::STATE_EVALSTALECAPS);
2177 continue;
2178 }
2179
2180 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2181 issue_caps(in, cap);
2182 }
2183 }
2184}
2185
2186void Locker::remove_stale_leases(Session *session)
2187{
2188 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2189 xlist<ClientLease*>::iterator p = session->leases.begin();
2190 while (!p.end()) {
2191 ClientLease *l = *p;
2192 ++p;
2193 CDentry *parent = static_cast<CDentry*>(l->parent);
2194 dout(15) << " removing lease on " << *parent << dendl;
2195 parent->remove_client_lease(l, this);
2196 }
2197}
2198
2199
2200class C_MDL_RequestInodeFileCaps : public LockerContext {
2201 CInode *in;
2202public:
2203 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2204 in->get(CInode::PIN_PTRWAITER);
2205 }
2206 void finish(int r) override {
2207 if (!in->is_auth())
2208 locker->request_inode_file_caps(in);
2209 in->put(CInode::PIN_PTRWAITER);
2210 }
2211};
2212
2213void Locker::request_inode_file_caps(CInode *in)
2214{
2215 assert(!in->is_auth());
2216
2217 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2218 if (wanted != in->replica_caps_wanted) {
2219 // wait for single auth
2220 if (in->is_ambiguous_auth()) {
2221 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2222 new C_MDL_RequestInodeFileCaps(this, in));
2223 return;
2224 }
2225
2226 mds_rank_t auth = in->authority().first;
2227 if (mds->is_cluster_degraded() &&
2228 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2229 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2230 return;
2231 }
2232
2233 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2234 << " was " << ccap_string(in->replica_caps_wanted)
2235 << " on " << *in << " to mds." << auth << dendl;
2236
2237 in->replica_caps_wanted = wanted;
2238
2239 if (!mds->is_cluster_degraded() ||
2240 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2241 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2242 auth);
2243 }
2244}
2245
2246/* This function DOES put the passed message before returning */
2247void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2248{
2249 // nobody should be talking to us during recovery.
2250 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2251
2252 // ok
2253 CInode *in = mdcache->get_inode(m->get_ino());
2254 mds_rank_t from = mds_rank_t(m->get_source().num());
2255
2256 assert(in);
2257 assert(in->is_auth());
2258
2259 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2260
2261 if (m->get_caps())
2262 in->mds_caps_wanted[from] = m->get_caps();
2263 else
2264 in->mds_caps_wanted.erase(from);
2265
2266 try_eval(in, CEPH_CAP_LOCKS);
2267 m->put();
2268}
2269
2270
2271class C_MDL_CheckMaxSize : public LockerContext {
2272 CInode *in;
2273 uint64_t new_max_size;
2274 uint64_t newsize;
2275 utime_t mtime;
2276
2277public:
2278 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2279 uint64_t _newsize, utime_t _mtime) :
2280 LockerContext(l), in(i),
2281 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2282 {
2283 in->get(CInode::PIN_PTRWAITER);
2284 }
2285 void finish(int r) override {
2286 if (in->is_auth())
2287 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2288 in->put(CInode::PIN_PTRWAITER);
2289 }
2290};
2291
94b18763 2292uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
31f18b77
FG
2293{
2294 uint64_t new_max = (size + 1) << 1;
2295 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2296 if (max_inc > 0) {
b32b8144 2297 max_inc *= pi->layout.object_size;
31f18b77
FG
2298 new_max = MIN(new_max, size + max_inc);
2299 }
2300 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2301}
7c673cae
FG
2302
2303void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
94b18763 2304 CInode::mempool_inode::client_range_map *new_ranges,
7c673cae
FG
2305 bool *max_increased)
2306{
94b18763 2307 auto latest = in->get_projected_inode();
7c673cae
FG
2308 uint64_t ms;
2309 if(latest->has_layout()) {
31f18b77 2310 ms = calc_new_max_size(latest, size);
7c673cae
FG
2311 } else {
2312 // Layout-less directories like ~mds0/, have zero size
2313 ms = 0;
2314 }
2315
2316 // increase ranges as appropriate.
2317 // shrink to 0 if no WR|BUFFER caps issued.
2318 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2319 p != in->client_caps.end();
2320 ++p) {
2321 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2322 client_writeable_range_t& nr = (*new_ranges)[p->first];
2323 nr.range.first = 0;
2324 if (latest->client_ranges.count(p->first)) {
2325 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2326 if (ms > oldr.range.last)
2327 *max_increased = true;
2328 nr.range.last = MAX(ms, oldr.range.last);
2329 nr.follows = oldr.follows;
2330 } else {
31f18b77 2331 *max_increased = true;
7c673cae
FG
2332 nr.range.last = ms;
2333 nr.follows = in->first - 1;
2334 }
2335 }
2336 }
2337}
2338
2339bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2340 uint64_t new_max_size, uint64_t new_size,
2341 utime_t new_mtime)
2342{
2343 assert(in->is_auth());
2344 assert(in->is_file());
2345
94b18763
FG
2346 CInode::mempool_inode *latest = in->get_projected_inode();
2347 CInode::mempool_inode::client_range_map new_ranges;
7c673cae
FG
2348 uint64_t size = latest->size;
2349 bool update_size = new_size > 0;
2350 bool update_max = false;
2351 bool max_increased = false;
2352
2353 if (update_size) {
2354 new_size = size = MAX(size, new_size);
2355 new_mtime = MAX(new_mtime, latest->mtime);
2356 if (latest->size == new_size && latest->mtime == new_mtime)
2357 update_size = false;
2358 }
2359
2360 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2361
2362 if (max_increased || latest->client_ranges != new_ranges)
2363 update_max = true;
2364
2365 if (!update_size && !update_max) {
2366 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2367 return false;
2368 }
2369
2370 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2371 << " update_size " << update_size
2372 << " on " << *in << dendl;
2373
2374 if (in->is_frozen()) {
2375 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2376 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2377 new_max_size,
2378 new_size,
2379 new_mtime);
2380 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2381 return false;
2382 }
2383 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2384 // lock?
2385 if (in->filelock.is_stable()) {
2386 if (in->get_target_loner() >= 0)
2387 file_excl(&in->filelock);
2388 else
2389 simple_lock(&in->filelock);
2390 }
2391 if (!in->filelock.can_wrlock(in->get_loner())) {
2392 // try again later
2393 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2394 new_max_size,
2395 new_size,
2396 new_mtime);
2397
2398 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2399 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2400 return false;
2401 }
2402 }
2403
2404 MutationRef mut(new MutationImpl());
2405 mut->ls = mds->mdlog->get_current_segment();
2406
94b18763
FG
2407 auto &pi = in->project_inode();
2408 pi.inode.version = in->pre_dirty();
7c673cae
FG
2409
2410 if (update_max) {
94b18763
FG
2411 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2412 pi.inode.client_ranges = new_ranges;
7c673cae
FG
2413 }
2414
2415 if (update_size) {
94b18763
FG
2416 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2417 pi.inode.size = new_size;
2418 pi.inode.rstat.rbytes = new_size;
2419 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2420 pi.inode.mtime = new_mtime;
28e407b8
AA
2421 if (new_mtime > pi.inode.ctime)
2422 pi.inode.ctime = pi.inode.rstat.rctime = new_mtime;
7c673cae
FG
2423 }
2424
2425 // use EOpen if the file is still open; otherwise, use EUpdate.
2426 // this is just an optimization to push open files forward into
2427 // newer log segments.
2428 LogEvent *le;
2429 EMetaBlob *metablob;
2430 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2431 EOpen *eo = new EOpen(mds->mdlog);
2432 eo->add_ino(in->ino());
2433 metablob = &eo->metablob;
2434 le = eo;
2435 mut->ls->open_files.push_back(&in->item_open_file);
2436 } else {
2437 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2438 metablob = &eu->metablob;
2439 le = eu;
2440 }
2441 mds->mdlog->start_entry(le);
2442 if (update_size) { // FIXME if/when we do max_size nested accounting
2443 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2444 // no cow, here!
2445 CDentry *parent = in->get_projected_parent_dn();
2446 metablob->add_primary_dentry(parent, in, true);
2447 } else {
2448 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2449 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2450 }
2451 mds->mdlog->submit_entry(le,
2452 new C_Locker_FileUpdate_finish(this, in, mut, true));
2453 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2454 mut->auth_pin(in);
2455
2456 // make max_size _increase_ timely
2457 if (max_increased)
2458 mds->mdlog->flush();
2459
2460 return true;
2461}
2462
2463
2464void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2465{
2466 /*
2467 * only share if currently issued a WR cap. if client doesn't have it,
2468 * file_max doesn't matter, and the client will get it if/when they get
2469 * the cap later.
2470 */
2471 dout(10) << "share_inode_max_size on " << *in << dendl;
2472 map<client_t, Capability*>::iterator it;
2473 if (only_cap)
2474 it = in->client_caps.find(only_cap->get_client());
2475 else
2476 it = in->client_caps.begin();
2477 for (; it != in->client_caps.end(); ++it) {
2478 const client_t client = it->first;
2479 Capability *cap = it->second;
2480 if (cap->is_suppress())
2481 continue;
2482 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2483 dout(10) << "share_inode_max_size with client." << client << dendl;
2484 cap->inc_last_seq();
2485 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2486 in->ino(),
2487 in->find_snaprealm()->inode->ino(),
2488 cap->get_cap_id(), cap->get_last_seq(),
2489 cap->pending(), cap->wanted(), 0,
2490 cap->get_mseq(),
2491 mds->get_osd_epoch_barrier());
2492 in->encode_cap_message(m, cap);
2493 mds->send_message_client_counted(m, client);
2494 }
2495 if (only_cap)
2496 break;
2497 }
2498}
2499
2500bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2501{
2502 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2503 * pending mutations. */
2504 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2505 in->filelock.is_unstable_and_locked()) ||
2506 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2507 in->authlock.is_unstable_and_locked()) ||
2508 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2509 in->linklock.is_unstable_and_locked()) ||
2510 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2511 in->xattrlock.is_unstable_and_locked()))
2512 return true;
2513 return false;
2514}
2515
2516void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2517{
2518 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2519 dout(10) << " wanted " << ccap_string(cap->wanted())
2520 << " -> " << ccap_string(wanted) << dendl;
2521 cap->set_wanted(wanted);
2522 } else if (wanted & ~cap->wanted()) {
2523 dout(10) << " wanted " << ccap_string(cap->wanted())
2524 << " -> " << ccap_string(wanted)
2525 << " (added caps even though we had seq mismatch!)" << dendl;
2526 cap->set_wanted(wanted | cap->wanted());
2527 } else {
2528 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2529 << " -> " << ccap_string(wanted)
2530 << " (issue_seq " << issue_seq << " != last_issue "
2531 << cap->get_last_issue() << ")" << dendl;
2532 return;
2533 }
2534
2535 CInode *cur = cap->get_inode();
2536 if (!cur->is_auth()) {
2537 request_inode_file_caps(cur);
2538 return;
2539 }
2540
2541 if (cap->wanted() == 0) {
2542 if (cur->item_open_file.is_on_list() &&
2543 !cur->is_any_caps_wanted()) {
2544 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2545 cur->item_open_file.remove_myself();
2546 }
2547 } else {
2548 if (cur->state_test(CInode::STATE_RECOVERING) &&
2549 (cap->wanted() & (CEPH_CAP_FILE_RD |
2550 CEPH_CAP_FILE_WR))) {
2551 mds->mdcache->recovery_queue.prioritize(cur);
2552 }
2553
2554 if (!cur->item_open_file.is_on_list()) {
2555 dout(10) << " adding to open file list " << *cur << dendl;
2556 assert(cur->last == CEPH_NOSNAP);
2557 LogSegment *ls = mds->mdlog->get_current_segment();
2558 EOpen *le = new EOpen(mds->mdlog);
2559 mds->mdlog->start_entry(le);
2560 le->add_clean_inode(cur);
2561 ls->open_files.push_back(&cur->item_open_file);
2562 mds->mdlog->submit_entry(le);
2563 }
2564 }
2565}
2566
2567
2568
c07f9fc5 2569void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
7c673cae
FG
2570{
2571 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
c07f9fc5
FG
2572 for (auto p = head_in->client_need_snapflush.begin();
2573 p != head_in->client_need_snapflush.end() && p->first < last; ) {
7c673cae 2574 snapid_t snapid = p->first;
94b18763 2575 auto &clients = p->second;
7c673cae
FG
2576 ++p; // be careful, q loop below depends on this
2577
2578 if (clients.count(client)) {
2579 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
b32b8144
FG
2580 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2581 assert(sin);
2582 assert(sin->first <= snapid);
7c673cae
FG
2583 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2584 head_in->remove_need_snapflush(sin, snapid, client);
2585 }
2586 }
2587}
2588
2589
2590bool Locker::should_defer_client_cap_frozen(CInode *in)
2591{
2592 /*
2593 * This policy needs to be AT LEAST as permissive as allowing a client request
2594 * to go forward, or else a client request can release something, the release
2595 * gets deferred, but the request gets processed and deadlocks because when the
2596 * caps can't get revoked.
2597 *
2598 * Currently, a request wait if anything locked is freezing (can't
2599 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2600 * _MUST_ be in the lock/auth_pin set.
2601 *
2602 * auth_pins==0 implies no unstable lock and not auth pinnned by
2603 * client request, otherwise continue even it's freezing.
2604 */
2605 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2606}
2607
2608/*
2609 * This function DOES put the passed message before returning
2610 */
2611void Locker::handle_client_caps(MClientCaps *m)
2612{
7c673cae 2613 client_t client = m->get_source().num();
7c673cae
FG
2614 snapid_t follows = m->get_snap_follows();
2615 dout(7) << "handle_client_caps "
2616 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2617 << " on " << m->get_ino()
2618 << " tid " << m->get_client_tid() << " follows " << follows
2619 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2620
94b18763 2621 Session *session = mds->get_session(m);
7c673cae
FG
2622 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2623 if (!session) {
2624 dout(5) << " no session, dropping " << *m << dendl;
2625 m->put();
2626 return;
2627 }
2628 if (session->is_closed() ||
2629 session->is_closing() ||
2630 session->is_killing()) {
2631 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2632 m->put();
2633 return;
2634 }
2635 if (mds->is_reconnect() &&
2636 m->get_dirty() && m->get_client_tid() > 0 &&
2637 !session->have_completed_flush(m->get_client_tid())) {
2638 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2639 }
2640 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2641 return;
2642 }
2643
2644 if (m->get_client_tid() > 0 && session &&
2645 session->have_completed_flush(m->get_client_tid())) {
2646 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2647 << " for client." << client << dendl;
2648 MClientCaps *ack;
2649 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2650 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2651 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2652 } else {
2653 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2654 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2655 mds->get_osd_epoch_barrier());
2656 }
2657 ack->set_snap_follows(follows);
2658 ack->set_client_tid(m->get_client_tid());
2659 mds->send_message_client_counted(ack, m->get_connection());
2660 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2661 m->put();
2662 return;
2663 } else {
2664 // fall-thru because the message may release some caps
2665 m->clear_dirty();
2666 m->set_op(CEPH_CAP_OP_UPDATE);
2667 }
2668 }
2669
2670 // "oldest flush tid" > 0 means client uses unique TID for each flush
2671 if (m->get_oldest_flush_tid() > 0 && session) {
2672 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2673 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2674
2675 if (session->get_num_trim_flushes_warnings() > 0 &&
2676 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2677 session->reset_num_trim_flushes_warnings();
2678 } else {
2679 if (session->get_num_completed_flushes() >=
2680 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2681 session->inc_num_trim_flushes_warnings();
2682 stringstream ss;
2683 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2684 << m->get_oldest_flush_tid() << "), "
2685 << session->get_num_completed_flushes()
2686 << " completed flushes recorded in session";
2687 mds->clog->warn() << ss.str();
2688 dout(20) << __func__ << " " << ss.str() << dendl;
2689 }
2690 }
2691 }
2692
2693 CInode *head_in = mdcache->get_inode(m->get_ino());
2694 if (!head_in) {
2695 if (mds->is_clientreplay()) {
2696 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2697 << ", will try again after replayed client requests" << dendl;
2698 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2699 return;
2700 }
2701 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2702 m->put();
2703 return;
2704 }
2705
2706 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2707 // Pause RADOS operations until we see the required epoch
2708 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2709 }
2710
2711 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2712 // Record the barrier so that we will retransmit it to clients
2713 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2714 }
2715
b32b8144 2716 dout(10) << " head inode " << *head_in << dendl;
7c673cae
FG
2717
2718 Capability *cap = 0;
b32b8144 2719 cap = head_in->get_client_cap(client);
7c673cae 2720 if (!cap) {
b32b8144 2721 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
7c673cae
FG
2722 m->put();
2723 return;
2724 }
2725 assert(cap);
2726
2727 // freezing|frozen?
b32b8144
FG
2728 if (should_defer_client_cap_frozen(head_in)) {
2729 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2730 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
7c673cae
FG
2731 return;
2732 }
2733 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2734 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2735 << ", dropping" << dendl;
2736 m->put();
2737 return;
2738 }
2739
2740 int op = m->get_op();
2741
2742 // flushsnap?
2743 if (op == CEPH_CAP_OP_FLUSHSNAP) {
b32b8144
FG
2744 if (!head_in->is_auth()) {
2745 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
7c673cae
FG
2746 goto out;
2747 }
2748
b32b8144 2749 SnapRealm *realm = head_in->find_snaprealm();
7c673cae
FG
2750 snapid_t snap = realm->get_snap_following(follows);
2751 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2752
b32b8144
FG
2753 CInode *in = head_in;
2754 if (snap != CEPH_NOSNAP) {
2755 in = mdcache->pick_inode_snap(head_in, snap - 1);
2756 if (in != head_in)
2757 dout(10) << " snapped inode " << *in << dendl;
2758 }
2759
7c673cae
FG
2760 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2761 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2762 // case we get a dup response, so whatever.)
2763 MClientCaps *ack = 0;
2764 if (m->get_dirty()) {
2765 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2766 ack->set_snap_follows(follows);
2767 ack->set_client_tid(m->get_client_tid());
2768 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2769 }
2770
2771 if (in == head_in ||
2772 (head_in->client_need_snapflush.count(snap) &&
2773 head_in->client_need_snapflush[snap].count(client))) {
2774 dout(7) << " flushsnap snap " << snap
2775 << " client." << client << " on " << *in << dendl;
2776
2777 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2778 if (in == head_in)
2779 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
c07f9fc5
FG
2780 else if (head_in->client_need_snapflush.begin()->first < snap)
2781 _do_null_snapflush(head_in, client, snap);
7c673cae
FG
2782
2783 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2784
2785 if (in != head_in)
2786 head_in->remove_need_snapflush(in, snap, client);
7c673cae
FG
2787 } else {
2788 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2789 if (ack)
2790 mds->send_message_client_counted(ack, m->get_connection());
2791 }
2792 goto out;
2793 }
2794
2795 if (cap->get_cap_id() != m->get_cap_id()) {
2796 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2797 } else {
b32b8144
FG
2798 CInode *in = head_in;
2799 if (follows > 0) {
2800 in = mdcache->pick_inode_snap(head_in, follows);
2801 // intermediate snap inodes
2802 while (in != head_in) {
2803 assert(in->last != CEPH_NOSNAP);
2804 if (in->is_auth() && m->get_dirty()) {
2805 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2806 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2807 }
2808 in = mdcache->pick_inode_snap(head_in, in->last);
7c673cae 2809 }
7c673cae
FG
2810 }
2811
2812 // head inode, and cap
2813 MClientCaps *ack = 0;
2814
2815 int caps = m->get_caps();
2816 if (caps & ~cap->issued()) {
2817 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2818 caps &= cap->issued();
2819 }
2820
2821 cap->confirm_receipt(m->get_seq(), caps);
2822 dout(10) << " follows " << follows
2823 << " retains " << ccap_string(m->get_caps())
2824 << " dirty " << ccap_string(m->get_dirty())
2825 << " on " << *in << dendl;
2826
2827
2828 // missing/skipped snapflush?
2829 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2830 // presently only does so when it has actual dirty metadata. But, we
2831 // set up the need_snapflush stuff based on the issued caps.
2832 // We can infer that the client WONT send a FLUSHSNAP once they have
2833 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2834 // update/release).
2835 if (!head_in->client_need_snapflush.empty()) {
2836 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2837 _do_null_snapflush(head_in, client);
2838 } else {
2839 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2840 }
2841 }
2842
2843 if (m->get_dirty() && in->is_auth()) {
2844 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2845 << " seq " << m->get_seq() << " on " << *in << dendl;
2846 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2847 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2848 ack->set_client_tid(m->get_client_tid());
2849 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2850 }
2851
2852 // filter wanted based on what we could ever give out (given auth/replica status)
2853 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2854 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2855 if (new_wanted != cap->wanted()) {
2856 if (!need_flush && (new_wanted & ~cap->pending())) {
2857 // exapnding caps. make sure we aren't waiting for a log flush
2858 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2859 }
2860
2861 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2862 }
2863
2864 if (in->is_auth() &&
2865 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2866 // updated
2867 eval(in, CEPH_CAP_LOCKS);
2868
2869 if (!need_flush && (cap->wanted() & ~cap->pending()))
2870 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2871 } else {
2872 // no update, ack now.
2873 if (ack)
2874 mds->send_message_client_counted(ack, m->get_connection());
2875
2876 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2877 if (!did_issue && (cap->wanted() & ~cap->pending()))
2878 issue_caps(in, cap);
2879
2880 if (cap->get_last_seq() == 0 &&
2881 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2882 cap->issue_norevoke(cap->issued());
2883 share_inode_max_size(in, cap);
2884 }
2885 }
2886
2887 if (need_flush)
2888 mds->mdlog->flush();
2889 }
2890
2891 out:
2892 m->put();
2893}
2894
2895
2896class C_Locker_RetryRequestCapRelease : public LockerContext {
2897 client_t client;
2898 ceph_mds_request_release item;
2899public:
2900 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2901 LockerContext(l), client(c), item(it) { }
2902 void finish(int r) override {
2903 string dname;
2904 MDRequestRef null_ref;
2905 locker->process_request_cap_release(null_ref, client, item, dname);
2906 }
2907};
2908
2909void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
94b18763 2910 boost::string_view dname)
7c673cae
FG
2911{
2912 inodeno_t ino = (uint64_t)item.ino;
2913 uint64_t cap_id = item.cap_id;
2914 int caps = item.caps;
2915 int wanted = item.wanted;
2916 int seq = item.seq;
2917 int issue_seq = item.issue_seq;
2918 int mseq = item.mseq;
2919
2920 CInode *in = mdcache->get_inode(ino);
2921 if (!in)
2922 return;
2923
2924 if (dname.length()) {
2925 frag_t fg = in->pick_dirfrag(dname);
2926 CDir *dir = in->get_dirfrag(fg);
2927 if (dir) {
2928 CDentry *dn = dir->lookup(dname);
2929 if (dn) {
2930 ClientLease *l = dn->get_client_lease(client);
2931 if (l) {
2932 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2933 dn->remove_client_lease(l, this);
2934 } else {
2935 dout(7) << "process_cap_release client." << client
2936 << " doesn't have lease on " << *dn << dendl;
2937 }
2938 } else {
2939 dout(7) << "process_cap_release client." << client << " released lease on dn "
2940 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2941 }
2942 }
2943 }
2944
2945 Capability *cap = in->get_client_cap(client);
2946 if (!cap)
2947 return;
2948
2949 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2950 << (mdr ? "" : " (DEFERRED, no mdr)")
2951 << dendl;
2952
2953 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2954 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2955 return;
2956 }
2957
2958 if (cap->get_cap_id() != cap_id) {
2959 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2960 return;
2961 }
2962
2963 if (should_defer_client_cap_frozen(in)) {
2964 dout(7) << " frozen, deferring" << dendl;
2965 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2966 return;
2967 }
2968
2969 if (caps & ~cap->issued()) {
2970 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2971 caps &= cap->issued();
2972 }
2973 cap->confirm_receipt(seq, caps);
2974
2975 if (!in->client_need_snapflush.empty() &&
2976 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2977 _do_null_snapflush(in, client);
2978 }
2979
2980 adjust_cap_wanted(cap, wanted, issue_seq);
2981
2982 if (mdr)
2983 cap->inc_suppress();
2984 eval(in, CEPH_CAP_LOCKS);
2985 if (mdr)
2986 cap->dec_suppress();
2987
2988 // take note; we may need to reissue on this cap later
2989 if (mdr)
2990 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2991}
2992
2993class C_Locker_RetryKickIssueCaps : public LockerContext {
2994 CInode *in;
2995 client_t client;
2996 ceph_seq_t seq;
2997public:
2998 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2999 LockerContext(l), in(i), client(c), seq(s) {
3000 in->get(CInode::PIN_PTRWAITER);
3001 }
3002 void finish(int r) override {
3003 locker->kick_issue_caps(in, client, seq);
3004 in->put(CInode::PIN_PTRWAITER);
3005 }
3006};
3007
3008void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3009{
3010 Capability *cap = in->get_client_cap(client);
3011 if (!cap || cap->get_last_sent() != seq)
3012 return;
3013 if (in->is_frozen()) {
3014 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3015 in->add_waiter(CInode::WAIT_UNFREEZE,
3016 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3017 return;
3018 }
3019 dout(10) << "kick_issue_caps released at current seq " << seq
3020 << ", reissuing" << dendl;
3021 issue_caps(in, cap);
3022}
3023
3024void Locker::kick_cap_releases(MDRequestRef& mdr)
3025{
3026 client_t client = mdr->get_client();
3027 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3028 p != mdr->cap_releases.end();
3029 ++p) {
3030 CInode *in = mdcache->get_inode(p->first);
3031 if (!in)
3032 continue;
3033 kick_issue_caps(in, client, p->second);
3034 }
3035}
3036
7c673cae
FG
3037/**
3038 * m and ack might be NULL, so don't dereference them unless dirty != 0
3039 */
3040void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3041{
3042 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3043 << " follows " << follows << " snap " << snap
3044 << " on " << *in << dendl;
3045
3046 if (snap == CEPH_NOSNAP) {
3047 // hmm, i guess snap was already deleted? just ack!
3048 dout(10) << " wow, the snap following " << follows
3049 << " was already deleted. nothing to record, just ack." << dendl;
3050 if (ack)
3051 mds->send_message_client_counted(ack, m->get_connection());
3052 return;
3053 }
3054
3055 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3056 mds->mdlog->start_entry(le);
3057 MutationRef mut = new MutationImpl();
3058 mut->ls = mds->mdlog->get_current_segment();
3059
3060 // normal metadata updates that we can apply to the head as well.
3061
3062 // update xattrs?
94b18763
FG
3063 CInode::mempool_xattr_map *px = nullptr;
3064 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3065 m->xattrbl.length() &&
3066 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3067
3068 CInode::mempool_old_inode *oi = 0;
7c673cae
FG
3069 if (in->is_multiversion()) {
3070 oi = in->pick_old_inode(snap);
3071 }
3072
94b18763 3073 CInode::mempool_inode *i;
7c673cae
FG
3074 if (oi) {
3075 dout(10) << " writing into old inode" << dendl;
94b18763
FG
3076 auto &pi = in->project_inode();
3077 pi.inode.version = in->pre_dirty();
7c673cae
FG
3078 if (snap > oi->first)
3079 in->split_old_inode(snap);
94b18763 3080 i = &oi->inode;
7c673cae
FG
3081 if (xattrs)
3082 px = &oi->xattrs;
3083 } else {
94b18763
FG
3084 auto &pi = in->project_inode(xattrs);
3085 pi.inode.version = in->pre_dirty();
3086 i = &pi.inode;
7c673cae 3087 if (xattrs)
94b18763 3088 px = pi.xattrs.get();
7c673cae
FG
3089 }
3090
94b18763 3091 _update_cap_fields(in, dirty, m, i);
7c673cae
FG
3092
3093 // xattr
94b18763
FG
3094 if (xattrs) {
3095 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
7c673cae 3096 << " len " << m->xattrbl.length() << dendl;
94b18763 3097 i->xattr_version = m->head.xattr_version;
7c673cae
FG
3098 bufferlist::iterator p = m->xattrbl.begin();
3099 ::decode(*px, p);
3100 }
3101
94b18763
FG
3102 {
3103 auto it = i->client_ranges.find(client);
3104 if (it != i->client_ranges.end()) {
3105 if (in->last == snap) {
3106 dout(10) << " removing client_range entirely" << dendl;
3107 i->client_ranges.erase(it);
3108 } else {
3109 dout(10) << " client_range now follows " << snap << dendl;
3110 it->second.follows = snap;
3111 }
7c673cae
FG
3112 }
3113 }
3114
3115 mut->auth_pin(in);
3116 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3117 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3118
3119 // "oldest flush tid" > 0 means client uses unique TID for each flush
3120 if (ack && ack->get_oldest_flush_tid() > 0)
3121 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3122 ack->get_oldest_flush_tid());
3123
3124 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3125 client, ack));
3126}
3127
94b18763 3128void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
7c673cae
FG
3129{
3130 if (dirty == 0)
3131 return;
3132
3133 /* m must be valid if there are dirty caps */
3134 assert(m);
3135 uint64_t features = m->get_connection()->get_features();
3136
3137 if (m->get_ctime() > pi->ctime) {
3138 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3139 << " for " << *in << dendl;
28e407b8 3140 pi->ctime = pi->rstat.rctime = m->get_ctime();
7c673cae
FG
3141 }
3142
3143 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3144 m->get_change_attr() > pi->change_attr) {
3145 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3146 << " for " << *in << dendl;
3147 pi->change_attr = m->get_change_attr();
3148 }
3149
3150 // file
3151 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3152 utime_t atime = m->get_atime();
3153 utime_t mtime = m->get_mtime();
3154 uint64_t size = m->get_size();
3155 version_t inline_version = m->inline_version;
3156
3157 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3158 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3159 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3160 << " for " << *in << dendl;
3161 pi->mtime = mtime;
3162 }
3163 if (in->inode.is_file() && // ONLY if regular file
3164 size > pi->size) {
3165 dout(7) << " size " << pi->size << " -> " << size
3166 << " for " << *in << dendl;
3167 pi->size = size;
3168 pi->rstat.rbytes = size;
3169 }
3170 if (in->inode.is_file() &&
3171 (dirty & CEPH_CAP_FILE_WR) &&
3172 inline_version > pi->inline_data.version) {
3173 pi->inline_data.version = inline_version;
3174 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3175 pi->inline_data.get_data() = m->inline_data;
3176 else
3177 pi->inline_data.free_data();
3178 }
3179 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3180 dout(7) << " atime " << pi->atime << " -> " << atime
3181 << " for " << *in << dendl;
3182 pi->atime = atime;
3183 }
3184 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3185 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3186 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3187 << " for " << *in << dendl;
3188 pi->time_warp_seq = m->get_time_warp_seq();
3189 }
3190 }
3191 // auth
3192 if (dirty & CEPH_CAP_AUTH_EXCL) {
3193 if (m->head.uid != pi->uid) {
3194 dout(7) << " uid " << pi->uid
3195 << " -> " << m->head.uid
3196 << " for " << *in << dendl;
3197 pi->uid = m->head.uid;
3198 }
3199 if (m->head.gid != pi->gid) {
3200 dout(7) << " gid " << pi->gid
3201 << " -> " << m->head.gid
3202 << " for " << *in << dendl;
3203 pi->gid = m->head.gid;
3204 }
3205 if (m->head.mode != pi->mode) {
3206 dout(7) << " mode " << oct << pi->mode
3207 << " -> " << m->head.mode << dec
3208 << " for " << *in << dendl;
3209 pi->mode = m->head.mode;
3210 }
3211 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3212 dout(7) << " btime " << oct << pi->btime
3213 << " -> " << m->get_btime() << dec
3214 << " for " << *in << dendl;
3215 pi->btime = m->get_btime();
3216 }
3217 }
3218}
3219
3220/*
3221 * update inode based on cap flush|flushsnap|wanted.
3222 * adjust max_size, if needed.
3223 * if we update, return true; otherwise, false (no updated needed).
3224 */
3225bool Locker::_do_cap_update(CInode *in, Capability *cap,
3226 int dirty, snapid_t follows,
3227 MClientCaps *m, MClientCaps *ack,
3228 bool *need_flush)
3229{
3230 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3231 << " issued " << ccap_string(cap ? cap->issued() : 0)
3232 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3233 << " on " << *in << dendl;
3234 assert(in->is_auth());
3235 client_t client = m->get_source().num();
94b18763 3236 CInode::mempool_inode *latest = in->get_projected_inode();
7c673cae
FG
3237
3238 // increase or zero max_size?
3239 uint64_t size = m->get_size();
3240 bool change_max = false;
3241 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3242 uint64_t new_max = old_max;
3243
3244 if (in->is_file()) {
3245 bool forced_change_max = false;
3246 dout(20) << "inode is file" << dendl;
3247 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3248 dout(20) << "client has write caps; m->get_max_size="
3249 << m->get_max_size() << "; old_max=" << old_max << dendl;
3250 if (m->get_max_size() > new_max) {
3251 dout(10) << "client requests file_max " << m->get_max_size()
3252 << " > max " << old_max << dendl;
3253 change_max = true;
3254 forced_change_max = true;
31f18b77 3255 new_max = calc_new_max_size(latest, m->get_max_size());
7c673cae 3256 } else {
31f18b77 3257 new_max = calc_new_max_size(latest, size);
7c673cae
FG
3258
3259 if (new_max > old_max)
3260 change_max = true;
3261 else
3262 new_max = old_max;
3263 }
3264 } else {
3265 if (old_max) {
3266 change_max = true;
3267 new_max = 0;
3268 }
3269 }
3270
3271 if (in->last == CEPH_NOSNAP &&
3272 change_max &&
3273 !in->filelock.can_wrlock(client) &&
3274 !in->filelock.can_force_wrlock(client)) {
3275 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3276 if (in->filelock.is_stable()) {
3277 bool need_issue = false;
3278 if (cap)
3279 cap->inc_suppress();
3280 if (in->mds_caps_wanted.empty() &&
3281 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3282 if (in->filelock.get_state() != LOCK_EXCL)
3283 file_excl(&in->filelock, &need_issue);
3284 } else
3285 simple_lock(&in->filelock, &need_issue);
3286 if (need_issue)
3287 issue_caps(in);
3288 if (cap)
3289 cap->dec_suppress();
3290 }
3291 if (!in->filelock.can_wrlock(client) &&
3292 !in->filelock.can_force_wrlock(client)) {
3293 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3294 forced_change_max ? new_max : 0,
3295 0, utime_t());
3296
3297 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3298 change_max = false;
3299 }
3300 }
3301 }
3302
3303 if (m->flockbl.length()) {
3304 int32_t num_locks;
3305 bufferlist::iterator bli = m->flockbl.begin();
3306 ::decode(num_locks, bli);
3307 for ( int i=0; i < num_locks; ++i) {
3308 ceph_filelock decoded_lock;
3309 ::decode(decoded_lock, bli);
3310 in->get_fcntl_lock_state()->held_locks.
3311 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3312 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3313 }
3314 ::decode(num_locks, bli);
3315 for ( int i=0; i < num_locks; ++i) {
3316 ceph_filelock decoded_lock;
3317 ::decode(decoded_lock, bli);
3318 in->get_flock_lock_state()->held_locks.
3319 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3320 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3321 }
3322 }
3323
3324 if (!dirty && !change_max)
3325 return false;
3326
94b18763 3327 Session *session = mds->get_session(m);
7c673cae
FG
3328 if (session->check_access(in, MAY_WRITE,
3329 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
7c673cae
FG
3330 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3331 return false;
3332 }
7c673cae
FG
3333
3334 // do the update.
3335 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3336 mds->mdlog->start_entry(le);
3337
94b18763
FG
3338 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3339 m->xattrbl.length() &&
3340 m->head.xattr_version > in->get_projected_inode()->xattr_version;
7c673cae 3341
94b18763
FG
3342 auto &pi = in->project_inode(xattr);
3343 pi.inode.version = in->pre_dirty();
7c673cae
FG
3344
3345 MutationRef mut(new MutationImpl());
3346 mut->ls = mds->mdlog->get_current_segment();
3347
94b18763 3348 _update_cap_fields(in, dirty, m, &pi.inode);
7c673cae
FG
3349
3350 if (change_max) {
3351 dout(7) << " max_size " << old_max << " -> " << new_max
3352 << " for " << *in << dendl;
3353 if (new_max) {
94b18763
FG
3354 auto &cr = pi.inode.client_ranges[client];
3355 cr.range.first = 0;
3356 cr.range.last = new_max;
3357 cr.follows = in->first - 1;
7c673cae 3358 } else
94b18763 3359 pi.inode.client_ranges.erase(client);
7c673cae
FG
3360 }
3361
3362 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3363 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3364
3365 // auth
3366 if (dirty & CEPH_CAP_AUTH_EXCL)
3367 wrlock_force(&in->authlock, mut);
3368
94b18763
FG
3369 // xattrs update?
3370 if (xattr) {
3371 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3372 pi.inode.xattr_version = m->head.xattr_version;
7c673cae 3373 bufferlist::iterator p = m->xattrbl.begin();
94b18763 3374 ::decode(*pi.xattrs, p);
7c673cae
FG
3375 wrlock_force(&in->xattrlock, mut);
3376 }
3377
3378 mut->auth_pin(in);
3379 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3380 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3381
3382 // "oldest flush tid" > 0 means client uses unique TID for each flush
3383 if (ack && ack->get_oldest_flush_tid() > 0)
3384 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3385 ack->get_oldest_flush_tid());
3386
3387 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3388 change_max, !!cap,
3389 client, ack));
3390 if (need_flush && !*need_flush &&
3391 ((change_max && new_max) || // max INCREASE
3392 _need_flush_mdlog(in, dirty)))
3393 *need_flush = true;
3394
3395 return true;
3396}
3397
3398/* This function DOES put the passed message before returning */
3399void Locker::handle_client_cap_release(MClientCapRelease *m)
3400{
3401 client_t client = m->get_source().num();
3402 dout(10) << "handle_client_cap_release " << *m << dendl;
3403
3404 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3405 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3406 return;
3407 }
3408
3409 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3410 // Pause RADOS operations until we see the required epoch
3411 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3412 }
3413
3414 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3415 // Record the barrier so that we will retransmit it to clients
3416 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3417 }
3418
94b18763 3419 Session *session = mds->get_session(m);
7c673cae
FG
3420
3421 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3422 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3423 }
3424
3425 if (session) {
3426 session->notify_cap_release(m->caps.size());
3427 }
3428
3429 m->put();
3430}
3431
3432class C_Locker_RetryCapRelease : public LockerContext {
3433 client_t client;
3434 inodeno_t ino;
3435 uint64_t cap_id;
3436 ceph_seq_t migrate_seq;
3437 ceph_seq_t issue_seq;
3438public:
3439 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3440 ceph_seq_t mseq, ceph_seq_t seq) :
3441 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3442 void finish(int r) override {
3443 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3444 }
3445};
3446
3447void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3448 ceph_seq_t mseq, ceph_seq_t seq)
3449{
3450 CInode *in = mdcache->get_inode(ino);
3451 if (!in) {
3452 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3453 return;
3454 }
3455 Capability *cap = in->get_client_cap(client);
3456 if (!cap) {
3457 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3458 return;
3459 }
3460
3461 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3462 if (cap->get_cap_id() != cap_id) {
3463 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3464 return;
3465 }
3466 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3467 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3468 return;
3469 }
3470 if (should_defer_client_cap_frozen(in)) {
3471 dout(7) << " freezing|frozen, deferring" << dendl;
3472 in->add_waiter(CInode::WAIT_UNFREEZE,
3473 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3474 return;
3475 }
3476 if (seq != cap->get_last_issue()) {
3477 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3478 // clean out any old revoke history
3479 cap->clean_revoke_from(seq);
3480 eval_cap_gather(in);
3481 return;
3482 }
3483 remove_client_cap(in, client);
3484}
3485
3486/* This function DOES put the passed message before returning */
3487
3488void Locker::remove_client_cap(CInode *in, client_t client)
3489{
3490 // clean out any pending snapflush state
3491 if (!in->client_need_snapflush.empty())
3492 _do_null_snapflush(in, client);
3493
3494 in->remove_client_cap(client);
3495
3496 if (in->is_auth()) {
3497 // make sure we clear out the client byte range
3498 if (in->get_projected_inode()->client_ranges.count(client) &&
3499 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3500 check_inode_max_size(in);
3501 } else {
3502 request_inode_file_caps(in);
3503 }
3504
3505 try_eval(in, CEPH_CAP_LOCKS);
3506}
3507
3508
3509/**
3510 * Return true if any currently revoking caps exceed the
b32b8144 3511 * mds_session_timeout threshold.
7c673cae
FG
3512 */
3513bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3514{
3515 xlist<Capability*>::const_iterator p = revoking.begin();
3516 if (p.end()) {
3517 // No revoking caps at the moment
3518 return false;
3519 } else {
3520 utime_t now = ceph_clock_now();
3521 utime_t age = now - (*p)->get_last_revoke_stamp();
b32b8144 3522 if (age <= g_conf->mds_session_timeout) {
7c673cae
FG
3523 return false;
3524 } else {
3525 return true;
3526 }
3527 }
3528}
3529
3530
3531void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3532{
3533 if (!any_late_revoking_caps(revoking_caps)) {
3534 // Fast path: no misbehaving clients, execute in O(1)
3535 return;
3536 }
3537
3538 // Slow path: execute in O(N_clients)
3539 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3540 for (client_rc_iter = revoking_caps_by_client.begin();
3541 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3542 xlist<Capability*> const &client_rc = client_rc_iter->second;
3543 bool any_late = any_late_revoking_caps(client_rc);
3544 if (any_late) {
3545 result->push_back(client_rc_iter->first);
3546 }
3547 }
3548}
3549
3550// Hard-code instead of surfacing a config settings because this is
3551// really a hack that should go away at some point when we have better
3552// inspection tools for getting at detailed cap state (#7316)
3553#define MAX_WARN_CAPS 100
3554
3555void Locker::caps_tick()
3556{
3557 utime_t now = ceph_clock_now();
3558
3559 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3560
3561 int i = 0;
3562 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3563 Capability *cap = *p;
3564
3565 utime_t age = now - cap->get_last_revoke_stamp();
3566 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
b32b8144
FG
3567 if (age <= g_conf->mds_session_timeout) {
3568 dout(20) << __func__ << " age below timeout " << g_conf->mds_session_timeout << dendl;
7c673cae
FG
3569 break;
3570 } else {
3571 ++i;
3572 if (i > MAX_WARN_CAPS) {
3573 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3574 << "revoking, ignoring subsequent caps" << dendl;
3575 break;
3576 }
3577 }
3578 // exponential backoff of warning intervals
b32b8144 3579 if (age > g_conf->mds_session_timeout * (1 << cap->get_num_revoke_warnings())) {
7c673cae
FG
3580 cap->inc_num_revoke_warnings();
3581 stringstream ss;
3582 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3583 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3584 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3585 mds->clog->warn() << ss.str();
3586 dout(20) << __func__ << " " << ss.str() << dendl;
3587 } else {
3588 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3589 }
3590 }
3591}
3592
3593
3594void Locker::handle_client_lease(MClientLease *m)
3595{
3596 dout(10) << "handle_client_lease " << *m << dendl;
3597
3598 assert(m->get_source().is_client());
3599 client_t client = m->get_source().num();
3600
3601 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3602 if (!in) {
3603 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3604 m->put();
3605 return;
3606 }
3607 CDentry *dn = 0;
3608
3609 frag_t fg = in->pick_dirfrag(m->dname);
3610 CDir *dir = in->get_dirfrag(fg);
3611 if (dir)
3612 dn = dir->lookup(m->dname);
3613 if (!dn) {
3614 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3615 m->put();
3616 return;
3617 }
3618 dout(10) << " on " << *dn << dendl;
3619
3620 // replica and lock
3621 ClientLease *l = dn->get_client_lease(client);
3622 if (!l) {
3623 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3624 m->put();
3625 return;
3626 }
3627
3628 switch (m->get_action()) {
3629 case CEPH_MDS_LEASE_REVOKE_ACK:
3630 case CEPH_MDS_LEASE_RELEASE:
3631 if (l->seq != m->get_seq()) {
3632 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3633 } else {
3634 dout(7) << "handle_client_lease client." << client
3635 << " on " << *dn << dendl;
3636 dn->remove_client_lease(l, this);
3637 }
3638 m->put();
3639 break;
3640
3641 case CEPH_MDS_LEASE_RENEW:
3642 {
3643 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3644 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3645 if (dn->lock.can_lease(client)) {
3646 int pool = 1; // fixme.. do something smart!
3647 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3648 m->h.seq = ++l->seq;
3649 m->clear_payload();
3650
3651 utime_t now = ceph_clock_now();
3652 now += mdcache->client_lease_durations[pool];
3653 mdcache->touch_client_lease(l, pool, now);
3654
3655 mds->send_message_client_counted(m, m->get_connection());
3656 }
3657 }
3658 break;
3659
3660 default:
3661 ceph_abort(); // implement me
3662 break;
3663 }
3664}
3665
3666
3667void Locker::issue_client_lease(CDentry *dn, client_t client,
3668 bufferlist &bl, utime_t now, Session *session)
3669{
3670 CInode *diri = dn->get_dir()->get_inode();
3671 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3672 ((!diri->filelock.can_lease(client) &&
3673 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3674 dn->lock.can_lease(client)) {
3675 int pool = 1; // fixme.. do something smart!
3676 // issue a dentry lease
3677 ClientLease *l = dn->add_client_lease(client, session);
3678 session->touch_lease(l);
3679
3680 now += mdcache->client_lease_durations[pool];
3681 mdcache->touch_client_lease(l, pool, now);
3682
3683 LeaseStat e;
3684 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3685 e.seq = ++l->seq;
3686 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3687 ::encode(e, bl);
3688 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3689 << " on " << *dn << dendl;
3690 } else {
3691 // null lease
3692 LeaseStat e;
3693 e.mask = 0;
3694 e.seq = 0;
3695 e.duration_ms = 0;
3696 ::encode(e, bl);
3697 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3698 }
3699}
3700
3701
3702void Locker::revoke_client_leases(SimpleLock *lock)
3703{
3704 int n = 0;
3705 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3706 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3707 p != dn->client_lease_map.end();
3708 ++p) {
3709 ClientLease *l = p->second;
3710
3711 n++;
3712 assert(lock->get_type() == CEPH_LOCK_DN);
3713
3714 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3715 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3716
3717 // i should also revoke the dir ICONTENT lease, if they have it!
3718 CInode *diri = dn->get_dir()->get_inode();
3719 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3720 mask,
3721 diri->ino(),
3722 diri->first, CEPH_NOSNAP,
3723 dn->get_name()),
3724 l->client);
3725 }
7c673cae
FG
3726}
3727
3728
3729
3730// locks ----------------------------------------------------------------
3731
3732SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3733{
3734 switch (lock_type) {
3735 case CEPH_LOCK_DN:
3736 {
3737 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3738 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3739 frag_t fg;
3740 CDir *dir = 0;
3741 CDentry *dn = 0;
3742 if (diri) {
3743 fg = diri->pick_dirfrag(info.dname);
3744 dir = diri->get_dirfrag(fg);
3745 if (dir)
3746 dn = dir->lookup(info.dname, info.snapid);
3747 }
3748 if (!dn) {
3749 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3750 return 0;
3751 }
3752 return &dn->lock;
3753 }
3754
3755 case CEPH_LOCK_IAUTH:
3756 case CEPH_LOCK_ILINK:
3757 case CEPH_LOCK_IDFT:
3758 case CEPH_LOCK_IFILE:
3759 case CEPH_LOCK_INEST:
3760 case CEPH_LOCK_IXATTR:
3761 case CEPH_LOCK_ISNAP:
3762 case CEPH_LOCK_IFLOCK:
3763 case CEPH_LOCK_IPOLICY:
3764 {
3765 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3766 if (!in) {
3767 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3768 return 0;
3769 }
3770 switch (lock_type) {
3771 case CEPH_LOCK_IAUTH: return &in->authlock;
3772 case CEPH_LOCK_ILINK: return &in->linklock;
3773 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3774 case CEPH_LOCK_IFILE: return &in->filelock;
3775 case CEPH_LOCK_INEST: return &in->nestlock;
3776 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3777 case CEPH_LOCK_ISNAP: return &in->snaplock;
3778 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3779 case CEPH_LOCK_IPOLICY: return &in->policylock;
3780 }
3781 }
3782
3783 default:
3784 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3785 ceph_abort();
3786 break;
3787 }
3788
3789 return 0;
3790}
3791
3792/* This function DOES put the passed message before returning */
3793void Locker::handle_lock(MLock *m)
3794{
3795 // nobody should be talking to us during recovery.
3796 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3797
3798 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3799 if (!lock) {
3800 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3801 m->put();
3802 return;
3803 }
3804
3805 switch (lock->get_type()) {
3806 case CEPH_LOCK_DN:
3807 case CEPH_LOCK_IAUTH:
3808 case CEPH_LOCK_ILINK:
3809 case CEPH_LOCK_ISNAP:
3810 case CEPH_LOCK_IXATTR:
3811 case CEPH_LOCK_IFLOCK:
3812 case CEPH_LOCK_IPOLICY:
3813 handle_simple_lock(lock, m);
3814 break;
3815
3816 case CEPH_LOCK_IDFT:
3817 case CEPH_LOCK_INEST:
3818 //handle_scatter_lock((ScatterLock*)lock, m);
3819 //break;
3820
3821 case CEPH_LOCK_IFILE:
3822 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3823 break;
3824
3825 default:
3826 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3827 ceph_abort();
3828 break;
3829 }
3830}
3831
3832
3833
3834
3835
3836// ==========================================================================
3837// simple lock
3838
3839/** This function may take a reference to m if it needs one, but does
3840 * not put references. */
3841void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3842{
3843 MDSCacheObject *parent = lock->get_parent();
3844 if (parent->is_auth() &&
3845 lock->get_state() != LOCK_SYNC &&
3846 !parent->is_frozen()) {
3847 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3848 << " on " << *parent << dendl;
3849 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3850 if (lock->is_stable()) {
3851 simple_sync(lock);
3852 } else {
3853 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3854 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3855 new C_MDS_RetryMessage(mds, m->get()));
3856 }
3857 } else {
3858 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3859 << " on " << *parent << dendl;
3860 // replica should retry
3861 }
3862}
3863
3864/* This function DOES put the passed message before returning */
3865void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3866{
3867 int from = m->get_asker();
3868
3869 dout(10) << "handle_simple_lock " << *m
3870 << " on " << *lock << " " << *lock->get_parent() << dendl;
3871
3872 if (mds->is_rejoin()) {
3873 if (lock->get_parent()->is_rejoining()) {
3874 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3875 << ", dropping " << *m << dendl;
3876 m->put();
3877 return;
3878 }
3879 }
3880
3881 switch (m->get_action()) {
3882 // -- replica --
3883 case LOCK_AC_SYNC:
3884 assert(lock->get_state() == LOCK_LOCK);
3885 lock->decode_locked_state(m->get_data());
3886 lock->set_state(LOCK_SYNC);
3887 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3888 break;
3889
3890 case LOCK_AC_LOCK:
3891 assert(lock->get_state() == LOCK_SYNC);
3892 lock->set_state(LOCK_SYNC_LOCK);
3893 if (lock->is_leased())
3894 revoke_client_leases(lock);
3895 eval_gather(lock, true);
31f18b77
FG
3896 if (lock->is_unstable_and_locked())
3897 mds->mdlog->flush();
7c673cae
FG
3898 break;
3899
3900
3901 // -- auth --
3902 case LOCK_AC_LOCKACK:
3903 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3904 lock->get_state() == LOCK_SYNC_EXCL);
3905 assert(lock->is_gathering(from));
3906 lock->remove_gather(from);
3907
3908 if (lock->is_gathering()) {
3909 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3910 << ", still gathering " << lock->get_gather_set() << dendl;
3911 } else {
3912 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3913 << ", last one" << dendl;
3914 eval_gather(lock);
3915 }
3916 break;
3917
3918 case LOCK_AC_REQRDLOCK:
3919 handle_reqrdlock(lock, m);
3920 break;
3921
3922 }
3923
3924 m->put();
3925}
3926
3927/* unused, currently.
3928
3929class C_Locker_SimpleEval : public Context {
3930 Locker *locker;
3931 SimpleLock *lock;
3932public:
3933 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3934 void finish(int r) {
3935 locker->try_simple_eval(lock);
3936 }
3937};
3938
3939void Locker::try_simple_eval(SimpleLock *lock)
3940{
3941 // unstable and ambiguous auth?
3942 if (!lock->is_stable() &&
3943 lock->get_parent()->is_ambiguous_auth()) {
3944 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3945 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3946 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3947 return;
3948 }
3949
3950 if (!lock->get_parent()->is_auth()) {
3951 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3952 return;
3953 }
3954
3955 if (!lock->get_parent()->can_auth_pin()) {
3956 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3957 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3958 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3959 return;
3960 }
3961
3962 if (lock->is_stable())
3963 simple_eval(lock);
3964}
3965*/
3966
3967
3968void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3969{
3970 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3971
3972 assert(lock->get_parent()->is_auth());
3973 assert(lock->is_stable());
3974
3975 if (lock->get_parent()->is_freezing_or_frozen()) {
3976 // dentry lock in unreadable state can block path traverse
3977 if ((lock->get_type() != CEPH_LOCK_DN ||
3978 lock->get_state() == LOCK_SYNC ||
3979 lock->get_parent()->is_frozen()))
3980 return;
3981 }
3982
3983 if (mdcache->is_readonly()) {
3984 if (lock->get_state() != LOCK_SYNC) {
3985 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3986 simple_sync(lock, need_issue);
3987 }
3988 return;
3989 }
3990
3991 CInode *in = 0;
3992 int wanted = 0;
3993 if (lock->get_type() != CEPH_LOCK_DN) {
3994 in = static_cast<CInode*>(lock->get_parent());
3995 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3996 }
3997
3998 // -> excl?
3999 if (lock->get_state() != LOCK_EXCL &&
4000 in && in->get_target_loner() >= 0 &&
4001 (wanted & CEPH_CAP_GEXCL)) {
4002 dout(7) << "simple_eval stable, going to excl " << *lock
4003 << " on " << *lock->get_parent() << dendl;
4004 simple_excl(lock, need_issue);
4005 }
4006
4007 // stable -> sync?
4008 else if (lock->get_state() != LOCK_SYNC &&
4009 !lock->is_wrlocked() &&
4010 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4011 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4012 dout(7) << "simple_eval stable, syncing " << *lock
4013 << " on " << *lock->get_parent() << dendl;
4014 simple_sync(lock, need_issue);
4015 }
4016}
4017
4018
4019// mid
4020
4021bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4022{
4023 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4024 assert(lock->get_parent()->is_auth());
4025 assert(lock->is_stable());
4026
4027 CInode *in = 0;
4028 if (lock->get_cap_shift())
4029 in = static_cast<CInode *>(lock->get_parent());
4030
4031 int old_state = lock->get_state();
4032
4033 if (old_state != LOCK_TSYN) {
4034
4035 switch (lock->get_state()) {
4036 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4037 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4038 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4039 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4040 default: ceph_abort();
4041 }
4042
4043 int gather = 0;
4044 if (lock->is_wrlocked())
4045 gather++;
4046
4047 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4048 send_lock_message(lock, LOCK_AC_SYNC);
4049 lock->init_gather();
4050 gather++;
4051 }
4052
4053 if (in && in->is_head()) {
4054 if (in->issued_caps_need_gather(lock)) {
4055 if (need_issue)
4056 *need_issue = true;
4057 else
4058 issue_caps(in);
4059 gather++;
4060 }
4061 }
4062
4063 bool need_recover = false;
4064 if (lock->get_type() == CEPH_LOCK_IFILE) {
4065 assert(in);
4066 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4067 mds->mdcache->queue_file_recover(in);
4068 need_recover = true;
4069 gather++;
4070 }
4071 }
4072
4073 if (!gather && lock->is_dirty()) {
4074 lock->get_parent()->auth_pin(lock);
4075 scatter_writebehind(static_cast<ScatterLock*>(lock));
4076 mds->mdlog->flush();
4077 return false;
4078 }
4079
4080 if (gather) {
4081 lock->get_parent()->auth_pin(lock);
4082 if (need_recover)
4083 mds->mdcache->do_file_recover();
4084 return false;
4085 }
4086 }
4087
4088 if (lock->get_parent()->is_replicated()) { // FIXME
4089 bufferlist data;
4090 lock->encode_locked_state(data);
4091 send_lock_message(lock, LOCK_AC_SYNC, data);
4092 }
4093 lock->set_state(LOCK_SYNC);
4094 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4095 if (in && in->is_head()) {
4096 if (need_issue)
4097 *need_issue = true;
4098 else
4099 issue_caps(in);
4100 }
4101 return true;
4102}
4103
4104void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4105{
4106 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4107 assert(lock->get_parent()->is_auth());
4108 assert(lock->is_stable());
4109
4110 CInode *in = 0;
4111 if (lock->get_cap_shift())
4112 in = static_cast<CInode *>(lock->get_parent());
4113
4114 switch (lock->get_state()) {
4115 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4116 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4117 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4118 default: ceph_abort();
4119 }
4120
4121 int gather = 0;
4122 if (lock->is_rdlocked())
4123 gather++;
4124 if (lock->is_wrlocked())
4125 gather++;
4126
4127 if (lock->get_parent()->is_replicated() &&
4128 lock->get_state() != LOCK_LOCK_EXCL &&
4129 lock->get_state() != LOCK_XSYN_EXCL) {
4130 send_lock_message(lock, LOCK_AC_LOCK);
4131 lock->init_gather();
4132 gather++;
4133 }
4134
4135 if (in && in->is_head()) {
4136 if (in->issued_caps_need_gather(lock)) {
4137 if (need_issue)
4138 *need_issue = true;
4139 else
4140 issue_caps(in);
4141 gather++;
4142 }
4143 }
4144
4145 if (gather) {
4146 lock->get_parent()->auth_pin(lock);
4147 } else {
4148 lock->set_state(LOCK_EXCL);
4149 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4150 if (in) {
4151 if (need_issue)
4152 *need_issue = true;
4153 else
4154 issue_caps(in);
4155 }
4156 }
4157}
4158
4159void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4160{
4161 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4162 assert(lock->get_parent()->is_auth());
4163 assert(lock->is_stable());
4164 assert(lock->get_state() != LOCK_LOCK);
4165
4166 CInode *in = 0;
4167 if (lock->get_cap_shift())
4168 in = static_cast<CInode *>(lock->get_parent());
4169
4170 int old_state = lock->get_state();
4171
4172 switch (lock->get_state()) {
4173 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
b32b8144 4174 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
7c673cae
FG
4175 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4176 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4177 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4178 break;
4179 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4180 default: ceph_abort();
4181 }
4182
4183 int gather = 0;
4184 if (lock->is_leased()) {
4185 gather++;
4186 revoke_client_leases(lock);
4187 }
4188 if (lock->is_rdlocked())
4189 gather++;
4190 if (in && in->is_head()) {
4191 if (in->issued_caps_need_gather(lock)) {
4192 if (need_issue)
4193 *need_issue = true;
4194 else
4195 issue_caps(in);
4196 gather++;
4197 }
4198 }
4199
4200 bool need_recover = false;
4201 if (lock->get_type() == CEPH_LOCK_IFILE) {
4202 assert(in);
4203 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4204 mds->mdcache->queue_file_recover(in);
4205 need_recover = true;
4206 gather++;
4207 }
4208 }
4209
4210 if (lock->get_parent()->is_replicated() &&
4211 lock->get_state() == LOCK_MIX_LOCK &&
4212 gather) {
4213 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4214 } else {
4215 // move to second stage of gather now, so we don't send the lock action later.
4216 if (lock->get_state() == LOCK_MIX_LOCK)
4217 lock->set_state(LOCK_MIX_LOCK2);
4218
4219 if (lock->get_parent()->is_replicated() &&
4220 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4221 gather++;
4222 send_lock_message(lock, LOCK_AC_LOCK);
4223 lock->init_gather();
4224 }
4225 }
4226
4227 if (!gather && lock->is_dirty()) {
4228 lock->get_parent()->auth_pin(lock);
4229 scatter_writebehind(static_cast<ScatterLock*>(lock));
4230 mds->mdlog->flush();
4231 return;
4232 }
4233
4234 if (gather) {
4235 lock->get_parent()->auth_pin(lock);
4236 if (need_recover)
4237 mds->mdcache->do_file_recover();
4238 } else {
4239 lock->set_state(LOCK_LOCK);
4240 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4241 }
4242}
4243
4244
4245void Locker::simple_xlock(SimpleLock *lock)
4246{
4247 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4248 assert(lock->get_parent()->is_auth());
4249 //assert(lock->is_stable());
4250 assert(lock->get_state() != LOCK_XLOCK);
4251
4252 CInode *in = 0;
4253 if (lock->get_cap_shift())
4254 in = static_cast<CInode *>(lock->get_parent());
4255
4256 if (lock->is_stable())
4257 lock->get_parent()->auth_pin(lock);
4258
4259 switch (lock->get_state()) {
4260 case LOCK_LOCK:
4261 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4262 default: ceph_abort();
4263 }
4264
4265 int gather = 0;
4266 if (lock->is_rdlocked())
4267 gather++;
4268 if (lock->is_wrlocked())
4269 gather++;
4270
4271 if (in && in->is_head()) {
4272 if (in->issued_caps_need_gather(lock)) {
4273 issue_caps(in);
4274 gather++;
4275 }
4276 }
4277
4278 if (!gather) {
4279 lock->set_state(LOCK_PREXLOCK);
4280 //assert("shouldn't be called if we are already xlockable" == 0);
4281 }
4282}
4283
4284
4285
4286
4287
4288// ==========================================================================
4289// scatter lock
4290
4291/*
4292
4293Some notes on scatterlocks.
4294
4295 - The scatter/gather is driven by the inode lock. The scatter always
4296 brings in the latest metadata from the fragments.
4297
4298 - When in a scattered/MIX state, fragments are only allowed to
4299 update/be written to if the accounted stat matches the inode's
4300 current version.
4301
4302 - That means, on gather, we _only_ assimilate diffs for frag metadata
4303 that match the current version, because those are the only ones
4304 written during this scatter/gather cycle. (Others didn't permit
4305 it.) We increment the version and journal this to disk.
4306
4307 - When possible, we also simultaneously update our local frag
4308 accounted stats to match.
4309
4310 - On scatter, the new inode info is broadcast to frags, both local
4311 and remote. If possible (auth and !frozen), the dirfrag auth
4312 should update the accounted state (if it isn't already up to date).
4313 Note that this may occur on both the local inode auth node and
4314 inode replicas, so there are two potential paths. If it is NOT
4315 possible, they need to mark_stale to prevent any possible writes.
4316
4317 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4318 only). Both are opportunities to update the frag accounted stats,
4319 even though only the MIX case is affected by a stale dirfrag.
4320
4321 - Because many scatter/gather cycles can potentially go by without a
4322 frag being able to update its accounted stats (due to being frozen
4323 by exports/refragments in progress), the frag may have (even very)
4324 old stat versions. That's fine. If when we do want to update it,
4325 we can update accounted_* and the version first.
4326
4327*/
4328
4329class C_Locker_ScatterWB : public LockerLogContext {
4330 ScatterLock *lock;
4331 MutationRef mut;
4332public:
4333 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4334 LockerLogContext(l), lock(sl), mut(m) {}
4335 void finish(int r) override {
4336 locker->scatter_writebehind_finish(lock, mut);
4337 }
4338};
4339
4340void Locker::scatter_writebehind(ScatterLock *lock)
4341{
4342 CInode *in = static_cast<CInode*>(lock->get_parent());
4343 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4344
4345 // journal
4346 MutationRef mut(new MutationImpl());
4347 mut->ls = mds->mdlog->get_current_segment();
4348
4349 // forcefully take a wrlock
4350 lock->get_wrlock(true);
4351 mut->wrlocks.insert(lock);
4352 mut->locks.insert(lock);
4353
4354 in->pre_cow_old_inode(); // avoid cow mayhem
4355
94b18763
FG
4356 auto &pi = in->project_inode();
4357 pi.inode.version = in->pre_dirty();
7c673cae
FG
4358
4359 in->finish_scatter_gather_update(lock->get_type());
4360 lock->start_flush();
4361
4362 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4363 mds->mdlog->start_entry(le);
4364
4365 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4366 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4367
4368 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4369
4370 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4371}
4372
4373void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4374{
4375 CInode *in = static_cast<CInode*>(lock->get_parent());
4376 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4377 in->pop_and_dirty_projected_inode(mut->ls);
4378
4379 lock->finish_flush();
4380
4381 // if replicas may have flushed in a mix->lock state, send another
4382 // message so they can finish_flush().
4383 if (in->is_replicated()) {
4384 switch (lock->get_state()) {
4385 case LOCK_MIX_LOCK:
4386 case LOCK_MIX_LOCK2:
4387 case LOCK_MIX_EXCL:
4388 case LOCK_MIX_TSYN:
4389 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4390 }
4391 }
4392
4393 mut->apply();
4394 drop_locks(mut.get());
4395 mut->cleanup();
4396
4397 if (lock->is_stable())
4398 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4399
4400 //scatter_eval_gather(lock);
4401}
4402
4403void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4404{
4405 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4406
4407 assert(lock->get_parent()->is_auth());
4408 assert(lock->is_stable());
4409
4410 if (lock->get_parent()->is_freezing_or_frozen()) {
4411 dout(20) << " freezing|frozen" << dendl;
4412 return;
4413 }
4414
4415 if (mdcache->is_readonly()) {
4416 if (lock->get_state() != LOCK_SYNC) {
4417 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4418 simple_sync(lock, need_issue);
4419 }
4420 return;
4421 }
4422
4423 if (!lock->is_rdlocked() &&
4424 lock->get_state() != LOCK_MIX &&
4425 lock->get_scatter_wanted()) {
4426 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4427 << " on " << *lock->get_parent() << dendl;
4428 scatter_mix(lock, need_issue);
4429 return;
4430 }
4431
4432 if (lock->get_type() == CEPH_LOCK_INEST) {
4433 // in general, we want to keep INEST writable at all times.
4434 if (!lock->is_rdlocked()) {
4435 if (lock->get_parent()->is_replicated()) {
4436 if (lock->get_state() != LOCK_MIX)
4437 scatter_mix(lock, need_issue);
4438 } else {
4439 if (lock->get_state() != LOCK_LOCK)
4440 simple_lock(lock, need_issue);
4441 }
4442 }
4443 return;
4444 }
4445
4446 CInode *in = static_cast<CInode*>(lock->get_parent());
4447 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4448 // i _should_ be sync.
4449 if (!lock->is_wrlocked() &&
4450 lock->get_state() != LOCK_SYNC) {
4451 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4452 simple_sync(lock, need_issue);
4453 }
4454 }
4455}
4456
4457
4458/*
4459 * mark a scatterlock to indicate that the dir fnode has some dirty data
4460 */
4461void Locker::mark_updated_scatterlock(ScatterLock *lock)
4462{
4463 lock->mark_dirty();
4464 if (lock->get_updated_item()->is_on_list()) {
4465 dout(10) << "mark_updated_scatterlock " << *lock
4466 << " - already on list since " << lock->get_update_stamp() << dendl;
4467 } else {
4468 updated_scatterlocks.push_back(lock->get_updated_item());
4469 utime_t now = ceph_clock_now();
4470 lock->set_update_stamp(now);
4471 dout(10) << "mark_updated_scatterlock " << *lock
4472 << " - added at " << now << dendl;
4473 }
4474}
4475
4476/*
4477 * this is called by scatter_tick and LogSegment::try_to_trim() when
4478 * trying to flush dirty scattered data (i.e. updated fnode) back to
4479 * the inode.
4480 *
4481 * we need to lock|scatter in order to push fnode changes into the
4482 * inode.dirstat.
4483 */
4484void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4485{
4486 CInode *p = static_cast<CInode *>(lock->get_parent());
4487
4488 if (p->is_frozen() || p->is_freezing()) {
4489 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4490 if (c)
4491 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
b32b8144 4492 else if (lock->is_dirty())
7c673cae
FG
4493 // just requeue. not ideal.. starvation prone..
4494 updated_scatterlocks.push_back(lock->get_updated_item());
4495 return;
4496 }
4497
4498 if (p->is_ambiguous_auth()) {
4499 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4500 if (c)
4501 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
b32b8144 4502 else if (lock->is_dirty())
7c673cae
FG
4503 // just requeue. not ideal.. starvation prone..
4504 updated_scatterlocks.push_back(lock->get_updated_item());
4505 return;
4506 }
4507
4508 if (p->is_auth()) {
4509 int count = 0;
4510 while (true) {
4511 if (lock->is_stable()) {
4512 // can we do it now?
4513 // (only if we're not replicated.. if we are, we really do need
4514 // to nudge the lock state!)
4515 /*
4516 actually, even if we're not replicated, we can't stay in MIX, because another mds
4517 could discover and replicate us at any time. if that happens while we're flushing,
4518 they end up in MIX but their inode has the old scatterstat version.
4519
4520 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4521 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4522 scatter_writebehind(lock);
4523 if (c)
4524 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4525 return;
4526 }
4527 */
4528
4529 if (mdcache->is_readonly()) {
4530 if (lock->get_state() != LOCK_SYNC) {
4531 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4532 simple_sync(static_cast<ScatterLock*>(lock));
4533 }
4534 break;
4535 }
4536
4537 // adjust lock state
4538 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4539 switch (lock->get_type()) {
4540 case CEPH_LOCK_IFILE:
4541 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4542 scatter_mix(static_cast<ScatterLock*>(lock));
4543 else if (lock->get_state() != LOCK_LOCK)
4544 simple_lock(static_cast<ScatterLock*>(lock));
4545 else
4546 simple_sync(static_cast<ScatterLock*>(lock));
4547 break;
4548
4549 case CEPH_LOCK_IDFT:
4550 case CEPH_LOCK_INEST:
4551 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4552 scatter_mix(lock);
4553 else if (lock->get_state() != LOCK_LOCK)
4554 simple_lock(lock);
4555 else
4556 simple_sync(lock);
4557 break;
4558 default:
4559 ceph_abort();
4560 }
4561 ++count;
4562 if (lock->is_stable() && count == 2) {
4563 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4564 // this should only realy happen when called via
4565 // handle_file_lock due to AC_NUDGE, because the rest of the
4566 // time we are replicated or have dirty data and won't get
4567 // called. bailing here avoids an infinite loop.
4568 assert(!c);
4569 break;
4570 }
4571 } else {
4572 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4573 if (c)
4574 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4575 return;
4576 }
4577 }
4578 } else {
4579 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4580 << *lock << " on " << *p << dendl;
4581 // request unscatter?
4582 mds_rank_t auth = lock->get_parent()->authority().first;
4583 if (!mds->is_cluster_degraded() ||
4584 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4585 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4586
4587 // wait...
4588 if (c)
4589 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4590
4591 // also, requeue, in case we had wrong auth or something
b32b8144
FG
4592 if (lock->is_dirty())
4593 updated_scatterlocks.push_back(lock->get_updated_item());
7c673cae
FG
4594 }
4595}
4596
4597void Locker::scatter_tick()
4598{
4599 dout(10) << "scatter_tick" << dendl;
4600
4601 // updated
4602 utime_t now = ceph_clock_now();
4603 int n = updated_scatterlocks.size();
4604 while (!updated_scatterlocks.empty()) {
4605 ScatterLock *lock = updated_scatterlocks.front();
4606
4607 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4608
4609 if (!lock->is_dirty()) {
4610 updated_scatterlocks.pop_front();
4611 dout(10) << " removing from updated_scatterlocks "
4612 << *lock << " " << *lock->get_parent() << dendl;
4613 continue;
4614 }
4615 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4616 break;
4617 updated_scatterlocks.pop_front();
4618 scatter_nudge(lock, 0);
4619 }
4620 mds->mdlog->flush();
4621}
4622
4623
4624void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4625{
4626 dout(10) << "scatter_tempsync " << *lock
4627 << " on " << *lock->get_parent() << dendl;
4628 assert(lock->get_parent()->is_auth());
4629 assert(lock->is_stable());
4630
4631 assert(0 == "not fully implemented, at least not for filelock");
4632
4633 CInode *in = static_cast<CInode *>(lock->get_parent());
4634
4635 switch (lock->get_state()) {
4636 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4637 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4638 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4639 default: ceph_abort();
4640 }
4641
4642 int gather = 0;
4643 if (lock->is_wrlocked())
4644 gather++;
4645
4646 if (lock->get_cap_shift() &&
4647 in->is_head() &&
4648 in->issued_caps_need_gather(lock)) {
4649 if (need_issue)
4650 *need_issue = true;
4651 else
4652 issue_caps(in);
4653 gather++;
4654 }
4655
4656 if (lock->get_state() == LOCK_MIX_TSYN &&
4657 in->is_replicated()) {
4658 lock->init_gather();
4659 send_lock_message(lock, LOCK_AC_LOCK);
4660 gather++;
4661 }
4662
4663 if (gather) {
4664 in->auth_pin(lock);
4665 } else {
4666 // do tempsync
4667 lock->set_state(LOCK_TSYN);
4668 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4669 if (lock->get_cap_shift()) {
4670 if (need_issue)
4671 *need_issue = true;
4672 else
4673 issue_caps(in);
4674 }
4675 }
4676}
4677
4678
4679
4680// ==========================================================================
4681// local lock
4682
4683void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4684{
4685 dout(7) << "local_wrlock_grab on " << *lock
4686 << " on " << *lock->get_parent() << dendl;
4687
4688 assert(lock->get_parent()->is_auth());
4689 assert(lock->can_wrlock());
4690 assert(!mut->wrlocks.count(lock));
4691 lock->get_wrlock(mut->get_client());
4692 mut->wrlocks.insert(lock);
4693 mut->locks.insert(lock);
4694}
4695
4696bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4697{
4698 dout(7) << "local_wrlock_start on " << *lock
4699 << " on " << *lock->get_parent() << dendl;
4700
4701 assert(lock->get_parent()->is_auth());
4702 if (lock->can_wrlock()) {
4703 assert(!mut->wrlocks.count(lock));
4704 lock->get_wrlock(mut->get_client());
4705 mut->wrlocks.insert(lock);
4706 mut->locks.insert(lock);
4707 return true;
4708 } else {
4709 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4710 return false;
4711 }
4712}
4713
4714void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4715{
4716 dout(7) << "local_wrlock_finish on " << *lock
4717 << " on " << *lock->get_parent() << dendl;
4718 lock->put_wrlock();
4719 mut->wrlocks.erase(lock);
4720 mut->locks.erase(lock);
4721 if (lock->get_num_wrlocks() == 0) {
4722 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4723 SimpleLock::WAIT_WR |
4724 SimpleLock::WAIT_RD);
4725 }
4726}
4727
4728bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4729{
4730 dout(7) << "local_xlock_start on " << *lock
4731 << " on " << *lock->get_parent() << dendl;
4732
4733 assert(lock->get_parent()->is_auth());
4734 if (!lock->can_xlock_local()) {
4735 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4736 return false;
4737 }
4738
4739 lock->get_xlock(mut, mut->get_client());
4740 mut->xlocks.insert(lock);
4741 mut->locks.insert(lock);
4742 return true;
4743}
4744
4745void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4746{
4747 dout(7) << "local_xlock_finish on " << *lock
4748 << " on " << *lock->get_parent() << dendl;
4749 lock->put_xlock();
4750 mut->xlocks.erase(lock);
4751 mut->locks.erase(lock);
4752
4753 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4754 SimpleLock::WAIT_WR |
4755 SimpleLock::WAIT_RD);
4756}
4757
4758
4759
4760// ==========================================================================
4761// file lock
4762
4763
4764void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4765{
4766 CInode *in = static_cast<CInode*>(lock->get_parent());
4767 int loner_wanted, other_wanted;
4768 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4769 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4770 << " loner_wanted=" << gcap_string(loner_wanted)
4771 << " other_wanted=" << gcap_string(other_wanted)
4772 << " filelock=" << *lock << " on " << *lock->get_parent()
4773 << dendl;
4774
4775 assert(lock->get_parent()->is_auth());
4776 assert(lock->is_stable());
4777
4778 if (lock->get_parent()->is_freezing_or_frozen())
4779 return;
4780
4781 if (mdcache->is_readonly()) {
4782 if (lock->get_state() != LOCK_SYNC) {
4783 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4784 simple_sync(lock, need_issue);
4785 }
4786 return;
4787 }
4788
4789 // excl -> *?
4790 if (lock->get_state() == LOCK_EXCL) {
4791 dout(20) << " is excl" << dendl;
4792 int loner_issued, other_issued, xlocker_issued;
4793 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4794 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4795 << " other_issued=" << gcap_string(other_issued)
4796 << " xlocker_issued=" << gcap_string(xlocker_issued)
4797 << dendl;
4798 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4799 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4800 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4801 dout(20) << " should lose it" << dendl;
4802 // we should lose it.
4803 // loner other want
4804 // R R SYNC
4805 // R R|W MIX
4806 // R W MIX
4807 // R|W R MIX
4808 // R|W R|W MIX
4809 // R|W W MIX
4810 // W R MIX
4811 // W R|W MIX
4812 // W W MIX
4813 // -> any writer means MIX; RD doesn't matter.
4814 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4815 lock->is_waiter_for(SimpleLock::WAIT_WR))
4816 scatter_mix(lock, need_issue);
4817 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4818 simple_sync(lock, need_issue);
4819 else
4820 dout(10) << " waiting for wrlock to drain" << dendl;
4821 }
4822 }
4823
4824 // * -> excl?
4825 else if (lock->get_state() != LOCK_EXCL &&
4826 !lock->is_rdlocked() &&
4827 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4828 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4829 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4830 in->get_target_loner() >= 0) {
4831 dout(7) << "file_eval stable, bump to loner " << *lock
4832 << " on " << *lock->get_parent() << dendl;
4833 file_excl(lock, need_issue);
4834 }
4835
4836 // * -> mixed?
4837 else if (lock->get_state() != LOCK_MIX &&
4838 !lock->is_rdlocked() &&
4839 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4840 (lock->get_scatter_wanted() ||
b32b8144 4841 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
7c673cae
FG
4842 dout(7) << "file_eval stable, bump to mixed " << *lock
4843 << " on " << *lock->get_parent() << dendl;
4844 scatter_mix(lock, need_issue);
4845 }
4846
4847 // * -> sync?
4848 else if (lock->get_state() != LOCK_SYNC &&
4849 !lock->is_wrlocked() && // drain wrlocks first!
4850 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4851 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4852 !((lock->get_state() == LOCK_MIX) &&
4853 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4854 //((wanted & CEPH_CAP_RD) ||
4855 //in->is_replicated() ||
b32b8144 4856 //lock->is_leased() ||
7c673cae
FG
4857 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4858 ) {
4859 dout(7) << "file_eval stable, bump to sync " << *lock
4860 << " on " << *lock->get_parent() << dendl;
4861 simple_sync(lock, need_issue);
4862 }
4863}
4864
4865
4866
4867void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4868{
4869 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4870
4871 CInode *in = static_cast<CInode*>(lock->get_parent());
4872 assert(in->is_auth());
4873 assert(lock->is_stable());
4874
4875 if (lock->get_state() == LOCK_LOCK) {
4876 in->start_scatter(lock);
4877 if (in->is_replicated()) {
4878 // data
4879 bufferlist softdata;
4880 lock->encode_locked_state(softdata);
4881
4882 // bcast to replicas
4883 send_lock_message(lock, LOCK_AC_MIX, softdata);
4884 }
4885
4886 // change lock
4887 lock->set_state(LOCK_MIX);
4888 lock->clear_scatter_wanted();
4889 if (lock->get_cap_shift()) {
4890 if (need_issue)
4891 *need_issue = true;
4892 else
4893 issue_caps(in);
4894 }
4895 } else {
4896 // gather?
4897 switch (lock->get_state()) {
4898 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
7c673cae 4899 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
b32b8144 4900 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
7c673cae
FG
4901 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4902 default: ceph_abort();
4903 }
4904
4905 int gather = 0;
4906 if (lock->is_rdlocked())
4907 gather++;
4908 if (in->is_replicated()) {
b32b8144 4909 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
7c673cae
FG
4910 send_lock_message(lock, LOCK_AC_MIX);
4911 lock->init_gather();
4912 gather++;
4913 }
4914 }
4915 if (lock->is_leased()) {
4916 revoke_client_leases(lock);
4917 gather++;
4918 }
4919 if (lock->get_cap_shift() &&
4920 in->is_head() &&
4921 in->issued_caps_need_gather(lock)) {
4922 if (need_issue)
4923 *need_issue = true;
4924 else
4925 issue_caps(in);
4926 gather++;
4927 }
4928 bool need_recover = false;
4929 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4930 mds->mdcache->queue_file_recover(in);
4931 need_recover = true;
4932 gather++;
4933 }
4934
4935 if (gather) {
4936 lock->get_parent()->auth_pin(lock);
4937 if (need_recover)
4938 mds->mdcache->do_file_recover();
4939 } else {
4940 in->start_scatter(lock);
4941 lock->set_state(LOCK_MIX);
4942 lock->clear_scatter_wanted();
4943 if (in->is_replicated()) {
4944 bufferlist softdata;
4945 lock->encode_locked_state(softdata);
4946 send_lock_message(lock, LOCK_AC_MIX, softdata);
4947 }
4948 if (lock->get_cap_shift()) {
4949 if (need_issue)
4950 *need_issue = true;
4951 else
4952 issue_caps(in);
4953 }
4954 }
4955 }
4956}
4957
4958
4959void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4960{
4961 CInode *in = static_cast<CInode*>(lock->get_parent());
4962 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4963
4964 assert(in->is_auth());
4965 assert(lock->is_stable());
4966
4967 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4968 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4969
4970 switch (lock->get_state()) {
4971 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4972 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4973 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4974 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4975 default: ceph_abort();
4976 }
4977 int gather = 0;
4978
4979 if (lock->is_rdlocked())
4980 gather++;
4981 if (lock->is_wrlocked())
4982 gather++;
4983
4984 if (in->is_replicated() &&
4985 lock->get_state() != LOCK_LOCK_EXCL &&
4986 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4987 send_lock_message(lock, LOCK_AC_LOCK);
4988 lock->init_gather();
4989 gather++;
4990 }
4991 if (lock->is_leased()) {
4992 revoke_client_leases(lock);
4993 gather++;
4994 }
4995 if (in->is_head() &&
4996 in->issued_caps_need_gather(lock)) {
4997 if (need_issue)
4998 *need_issue = true;
4999 else
5000 issue_caps(in);
5001 gather++;
5002 }
5003 bool need_recover = false;
5004 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5005 mds->mdcache->queue_file_recover(in);
5006 need_recover = true;
5007 gather++;
5008 }
5009
5010 if (gather) {
5011 lock->get_parent()->auth_pin(lock);
5012 if (need_recover)
5013 mds->mdcache->do_file_recover();
5014 } else {
5015 lock->set_state(LOCK_EXCL);
5016 if (need_issue)
5017 *need_issue = true;
5018 else
5019 issue_caps(in);
5020 }
5021}
5022
5023void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5024{
5025 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5026 CInode *in = static_cast<CInode *>(lock->get_parent());
5027 assert(in->is_auth());
5028 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5029
5030 switch (lock->get_state()) {
5031 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5032 default: ceph_abort();
5033 }
5034
5035 int gather = 0;
5036 if (lock->is_wrlocked())
5037 gather++;
5038
5039 if (in->is_head() &&
5040 in->issued_caps_need_gather(lock)) {
5041 if (need_issue)
5042 *need_issue = true;
5043 else
5044 issue_caps(in);
5045 gather++;
5046 }
5047
5048 if (gather) {
5049 lock->get_parent()->auth_pin(lock);
5050 } else {
5051 lock->set_state(LOCK_XSYN);
5052 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5053 if (need_issue)
5054 *need_issue = true;
5055 else
5056 issue_caps(in);
5057 }
5058}
5059
5060void Locker::file_recover(ScatterLock *lock)
5061{
5062 CInode *in = static_cast<CInode *>(lock->get_parent());
5063 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5064
5065 assert(in->is_auth());
5066 //assert(lock->is_stable());
5067 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5068
5069 int gather = 0;
5070
5071 /*
5072 if (in->is_replicated()
5073 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5074 send_lock_message(lock, LOCK_AC_LOCK);
5075 lock->init_gather();
5076 gather++;
5077 }
5078 */
5079 if (in->is_head() &&
5080 in->issued_caps_need_gather(lock)) {
5081 issue_caps(in);
5082 gather++;
5083 }
5084
5085 lock->set_state(LOCK_SCAN);
5086 if (gather)
5087 in->state_set(CInode::STATE_NEEDSRECOVER);
5088 else
5089 mds->mdcache->queue_file_recover(in);
5090}
5091
5092
5093// messenger
5094/* This function DOES put the passed message before returning */
5095void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5096{
5097 CInode *in = static_cast<CInode*>(lock->get_parent());
5098 int from = m->get_asker();
5099
5100 if (mds->is_rejoin()) {
5101 if (in->is_rejoining()) {
5102 dout(7) << "handle_file_lock still rejoining " << *in
5103 << ", dropping " << *m << dendl;
5104 m->put();
5105 return;
5106 }
5107 }
5108
5109 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5110 << " on " << *lock
5111 << " from mds." << from << " "
5112 << *in << dendl;
5113
5114 bool caps = lock->get_cap_shift();
5115
5116 switch (m->get_action()) {
5117 // -- replica --
5118 case LOCK_AC_SYNC:
5119 assert(lock->get_state() == LOCK_LOCK ||
5120 lock->get_state() == LOCK_MIX ||
5121 lock->get_state() == LOCK_MIX_SYNC2);
5122
5123 if (lock->get_state() == LOCK_MIX) {
5124 lock->set_state(LOCK_MIX_SYNC);
5125 eval_gather(lock, true);
31f18b77
FG
5126 if (lock->is_unstable_and_locked())
5127 mds->mdlog->flush();
7c673cae
FG
5128 break;
5129 }
5130
5131 (static_cast<ScatterLock *>(lock))->finish_flush();
5132 (static_cast<ScatterLock *>(lock))->clear_flushed();
5133
5134 // ok
5135 lock->decode_locked_state(m->get_data());
5136 lock->set_state(LOCK_SYNC);
5137
5138 lock->get_rdlock();
5139 if (caps)
5140 issue_caps(in);
5141 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5142 lock->put_rdlock();
5143 break;
5144
5145 case LOCK_AC_LOCK:
5146 switch (lock->get_state()) {
5147 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5148 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5149 default: ceph_abort();
5150 }
5151
5152 eval_gather(lock, true);
31f18b77
FG
5153 if (lock->is_unstable_and_locked())
5154 mds->mdlog->flush();
5155
7c673cae
FG
5156 break;
5157
5158 case LOCK_AC_LOCKFLUSHED:
5159 (static_cast<ScatterLock *>(lock))->finish_flush();
5160 (static_cast<ScatterLock *>(lock))->clear_flushed();
5161 // wake up scatter_nudge waiters
5162 if (lock->is_stable())
5163 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5164 break;
5165
5166 case LOCK_AC_MIX:
5167 assert(lock->get_state() == LOCK_SYNC ||
5168 lock->get_state() == LOCK_LOCK ||
5169 lock->get_state() == LOCK_SYNC_MIX2);
5170
5171 if (lock->get_state() == LOCK_SYNC) {
5172 // MIXED
5173 lock->set_state(LOCK_SYNC_MIX);
5174 eval_gather(lock, true);
31f18b77
FG
5175 if (lock->is_unstable_and_locked())
5176 mds->mdlog->flush();
7c673cae
FG
5177 break;
5178 }
c07f9fc5 5179
7c673cae 5180 // ok
7c673cae 5181 lock->set_state(LOCK_MIX);
c07f9fc5 5182 lock->decode_locked_state(m->get_data());
7c673cae
FG
5183
5184 if (caps)
5185 issue_caps(in);
5186
5187 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5188 break;
5189
5190
5191 // -- auth --
5192 case LOCK_AC_LOCKACK:
5193 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5194 lock->get_state() == LOCK_MIX_LOCK ||
5195 lock->get_state() == LOCK_MIX_LOCK2 ||
5196 lock->get_state() == LOCK_MIX_EXCL ||
5197 lock->get_state() == LOCK_SYNC_EXCL ||
5198 lock->get_state() == LOCK_SYNC_MIX ||
5199 lock->get_state() == LOCK_MIX_TSYN);
5200 assert(lock->is_gathering(from));
5201 lock->remove_gather(from);
5202
5203 if (lock->get_state() == LOCK_MIX_LOCK ||
5204 lock->get_state() == LOCK_MIX_LOCK2 ||
5205 lock->get_state() == LOCK_MIX_EXCL ||
5206 lock->get_state() == LOCK_MIX_TSYN) {
5207 lock->decode_locked_state(m->get_data());
5208 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5209 // delay calling scatter_writebehind().
5210 lock->clear_flushed();
5211 }
5212
5213 if (lock->is_gathering()) {
5214 dout(7) << "handle_file_lock " << *in << " from " << from
5215 << ", still gathering " << lock->get_gather_set() << dendl;
5216 } else {
5217 dout(7) << "handle_file_lock " << *in << " from " << from
5218 << ", last one" << dendl;
5219 eval_gather(lock);
5220 }
5221 break;
5222
5223 case LOCK_AC_SYNCACK:
5224 assert(lock->get_state() == LOCK_MIX_SYNC);
5225 assert(lock->is_gathering(from));
5226 lock->remove_gather(from);
5227
5228 lock->decode_locked_state(m->get_data());
5229
5230 if (lock->is_gathering()) {
5231 dout(7) << "handle_file_lock " << *in << " from " << from
5232 << ", still gathering " << lock->get_gather_set() << dendl;
5233 } else {
5234 dout(7) << "handle_file_lock " << *in << " from " << from
5235 << ", last one" << dendl;
5236 eval_gather(lock);
5237 }
5238 break;
5239
5240 case LOCK_AC_MIXACK:
5241 assert(lock->get_state() == LOCK_SYNC_MIX);
5242 assert(lock->is_gathering(from));
5243 lock->remove_gather(from);
5244
5245 if (lock->is_gathering()) {
5246 dout(7) << "handle_file_lock " << *in << " from " << from
5247 << ", still gathering " << lock->get_gather_set() << dendl;
5248 } else {
5249 dout(7) << "handle_file_lock " << *in << " from " << from
5250 << ", last one" << dendl;
5251 eval_gather(lock);
5252 }
5253 break;
5254
5255
5256 // requests....
5257 case LOCK_AC_REQSCATTER:
5258 if (lock->is_stable()) {
5259 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5260 * because the replica should be holding an auth_pin if they're
5261 * doing this (and thus, we are freezing, not frozen, and indefinite
5262 * starvation isn't an issue).
5263 */
5264 dout(7) << "handle_file_lock got scatter request on " << *lock
5265 << " on " << *lock->get_parent() << dendl;
5266 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5267 scatter_mix(lock);
5268 } else {
5269 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5270 << " on " << *lock->get_parent() << dendl;
5271 lock->set_scatter_wanted();
5272 }
5273 break;
5274
5275 case LOCK_AC_REQUNSCATTER:
5276 if (lock->is_stable()) {
5277 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5278 * because the replica should be holding an auth_pin if they're
5279 * doing this (and thus, we are freezing, not frozen, and indefinite
5280 * starvation isn't an issue).
5281 */
5282 dout(7) << "handle_file_lock got unscatter request on " << *lock
5283 << " on " << *lock->get_parent() << dendl;
5284 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5285 simple_lock(lock); // FIXME tempsync?
5286 } else {
5287 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5288 << " on " << *lock->get_parent() << dendl;
5289 lock->set_unscatter_wanted();
5290 }
5291 break;
5292
5293 case LOCK_AC_REQRDLOCK:
5294 handle_reqrdlock(lock, m);
5295 break;
5296
5297 case LOCK_AC_NUDGE:
5298 if (!lock->get_parent()->is_auth()) {
5299 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5300 << " on " << *lock->get_parent() << dendl;
5301 } else if (!lock->get_parent()->is_replicated()) {
5302 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5303 << " on " << *lock->get_parent() << dendl;
5304 } else {
5305 dout(7) << "handle_file_lock trying nudge on " << *lock
5306 << " on " << *lock->get_parent() << dendl;
5307 scatter_nudge(lock, 0, true);
5308 mds->mdlog->flush();
5309 }
5310 break;
5311
5312 default:
5313 ceph_abort();
5314 }
5315
5316 m->put();
5317}
5318
5319
5320
5321
5322
5323