]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Locker.cc
update sources to v12.1.2
[ceph.git] / ceph / src / mds / Locker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "MDSRank.h"
17#include "MDCache.h"
18#include "Locker.h"
19#include "CInode.h"
20#include "CDir.h"
21#include "CDentry.h"
22#include "Mutation.h"
23#include "MDSContext.h"
24
25#include "MDLog.h"
26#include "MDSMap.h"
27
28#include "events/EUpdate.h"
29#include "events/EOpen.h"
30
31#include "msg/Messenger.h"
32#include "osdc/Objecter.h"
33
34#include "messages/MInodeFileCaps.h"
35#include "messages/MLock.h"
36#include "messages/MClientLease.h"
37#include "messages/MClientReply.h"
38#include "messages/MClientCaps.h"
39#include "messages/MClientCapRelease.h"
40
41#include "messages/MMDSSlaveRequest.h"
42
43#include <errno.h>
44
45#include "common/config.h"
46
47
48#define dout_subsys ceph_subsys_mds
49#undef dout_prefix
50#define dout_context g_ceph_context
51#define dout_prefix _prefix(_dout, mds)
52static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
54}
55
56
57class LockerContext : public MDSInternalContextBase {
58protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65public:
66 explicit LockerContext(Locker *locker_) : locker(locker_) {
67 assert(locker != NULL);
68 }
69};
70
71class LockerLogContext : public MDSLogContextBase {
72protected:
73 Locker *locker;
74 MDSRank *get_mds() override
75 {
76 return locker->mds;
77 }
78
79public:
80 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81 assert(locker != NULL);
82 }
83};
84
85/* This function DOES put the passed message before returning */
86void Locker::dispatch(Message *m)
87{
88
89 switch (m->get_type()) {
90
91 // inter-mds locking
92 case MSG_MDS_LOCK:
93 handle_lock(static_cast<MLock*>(m));
94 break;
95 // inter-mds caps
96 case MSG_MDS_INODEFILECAPS:
97 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
98 break;
99
100 // client sync
101 case CEPH_MSG_CLIENT_CAPS:
102 handle_client_caps(static_cast<MClientCaps*>(m));
103
104 break;
105 case CEPH_MSG_CLIENT_CAPRELEASE:
106 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
107 break;
108 case CEPH_MSG_CLIENT_LEASE:
109 handle_client_lease(static_cast<MClientLease*>(m));
110 break;
111
112 default:
113 derr << "locker unknown message " << m->get_type() << dendl;
114 assert(0 == "locker unknown message");
115 }
116}
117
118void Locker::tick()
119{
120 scatter_tick();
121 caps_tick();
122}
123
124/*
125 * locks vs rejoin
126 *
127 *
128 *
129 */
130
131void Locker::send_lock_message(SimpleLock *lock, int msg)
132{
133 for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
134 it != lock->get_parent()->replicas_end();
135 ++it) {
136 if (mds->is_cluster_degraded() &&
137 mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
138 continue;
139 MLock *m = new MLock(lock, msg, mds->get_nodeid());
140 mds->send_message_mds(m, it->first);
141 }
142}
143
144void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145{
146 for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
147 it != lock->get_parent()->replicas_end();
148 ++it) {
149 if (mds->is_cluster_degraded() &&
150 mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
151 continue;
152 MLock *m = new MLock(lock, msg, mds->get_nodeid());
153 m->set_data(data);
154 mds->send_message_mds(m, it->first);
155 }
156}
157
158
159
160
161void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
162{
163 // rdlock ancestor snaps
164 CInode *t = in;
165 rdlocks.insert(&in->snaplock);
166 while (t->get_projected_parent_dn()) {
167 t = t->get_projected_parent_dn()->get_dir()->get_inode();
168 rdlocks.insert(&t->snaplock);
169 }
170}
171
172void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
173 file_layout_t **layout)
174{
175 //rdlock ancestor snaps
176 CInode *t = in;
177 rdlocks.insert(&in->snaplock);
178 rdlocks.insert(&in->policylock);
179 bool found_layout = false;
180 while (t) {
181 rdlocks.insert(&t->snaplock);
182 if (!found_layout) {
183 rdlocks.insert(&t->policylock);
184 if (t->get_projected_inode()->has_layout()) {
185 *layout = &t->get_projected_inode()->layout;
186 found_layout = true;
187 }
188 }
189 if (t->get_projected_parent_dn() &&
190 t->get_projected_parent_dn()->get_dir())
191 t = t->get_projected_parent_dn()->get_dir()->get_inode();
192 else t = NULL;
193 }
194}
195
196struct MarkEventOnDestruct {
197 MDRequestRef& mdr;
198 const char* message;
199 bool mark_event;
200 MarkEventOnDestruct(MDRequestRef& _mdr,
201 const char *_message) : mdr(_mdr),
202 message(_message),
203 mark_event(true) {}
204 ~MarkEventOnDestruct() {
205 if (mark_event)
206 mdr->mark_event(message);
207 }
208};
209
210/* If this function returns false, the mdr has been placed
211 * on the appropriate wait list */
212bool Locker::acquire_locks(MDRequestRef& mdr,
213 set<SimpleLock*> &rdlocks,
214 set<SimpleLock*> &wrlocks,
215 set<SimpleLock*> &xlocks,
216 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
217 CInode *auth_pin_freeze,
218 bool auth_pin_nonblock)
219{
220 if (mdr->done_locking &&
221 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
222 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
223 return true; // at least we had better be!
224 }
225 dout(10) << "acquire_locks " << *mdr << dendl;
226
227 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
228
229 client_t client = mdr->get_client();
230
231 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
232 set<MDSCacheObject*> mustpin; // items to authpin
233
234 // xlocks
235 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
236 dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
237 sorted.insert(*p);
238 mustpin.insert((*p)->get_parent());
239
240 // augment xlock with a versionlock?
241 if ((*p)->get_type() == CEPH_LOCK_DN) {
242 CDentry *dn = (CDentry*)(*p)->get_parent();
243 if (!dn->is_auth())
244 continue;
245
246 if (xlocks.count(&dn->versionlock))
247 continue; // we're xlocking the versionlock too; don't wrlock it!
248
249 if (mdr->is_master()) {
250 // master. wrlock versionlock so we can pipeline dentry updates to journal.
251 wrlocks.insert(&dn->versionlock);
252 } else {
253 // slave. exclusively lock the dentry version (i.e. block other journal updates).
254 // this makes rollback safe.
255 xlocks.insert(&dn->versionlock);
256 sorted.insert(&dn->versionlock);
257 }
258 }
259 if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
260 // inode version lock?
261 CInode *in = (CInode*)(*p)->get_parent();
262 if (!in->is_auth())
263 continue;
264 if (mdr->is_master()) {
265 // master. wrlock versionlock so we can pipeline inode updates to journal.
266 wrlocks.insert(&in->versionlock);
267 } else {
268 // slave. exclusively lock the inode version (i.e. block other journal updates).
269 // this makes rollback safe.
270 xlocks.insert(&in->versionlock);
271 sorted.insert(&in->versionlock);
272 }
273 }
274 }
275
276 // wrlocks
277 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
278 MDSCacheObject *object = (*p)->get_parent();
279 dout(20) << " must wrlock " << **p << " " << *object << dendl;
280 sorted.insert(*p);
281 if (object->is_auth())
282 mustpin.insert(object);
283 else if (!object->is_auth() &&
284 !(*p)->can_wrlock(client) && // we might have to request a scatter
285 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
286 dout(15) << " will also auth_pin " << *object
287 << " in case we need to request a scatter" << dendl;
288 mustpin.insert(object);
289 }
290 }
291
292 // remote_wrlocks
293 if (remote_wrlocks) {
294 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
295 MDSCacheObject *object = p->first->get_parent();
296 dout(20) << " must remote_wrlock on mds." << p->second << " "
297 << *p->first << " " << *object << dendl;
298 sorted.insert(p->first);
299 mustpin.insert(object);
300 }
301 }
302
303 // rdlocks
304 for (set<SimpleLock*>::iterator p = rdlocks.begin();
305 p != rdlocks.end();
306 ++p) {
307 MDSCacheObject *object = (*p)->get_parent();
308 dout(20) << " must rdlock " << **p << " " << *object << dendl;
309 sorted.insert(*p);
310 if (object->is_auth())
311 mustpin.insert(object);
312 else if (!object->is_auth() &&
313 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
314 dout(15) << " will also auth_pin " << *object
315 << " in case we need to request a rdlock" << dendl;
316 mustpin.insert(object);
317 }
318 }
319
320
321 // AUTH PINS
322 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
323
324 // can i auth pin them all now?
325 marker.message = "failed to authpin local pins";
326 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
327 p != mustpin.end();
328 ++p) {
329 MDSCacheObject *object = *p;
330
331 dout(10) << " must authpin " << *object << dendl;
332
333 if (mdr->is_auth_pinned(object)) {
334 if (object != (MDSCacheObject*)auth_pin_freeze)
335 continue;
336 if (mdr->more()->is_remote_frozen_authpin) {
337 if (mdr->more()->rename_inode == auth_pin_freeze)
338 continue;
339 // unfreeze auth pin for the wrong inode
340 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
341 }
342 }
343
344 if (!object->is_auth()) {
345 if (!mdr->locks.empty())
346 drop_locks(mdr.get());
347 if (object->is_ambiguous_auth()) {
348 // wait
349 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
350 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
351 mdr->drop_local_auth_pins();
352 return false;
353 }
354 mustpin_remote[object->authority().first].insert(object);
355 continue;
356 }
357 if (!object->can_auth_pin()) {
358 // wait
359 drop_locks(mdr.get());
360 mdr->drop_local_auth_pins();
361 if (auth_pin_nonblock) {
362 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
363 mdr->aborted = true;
364 return false;
365 }
366 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
367 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
224ce89b
WB
368
369 if (!mdr->remote_auth_pins.empty())
370 notify_freeze_waiter(object);
371
7c673cae
FG
372 return false;
373 }
374 }
375
376 // ok, grab local auth pins
377 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
378 p != mustpin.end();
379 ++p) {
380 MDSCacheObject *object = *p;
381 if (mdr->is_auth_pinned(object)) {
382 dout(10) << " already auth_pinned " << *object << dendl;
383 } else if (object->is_auth()) {
384 dout(10) << " auth_pinning " << *object << dendl;
385 mdr->auth_pin(object);
386 }
387 }
388
389 // request remote auth_pins
390 if (!mustpin_remote.empty()) {
391 marker.message = "requesting remote authpins";
392 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
393 p != mdr->remote_auth_pins.end();
394 ++p) {
395 if (mustpin.count(p->first)) {
396 assert(p->second == p->first->authority().first);
397 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
398 if (q != mustpin_remote.end())
399 q->second.insert(p->first);
400 }
401 }
402 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
403 p != mustpin_remote.end();
404 ++p) {
405 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
406
407 // wait for active auth
408 if (mds->is_cluster_degraded() &&
409 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
410 dout(10) << " mds." << p->first << " is not active" << dendl;
411 if (mdr->more()->waiting_on_slave.empty())
412 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
413 return false;
414 }
415
416 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
417 MMDSSlaveRequest::OP_AUTHPIN);
418 for (set<MDSCacheObject*>::iterator q = p->second.begin();
419 q != p->second.end();
420 ++q) {
421 dout(10) << " req remote auth_pin of " << **q << dendl;
422 MDSCacheObjectInfo info;
423 (*q)->set_object_info(info);
424 req->get_authpins().push_back(info);
425 if (*q == auth_pin_freeze)
426 (*q)->set_object_info(req->get_authpin_freeze());
427 mdr->pin(*q);
428 }
429 if (auth_pin_nonblock)
430 req->mark_nonblock();
431 mds->send_message_mds(req, p->first);
432
433 // put in waiting list
434 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
435 mdr->more()->waiting_on_slave.insert(p->first);
436 }
437 return false;
438 }
439
440 // caps i'll need to issue
441 set<CInode*> issue_set;
442 bool result = false;
443
444 // acquire locks.
445 // make sure they match currently acquired locks.
446 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
447 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
448 p != sorted.end();
449 ++p) {
450 bool need_wrlock = !!wrlocks.count(*p);
451 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
452
453 // already locked?
454 if (existing != mdr->locks.end() && *existing == *p) {
455 // right kind?
456 SimpleLock *have = *existing;
457 ++existing;
458 if (xlocks.count(have) && mdr->xlocks.count(have)) {
459 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
460 continue;
461 }
462 if (mdr->remote_wrlocks.count(have)) {
463 if (!need_remote_wrlock ||
464 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
465 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
466 << " " << *have << " " << *have->get_parent() << dendl;
467 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
468 }
469 }
470 if (need_wrlock || need_remote_wrlock) {
471 if (need_wrlock == !!mdr->wrlocks.count(have) &&
472 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
473 if (need_wrlock)
474 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
475 if (need_remote_wrlock)
476 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
477 continue;
478 }
479 }
480 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
481 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
482 continue;
483 }
484 }
485
486 // hose any stray locks
487 if (existing != mdr->locks.end() && *existing == *p) {
488 assert(need_wrlock || need_remote_wrlock);
489 SimpleLock *lock = *existing;
490 if (mdr->wrlocks.count(lock)) {
491 if (!need_wrlock)
492 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
493 else if (need_remote_wrlock) // acquire remote_wrlock first
494 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
495 bool need_issue = false;
496 wrlock_finish(lock, mdr.get(), &need_issue);
497 if (need_issue)
498 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
499 }
500 ++existing;
501 }
502 while (existing != mdr->locks.end()) {
503 SimpleLock *stray = *existing;
504 ++existing;
505 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
506 bool need_issue = false;
507 if (mdr->xlocks.count(stray)) {
508 xlock_finish(stray, mdr.get(), &need_issue);
509 } else if (mdr->rdlocks.count(stray)) {
510 rdlock_finish(stray, mdr.get(), &need_issue);
511 } else {
512 // may have acquired both wrlock and remore wrlock
513 if (mdr->wrlocks.count(stray))
514 wrlock_finish(stray, mdr.get(), &need_issue);
515 if (mdr->remote_wrlocks.count(stray))
516 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
517 }
518 if (need_issue)
519 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
520 }
521
522 // lock
523 if (mdr->locking && *p != mdr->locking) {
524 cancel_locking(mdr.get(), &issue_set);
525 }
526 if (xlocks.count(*p)) {
527 marker.message = "failed to xlock, waiting";
528 if (!xlock_start(*p, mdr))
529 goto out;
530 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
531 } else if (need_wrlock || need_remote_wrlock) {
532 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
533 marker.message = "waiting for remote wrlocks";
534 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
535 goto out;
536 }
537 if (need_wrlock && !mdr->wrlocks.count(*p)) {
538 marker.message = "failed to wrlock, waiting";
539 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
540 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
541 // can't take the wrlock because the scatter lock is gathering. need to
542 // release the remote wrlock, so that the gathering process can finish.
543 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
544 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
545 goto out;
546 }
547 // nowait if we have already gotten remote wrlock
548 if (!wrlock_start(*p, mdr, need_remote_wrlock))
549 goto out;
550 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
551 }
552 } else {
553 assert(mdr->is_master());
554 if ((*p)->is_scatterlock()) {
555 ScatterLock *slock = static_cast<ScatterLock *>(*p);
556 if (slock->is_rejoin_mix()) {
557 // If there is a recovering mds who replcated an object when it failed
558 // and scatterlock in the object was in MIX state, It's possible that
559 // the recovering mds needs to take wrlock on the scatterlock when it
560 // replays unsafe requests. So this mds should delay taking rdlock on
561 // the scatterlock until the recovering mds finishes replaying unsafe.
562 // Otherwise unsafe requests may get replayed after current request.
563 //
564 // For example:
565 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
566 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
567 // delay the rmdir request until the recovering mds has replayed unlink
568 // requests.
569 if (mds->is_cluster_degraded()) {
570 if (!mdr->is_replay()) {
571 drop_locks(mdr.get());
572 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
573 dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
574 << ", waiting for cluster recovered" << dendl;
575 marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
576 return false;
577 }
578 } else {
579 slock->clear_rejoin_mix();
580 }
581 }
582 }
583
584 marker.message = "failed to rdlock, waiting";
585 if (!rdlock_start(*p, mdr))
586 goto out;
587 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
588 }
589 }
590
591 // any extra unneeded locks?
592 while (existing != mdr->locks.end()) {
593 SimpleLock *stray = *existing;
594 ++existing;
595 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
596 bool need_issue = false;
597 if (mdr->xlocks.count(stray)) {
598 xlock_finish(stray, mdr.get(), &need_issue);
599 } else if (mdr->rdlocks.count(stray)) {
600 rdlock_finish(stray, mdr.get(), &need_issue);
601 } else {
602 // may have acquired both wrlock and remore wrlock
603 if (mdr->wrlocks.count(stray))
604 wrlock_finish(stray, mdr.get(), &need_issue);
605 if (mdr->remote_wrlocks.count(stray))
606 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
607 }
608 if (need_issue)
609 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
610 }
611
612 mdr->done_locking = true;
613 mdr->set_mds_stamp(ceph_clock_now());
614 result = true;
615 marker.message = "acquired locks";
616
617 out:
618 issue_caps_set(issue_set);
619 return result;
620}
621
224ce89b
WB
622void Locker::notify_freeze_waiter(MDSCacheObject *o)
623{
624 CDir *dir = NULL;
625 if (CInode *in = dynamic_cast<CInode*>(o)) {
626 if (!in->is_root())
627 dir = in->get_parent_dir();
628 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
629 dir = dn->get_dir();
630 } else {
631 dir = dynamic_cast<CDir*>(o);
632 assert(dir);
633 }
634 if (dir) {
635 if (dir->is_freezing_dir())
636 mdcache->fragment_freeze_inc_num_waiters(dir);
637 if (dir->is_freezing_tree()) {
638 while (!dir->is_freezing_tree_root())
639 dir = dir->get_parent_dir();
640 mdcache->migrator->export_freeze_inc_num_waiters(dir);
641 }
642 }
643}
7c673cae
FG
644
645void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
646{
647 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
648 p != mut->xlocks.end();
649 ++p) {
650 MDSCacheObject *object = (*p)->get_parent();
651 assert(object->is_auth());
652 if (skip_dentry &&
653 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
654 continue;
655 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
656 (*p)->set_xlock_done();
657 }
658}
659
660void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
661{
662 while (!mut->rdlocks.empty()) {
663 bool ni = false;
664 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
665 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
666 if (ni)
667 pneed_issue->insert(static_cast<CInode*>(p));
668 }
669}
670
671void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
672{
673 set<mds_rank_t> slaves;
674
675 while (!mut->xlocks.empty()) {
676 SimpleLock *lock = *mut->xlocks.begin();
677 MDSCacheObject *p = lock->get_parent();
678 if (!p->is_auth()) {
679 assert(lock->get_sm()->can_remote_xlock);
680 slaves.insert(p->authority().first);
681 lock->put_xlock();
682 mut->locks.erase(lock);
683 mut->xlocks.erase(lock);
684 continue;
685 }
686 bool ni = false;
687 xlock_finish(lock, mut, &ni);
688 if (ni)
689 pneed_issue->insert(static_cast<CInode*>(p));
690 }
691
692 while (!mut->remote_wrlocks.empty()) {
693 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
694 slaves.insert(p->second);
695 if (mut->wrlocks.count(p->first) == 0)
696 mut->locks.erase(p->first);
697 mut->remote_wrlocks.erase(p);
698 }
699
700 while (!mut->wrlocks.empty()) {
701 bool ni = false;
702 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
703 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
704 if (ni)
705 pneed_issue->insert(static_cast<CInode*>(p));
706 }
707
708 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
709 if (!mds->is_cluster_degraded() ||
710 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
711 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
712 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
713 MMDSSlaveRequest::OP_DROPLOCKS);
714 mds->send_message_mds(slavereq, *p);
715 }
716 }
717}
718
719void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
720{
721 SimpleLock *lock = mut->locking;
722 assert(lock);
723 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
724
725 if (lock->get_parent()->is_auth()) {
726 bool need_issue = false;
727 if (lock->get_state() == LOCK_PREXLOCK) {
728 _finish_xlock(lock, -1, &need_issue);
729 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
730 lock->get_num_xlocks() == 0) {
731 lock->set_state(LOCK_XLOCKDONE);
732 eval_gather(lock, true, &need_issue);
733 }
734 if (need_issue)
735 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
736 }
737 mut->finish_locking(lock);
738}
739
740void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
741{
742 // leftover locks
743 set<CInode*> my_need_issue;
744 if (!pneed_issue)
745 pneed_issue = &my_need_issue;
746
747 if (mut->locking)
748 cancel_locking(mut, pneed_issue);
749 _drop_non_rdlocks(mut, pneed_issue);
750 _drop_rdlocks(mut, pneed_issue);
751
752 if (pneed_issue == &my_need_issue)
753 issue_caps_set(*pneed_issue);
754 mut->done_locking = false;
755}
756
757void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
758{
759 set<CInode*> my_need_issue;
760 if (!pneed_issue)
761 pneed_issue = &my_need_issue;
762
763 _drop_non_rdlocks(mut, pneed_issue);
764
765 if (pneed_issue == &my_need_issue)
766 issue_caps_set(*pneed_issue);
767}
768
769void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
770{
771 set<CInode*> my_need_issue;
772 if (!pneed_issue)
773 pneed_issue = &my_need_issue;
774
775 _drop_rdlocks(mut, pneed_issue);
776
777 if (pneed_issue == &my_need_issue)
778 issue_caps_set(*pneed_issue);
779}
780
781
782// generics
783
784void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
785{
786 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
787 assert(!lock->is_stable());
788
789 int next = lock->get_next_state();
790
791 CInode *in = 0;
792 bool caps = lock->get_cap_shift();
793 if (lock->get_type() != CEPH_LOCK_DN)
794 in = static_cast<CInode *>(lock->get_parent());
795
796 bool need_issue = false;
797
798 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
799 assert(!caps || in != NULL);
800 if (caps && in->is_head()) {
801 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
802 lock->get_cap_shift(), lock->get_cap_mask());
803 dout(10) << " next state is " << lock->get_state_name(next)
804 << " issued/allows loner " << gcap_string(loner_issued)
805 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
806 << " xlocker " << gcap_string(xlocker_issued)
807 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
808 << " other " << gcap_string(other_issued)
809 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
810 << dendl;
811
812 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
813 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
814 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
815 need_issue = true;
816 }
817
818#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
819 bool auth = lock->get_parent()->is_auth();
820 if (!lock->is_gathering() &&
821 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
822 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
823 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
824 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
825 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
826 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
827 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
828 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
829 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
830 lock->get_state() != LOCK_MIX_SYNC2
831 ) {
832 dout(7) << "eval_gather finished gather on " << *lock
833 << " on " << *lock->get_parent() << dendl;
834
835 if (lock->get_sm() == &sm_filelock) {
836 assert(in);
837 if (in->state_test(CInode::STATE_RECOVERING)) {
838 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
839 return;
840 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
841 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
842 mds->mdcache->queue_file_recover(in);
843 mds->mdcache->do_file_recover();
844 return;
845 }
846 }
847
848 if (!lock->get_parent()->is_auth()) {
849 // replica: tell auth
850 mds_rank_t auth = lock->get_parent()->authority().first;
851
852 if (lock->get_parent()->is_rejoining() &&
853 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
854 dout(7) << "eval_gather finished gather, but still rejoining "
855 << *lock->get_parent() << dendl;
856 return;
857 }
858
859 if (!mds->is_cluster_degraded() ||
860 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
861 switch (lock->get_state()) {
862 case LOCK_SYNC_LOCK:
863 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
864 auth);
865 break;
866
867 case LOCK_MIX_SYNC:
868 {
869 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
870 lock->encode_locked_state(reply->get_data());
871 mds->send_message_mds(reply, auth);
872 next = LOCK_MIX_SYNC2;
873 (static_cast<ScatterLock *>(lock))->start_flush();
874 }
875 break;
876
877 case LOCK_MIX_SYNC2:
878 (static_cast<ScatterLock *>(lock))->finish_flush();
879 (static_cast<ScatterLock *>(lock))->clear_flushed();
880
881 case LOCK_SYNC_MIX2:
882 // do nothing, we already acked
883 break;
884
885 case LOCK_SYNC_MIX:
886 {
887 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
888 mds->send_message_mds(reply, auth);
889 next = LOCK_SYNC_MIX2;
890 }
891 break;
892
893 case LOCK_MIX_LOCK:
894 {
895 bufferlist data;
896 lock->encode_locked_state(data);
897 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
898 (static_cast<ScatterLock *>(lock))->start_flush();
899 // we'll get an AC_LOCKFLUSHED to complete
900 }
901 break;
902
903 default:
904 ceph_abort();
905 }
906 }
907 } else {
908 // auth
909
910 // once the first (local) stage of mix->lock gather complete we can
911 // gather from replicas
912 if (lock->get_state() == LOCK_MIX_LOCK &&
913 lock->get_parent()->is_replicated()) {
914 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
915 send_lock_message(lock, LOCK_AC_LOCK);
916 lock->init_gather();
917 lock->set_state(LOCK_MIX_LOCK2);
918 return;
919 }
920
921 if (lock->is_dirty() && !lock->is_flushed()) {
922 scatter_writebehind(static_cast<ScatterLock *>(lock));
923 mds->mdlog->flush();
924 return;
925 }
926 lock->clear_flushed();
927
928 switch (lock->get_state()) {
929 // to mixed
930 case LOCK_TSYN_MIX:
931 case LOCK_SYNC_MIX:
932 case LOCK_EXCL_MIX:
933 in->start_scatter(static_cast<ScatterLock *>(lock));
934 if (lock->get_parent()->is_replicated()) {
935 bufferlist softdata;
936 lock->encode_locked_state(softdata);
937 send_lock_message(lock, LOCK_AC_MIX, softdata);
938 }
939 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
940 break;
941
942 case LOCK_XLOCK:
943 case LOCK_XLOCKDONE:
944 if (next != LOCK_SYNC)
945 break;
946 // fall-thru
947
948 // to sync
949 case LOCK_EXCL_SYNC:
950 case LOCK_LOCK_SYNC:
951 case LOCK_MIX_SYNC:
952 case LOCK_XSYN_SYNC:
953 if (lock->get_parent()->is_replicated()) {
954 bufferlist softdata;
955 lock->encode_locked_state(softdata);
956 send_lock_message(lock, LOCK_AC_SYNC, softdata);
957 }
958 break;
959 }
960
961 }
962
963 lock->set_state(next);
964
965 if (lock->get_parent()->is_auth() &&
966 lock->is_stable())
967 lock->get_parent()->auth_unpin(lock);
968
969 // drop loner before doing waiters
970 if (caps &&
971 in->is_head() &&
972 in->is_auth() &&
973 in->get_wanted_loner() != in->get_loner()) {
974 dout(10) << " trying to drop loner" << dendl;
975 if (in->try_drop_loner()) {
976 dout(10) << " dropped loner" << dendl;
977 need_issue = true;
978 }
979 }
980
981 if (pfinishers)
982 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
983 *pfinishers);
984 else
985 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
986
987 if (caps && in->is_head())
988 need_issue = true;
989
990 if (lock->get_parent()->is_auth() &&
991 lock->is_stable())
992 try_eval(lock, &need_issue);
993 }
994
995 if (need_issue) {
996 if (pneed_issue)
997 *pneed_issue = true;
998 else if (in->is_head())
999 issue_caps(in);
1000 }
1001
1002}
1003
1004bool Locker::eval(CInode *in, int mask, bool caps_imported)
1005{
1006 bool need_issue = caps_imported;
1007 list<MDSInternalContextBase*> finishers;
1008
1009 dout(10) << "eval " << mask << " " << *in << dendl;
1010
1011 // choose loner?
1012 if (in->is_auth() && in->is_head()) {
1013 if (in->choose_ideal_loner() >= 0) {
1014 if (in->try_set_loner()) {
1015 dout(10) << "eval set loner to client." << in->get_loner() << dendl;
1016 need_issue = true;
1017 mask = -1;
1018 } else
1019 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1020 } else
1021 dout(10) << "eval doesn't want loner" << dendl;
1022 }
1023
1024 retry:
1025 if (mask & CEPH_LOCK_IFILE)
1026 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1027 if (mask & CEPH_LOCK_IAUTH)
1028 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1029 if (mask & CEPH_LOCK_ILINK)
1030 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1031 if (mask & CEPH_LOCK_IXATTR)
1032 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1033 if (mask & CEPH_LOCK_INEST)
1034 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1035 if (mask & CEPH_LOCK_IFLOCK)
1036 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1037 if (mask & CEPH_LOCK_IPOLICY)
1038 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1039
1040 // drop loner?
1041 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1042 dout(10) << " trying to drop loner" << dendl;
1043 if (in->try_drop_loner()) {
1044 dout(10) << " dropped loner" << dendl;
1045 need_issue = true;
1046
1047 if (in->get_wanted_loner() >= 0) {
1048 if (in->try_set_loner()) {
1049 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1050 mask = -1;
1051 goto retry;
1052 } else {
1053 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1054 }
1055 }
1056 }
1057 }
1058
1059 finish_contexts(g_ceph_context, finishers);
1060
1061 if (need_issue && in->is_head())
1062 issue_caps(in);
1063
1064 dout(10) << "eval done" << dendl;
1065 return need_issue;
1066}
1067
1068class C_Locker_Eval : public LockerContext {
1069 MDSCacheObject *p;
1070 int mask;
1071public:
1072 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1073 // We are used as an MDSCacheObject waiter, so should
1074 // only be invoked by someone already holding the big lock.
1075 assert(locker->mds->mds_lock.is_locked_by_me());
1076 p->get(MDSCacheObject::PIN_PTRWAITER);
1077 }
1078 void finish(int r) override {
1079 locker->try_eval(p, mask);
1080 p->put(MDSCacheObject::PIN_PTRWAITER);
1081 }
1082};
1083
1084void Locker::try_eval(MDSCacheObject *p, int mask)
1085{
1086 // unstable and ambiguous auth?
1087 if (p->is_ambiguous_auth()) {
1088 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1089 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1090 return;
1091 }
1092
1093 if (p->is_auth() && p->is_frozen()) {
1094 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1095 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1096 return;
1097 }
1098
1099 if (mask & CEPH_LOCK_DN) {
1100 assert(mask == CEPH_LOCK_DN);
1101 bool need_issue = false; // ignore this, no caps on dentries
1102 CDentry *dn = static_cast<CDentry *>(p);
1103 eval_any(&dn->lock, &need_issue);
1104 } else {
1105 CInode *in = static_cast<CInode *>(p);
1106 eval(in, mask);
1107 }
1108}
1109
1110void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1111{
1112 MDSCacheObject *p = lock->get_parent();
1113
1114 // unstable and ambiguous auth?
1115 if (p->is_ambiguous_auth()) {
1116 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1117 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1118 return;
1119 }
1120
1121 if (!p->is_auth()) {
1122 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1123 return;
1124 }
1125
1126 if (p->is_frozen()) {
1127 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1128 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1129 return;
1130 }
1131
1132 /*
1133 * We could have a situation like:
1134 *
1135 * - mds A authpins item on mds B
1136 * - mds B starts to freeze tree containing item
1137 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1138 * - mds B lock is unstable, sets scatter_wanted
1139 * - mds B lock stabilizes, calls try_eval.
1140 *
1141 * We can defer while freezing without causing a deadlock. Honor
1142 * scatter_wanted flag here. This will never get deferred by the
1143 * checks above due to the auth_pin held by the master.
1144 */
1145 if (lock->is_scatterlock()) {
1146 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1147 if (slock->get_scatter_wanted() &&
1148 slock->get_state() != LOCK_MIX) {
1149 scatter_mix(slock, pneed_issue);
1150 if (!lock->is_stable())
1151 return;
1152 } else if (slock->get_unscatter_wanted() &&
1153 slock->get_state() != LOCK_LOCK) {
1154 simple_lock(slock, pneed_issue);
1155 if (!lock->is_stable()) {
1156 return;
1157 }
1158 }
1159 }
1160
1161 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1162 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1163 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1164 return;
1165 }
1166
1167 eval(lock, pneed_issue);
1168}
1169
1170void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1171{
1172 bool need_issue = false;
1173 list<MDSInternalContextBase*> finishers;
1174
1175 // kick locks now
1176 if (!in->filelock.is_stable())
1177 eval_gather(&in->filelock, false, &need_issue, &finishers);
1178 if (!in->authlock.is_stable())
1179 eval_gather(&in->authlock, false, &need_issue, &finishers);
1180 if (!in->linklock.is_stable())
1181 eval_gather(&in->linklock, false, &need_issue, &finishers);
1182 if (!in->xattrlock.is_stable())
1183 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1184
1185 if (need_issue && in->is_head()) {
1186 if (issue_set)
1187 issue_set->insert(in);
1188 else
1189 issue_caps(in);
1190 }
1191
1192 finish_contexts(g_ceph_context, finishers);
1193}
1194
1195void Locker::eval_scatter_gathers(CInode *in)
1196{
1197 bool need_issue = false;
1198 list<MDSInternalContextBase*> finishers;
1199
1200 dout(10) << "eval_scatter_gathers " << *in << dendl;
1201
1202 // kick locks now
1203 if (!in->filelock.is_stable())
1204 eval_gather(&in->filelock, false, &need_issue, &finishers);
1205 if (!in->nestlock.is_stable())
1206 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1207 if (!in->dirfragtreelock.is_stable())
1208 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1209
1210 if (need_issue && in->is_head())
1211 issue_caps(in);
1212
1213 finish_contexts(g_ceph_context, finishers);
1214}
1215
1216void Locker::eval(SimpleLock *lock, bool *need_issue)
1217{
1218 switch (lock->get_type()) {
1219 case CEPH_LOCK_IFILE:
1220 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1221 case CEPH_LOCK_IDFT:
1222 case CEPH_LOCK_INEST:
1223 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1224 default:
1225 return simple_eval(lock, need_issue);
1226 }
1227}
1228
1229
1230// ------------------
1231// rdlock
1232
1233bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1234{
1235 // kick the lock
1236 if (lock->is_stable()) {
1237 if (lock->get_parent()->is_auth()) {
1238 if (lock->get_sm() == &sm_scatterlock) {
1239 // not until tempsync is fully implemented
1240 //if (lock->get_parent()->is_replicated())
1241 //scatter_tempsync((ScatterLock*)lock);
1242 //else
1243 simple_sync(lock);
1244 } else if (lock->get_sm() == &sm_filelock) {
1245 CInode *in = static_cast<CInode*>(lock->get_parent());
1246 if (lock->get_state() == LOCK_EXCL &&
1247 in->get_target_loner() >= 0 &&
1248 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1249 file_xsyn(lock);
1250 else
1251 simple_sync(lock);
1252 } else
1253 simple_sync(lock);
1254 return true;
1255 } else {
1256 // request rdlock state change from auth
1257 mds_rank_t auth = lock->get_parent()->authority().first;
1258 if (!mds->is_cluster_degraded() ||
1259 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1260 dout(10) << "requesting rdlock from auth on "
1261 << *lock << " on " << *lock->get_parent() << dendl;
1262 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1263 }
1264 return false;
1265 }
1266 }
1267 if (lock->get_type() == CEPH_LOCK_IFILE) {
1268 CInode *in = static_cast<CInode *>(lock->get_parent());
1269 if (in->state_test(CInode::STATE_RECOVERING)) {
1270 mds->mdcache->recovery_queue.prioritize(in);
1271 }
1272 }
1273
1274 return false;
1275}
1276
1277bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1278{
1279 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1280
1281 // can read? grab ref.
1282 if (lock->can_rdlock(client))
1283 return true;
1284
1285 _rdlock_kick(lock, false);
1286
1287 if (lock->can_rdlock(client))
1288 return true;
1289
1290 // wait!
1291 if (con) {
1292 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1293 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1294 }
1295 return false;
1296}
1297
1298bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1299{
1300 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1301
1302 // client may be allowed to rdlock the same item it has xlocked.
1303 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1304 if (mut->snapid != CEPH_NOSNAP)
1305 as_anon = true;
1306 client_t client = as_anon ? -1 : mut->get_client();
1307
1308 CInode *in = 0;
1309 if (lock->get_type() != CEPH_LOCK_DN)
1310 in = static_cast<CInode *>(lock->get_parent());
1311
1312 /*
1313 if (!lock->get_parent()->is_auth() &&
1314 lock->fw_rdlock_to_auth()) {
1315 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1316 return false;
1317 }
1318 */
1319
1320 while (1) {
1321 // can read? grab ref.
1322 if (lock->can_rdlock(client)) {
1323 lock->get_rdlock();
1324 mut->rdlocks.insert(lock);
1325 mut->locks.insert(lock);
1326 return true;
1327 }
1328
1329 // hmm, wait a second.
1330 if (in && !in->is_head() && in->is_auth() &&
1331 lock->get_state() == LOCK_SNAP_SYNC) {
1332 // okay, we actually need to kick the head's lock to get ourselves synced up.
1333 CInode *head = mdcache->get_inode(in->ino());
1334 assert(head);
c07f9fc5
FG
1335 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1336 if (hlock->get_state() == LOCK_SYNC)
1337 hlock = head->get_lock(lock->get_type());
1338
7c673cae
FG
1339 if (hlock->get_state() != LOCK_SYNC) {
1340 dout(10) << "rdlock_start trying head inode " << *head << dendl;
c07f9fc5 1341 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
7c673cae
FG
1342 return false;
1343 // oh, check our lock again then
1344 }
1345 }
1346
1347 if (!_rdlock_kick(lock, as_anon))
1348 break;
1349 }
1350
1351 // wait!
1352 int wait_on;
1353 if (lock->get_parent()->is_auth() && lock->is_stable())
1354 wait_on = SimpleLock::WAIT_RD;
1355 else
1356 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1357 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1358 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1359 nudge_log(lock);
1360 return false;
1361}
1362
1363void Locker::nudge_log(SimpleLock *lock)
1364{
1365 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1366 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1367 mds->mdlog->flush();
1368}
1369
1370void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1371{
1372 // drop ref
1373 lock->put_rdlock();
1374 if (mut) {
1375 mut->rdlocks.erase(lock);
1376 mut->locks.erase(lock);
1377 }
1378
1379 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1380
1381 // last one?
1382 if (!lock->is_rdlocked()) {
1383 if (!lock->is_stable())
1384 eval_gather(lock, false, pneed_issue);
1385 else if (lock->get_parent()->is_auth())
1386 try_eval(lock, pneed_issue);
1387 }
1388}
1389
1390
1391bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1392{
1393 dout(10) << "can_rdlock_set " << locks << dendl;
1394 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1395 if (!(*p)->can_rdlock(-1)) {
1396 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1397 return false;
1398 }
1399 return true;
1400}
1401
1402bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1403{
1404 dout(10) << "rdlock_try_set " << locks << dendl;
1405 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1406 if (!rdlock_try(*p, -1, NULL)) {
1407 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1408 return false;
1409 }
1410 return true;
1411}
1412
1413void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1414{
1415 dout(10) << "rdlock_take_set " << locks << dendl;
1416 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1417 (*p)->get_rdlock();
1418 mut->rdlocks.insert(*p);
1419 mut->locks.insert(*p);
1420 }
1421}
1422
1423// ------------------
1424// wrlock
1425
1426void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1427{
1428 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1429 lock->get_type() == CEPH_LOCK_DVERSION)
1430 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1431
1432 dout(7) << "wrlock_force on " << *lock
1433 << " on " << *lock->get_parent() << dendl;
1434 lock->get_wrlock(true);
1435 mut->wrlocks.insert(lock);
1436 mut->locks.insert(lock);
1437}
1438
1439bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1440{
1441 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1442 lock->get_type() == CEPH_LOCK_DVERSION)
1443 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1444
1445 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1446
1447 CInode *in = static_cast<CInode *>(lock->get_parent());
1448 client_t client = mut->get_client();
1449 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1450 (in->has_subtree_or_exporting_dirfrag() ||
1451 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1452
1453 while (1) {
1454 // wrlock?
1455 if (lock->can_wrlock(client) &&
1456 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1457 lock->get_wrlock();
1458 mut->wrlocks.insert(lock);
1459 mut->locks.insert(lock);
1460 return true;
1461 }
1462
1463 if (lock->get_type() == CEPH_LOCK_IFILE &&
1464 in->state_test(CInode::STATE_RECOVERING)) {
1465 mds->mdcache->recovery_queue.prioritize(in);
1466 }
1467
1468 if (!lock->is_stable())
1469 break;
1470
1471 if (in->is_auth()) {
1472 // don't do nested lock state change if we have dirty scatterdata and
1473 // may scatter_writebehind or start_scatter, because nowait==true implies
1474 // that the caller already has a log entry open!
1475 if (nowait && lock->is_dirty())
1476 return false;
1477
1478 if (want_scatter)
1479 scatter_mix(static_cast<ScatterLock*>(lock));
1480 else
1481 simple_lock(lock);
1482
1483 if (nowait && !lock->can_wrlock(client))
1484 return false;
1485
1486 } else {
1487 // replica.
1488 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1489 mds_rank_t auth = lock->get_parent()->authority().first;
1490 if (!mds->is_cluster_degraded() ||
1491 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1492 dout(10) << "requesting scatter from auth on "
1493 << *lock << " on " << *lock->get_parent() << dendl;
1494 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1495 }
1496 break;
1497 }
1498 }
1499
1500 if (!nowait) {
1501 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1502 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1503 nudge_log(lock);
1504 }
1505
1506 return false;
1507}
1508
1509void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1510{
1511 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1512 lock->get_type() == CEPH_LOCK_DVERSION)
1513 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1514
1515 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1516 lock->put_wrlock();
1517 if (mut) {
1518 mut->wrlocks.erase(lock);
1519 if (mut->remote_wrlocks.count(lock) == 0)
1520 mut->locks.erase(lock);
1521 }
1522
1523 if (!lock->is_wrlocked()) {
1524 if (!lock->is_stable())
1525 eval_gather(lock, false, pneed_issue);
1526 else if (lock->get_parent()->is_auth())
1527 try_eval(lock, pneed_issue);
1528 }
1529}
1530
1531
1532// remote wrlock
1533
1534void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1535{
1536 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1537
1538 // wait for active target
1539 if (mds->is_cluster_degraded() &&
1540 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1541 dout(7) << " mds." << target << " is not active" << dendl;
1542 if (mut->more()->waiting_on_slave.empty())
1543 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1544 return;
1545 }
1546
1547 // send lock request
1548 mut->start_locking(lock, target);
1549 mut->more()->slaves.insert(target);
1550 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1551 MMDSSlaveRequest::OP_WRLOCK);
1552 r->set_lock_type(lock->get_type());
1553 lock->get_parent()->set_object_info(r->get_object_info());
1554 mds->send_message_mds(r, target);
1555
1556 assert(mut->more()->waiting_on_slave.count(target) == 0);
1557 mut->more()->waiting_on_slave.insert(target);
1558}
1559
1560void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1561 MutationImpl *mut)
1562{
1563 // drop ref
1564 mut->remote_wrlocks.erase(lock);
1565 if (mut->wrlocks.count(lock) == 0)
1566 mut->locks.erase(lock);
1567
1568 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1569 << " " << *lock->get_parent() << dendl;
1570 if (!mds->is_cluster_degraded() ||
1571 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1572 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1573 MMDSSlaveRequest::OP_UNWRLOCK);
1574 slavereq->set_lock_type(lock->get_type());
1575 lock->get_parent()->set_object_info(slavereq->get_object_info());
1576 mds->send_message_mds(slavereq, target);
1577 }
1578}
1579
1580
1581// ------------------
1582// xlock
1583
1584bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1585{
1586 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1587 lock->get_type() == CEPH_LOCK_DVERSION)
1588 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1589
1590 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1591 client_t client = mut->get_client();
1592
1593 // auth?
1594 if (lock->get_parent()->is_auth()) {
1595 // auth
1596 while (1) {
1597 if (lock->can_xlock(client)) {
1598 lock->set_state(LOCK_XLOCK);
1599 lock->get_xlock(mut, client);
1600 mut->xlocks.insert(lock);
1601 mut->locks.insert(lock);
1602 mut->finish_locking(lock);
1603 return true;
1604 }
1605
1606 if (lock->get_type() == CEPH_LOCK_IFILE) {
1607 CInode *in = static_cast<CInode*>(lock->get_parent());
1608 if (in->state_test(CInode::STATE_RECOVERING)) {
1609 mds->mdcache->recovery_queue.prioritize(in);
1610 }
1611 }
1612
1613 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1614 lock->get_xlock_by_client() != client ||
1615 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1616 break;
1617
1618 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1619 mut->start_locking(lock);
1620 simple_xlock(lock);
1621 } else {
1622 simple_lock(lock);
1623 }
1624 }
1625
1626 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1627 nudge_log(lock);
1628 return false;
1629 } else {
1630 // replica
1631 assert(lock->get_sm()->can_remote_xlock);
1632 assert(!mut->slave_request);
1633
1634 // wait for single auth
1635 if (lock->get_parent()->is_ambiguous_auth()) {
1636 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1637 new C_MDS_RetryRequest(mdcache, mut));
1638 return false;
1639 }
1640
1641 // wait for active auth
1642 mds_rank_t auth = lock->get_parent()->authority().first;
1643 if (mds->is_cluster_degraded() &&
1644 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1645 dout(7) << " mds." << auth << " is not active" << dendl;
1646 if (mut->more()->waiting_on_slave.empty())
1647 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1648 return false;
1649 }
1650
1651 // send lock request
1652 mut->more()->slaves.insert(auth);
1653 mut->start_locking(lock, auth);
1654 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1655 MMDSSlaveRequest::OP_XLOCK);
1656 r->set_lock_type(lock->get_type());
1657 lock->get_parent()->set_object_info(r->get_object_info());
1658 mds->send_message_mds(r, auth);
1659
1660 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1661 mut->more()->waiting_on_slave.insert(auth);
1662
1663 return false;
1664 }
1665}
1666
1667void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1668{
1669 assert(!lock->is_stable());
1670 if (lock->get_num_rdlocks() == 0 &&
1671 lock->get_num_wrlocks() == 0 &&
1672 lock->get_num_client_lease() == 0 &&
1673 lock->get_state() != LOCK_XLOCKSNAP &&
1674 lock->get_type() != CEPH_LOCK_DN) {
1675 CInode *in = static_cast<CInode*>(lock->get_parent());
1676 client_t loner = in->get_target_loner();
1677 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1678 lock->set_state(LOCK_EXCL);
1679 lock->get_parent()->auth_unpin(lock);
1680 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1681 if (lock->get_cap_shift())
1682 *pneed_issue = true;
1683 if (lock->get_parent()->is_auth() &&
1684 lock->is_stable())
1685 try_eval(lock, pneed_issue);
1686 return;
1687 }
1688 }
1689 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1690 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1691}
1692
1693void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1694{
1695 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1696 lock->get_type() == CEPH_LOCK_DVERSION)
1697 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1698
1699 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1700
1701 client_t xlocker = lock->get_xlock_by_client();
1702
1703 // drop ref
1704 lock->put_xlock();
1705 assert(mut);
1706 mut->xlocks.erase(lock);
1707 mut->locks.erase(lock);
1708
1709 bool do_issue = false;
1710
1711 // remote xlock?
1712 if (!lock->get_parent()->is_auth()) {
1713 assert(lock->get_sm()->can_remote_xlock);
1714
1715 // tell auth
1716 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1717 mds_rank_t auth = lock->get_parent()->authority().first;
1718 if (!mds->is_cluster_degraded() ||
1719 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1720 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1721 MMDSSlaveRequest::OP_UNXLOCK);
1722 slavereq->set_lock_type(lock->get_type());
1723 lock->get_parent()->set_object_info(slavereq->get_object_info());
1724 mds->send_message_mds(slavereq, auth);
1725 }
1726 // others waiting?
1727 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1728 SimpleLock::WAIT_WR |
1729 SimpleLock::WAIT_RD, 0);
1730 } else {
1731 if (lock->get_num_xlocks() == 0) {
1732 if (lock->get_state() == LOCK_LOCK_XLOCK)
1733 lock->set_state(LOCK_XLOCKDONE);
1734 _finish_xlock(lock, xlocker, &do_issue);
1735 }
1736 }
1737
1738 if (do_issue) {
1739 CInode *in = static_cast<CInode*>(lock->get_parent());
1740 if (in->is_head()) {
1741 if (pneed_issue)
1742 *pneed_issue = true;
1743 else
1744 issue_caps(in);
1745 }
1746 }
1747}
1748
1749void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1750{
1751 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1752
1753 lock->put_xlock();
1754 mut->xlocks.erase(lock);
1755 mut->locks.erase(lock);
1756
1757 MDSCacheObject *p = lock->get_parent();
1758 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1759
1760 if (!lock->is_stable())
1761 lock->get_parent()->auth_unpin(lock);
1762
1763 lock->set_state(LOCK_LOCK);
1764}
1765
1766void Locker::xlock_import(SimpleLock *lock)
1767{
1768 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1769 lock->get_parent()->auth_pin(lock);
1770}
1771
1772
1773
1774// file i/o -----------------------------------------
1775
1776version_t Locker::issue_file_data_version(CInode *in)
1777{
1778 dout(7) << "issue_file_data_version on " << *in << dendl;
1779 return in->inode.file_data_version;
1780}
1781
1782class C_Locker_FileUpdate_finish : public LockerLogContext {
1783 CInode *in;
1784 MutationRef mut;
1785 bool share_max;
1786 bool need_issue;
1787 client_t client;
1788 MClientCaps *ack;
1789public:
1790 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1791 bool sm=false, bool ni=false, client_t c=-1,
1792 MClientCaps *ac = 0)
1793 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1794 client(c), ack(ac) {
1795 in->get(CInode::PIN_PTRWAITER);
1796 }
1797 void finish(int r) override {
1798 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1799 in->put(CInode::PIN_PTRWAITER);
1800 }
1801};
1802
1803void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1804 client_t client, MClientCaps *ack)
1805{
1806 dout(10) << "file_update_finish on " << *in << dendl;
1807 in->pop_and_dirty_projected_inode(mut->ls);
1808
1809 mut->apply();
1810
1811 if (ack) {
1812 Session *session = mds->get_session(client);
1813 if (session) {
1814 // "oldest flush tid" > 0 means client uses unique TID for each flush
1815 if (ack->get_oldest_flush_tid() > 0)
1816 session->add_completed_flush(ack->get_client_tid());
1817 mds->send_message_client_counted(ack, session);
1818 } else {
1819 dout(10) << " no session for client." << client << " " << *ack << dendl;
1820 ack->put();
1821 }
1822 }
1823
1824 set<CInode*> need_issue;
1825 drop_locks(mut.get(), &need_issue);
1826
1827 if (!in->is_head() && !in->client_snap_caps.empty()) {
1828 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1829 // check for snap writeback completion
1830 bool gather = false;
1831 compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1832 while (p != in->client_snap_caps.end()) {
1833 SimpleLock *lock = in->get_lock(p->first);
1834 assert(lock);
1835 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1836 << " lock " << *lock << " on " << *in << dendl;
1837 lock->put_wrlock();
1838
1839 p->second.erase(client);
1840 if (p->second.empty()) {
1841 gather = true;
1842 in->client_snap_caps.erase(p++);
1843 } else
1844 ++p;
1845 }
1846 if (gather) {
1847 if (in->client_snap_caps.empty())
1848 in->item_open_file.remove_myself();
1849 eval_cap_gather(in, &need_issue);
1850 }
1851 } else {
1852 if (issue_client_cap && need_issue.count(in) == 0) {
1853 Capability *cap = in->get_client_cap(client);
1854 if (cap && (cap->wanted() & ~cap->pending()))
1855 issue_caps(in, cap);
1856 }
1857
1858 if (share_max && in->is_auth() &&
1859 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1860 share_inode_max_size(in);
1861 }
1862 issue_caps_set(need_issue);
1863
1864 // auth unpin after issuing caps
1865 mut->cleanup();
1866}
1867
1868Capability* Locker::issue_new_caps(CInode *in,
1869 int mode,
1870 Session *session,
1871 SnapRealm *realm,
1872 bool is_replay)
1873{
1874 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1875 bool is_new;
1876
1877 // if replay, try to reconnect cap, and otherwise do nothing.
1878 if (is_replay) {
1879 mds->mdcache->try_reconnect_cap(in, session);
1880 return 0;
1881 }
1882
1883 // my needs
1884 assert(session->info.inst.name.is_client());
31f18b77 1885 client_t my_client = session->info.inst.name.num();
7c673cae
FG
1886 int my_want = ceph_caps_for_mode(mode);
1887
1888 // register a capability
1889 Capability *cap = in->get_client_cap(my_client);
1890 if (!cap) {
1891 // new cap
1892 cap = in->add_client_cap(my_client, session, realm);
1893 cap->set_wanted(my_want);
1894 cap->mark_new();
1895 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1896 is_new = true;
1897 } else {
1898 is_new = false;
1899 // make sure it wants sufficient caps
1900 if (my_want & ~cap->wanted()) {
1901 // augment wanted caps for this client
1902 cap->set_wanted(cap->wanted() | my_want);
1903 }
1904 }
1905
1906 if (in->is_auth()) {
1907 // [auth] twiddle mode?
1908 eval(in, CEPH_CAP_LOCKS);
1909
1910 if (_need_flush_mdlog(in, my_want))
1911 mds->mdlog->flush();
1912
1913 } else {
1914 // [replica] tell auth about any new caps wanted
1915 request_inode_file_caps(in);
1916 }
1917
1918 // issue caps (pot. incl new one)
1919 //issue_caps(in); // note: _eval above may have done this already...
1920
1921 // re-issue whatever we can
1922 //cap->issue(cap->pending());
1923
1924 if (is_new)
1925 cap->dec_suppress();
1926
1927 return cap;
1928}
1929
1930
1931void Locker::issue_caps_set(set<CInode*>& inset)
1932{
1933 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1934 issue_caps(*p);
1935}
1936
1937bool Locker::issue_caps(CInode *in, Capability *only_cap)
1938{
1939 // allowed caps are determined by the lock mode.
1940 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1941 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1942 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1943
1944 client_t loner = in->get_loner();
1945 if (loner >= 0) {
1946 dout(7) << "issue_caps loner client." << loner
1947 << " allowed=" << ccap_string(loner_allowed)
1948 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1949 << ", others allowed=" << ccap_string(all_allowed)
1950 << " on " << *in << dendl;
1951 } else {
1952 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1953 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1954 << " on " << *in << dendl;
1955 }
1956
1957 assert(in->is_head());
1958
1959 // count conflicts with
1960 int nissued = 0;
1961
1962 // client caps
1963 map<client_t, Capability*>::iterator it;
1964 if (only_cap)
1965 it = in->client_caps.find(only_cap->get_client());
1966 else
1967 it = in->client_caps.begin();
1968 for (; it != in->client_caps.end(); ++it) {
1969 Capability *cap = it->second;
1970 if (cap->is_stale())
1971 continue;
1972
1973 // do not issue _new_ bits when size|mtime is projected
1974 int allowed;
1975 if (loner == it->first)
1976 allowed = loner_allowed;
1977 else
1978 allowed = all_allowed;
1979
1980 // add in any xlocker-only caps (for locks this client is the xlocker for)
1981 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
1982
1983 Session *session = mds->get_session(it->first);
1984 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
1985 !(session && session->connection &&
1986 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
1987 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
1988
1989 int pending = cap->pending();
1990 int wanted = cap->wanted();
1991
1992 dout(20) << " client." << it->first
1993 << " pending " << ccap_string(pending)
1994 << " allowed " << ccap_string(allowed)
1995 << " wanted " << ccap_string(wanted)
1996 << dendl;
1997
1998 if (!(pending & ~allowed)) {
1999 // skip if suppress or new, and not revocation
2000 if (cap->is_new() || cap->is_suppress()) {
2001 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2002 continue;
2003 }
2004 }
2005
2006 // notify clients about deleted inode, to make sure they release caps ASAP.
2007 if (in->inode.nlink == 0)
2008 wanted |= CEPH_CAP_LINK_SHARED;
2009
2010 // are there caps that the client _wants_ and can have, but aren't pending?
2011 // or do we need to revoke?
2012 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2013 (pending & ~allowed)) { // need to revoke ~allowed caps.
2014 // issue
2015 nissued++;
2016
2017 // include caps that clients generally like, while we're at it.
2018 int likes = in->get_caps_liked();
2019 int before = pending;
2020 long seq;
2021 if (pending & ~allowed)
2022 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2023 else
2024 seq = cap->issue((wanted|likes) & allowed);
2025 int after = cap->pending();
2026
2027 if (cap->is_new()) {
2028 // haven't send caps to client yet
2029 if (before & ~after)
2030 cap->confirm_receipt(seq, after);
2031 } else {
2032 dout(7) << " sending MClientCaps to client." << it->first
2033 << " seq " << cap->get_last_seq()
2034 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2035 << dendl;
2036
2037 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2038 if (op == CEPH_CAP_OP_REVOKE) {
2039 revoking_caps.push_back(&cap->item_revoking_caps);
2040 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2041 cap->set_last_revoke_stamp(ceph_clock_now());
2042 cap->reset_num_revoke_warnings();
2043 }
2044
2045 MClientCaps *m = new MClientCaps(op, in->ino(),
2046 in->find_snaprealm()->inode->ino(),
2047 cap->get_cap_id(), cap->get_last_seq(),
2048 after, wanted, 0,
2049 cap->get_mseq(),
2050 mds->get_osd_epoch_barrier());
2051 in->encode_cap_message(m, cap);
2052
2053 mds->send_message_client_counted(m, it->first);
2054 }
2055 }
2056
2057 if (only_cap)
2058 break;
2059 }
2060
2061 return (nissued == 0); // true if no re-issued, no callbacks
2062}
2063
2064void Locker::issue_truncate(CInode *in)
2065{
2066 dout(7) << "issue_truncate on " << *in << dendl;
2067
2068 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2069 it != in->client_caps.end();
2070 ++it) {
2071 Capability *cap = it->second;
2072 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2073 in->ino(),
2074 in->find_snaprealm()->inode->ino(),
2075 cap->get_cap_id(), cap->get_last_seq(),
2076 cap->pending(), cap->wanted(), 0,
2077 cap->get_mseq(),
2078 mds->get_osd_epoch_barrier());
2079 in->encode_cap_message(m, cap);
2080 mds->send_message_client_counted(m, it->first);
2081 }
2082
2083 // should we increase max_size?
2084 if (in->is_auth() && in->is_file())
2085 check_inode_max_size(in);
2086}
2087
2088
2089void Locker::revoke_stale_caps(Capability *cap)
2090{
2091 CInode *in = cap->get_inode();
2092 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2093 // if export succeeds, the cap will be removed. if export fails, we need to
2094 // revoke the cap if it's still stale.
2095 in->state_set(CInode::STATE_EVALSTALECAPS);
2096 return;
2097 }
2098
2099 int issued = cap->issued();
2100 if (issued & ~CEPH_CAP_PIN) {
2101 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2102 cap->revoke();
2103
2104 if (in->is_auth() &&
2105 in->inode.client_ranges.count(cap->get_client()))
2106 in->state_set(CInode::STATE_NEEDSRECOVER);
2107
2108 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2109 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2110 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2111 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2112
2113 if (in->is_auth()) {
2114 try_eval(in, CEPH_CAP_LOCKS);
2115 } else {
2116 request_inode_file_caps(in);
2117 }
2118 }
2119}
2120
2121void Locker::revoke_stale_caps(Session *session)
2122{
2123 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2124
2125 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2126 Capability *cap = *p;
2127 cap->mark_stale();
2128 revoke_stale_caps(cap);
2129 }
2130}
2131
2132void Locker::resume_stale_caps(Session *session)
2133{
2134 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2135
2136 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2137 Capability *cap = *p;
2138 CInode *in = cap->get_inode();
2139 assert(in->is_head());
2140 if (cap->is_stale()) {
2141 dout(10) << " clearing stale flag on " << *in << dendl;
2142 cap->clear_stale();
2143
2144 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2145 // if export succeeds, the cap will be removed. if export fails,
2146 // we need to re-issue the cap if it's not stale.
2147 in->state_set(CInode::STATE_EVALSTALECAPS);
2148 continue;
2149 }
2150
2151 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2152 issue_caps(in, cap);
2153 }
2154 }
2155}
2156
2157void Locker::remove_stale_leases(Session *session)
2158{
2159 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2160 xlist<ClientLease*>::iterator p = session->leases.begin();
2161 while (!p.end()) {
2162 ClientLease *l = *p;
2163 ++p;
2164 CDentry *parent = static_cast<CDentry*>(l->parent);
2165 dout(15) << " removing lease on " << *parent << dendl;
2166 parent->remove_client_lease(l, this);
2167 }
2168}
2169
2170
2171class C_MDL_RequestInodeFileCaps : public LockerContext {
2172 CInode *in;
2173public:
2174 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2175 in->get(CInode::PIN_PTRWAITER);
2176 }
2177 void finish(int r) override {
2178 if (!in->is_auth())
2179 locker->request_inode_file_caps(in);
2180 in->put(CInode::PIN_PTRWAITER);
2181 }
2182};
2183
2184void Locker::request_inode_file_caps(CInode *in)
2185{
2186 assert(!in->is_auth());
2187
2188 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2189 if (wanted != in->replica_caps_wanted) {
2190 // wait for single auth
2191 if (in->is_ambiguous_auth()) {
2192 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2193 new C_MDL_RequestInodeFileCaps(this, in));
2194 return;
2195 }
2196
2197 mds_rank_t auth = in->authority().first;
2198 if (mds->is_cluster_degraded() &&
2199 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2200 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2201 return;
2202 }
2203
2204 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2205 << " was " << ccap_string(in->replica_caps_wanted)
2206 << " on " << *in << " to mds." << auth << dendl;
2207
2208 in->replica_caps_wanted = wanted;
2209
2210 if (!mds->is_cluster_degraded() ||
2211 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2212 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2213 auth);
2214 }
2215}
2216
2217/* This function DOES put the passed message before returning */
2218void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2219{
2220 // nobody should be talking to us during recovery.
2221 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2222
2223 // ok
2224 CInode *in = mdcache->get_inode(m->get_ino());
2225 mds_rank_t from = mds_rank_t(m->get_source().num());
2226
2227 assert(in);
2228 assert(in->is_auth());
2229
2230 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2231
2232 if (m->get_caps())
2233 in->mds_caps_wanted[from] = m->get_caps();
2234 else
2235 in->mds_caps_wanted.erase(from);
2236
2237 try_eval(in, CEPH_CAP_LOCKS);
2238 m->put();
2239}
2240
2241
2242class C_MDL_CheckMaxSize : public LockerContext {
2243 CInode *in;
2244 uint64_t new_max_size;
2245 uint64_t newsize;
2246 utime_t mtime;
2247
2248public:
2249 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2250 uint64_t _newsize, utime_t _mtime) :
2251 LockerContext(l), in(i),
2252 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2253 {
2254 in->get(CInode::PIN_PTRWAITER);
2255 }
2256 void finish(int r) override {
2257 if (in->is_auth())
2258 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2259 in->put(CInode::PIN_PTRWAITER);
2260 }
2261};
2262
31f18b77
FG
2263uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2264{
2265 uint64_t new_max = (size + 1) << 1;
2266 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2267 if (max_inc > 0) {
2268 max_inc *= pi->get_layout_size_increment();
2269 new_max = MIN(new_max, size + max_inc);
2270 }
2271 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2272}
7c673cae
FG
2273
2274void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2275 map<client_t,client_writeable_range_t> *new_ranges,
2276 bool *max_increased)
2277{
2278 inode_t *latest = in->get_projected_inode();
2279 uint64_t ms;
2280 if(latest->has_layout()) {
31f18b77 2281 ms = calc_new_max_size(latest, size);
7c673cae
FG
2282 } else {
2283 // Layout-less directories like ~mds0/, have zero size
2284 ms = 0;
2285 }
2286
2287 // increase ranges as appropriate.
2288 // shrink to 0 if no WR|BUFFER caps issued.
2289 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2290 p != in->client_caps.end();
2291 ++p) {
2292 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2293 client_writeable_range_t& nr = (*new_ranges)[p->first];
2294 nr.range.first = 0;
2295 if (latest->client_ranges.count(p->first)) {
2296 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2297 if (ms > oldr.range.last)
2298 *max_increased = true;
2299 nr.range.last = MAX(ms, oldr.range.last);
2300 nr.follows = oldr.follows;
2301 } else {
31f18b77 2302 *max_increased = true;
7c673cae
FG
2303 nr.range.last = ms;
2304 nr.follows = in->first - 1;
2305 }
2306 }
2307 }
2308}
2309
2310bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2311 uint64_t new_max_size, uint64_t new_size,
2312 utime_t new_mtime)
2313{
2314 assert(in->is_auth());
2315 assert(in->is_file());
2316
2317 inode_t *latest = in->get_projected_inode();
2318 map<client_t, client_writeable_range_t> new_ranges;
2319 uint64_t size = latest->size;
2320 bool update_size = new_size > 0;
2321 bool update_max = false;
2322 bool max_increased = false;
2323
2324 if (update_size) {
2325 new_size = size = MAX(size, new_size);
2326 new_mtime = MAX(new_mtime, latest->mtime);
2327 if (latest->size == new_size && latest->mtime == new_mtime)
2328 update_size = false;
2329 }
2330
2331 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2332
2333 if (max_increased || latest->client_ranges != new_ranges)
2334 update_max = true;
2335
2336 if (!update_size && !update_max) {
2337 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2338 return false;
2339 }
2340
2341 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2342 << " update_size " << update_size
2343 << " on " << *in << dendl;
2344
2345 if (in->is_frozen()) {
2346 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2347 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2348 new_max_size,
2349 new_size,
2350 new_mtime);
2351 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2352 return false;
2353 }
2354 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2355 // lock?
2356 if (in->filelock.is_stable()) {
2357 if (in->get_target_loner() >= 0)
2358 file_excl(&in->filelock);
2359 else
2360 simple_lock(&in->filelock);
2361 }
2362 if (!in->filelock.can_wrlock(in->get_loner())) {
2363 // try again later
2364 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2365 new_max_size,
2366 new_size,
2367 new_mtime);
2368
2369 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2370 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2371 return false;
2372 }
2373 }
2374
2375 MutationRef mut(new MutationImpl());
2376 mut->ls = mds->mdlog->get_current_segment();
2377
2378 inode_t *pi = in->project_inode();
2379 pi->version = in->pre_dirty();
2380
2381 if (update_max) {
2382 dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2383 pi->client_ranges = new_ranges;
2384 }
2385
2386 if (update_size) {
2387 dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2388 pi->size = new_size;
2389 pi->rstat.rbytes = new_size;
2390 dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2391 pi->mtime = new_mtime;
2392 }
2393
2394 // use EOpen if the file is still open; otherwise, use EUpdate.
2395 // this is just an optimization to push open files forward into
2396 // newer log segments.
2397 LogEvent *le;
2398 EMetaBlob *metablob;
2399 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2400 EOpen *eo = new EOpen(mds->mdlog);
2401 eo->add_ino(in->ino());
2402 metablob = &eo->metablob;
2403 le = eo;
2404 mut->ls->open_files.push_back(&in->item_open_file);
2405 } else {
2406 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2407 metablob = &eu->metablob;
2408 le = eu;
2409 }
2410 mds->mdlog->start_entry(le);
2411 if (update_size) { // FIXME if/when we do max_size nested accounting
2412 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2413 // no cow, here!
2414 CDentry *parent = in->get_projected_parent_dn();
2415 metablob->add_primary_dentry(parent, in, true);
2416 } else {
2417 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2418 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2419 }
2420 mds->mdlog->submit_entry(le,
2421 new C_Locker_FileUpdate_finish(this, in, mut, true));
2422 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2423 mut->auth_pin(in);
2424
2425 // make max_size _increase_ timely
2426 if (max_increased)
2427 mds->mdlog->flush();
2428
2429 return true;
2430}
2431
2432
2433void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2434{
2435 /*
2436 * only share if currently issued a WR cap. if client doesn't have it,
2437 * file_max doesn't matter, and the client will get it if/when they get
2438 * the cap later.
2439 */
2440 dout(10) << "share_inode_max_size on " << *in << dendl;
2441 map<client_t, Capability*>::iterator it;
2442 if (only_cap)
2443 it = in->client_caps.find(only_cap->get_client());
2444 else
2445 it = in->client_caps.begin();
2446 for (; it != in->client_caps.end(); ++it) {
2447 const client_t client = it->first;
2448 Capability *cap = it->second;
2449 if (cap->is_suppress())
2450 continue;
2451 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2452 dout(10) << "share_inode_max_size with client." << client << dendl;
2453 cap->inc_last_seq();
2454 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2455 in->ino(),
2456 in->find_snaprealm()->inode->ino(),
2457 cap->get_cap_id(), cap->get_last_seq(),
2458 cap->pending(), cap->wanted(), 0,
2459 cap->get_mseq(),
2460 mds->get_osd_epoch_barrier());
2461 in->encode_cap_message(m, cap);
2462 mds->send_message_client_counted(m, client);
2463 }
2464 if (only_cap)
2465 break;
2466 }
2467}
2468
2469bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2470{
2471 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2472 * pending mutations. */
2473 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2474 in->filelock.is_unstable_and_locked()) ||
2475 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2476 in->authlock.is_unstable_and_locked()) ||
2477 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2478 in->linklock.is_unstable_and_locked()) ||
2479 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2480 in->xattrlock.is_unstable_and_locked()))
2481 return true;
2482 return false;
2483}
2484
2485void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2486{
2487 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2488 dout(10) << " wanted " << ccap_string(cap->wanted())
2489 << " -> " << ccap_string(wanted) << dendl;
2490 cap->set_wanted(wanted);
2491 } else if (wanted & ~cap->wanted()) {
2492 dout(10) << " wanted " << ccap_string(cap->wanted())
2493 << " -> " << ccap_string(wanted)
2494 << " (added caps even though we had seq mismatch!)" << dendl;
2495 cap->set_wanted(wanted | cap->wanted());
2496 } else {
2497 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2498 << " -> " << ccap_string(wanted)
2499 << " (issue_seq " << issue_seq << " != last_issue "
2500 << cap->get_last_issue() << ")" << dendl;
2501 return;
2502 }
2503
2504 CInode *cur = cap->get_inode();
2505 if (!cur->is_auth()) {
2506 request_inode_file_caps(cur);
2507 return;
2508 }
2509
2510 if (cap->wanted() == 0) {
2511 if (cur->item_open_file.is_on_list() &&
2512 !cur->is_any_caps_wanted()) {
2513 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2514 cur->item_open_file.remove_myself();
2515 }
2516 } else {
2517 if (cur->state_test(CInode::STATE_RECOVERING) &&
2518 (cap->wanted() & (CEPH_CAP_FILE_RD |
2519 CEPH_CAP_FILE_WR))) {
2520 mds->mdcache->recovery_queue.prioritize(cur);
2521 }
2522
2523 if (!cur->item_open_file.is_on_list()) {
2524 dout(10) << " adding to open file list " << *cur << dendl;
2525 assert(cur->last == CEPH_NOSNAP);
2526 LogSegment *ls = mds->mdlog->get_current_segment();
2527 EOpen *le = new EOpen(mds->mdlog);
2528 mds->mdlog->start_entry(le);
2529 le->add_clean_inode(cur);
2530 ls->open_files.push_back(&cur->item_open_file);
2531 mds->mdlog->submit_entry(le);
2532 }
2533 }
2534}
2535
2536
2537
c07f9fc5 2538void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
7c673cae
FG
2539{
2540 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
c07f9fc5
FG
2541 for (auto p = head_in->client_need_snapflush.begin();
2542 p != head_in->client_need_snapflush.end() && p->first < last; ) {
7c673cae
FG
2543 snapid_t snapid = p->first;
2544 set<client_t>& clients = p->second;
2545 ++p; // be careful, q loop below depends on this
2546
2547 if (clients.count(client)) {
2548 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2549 CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
2550 if (!sin) {
2551 // hrm, look forward until we find the inode.
2552 // (we can only look it up by the last snapid it is valid for)
2553 dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
2554 for (compact_map<snapid_t, set<client_t> >::iterator q = p; // p is already at next entry
2555 q != head_in->client_need_snapflush.end();
2556 ++q) {
2557 dout(10) << " trying snapid " << q->first << dendl;
2558 sin = mdcache->get_inode(head_in->ino(), q->first);
2559 if (sin) {
2560 assert(sin->first <= snapid);
2561 break;
2562 }
2563 dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
2564 }
2565 if (!sin && head_in->is_multiversion())
2566 sin = head_in;
2567 assert(sin);
2568 }
2569 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2570 head_in->remove_need_snapflush(sin, snapid, client);
2571 }
2572 }
2573}
2574
2575
2576bool Locker::should_defer_client_cap_frozen(CInode *in)
2577{
2578 /*
2579 * This policy needs to be AT LEAST as permissive as allowing a client request
2580 * to go forward, or else a client request can release something, the release
2581 * gets deferred, but the request gets processed and deadlocks because when the
2582 * caps can't get revoked.
2583 *
2584 * Currently, a request wait if anything locked is freezing (can't
2585 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2586 * _MUST_ be in the lock/auth_pin set.
2587 *
2588 * auth_pins==0 implies no unstable lock and not auth pinnned by
2589 * client request, otherwise continue even it's freezing.
2590 */
2591 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2592}
2593
2594/*
2595 * This function DOES put the passed message before returning
2596 */
2597void Locker::handle_client_caps(MClientCaps *m)
2598{
2599 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2600 client_t client = m->get_source().num();
2601
2602 snapid_t follows = m->get_snap_follows();
2603 dout(7) << "handle_client_caps "
2604 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2605 << " on " << m->get_ino()
2606 << " tid " << m->get_client_tid() << " follows " << follows
2607 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2608
2609 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2610 if (!session) {
2611 dout(5) << " no session, dropping " << *m << dendl;
2612 m->put();
2613 return;
2614 }
2615 if (session->is_closed() ||
2616 session->is_closing() ||
2617 session->is_killing()) {
2618 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2619 m->put();
2620 return;
2621 }
2622 if (mds->is_reconnect() &&
2623 m->get_dirty() && m->get_client_tid() > 0 &&
2624 !session->have_completed_flush(m->get_client_tid())) {
2625 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2626 }
2627 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2628 return;
2629 }
2630
2631 if (m->get_client_tid() > 0 && session &&
2632 session->have_completed_flush(m->get_client_tid())) {
2633 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2634 << " for client." << client << dendl;
2635 MClientCaps *ack;
2636 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2637 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2638 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2639 } else {
2640 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2641 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2642 mds->get_osd_epoch_barrier());
2643 }
2644 ack->set_snap_follows(follows);
2645 ack->set_client_tid(m->get_client_tid());
2646 mds->send_message_client_counted(ack, m->get_connection());
2647 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2648 m->put();
2649 return;
2650 } else {
2651 // fall-thru because the message may release some caps
2652 m->clear_dirty();
2653 m->set_op(CEPH_CAP_OP_UPDATE);
2654 }
2655 }
2656
2657 // "oldest flush tid" > 0 means client uses unique TID for each flush
2658 if (m->get_oldest_flush_tid() > 0 && session) {
2659 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2660 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2661
2662 if (session->get_num_trim_flushes_warnings() > 0 &&
2663 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2664 session->reset_num_trim_flushes_warnings();
2665 } else {
2666 if (session->get_num_completed_flushes() >=
2667 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2668 session->inc_num_trim_flushes_warnings();
2669 stringstream ss;
2670 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2671 << m->get_oldest_flush_tid() << "), "
2672 << session->get_num_completed_flushes()
2673 << " completed flushes recorded in session";
2674 mds->clog->warn() << ss.str();
2675 dout(20) << __func__ << " " << ss.str() << dendl;
2676 }
2677 }
2678 }
2679
2680 CInode *head_in = mdcache->get_inode(m->get_ino());
2681 if (!head_in) {
2682 if (mds->is_clientreplay()) {
2683 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2684 << ", will try again after replayed client requests" << dendl;
2685 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2686 return;
2687 }
2688 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2689 m->put();
2690 return;
2691 }
2692
2693 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2694 // Pause RADOS operations until we see the required epoch
2695 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2696 }
2697
2698 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2699 // Record the barrier so that we will retransmit it to clients
2700 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2701 }
2702
2703 CInode *in = head_in;
2704 if (follows > 0) {
2705 in = mdcache->pick_inode_snap(head_in, follows);
2706 if (in != head_in)
2707 dout(10) << " head inode " << *head_in << dendl;
2708 }
2709 dout(10) << " cap inode " << *in << dendl;
2710
2711 Capability *cap = 0;
2712 cap = in->get_client_cap(client);
2713 if (!cap && in != head_in)
2714 cap = head_in->get_client_cap(client);
2715 if (!cap) {
2716 dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
2717 m->put();
2718 return;
2719 }
2720 assert(cap);
2721
2722 // freezing|frozen?
2723 if (should_defer_client_cap_frozen(in)) {
2724 dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
2725 in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2726 return;
2727 }
2728 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2729 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2730 << ", dropping" << dendl;
2731 m->put();
2732 return;
2733 }
2734
2735 int op = m->get_op();
2736
2737 // flushsnap?
2738 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2739 if (!in->is_auth()) {
2740 dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
2741 goto out;
2742 }
2743
2744 SnapRealm *realm = in->find_snaprealm();
2745 snapid_t snap = realm->get_snap_following(follows);
2746 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2747
2748 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2749 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2750 // case we get a dup response, so whatever.)
2751 MClientCaps *ack = 0;
2752 if (m->get_dirty()) {
2753 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2754 ack->set_snap_follows(follows);
2755 ack->set_client_tid(m->get_client_tid());
2756 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2757 }
2758
2759 if (in == head_in ||
2760 (head_in->client_need_snapflush.count(snap) &&
2761 head_in->client_need_snapflush[snap].count(client))) {
2762 dout(7) << " flushsnap snap " << snap
2763 << " client." << client << " on " << *in << dendl;
2764
2765 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2766 if (in == head_in)
2767 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
c07f9fc5
FG
2768 else if (head_in->client_need_snapflush.begin()->first < snap)
2769 _do_null_snapflush(head_in, client, snap);
7c673cae
FG
2770
2771 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2772
2773 if (in != head_in)
2774 head_in->remove_need_snapflush(in, snap, client);
2775
2776 } else {
2777 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2778 if (ack)
2779 mds->send_message_client_counted(ack, m->get_connection());
2780 }
2781 goto out;
2782 }
2783
2784 if (cap->get_cap_id() != m->get_cap_id()) {
2785 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2786 } else {
2787 // intermediate snap inodes
2788 while (in != head_in) {
2789 assert(in->last != CEPH_NOSNAP);
2790 if (in->is_auth() && m->get_dirty()) {
2791 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2792 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2793 }
2794 in = mdcache->pick_inode_snap(head_in, in->last);
2795 }
2796
2797 // head inode, and cap
2798 MClientCaps *ack = 0;
2799
2800 int caps = m->get_caps();
2801 if (caps & ~cap->issued()) {
2802 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2803 caps &= cap->issued();
2804 }
2805
2806 cap->confirm_receipt(m->get_seq(), caps);
2807 dout(10) << " follows " << follows
2808 << " retains " << ccap_string(m->get_caps())
2809 << " dirty " << ccap_string(m->get_dirty())
2810 << " on " << *in << dendl;
2811
2812
2813 // missing/skipped snapflush?
2814 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2815 // presently only does so when it has actual dirty metadata. But, we
2816 // set up the need_snapflush stuff based on the issued caps.
2817 // We can infer that the client WONT send a FLUSHSNAP once they have
2818 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2819 // update/release).
2820 if (!head_in->client_need_snapflush.empty()) {
2821 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2822 _do_null_snapflush(head_in, client);
2823 } else {
2824 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2825 }
2826 }
2827
2828 if (m->get_dirty() && in->is_auth()) {
2829 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2830 << " seq " << m->get_seq() << " on " << *in << dendl;
2831 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2832 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2833 ack->set_client_tid(m->get_client_tid());
2834 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2835 }
2836
2837 // filter wanted based on what we could ever give out (given auth/replica status)
2838 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2839 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2840 if (new_wanted != cap->wanted()) {
2841 if (!need_flush && (new_wanted & ~cap->pending())) {
2842 // exapnding caps. make sure we aren't waiting for a log flush
2843 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2844 }
2845
2846 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2847 }
2848
2849 if (in->is_auth() &&
2850 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2851 // updated
2852 eval(in, CEPH_CAP_LOCKS);
2853
2854 if (!need_flush && (cap->wanted() & ~cap->pending()))
2855 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2856 } else {
2857 // no update, ack now.
2858 if (ack)
2859 mds->send_message_client_counted(ack, m->get_connection());
2860
2861 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2862 if (!did_issue && (cap->wanted() & ~cap->pending()))
2863 issue_caps(in, cap);
2864
2865 if (cap->get_last_seq() == 0 &&
2866 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2867 cap->issue_norevoke(cap->issued());
2868 share_inode_max_size(in, cap);
2869 }
2870 }
2871
2872 if (need_flush)
2873 mds->mdlog->flush();
2874 }
2875
2876 out:
2877 m->put();
2878}
2879
2880
2881class C_Locker_RetryRequestCapRelease : public LockerContext {
2882 client_t client;
2883 ceph_mds_request_release item;
2884public:
2885 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2886 LockerContext(l), client(c), item(it) { }
2887 void finish(int r) override {
2888 string dname;
2889 MDRequestRef null_ref;
2890 locker->process_request_cap_release(null_ref, client, item, dname);
2891 }
2892};
2893
2894void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2895 const string &dname)
2896{
2897 inodeno_t ino = (uint64_t)item.ino;
2898 uint64_t cap_id = item.cap_id;
2899 int caps = item.caps;
2900 int wanted = item.wanted;
2901 int seq = item.seq;
2902 int issue_seq = item.issue_seq;
2903 int mseq = item.mseq;
2904
2905 CInode *in = mdcache->get_inode(ino);
2906 if (!in)
2907 return;
2908
2909 if (dname.length()) {
2910 frag_t fg = in->pick_dirfrag(dname);
2911 CDir *dir = in->get_dirfrag(fg);
2912 if (dir) {
2913 CDentry *dn = dir->lookup(dname);
2914 if (dn) {
2915 ClientLease *l = dn->get_client_lease(client);
2916 if (l) {
2917 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2918 dn->remove_client_lease(l, this);
2919 } else {
2920 dout(7) << "process_cap_release client." << client
2921 << " doesn't have lease on " << *dn << dendl;
2922 }
2923 } else {
2924 dout(7) << "process_cap_release client." << client << " released lease on dn "
2925 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2926 }
2927 }
2928 }
2929
2930 Capability *cap = in->get_client_cap(client);
2931 if (!cap)
2932 return;
2933
2934 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2935 << (mdr ? "" : " (DEFERRED, no mdr)")
2936 << dendl;
2937
2938 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2939 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2940 return;
2941 }
2942
2943 if (cap->get_cap_id() != cap_id) {
2944 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2945 return;
2946 }
2947
2948 if (should_defer_client_cap_frozen(in)) {
2949 dout(7) << " frozen, deferring" << dendl;
2950 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2951 return;
2952 }
2953
2954 if (caps & ~cap->issued()) {
2955 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2956 caps &= cap->issued();
2957 }
2958 cap->confirm_receipt(seq, caps);
2959
2960 if (!in->client_need_snapflush.empty() &&
2961 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2962 _do_null_snapflush(in, client);
2963 }
2964
2965 adjust_cap_wanted(cap, wanted, issue_seq);
2966
2967 if (mdr)
2968 cap->inc_suppress();
2969 eval(in, CEPH_CAP_LOCKS);
2970 if (mdr)
2971 cap->dec_suppress();
2972
2973 // take note; we may need to reissue on this cap later
2974 if (mdr)
2975 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2976}
2977
2978class C_Locker_RetryKickIssueCaps : public LockerContext {
2979 CInode *in;
2980 client_t client;
2981 ceph_seq_t seq;
2982public:
2983 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2984 LockerContext(l), in(i), client(c), seq(s) {
2985 in->get(CInode::PIN_PTRWAITER);
2986 }
2987 void finish(int r) override {
2988 locker->kick_issue_caps(in, client, seq);
2989 in->put(CInode::PIN_PTRWAITER);
2990 }
2991};
2992
2993void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
2994{
2995 Capability *cap = in->get_client_cap(client);
2996 if (!cap || cap->get_last_sent() != seq)
2997 return;
2998 if (in->is_frozen()) {
2999 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3000 in->add_waiter(CInode::WAIT_UNFREEZE,
3001 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3002 return;
3003 }
3004 dout(10) << "kick_issue_caps released at current seq " << seq
3005 << ", reissuing" << dendl;
3006 issue_caps(in, cap);
3007}
3008
3009void Locker::kick_cap_releases(MDRequestRef& mdr)
3010{
3011 client_t client = mdr->get_client();
3012 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3013 p != mdr->cap_releases.end();
3014 ++p) {
3015 CInode *in = mdcache->get_inode(p->first);
3016 if (!in)
3017 continue;
3018 kick_issue_caps(in, client, p->second);
3019 }
3020}
3021
7c673cae
FG
3022/**
3023 * m and ack might be NULL, so don't dereference them unless dirty != 0
3024 */
3025void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3026{
3027 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3028 << " follows " << follows << " snap " << snap
3029 << " on " << *in << dendl;
3030
3031 if (snap == CEPH_NOSNAP) {
3032 // hmm, i guess snap was already deleted? just ack!
3033 dout(10) << " wow, the snap following " << follows
3034 << " was already deleted. nothing to record, just ack." << dendl;
3035 if (ack)
3036 mds->send_message_client_counted(ack, m->get_connection());
3037 return;
3038 }
3039
3040 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3041 mds->mdlog->start_entry(le);
3042 MutationRef mut = new MutationImpl();
3043 mut->ls = mds->mdlog->get_current_segment();
3044
3045 // normal metadata updates that we can apply to the head as well.
3046
3047 // update xattrs?
3048 bool xattrs = false;
3049 map<string,bufferptr> *px = 0;
3050 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3051 m->xattrbl.length() &&
3052 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3053 xattrs = true;
3054
3055 old_inode_t *oi = 0;
3056 if (in->is_multiversion()) {
3057 oi = in->pick_old_inode(snap);
3058 }
3059
3060 inode_t *pi;
3061 if (oi) {
3062 dout(10) << " writing into old inode" << dendl;
3063 pi = in->project_inode();
3064 pi->version = in->pre_dirty();
3065 if (snap > oi->first)
3066 in->split_old_inode(snap);
3067 pi = &oi->inode;
3068 if (xattrs)
3069 px = &oi->xattrs;
3070 } else {
3071 if (xattrs)
3072 px = new map<string,bufferptr>;
3073 pi = in->project_inode(px);
3074 pi->version = in->pre_dirty();
3075 }
3076
3077 _update_cap_fields(in, dirty, m, pi);
3078
3079 // xattr
3080 if (px) {
3081 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3082 << " len " << m->xattrbl.length() << dendl;
3083 pi->xattr_version = m->head.xattr_version;
3084 bufferlist::iterator p = m->xattrbl.begin();
3085 ::decode(*px, p);
3086 }
3087
3088 if (pi->client_ranges.count(client)) {
3089 if (in->last == snap) {
3090 dout(10) << " removing client_range entirely" << dendl;
3091 pi->client_ranges.erase(client);
3092 } else {
3093 dout(10) << " client_range now follows " << snap << dendl;
3094 pi->client_ranges[client].follows = snap;
3095 }
3096 }
3097
3098 mut->auth_pin(in);
3099 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3100 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3101
3102 // "oldest flush tid" > 0 means client uses unique TID for each flush
3103 if (ack && ack->get_oldest_flush_tid() > 0)
3104 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3105 ack->get_oldest_flush_tid());
3106
3107 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3108 client, ack));
3109}
3110
3111void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3112{
3113 if (dirty == 0)
3114 return;
3115
3116 /* m must be valid if there are dirty caps */
3117 assert(m);
3118 uint64_t features = m->get_connection()->get_features();
3119
3120 if (m->get_ctime() > pi->ctime) {
3121 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3122 << " for " << *in << dendl;
3123 pi->ctime = m->get_ctime();
3124 }
3125
3126 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3127 m->get_change_attr() > pi->change_attr) {
3128 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3129 << " for " << *in << dendl;
3130 pi->change_attr = m->get_change_attr();
3131 }
3132
3133 // file
3134 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3135 utime_t atime = m->get_atime();
3136 utime_t mtime = m->get_mtime();
3137 uint64_t size = m->get_size();
3138 version_t inline_version = m->inline_version;
3139
3140 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3141 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3142 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3143 << " for " << *in << dendl;
3144 pi->mtime = mtime;
3145 }
3146 if (in->inode.is_file() && // ONLY if regular file
3147 size > pi->size) {
3148 dout(7) << " size " << pi->size << " -> " << size
3149 << " for " << *in << dendl;
3150 pi->size = size;
3151 pi->rstat.rbytes = size;
3152 }
3153 if (in->inode.is_file() &&
3154 (dirty & CEPH_CAP_FILE_WR) &&
3155 inline_version > pi->inline_data.version) {
3156 pi->inline_data.version = inline_version;
3157 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3158 pi->inline_data.get_data() = m->inline_data;
3159 else
3160 pi->inline_data.free_data();
3161 }
3162 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3163 dout(7) << " atime " << pi->atime << " -> " << atime
3164 << " for " << *in << dendl;
3165 pi->atime = atime;
3166 }
3167 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3168 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3169 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3170 << " for " << *in << dendl;
3171 pi->time_warp_seq = m->get_time_warp_seq();
3172 }
3173 }
3174 // auth
3175 if (dirty & CEPH_CAP_AUTH_EXCL) {
3176 if (m->head.uid != pi->uid) {
3177 dout(7) << " uid " << pi->uid
3178 << " -> " << m->head.uid
3179 << " for " << *in << dendl;
3180 pi->uid = m->head.uid;
3181 }
3182 if (m->head.gid != pi->gid) {
3183 dout(7) << " gid " << pi->gid
3184 << " -> " << m->head.gid
3185 << " for " << *in << dendl;
3186 pi->gid = m->head.gid;
3187 }
3188 if (m->head.mode != pi->mode) {
3189 dout(7) << " mode " << oct << pi->mode
3190 << " -> " << m->head.mode << dec
3191 << " for " << *in << dendl;
3192 pi->mode = m->head.mode;
3193 }
3194 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3195 dout(7) << " btime " << oct << pi->btime
3196 << " -> " << m->get_btime() << dec
3197 << " for " << *in << dendl;
3198 pi->btime = m->get_btime();
3199 }
3200 }
3201}
3202
3203/*
3204 * update inode based on cap flush|flushsnap|wanted.
3205 * adjust max_size, if needed.
3206 * if we update, return true; otherwise, false (no updated needed).
3207 */
3208bool Locker::_do_cap_update(CInode *in, Capability *cap,
3209 int dirty, snapid_t follows,
3210 MClientCaps *m, MClientCaps *ack,
3211 bool *need_flush)
3212{
3213 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3214 << " issued " << ccap_string(cap ? cap->issued() : 0)
3215 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3216 << " on " << *in << dendl;
3217 assert(in->is_auth());
3218 client_t client = m->get_source().num();
3219 inode_t *latest = in->get_projected_inode();
3220
3221 // increase or zero max_size?
3222 uint64_t size = m->get_size();
3223 bool change_max = false;
3224 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3225 uint64_t new_max = old_max;
3226
3227 if (in->is_file()) {
3228 bool forced_change_max = false;
3229 dout(20) << "inode is file" << dendl;
3230 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3231 dout(20) << "client has write caps; m->get_max_size="
3232 << m->get_max_size() << "; old_max=" << old_max << dendl;
3233 if (m->get_max_size() > new_max) {
3234 dout(10) << "client requests file_max " << m->get_max_size()
3235 << " > max " << old_max << dendl;
3236 change_max = true;
3237 forced_change_max = true;
31f18b77 3238 new_max = calc_new_max_size(latest, m->get_max_size());
7c673cae 3239 } else {
31f18b77 3240 new_max = calc_new_max_size(latest, size);
7c673cae
FG
3241
3242 if (new_max > old_max)
3243 change_max = true;
3244 else
3245 new_max = old_max;
3246 }
3247 } else {
3248 if (old_max) {
3249 change_max = true;
3250 new_max = 0;
3251 }
3252 }
3253
3254 if (in->last == CEPH_NOSNAP &&
3255 change_max &&
3256 !in->filelock.can_wrlock(client) &&
3257 !in->filelock.can_force_wrlock(client)) {
3258 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3259 if (in->filelock.is_stable()) {
3260 bool need_issue = false;
3261 if (cap)
3262 cap->inc_suppress();
3263 if (in->mds_caps_wanted.empty() &&
3264 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3265 if (in->filelock.get_state() != LOCK_EXCL)
3266 file_excl(&in->filelock, &need_issue);
3267 } else
3268 simple_lock(&in->filelock, &need_issue);
3269 if (need_issue)
3270 issue_caps(in);
3271 if (cap)
3272 cap->dec_suppress();
3273 }
3274 if (!in->filelock.can_wrlock(client) &&
3275 !in->filelock.can_force_wrlock(client)) {
3276 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3277 forced_change_max ? new_max : 0,
3278 0, utime_t());
3279
3280 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3281 change_max = false;
3282 }
3283 }
3284 }
3285
3286 if (m->flockbl.length()) {
3287 int32_t num_locks;
3288 bufferlist::iterator bli = m->flockbl.begin();
3289 ::decode(num_locks, bli);
3290 for ( int i=0; i < num_locks; ++i) {
3291 ceph_filelock decoded_lock;
3292 ::decode(decoded_lock, bli);
3293 in->get_fcntl_lock_state()->held_locks.
3294 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3295 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3296 }
3297 ::decode(num_locks, bli);
3298 for ( int i=0; i < num_locks; ++i) {
3299 ceph_filelock decoded_lock;
3300 ::decode(decoded_lock, bli);
3301 in->get_flock_lock_state()->held_locks.
3302 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3303 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3304 }
3305 }
3306
3307 if (!dirty && !change_max)
3308 return false;
3309
3310 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3311 if (session->check_access(in, MAY_WRITE,
3312 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3313 session->put();
3314 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3315 return false;
3316 }
3317 session->put();
3318
3319 // do the update.
3320 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3321 mds->mdlog->start_entry(le);
3322
3323 // xattrs update?
3324 map<string,bufferptr> *px = 0;
3325 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3326 m->xattrbl.length() &&
3327 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3328 px = new map<string,bufferptr>;
3329
3330 inode_t *pi = in->project_inode(px);
3331 pi->version = in->pre_dirty();
3332
3333 MutationRef mut(new MutationImpl());
3334 mut->ls = mds->mdlog->get_current_segment();
3335
3336 _update_cap_fields(in, dirty, m, pi);
3337
3338 if (change_max) {
3339 dout(7) << " max_size " << old_max << " -> " << new_max
3340 << " for " << *in << dendl;
3341 if (new_max) {
3342 pi->client_ranges[client].range.first = 0;
3343 pi->client_ranges[client].range.last = new_max;
3344 pi->client_ranges[client].follows = in->first - 1;
3345 } else
3346 pi->client_ranges.erase(client);
3347 }
3348
3349 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3350 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3351
3352 // auth
3353 if (dirty & CEPH_CAP_AUTH_EXCL)
3354 wrlock_force(&in->authlock, mut);
3355
3356 // xattr
3357 if (px) {
3358 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3359 pi->xattr_version = m->head.xattr_version;
3360 bufferlist::iterator p = m->xattrbl.begin();
3361 ::decode(*px, p);
3362
3363 wrlock_force(&in->xattrlock, mut);
3364 }
3365
3366 mut->auth_pin(in);
3367 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3368 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3369
3370 // "oldest flush tid" > 0 means client uses unique TID for each flush
3371 if (ack && ack->get_oldest_flush_tid() > 0)
3372 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3373 ack->get_oldest_flush_tid());
3374
3375 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3376 change_max, !!cap,
3377 client, ack));
3378 if (need_flush && !*need_flush &&
3379 ((change_max && new_max) || // max INCREASE
3380 _need_flush_mdlog(in, dirty)))
3381 *need_flush = true;
3382
3383 return true;
3384}
3385
3386/* This function DOES put the passed message before returning */
3387void Locker::handle_client_cap_release(MClientCapRelease *m)
3388{
3389 client_t client = m->get_source().num();
3390 dout(10) << "handle_client_cap_release " << *m << dendl;
3391
3392 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3393 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3394 return;
3395 }
3396
3397 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3398 // Pause RADOS operations until we see the required epoch
3399 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3400 }
3401
3402 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3403 // Record the barrier so that we will retransmit it to clients
3404 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3405 }
3406
3407 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3408
3409 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3410 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3411 }
3412
3413 if (session) {
3414 session->notify_cap_release(m->caps.size());
3415 }
3416
3417 m->put();
3418}
3419
3420class C_Locker_RetryCapRelease : public LockerContext {
3421 client_t client;
3422 inodeno_t ino;
3423 uint64_t cap_id;
3424 ceph_seq_t migrate_seq;
3425 ceph_seq_t issue_seq;
3426public:
3427 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3428 ceph_seq_t mseq, ceph_seq_t seq) :
3429 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3430 void finish(int r) override {
3431 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3432 }
3433};
3434
3435void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3436 ceph_seq_t mseq, ceph_seq_t seq)
3437{
3438 CInode *in = mdcache->get_inode(ino);
3439 if (!in) {
3440 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3441 return;
3442 }
3443 Capability *cap = in->get_client_cap(client);
3444 if (!cap) {
3445 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3446 return;
3447 }
3448
3449 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3450 if (cap->get_cap_id() != cap_id) {
3451 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3452 return;
3453 }
3454 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3455 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3456 return;
3457 }
3458 if (should_defer_client_cap_frozen(in)) {
3459 dout(7) << " freezing|frozen, deferring" << dendl;
3460 in->add_waiter(CInode::WAIT_UNFREEZE,
3461 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3462 return;
3463 }
3464 if (seq != cap->get_last_issue()) {
3465 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3466 // clean out any old revoke history
3467 cap->clean_revoke_from(seq);
3468 eval_cap_gather(in);
3469 return;
3470 }
3471 remove_client_cap(in, client);
3472}
3473
3474/* This function DOES put the passed message before returning */
3475
3476void Locker::remove_client_cap(CInode *in, client_t client)
3477{
3478 // clean out any pending snapflush state
3479 if (!in->client_need_snapflush.empty())
3480 _do_null_snapflush(in, client);
3481
3482 in->remove_client_cap(client);
3483
3484 if (in->is_auth()) {
3485 // make sure we clear out the client byte range
3486 if (in->get_projected_inode()->client_ranges.count(client) &&
3487 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3488 check_inode_max_size(in);
3489 } else {
3490 request_inode_file_caps(in);
3491 }
3492
3493 try_eval(in, CEPH_CAP_LOCKS);
3494}
3495
3496
3497/**
3498 * Return true if any currently revoking caps exceed the
3499 * mds_revoke_cap_timeout threshold.
3500 */
3501bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3502{
3503 xlist<Capability*>::const_iterator p = revoking.begin();
3504 if (p.end()) {
3505 // No revoking caps at the moment
3506 return false;
3507 } else {
3508 utime_t now = ceph_clock_now();
3509 utime_t age = now - (*p)->get_last_revoke_stamp();
3510 if (age <= g_conf->mds_revoke_cap_timeout) {
3511 return false;
3512 } else {
3513 return true;
3514 }
3515 }
3516}
3517
3518
3519void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3520{
3521 if (!any_late_revoking_caps(revoking_caps)) {
3522 // Fast path: no misbehaving clients, execute in O(1)
3523 return;
3524 }
3525
3526 // Slow path: execute in O(N_clients)
3527 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3528 for (client_rc_iter = revoking_caps_by_client.begin();
3529 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3530 xlist<Capability*> const &client_rc = client_rc_iter->second;
3531 bool any_late = any_late_revoking_caps(client_rc);
3532 if (any_late) {
3533 result->push_back(client_rc_iter->first);
3534 }
3535 }
3536}
3537
3538// Hard-code instead of surfacing a config settings because this is
3539// really a hack that should go away at some point when we have better
3540// inspection tools for getting at detailed cap state (#7316)
3541#define MAX_WARN_CAPS 100
3542
3543void Locker::caps_tick()
3544{
3545 utime_t now = ceph_clock_now();
3546
3547 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3548
3549 int i = 0;
3550 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3551 Capability *cap = *p;
3552
3553 utime_t age = now - cap->get_last_revoke_stamp();
3554 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3555 if (age <= g_conf->mds_revoke_cap_timeout) {
3556 dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
3557 break;
3558 } else {
3559 ++i;
3560 if (i > MAX_WARN_CAPS) {
3561 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3562 << "revoking, ignoring subsequent caps" << dendl;
3563 break;
3564 }
3565 }
3566 // exponential backoff of warning intervals
3567 if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
3568 cap->inc_num_revoke_warnings();
3569 stringstream ss;
3570 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3571 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3572 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3573 mds->clog->warn() << ss.str();
3574 dout(20) << __func__ << " " << ss.str() << dendl;
3575 } else {
3576 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3577 }
3578 }
3579}
3580
3581
3582void Locker::handle_client_lease(MClientLease *m)
3583{
3584 dout(10) << "handle_client_lease " << *m << dendl;
3585
3586 assert(m->get_source().is_client());
3587 client_t client = m->get_source().num();
3588
3589 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3590 if (!in) {
3591 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3592 m->put();
3593 return;
3594 }
3595 CDentry *dn = 0;
3596
3597 frag_t fg = in->pick_dirfrag(m->dname);
3598 CDir *dir = in->get_dirfrag(fg);
3599 if (dir)
3600 dn = dir->lookup(m->dname);
3601 if (!dn) {
3602 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3603 m->put();
3604 return;
3605 }
3606 dout(10) << " on " << *dn << dendl;
3607
3608 // replica and lock
3609 ClientLease *l = dn->get_client_lease(client);
3610 if (!l) {
3611 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3612 m->put();
3613 return;
3614 }
3615
3616 switch (m->get_action()) {
3617 case CEPH_MDS_LEASE_REVOKE_ACK:
3618 case CEPH_MDS_LEASE_RELEASE:
3619 if (l->seq != m->get_seq()) {
3620 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3621 } else {
3622 dout(7) << "handle_client_lease client." << client
3623 << " on " << *dn << dendl;
3624 dn->remove_client_lease(l, this);
3625 }
3626 m->put();
3627 break;
3628
3629 case CEPH_MDS_LEASE_RENEW:
3630 {
3631 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3632 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3633 if (dn->lock.can_lease(client)) {
3634 int pool = 1; // fixme.. do something smart!
3635 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3636 m->h.seq = ++l->seq;
3637 m->clear_payload();
3638
3639 utime_t now = ceph_clock_now();
3640 now += mdcache->client_lease_durations[pool];
3641 mdcache->touch_client_lease(l, pool, now);
3642
3643 mds->send_message_client_counted(m, m->get_connection());
3644 }
3645 }
3646 break;
3647
3648 default:
3649 ceph_abort(); // implement me
3650 break;
3651 }
3652}
3653
3654
3655void Locker::issue_client_lease(CDentry *dn, client_t client,
3656 bufferlist &bl, utime_t now, Session *session)
3657{
3658 CInode *diri = dn->get_dir()->get_inode();
3659 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3660 ((!diri->filelock.can_lease(client) &&
3661 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3662 dn->lock.can_lease(client)) {
3663 int pool = 1; // fixme.. do something smart!
3664 // issue a dentry lease
3665 ClientLease *l = dn->add_client_lease(client, session);
3666 session->touch_lease(l);
3667
3668 now += mdcache->client_lease_durations[pool];
3669 mdcache->touch_client_lease(l, pool, now);
3670
3671 LeaseStat e;
3672 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3673 e.seq = ++l->seq;
3674 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3675 ::encode(e, bl);
3676 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3677 << " on " << *dn << dendl;
3678 } else {
3679 // null lease
3680 LeaseStat e;
3681 e.mask = 0;
3682 e.seq = 0;
3683 e.duration_ms = 0;
3684 ::encode(e, bl);
3685 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3686 }
3687}
3688
3689
3690void Locker::revoke_client_leases(SimpleLock *lock)
3691{
3692 int n = 0;
3693 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3694 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3695 p != dn->client_lease_map.end();
3696 ++p) {
3697 ClientLease *l = p->second;
3698
3699 n++;
3700 assert(lock->get_type() == CEPH_LOCK_DN);
3701
3702 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3703 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3704
3705 // i should also revoke the dir ICONTENT lease, if they have it!
3706 CInode *diri = dn->get_dir()->get_inode();
3707 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3708 mask,
3709 diri->ino(),
3710 diri->first, CEPH_NOSNAP,
3711 dn->get_name()),
3712 l->client);
3713 }
3714 assert(n == lock->get_num_client_lease());
3715}
3716
3717
3718
3719// locks ----------------------------------------------------------------
3720
3721SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3722{
3723 switch (lock_type) {
3724 case CEPH_LOCK_DN:
3725 {
3726 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3727 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3728 frag_t fg;
3729 CDir *dir = 0;
3730 CDentry *dn = 0;
3731 if (diri) {
3732 fg = diri->pick_dirfrag(info.dname);
3733 dir = diri->get_dirfrag(fg);
3734 if (dir)
3735 dn = dir->lookup(info.dname, info.snapid);
3736 }
3737 if (!dn) {
3738 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3739 return 0;
3740 }
3741 return &dn->lock;
3742 }
3743
3744 case CEPH_LOCK_IAUTH:
3745 case CEPH_LOCK_ILINK:
3746 case CEPH_LOCK_IDFT:
3747 case CEPH_LOCK_IFILE:
3748 case CEPH_LOCK_INEST:
3749 case CEPH_LOCK_IXATTR:
3750 case CEPH_LOCK_ISNAP:
3751 case CEPH_LOCK_IFLOCK:
3752 case CEPH_LOCK_IPOLICY:
3753 {
3754 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3755 if (!in) {
3756 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3757 return 0;
3758 }
3759 switch (lock_type) {
3760 case CEPH_LOCK_IAUTH: return &in->authlock;
3761 case CEPH_LOCK_ILINK: return &in->linklock;
3762 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3763 case CEPH_LOCK_IFILE: return &in->filelock;
3764 case CEPH_LOCK_INEST: return &in->nestlock;
3765 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3766 case CEPH_LOCK_ISNAP: return &in->snaplock;
3767 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3768 case CEPH_LOCK_IPOLICY: return &in->policylock;
3769 }
3770 }
3771
3772 default:
3773 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3774 ceph_abort();
3775 break;
3776 }
3777
3778 return 0;
3779}
3780
3781/* This function DOES put the passed message before returning */
3782void Locker::handle_lock(MLock *m)
3783{
3784 // nobody should be talking to us during recovery.
3785 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3786
3787 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3788 if (!lock) {
3789 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3790 m->put();
3791 return;
3792 }
3793
3794 switch (lock->get_type()) {
3795 case CEPH_LOCK_DN:
3796 case CEPH_LOCK_IAUTH:
3797 case CEPH_LOCK_ILINK:
3798 case CEPH_LOCK_ISNAP:
3799 case CEPH_LOCK_IXATTR:
3800 case CEPH_LOCK_IFLOCK:
3801 case CEPH_LOCK_IPOLICY:
3802 handle_simple_lock(lock, m);
3803 break;
3804
3805 case CEPH_LOCK_IDFT:
3806 case CEPH_LOCK_INEST:
3807 //handle_scatter_lock((ScatterLock*)lock, m);
3808 //break;
3809
3810 case CEPH_LOCK_IFILE:
3811 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3812 break;
3813
3814 default:
3815 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3816 ceph_abort();
3817 break;
3818 }
3819}
3820
3821
3822
3823
3824
3825// ==========================================================================
3826// simple lock
3827
3828/** This function may take a reference to m if it needs one, but does
3829 * not put references. */
3830void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3831{
3832 MDSCacheObject *parent = lock->get_parent();
3833 if (parent->is_auth() &&
3834 lock->get_state() != LOCK_SYNC &&
3835 !parent->is_frozen()) {
3836 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3837 << " on " << *parent << dendl;
3838 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3839 if (lock->is_stable()) {
3840 simple_sync(lock);
3841 } else {
3842 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3843 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3844 new C_MDS_RetryMessage(mds, m->get()));
3845 }
3846 } else {
3847 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3848 << " on " << *parent << dendl;
3849 // replica should retry
3850 }
3851}
3852
3853/* This function DOES put the passed message before returning */
3854void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3855{
3856 int from = m->get_asker();
3857
3858 dout(10) << "handle_simple_lock " << *m
3859 << " on " << *lock << " " << *lock->get_parent() << dendl;
3860
3861 if (mds->is_rejoin()) {
3862 if (lock->get_parent()->is_rejoining()) {
3863 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3864 << ", dropping " << *m << dendl;
3865 m->put();
3866 return;
3867 }
3868 }
3869
3870 switch (m->get_action()) {
3871 // -- replica --
3872 case LOCK_AC_SYNC:
3873 assert(lock->get_state() == LOCK_LOCK);
3874 lock->decode_locked_state(m->get_data());
3875 lock->set_state(LOCK_SYNC);
3876 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3877 break;
3878
3879 case LOCK_AC_LOCK:
3880 assert(lock->get_state() == LOCK_SYNC);
3881 lock->set_state(LOCK_SYNC_LOCK);
3882 if (lock->is_leased())
3883 revoke_client_leases(lock);
3884 eval_gather(lock, true);
31f18b77
FG
3885 if (lock->is_unstable_and_locked())
3886 mds->mdlog->flush();
7c673cae
FG
3887 break;
3888
3889
3890 // -- auth --
3891 case LOCK_AC_LOCKACK:
3892 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3893 lock->get_state() == LOCK_SYNC_EXCL);
3894 assert(lock->is_gathering(from));
3895 lock->remove_gather(from);
3896
3897 if (lock->is_gathering()) {
3898 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3899 << ", still gathering " << lock->get_gather_set() << dendl;
3900 } else {
3901 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3902 << ", last one" << dendl;
3903 eval_gather(lock);
3904 }
3905 break;
3906
3907 case LOCK_AC_REQRDLOCK:
3908 handle_reqrdlock(lock, m);
3909 break;
3910
3911 }
3912
3913 m->put();
3914}
3915
3916/* unused, currently.
3917
3918class C_Locker_SimpleEval : public Context {
3919 Locker *locker;
3920 SimpleLock *lock;
3921public:
3922 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3923 void finish(int r) {
3924 locker->try_simple_eval(lock);
3925 }
3926};
3927
3928void Locker::try_simple_eval(SimpleLock *lock)
3929{
3930 // unstable and ambiguous auth?
3931 if (!lock->is_stable() &&
3932 lock->get_parent()->is_ambiguous_auth()) {
3933 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3934 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3935 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3936 return;
3937 }
3938
3939 if (!lock->get_parent()->is_auth()) {
3940 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3941 return;
3942 }
3943
3944 if (!lock->get_parent()->can_auth_pin()) {
3945 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3946 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3947 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3948 return;
3949 }
3950
3951 if (lock->is_stable())
3952 simple_eval(lock);
3953}
3954*/
3955
3956
3957void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3958{
3959 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3960
3961 assert(lock->get_parent()->is_auth());
3962 assert(lock->is_stable());
3963
3964 if (lock->get_parent()->is_freezing_or_frozen()) {
3965 // dentry lock in unreadable state can block path traverse
3966 if ((lock->get_type() != CEPH_LOCK_DN ||
3967 lock->get_state() == LOCK_SYNC ||
3968 lock->get_parent()->is_frozen()))
3969 return;
3970 }
3971
3972 if (mdcache->is_readonly()) {
3973 if (lock->get_state() != LOCK_SYNC) {
3974 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3975 simple_sync(lock, need_issue);
3976 }
3977 return;
3978 }
3979
3980 CInode *in = 0;
3981 int wanted = 0;
3982 if (lock->get_type() != CEPH_LOCK_DN) {
3983 in = static_cast<CInode*>(lock->get_parent());
3984 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3985 }
3986
3987 // -> excl?
3988 if (lock->get_state() != LOCK_EXCL &&
3989 in && in->get_target_loner() >= 0 &&
3990 (wanted & CEPH_CAP_GEXCL)) {
3991 dout(7) << "simple_eval stable, going to excl " << *lock
3992 << " on " << *lock->get_parent() << dendl;
3993 simple_excl(lock, need_issue);
3994 }
3995
3996 // stable -> sync?
3997 else if (lock->get_state() != LOCK_SYNC &&
3998 !lock->is_wrlocked() &&
3999 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4000 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4001 dout(7) << "simple_eval stable, syncing " << *lock
4002 << " on " << *lock->get_parent() << dendl;
4003 simple_sync(lock, need_issue);
4004 }
4005}
4006
4007
4008// mid
4009
4010bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4011{
4012 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4013 assert(lock->get_parent()->is_auth());
4014 assert(lock->is_stable());
4015
4016 CInode *in = 0;
4017 if (lock->get_cap_shift())
4018 in = static_cast<CInode *>(lock->get_parent());
4019
4020 int old_state = lock->get_state();
4021
4022 if (old_state != LOCK_TSYN) {
4023
4024 switch (lock->get_state()) {
4025 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4026 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4027 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4028 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4029 default: ceph_abort();
4030 }
4031
4032 int gather = 0;
4033 if (lock->is_wrlocked())
4034 gather++;
4035
4036 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4037 send_lock_message(lock, LOCK_AC_SYNC);
4038 lock->init_gather();
4039 gather++;
4040 }
4041
4042 if (in && in->is_head()) {
4043 if (in->issued_caps_need_gather(lock)) {
4044 if (need_issue)
4045 *need_issue = true;
4046 else
4047 issue_caps(in);
4048 gather++;
4049 }
4050 }
4051
4052 bool need_recover = false;
4053 if (lock->get_type() == CEPH_LOCK_IFILE) {
4054 assert(in);
4055 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4056 mds->mdcache->queue_file_recover(in);
4057 need_recover = true;
4058 gather++;
4059 }
4060 }
4061
4062 if (!gather && lock->is_dirty()) {
4063 lock->get_parent()->auth_pin(lock);
4064 scatter_writebehind(static_cast<ScatterLock*>(lock));
4065 mds->mdlog->flush();
4066 return false;
4067 }
4068
4069 if (gather) {
4070 lock->get_parent()->auth_pin(lock);
4071 if (need_recover)
4072 mds->mdcache->do_file_recover();
4073 return false;
4074 }
4075 }
4076
4077 if (lock->get_parent()->is_replicated()) { // FIXME
4078 bufferlist data;
4079 lock->encode_locked_state(data);
4080 send_lock_message(lock, LOCK_AC_SYNC, data);
4081 }
4082 lock->set_state(LOCK_SYNC);
4083 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4084 if (in && in->is_head()) {
4085 if (need_issue)
4086 *need_issue = true;
4087 else
4088 issue_caps(in);
4089 }
4090 return true;
4091}
4092
4093void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4094{
4095 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4096 assert(lock->get_parent()->is_auth());
4097 assert(lock->is_stable());
4098
4099 CInode *in = 0;
4100 if (lock->get_cap_shift())
4101 in = static_cast<CInode *>(lock->get_parent());
4102
4103 switch (lock->get_state()) {
4104 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4105 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4106 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4107 default: ceph_abort();
4108 }
4109
4110 int gather = 0;
4111 if (lock->is_rdlocked())
4112 gather++;
4113 if (lock->is_wrlocked())
4114 gather++;
4115
4116 if (lock->get_parent()->is_replicated() &&
4117 lock->get_state() != LOCK_LOCK_EXCL &&
4118 lock->get_state() != LOCK_XSYN_EXCL) {
4119 send_lock_message(lock, LOCK_AC_LOCK);
4120 lock->init_gather();
4121 gather++;
4122 }
4123
4124 if (in && in->is_head()) {
4125 if (in->issued_caps_need_gather(lock)) {
4126 if (need_issue)
4127 *need_issue = true;
4128 else
4129 issue_caps(in);
4130 gather++;
4131 }
4132 }
4133
4134 if (gather) {
4135 lock->get_parent()->auth_pin(lock);
4136 } else {
4137 lock->set_state(LOCK_EXCL);
4138 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4139 if (in) {
4140 if (need_issue)
4141 *need_issue = true;
4142 else
4143 issue_caps(in);
4144 }
4145 }
4146}
4147
4148void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4149{
4150 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4151 assert(lock->get_parent()->is_auth());
4152 assert(lock->is_stable());
4153 assert(lock->get_state() != LOCK_LOCK);
4154
4155 CInode *in = 0;
4156 if (lock->get_cap_shift())
4157 in = static_cast<CInode *>(lock->get_parent());
4158
4159 int old_state = lock->get_state();
4160
4161 switch (lock->get_state()) {
4162 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4163 case LOCK_XSYN:
4164 file_excl(static_cast<ScatterLock*>(lock), need_issue);
4165 if (lock->get_state() != LOCK_EXCL)
4166 return;
4167 // fall-thru
4168 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4169 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4170 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4171 break;
4172 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4173 default: ceph_abort();
4174 }
4175
4176 int gather = 0;
4177 if (lock->is_leased()) {
4178 gather++;
4179 revoke_client_leases(lock);
4180 }
4181 if (lock->is_rdlocked())
4182 gather++;
4183 if (in && in->is_head()) {
4184 if (in->issued_caps_need_gather(lock)) {
4185 if (need_issue)
4186 *need_issue = true;
4187 else
4188 issue_caps(in);
4189 gather++;
4190 }
4191 }
4192
4193 bool need_recover = false;
4194 if (lock->get_type() == CEPH_LOCK_IFILE) {
4195 assert(in);
4196 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4197 mds->mdcache->queue_file_recover(in);
4198 need_recover = true;
4199 gather++;
4200 }
4201 }
4202
4203 if (lock->get_parent()->is_replicated() &&
4204 lock->get_state() == LOCK_MIX_LOCK &&
4205 gather) {
4206 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4207 } else {
4208 // move to second stage of gather now, so we don't send the lock action later.
4209 if (lock->get_state() == LOCK_MIX_LOCK)
4210 lock->set_state(LOCK_MIX_LOCK2);
4211
4212 if (lock->get_parent()->is_replicated() &&
4213 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4214 gather++;
4215 send_lock_message(lock, LOCK_AC_LOCK);
4216 lock->init_gather();
4217 }
4218 }
4219
4220 if (!gather && lock->is_dirty()) {
4221 lock->get_parent()->auth_pin(lock);
4222 scatter_writebehind(static_cast<ScatterLock*>(lock));
4223 mds->mdlog->flush();
4224 return;
4225 }
4226
4227 if (gather) {
4228 lock->get_parent()->auth_pin(lock);
4229 if (need_recover)
4230 mds->mdcache->do_file_recover();
4231 } else {
4232 lock->set_state(LOCK_LOCK);
4233 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4234 }
4235}
4236
4237
4238void Locker::simple_xlock(SimpleLock *lock)
4239{
4240 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4241 assert(lock->get_parent()->is_auth());
4242 //assert(lock->is_stable());
4243 assert(lock->get_state() != LOCK_XLOCK);
4244
4245 CInode *in = 0;
4246 if (lock->get_cap_shift())
4247 in = static_cast<CInode *>(lock->get_parent());
4248
4249 if (lock->is_stable())
4250 lock->get_parent()->auth_pin(lock);
4251
4252 switch (lock->get_state()) {
4253 case LOCK_LOCK:
4254 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4255 default: ceph_abort();
4256 }
4257
4258 int gather = 0;
4259 if (lock->is_rdlocked())
4260 gather++;
4261 if (lock->is_wrlocked())
4262 gather++;
4263
4264 if (in && in->is_head()) {
4265 if (in->issued_caps_need_gather(lock)) {
4266 issue_caps(in);
4267 gather++;
4268 }
4269 }
4270
4271 if (!gather) {
4272 lock->set_state(LOCK_PREXLOCK);
4273 //assert("shouldn't be called if we are already xlockable" == 0);
4274 }
4275}
4276
4277
4278
4279
4280
4281// ==========================================================================
4282// scatter lock
4283
4284/*
4285
4286Some notes on scatterlocks.
4287
4288 - The scatter/gather is driven by the inode lock. The scatter always
4289 brings in the latest metadata from the fragments.
4290
4291 - When in a scattered/MIX state, fragments are only allowed to
4292 update/be written to if the accounted stat matches the inode's
4293 current version.
4294
4295 - That means, on gather, we _only_ assimilate diffs for frag metadata
4296 that match the current version, because those are the only ones
4297 written during this scatter/gather cycle. (Others didn't permit
4298 it.) We increment the version and journal this to disk.
4299
4300 - When possible, we also simultaneously update our local frag
4301 accounted stats to match.
4302
4303 - On scatter, the new inode info is broadcast to frags, both local
4304 and remote. If possible (auth and !frozen), the dirfrag auth
4305 should update the accounted state (if it isn't already up to date).
4306 Note that this may occur on both the local inode auth node and
4307 inode replicas, so there are two potential paths. If it is NOT
4308 possible, they need to mark_stale to prevent any possible writes.
4309
4310 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4311 only). Both are opportunities to update the frag accounted stats,
4312 even though only the MIX case is affected by a stale dirfrag.
4313
4314 - Because many scatter/gather cycles can potentially go by without a
4315 frag being able to update its accounted stats (due to being frozen
4316 by exports/refragments in progress), the frag may have (even very)
4317 old stat versions. That's fine. If when we do want to update it,
4318 we can update accounted_* and the version first.
4319
4320*/
4321
4322class C_Locker_ScatterWB : public LockerLogContext {
4323 ScatterLock *lock;
4324 MutationRef mut;
4325public:
4326 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4327 LockerLogContext(l), lock(sl), mut(m) {}
4328 void finish(int r) override {
4329 locker->scatter_writebehind_finish(lock, mut);
4330 }
4331};
4332
4333void Locker::scatter_writebehind(ScatterLock *lock)
4334{
4335 CInode *in = static_cast<CInode*>(lock->get_parent());
4336 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4337
4338 // journal
4339 MutationRef mut(new MutationImpl());
4340 mut->ls = mds->mdlog->get_current_segment();
4341
4342 // forcefully take a wrlock
4343 lock->get_wrlock(true);
4344 mut->wrlocks.insert(lock);
4345 mut->locks.insert(lock);
4346
4347 in->pre_cow_old_inode(); // avoid cow mayhem
4348
4349 inode_t *pi = in->project_inode();
4350 pi->version = in->pre_dirty();
4351
4352 in->finish_scatter_gather_update(lock->get_type());
4353 lock->start_flush();
4354
4355 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4356 mds->mdlog->start_entry(le);
4357
4358 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4359 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4360
4361 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4362
4363 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4364}
4365
4366void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4367{
4368 CInode *in = static_cast<CInode*>(lock->get_parent());
4369 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4370 in->pop_and_dirty_projected_inode(mut->ls);
4371
4372 lock->finish_flush();
4373
4374 // if replicas may have flushed in a mix->lock state, send another
4375 // message so they can finish_flush().
4376 if (in->is_replicated()) {
4377 switch (lock->get_state()) {
4378 case LOCK_MIX_LOCK:
4379 case LOCK_MIX_LOCK2:
4380 case LOCK_MIX_EXCL:
4381 case LOCK_MIX_TSYN:
4382 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4383 }
4384 }
4385
4386 mut->apply();
4387 drop_locks(mut.get());
4388 mut->cleanup();
4389
4390 if (lock->is_stable())
4391 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4392
4393 //scatter_eval_gather(lock);
4394}
4395
4396void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4397{
4398 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4399
4400 assert(lock->get_parent()->is_auth());
4401 assert(lock->is_stable());
4402
4403 if (lock->get_parent()->is_freezing_or_frozen()) {
4404 dout(20) << " freezing|frozen" << dendl;
4405 return;
4406 }
4407
4408 if (mdcache->is_readonly()) {
4409 if (lock->get_state() != LOCK_SYNC) {
4410 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4411 simple_sync(lock, need_issue);
4412 }
4413 return;
4414 }
4415
4416 if (!lock->is_rdlocked() &&
4417 lock->get_state() != LOCK_MIX &&
4418 lock->get_scatter_wanted()) {
4419 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4420 << " on " << *lock->get_parent() << dendl;
4421 scatter_mix(lock, need_issue);
4422 return;
4423 }
4424
4425 if (lock->get_type() == CEPH_LOCK_INEST) {
4426 // in general, we want to keep INEST writable at all times.
4427 if (!lock->is_rdlocked()) {
4428 if (lock->get_parent()->is_replicated()) {
4429 if (lock->get_state() != LOCK_MIX)
4430 scatter_mix(lock, need_issue);
4431 } else {
4432 if (lock->get_state() != LOCK_LOCK)
4433 simple_lock(lock, need_issue);
4434 }
4435 }
4436 return;
4437 }
4438
4439 CInode *in = static_cast<CInode*>(lock->get_parent());
4440 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4441 // i _should_ be sync.
4442 if (!lock->is_wrlocked() &&
4443 lock->get_state() != LOCK_SYNC) {
4444 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4445 simple_sync(lock, need_issue);
4446 }
4447 }
4448}
4449
4450
4451/*
4452 * mark a scatterlock to indicate that the dir fnode has some dirty data
4453 */
4454void Locker::mark_updated_scatterlock(ScatterLock *lock)
4455{
4456 lock->mark_dirty();
4457 if (lock->get_updated_item()->is_on_list()) {
4458 dout(10) << "mark_updated_scatterlock " << *lock
4459 << " - already on list since " << lock->get_update_stamp() << dendl;
4460 } else {
4461 updated_scatterlocks.push_back(lock->get_updated_item());
4462 utime_t now = ceph_clock_now();
4463 lock->set_update_stamp(now);
4464 dout(10) << "mark_updated_scatterlock " << *lock
4465 << " - added at " << now << dendl;
4466 }
4467}
4468
4469/*
4470 * this is called by scatter_tick and LogSegment::try_to_trim() when
4471 * trying to flush dirty scattered data (i.e. updated fnode) back to
4472 * the inode.
4473 *
4474 * we need to lock|scatter in order to push fnode changes into the
4475 * inode.dirstat.
4476 */
4477void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4478{
4479 CInode *p = static_cast<CInode *>(lock->get_parent());
4480
4481 if (p->is_frozen() || p->is_freezing()) {
4482 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4483 if (c)
4484 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4485 else
4486 // just requeue. not ideal.. starvation prone..
4487 updated_scatterlocks.push_back(lock->get_updated_item());
4488 return;
4489 }
4490
4491 if (p->is_ambiguous_auth()) {
4492 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4493 if (c)
4494 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4495 else
4496 // just requeue. not ideal.. starvation prone..
4497 updated_scatterlocks.push_back(lock->get_updated_item());
4498 return;
4499 }
4500
4501 if (p->is_auth()) {
4502 int count = 0;
4503 while (true) {
4504 if (lock->is_stable()) {
4505 // can we do it now?
4506 // (only if we're not replicated.. if we are, we really do need
4507 // to nudge the lock state!)
4508 /*
4509 actually, even if we're not replicated, we can't stay in MIX, because another mds
4510 could discover and replicate us at any time. if that happens while we're flushing,
4511 they end up in MIX but their inode has the old scatterstat version.
4512
4513 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4514 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4515 scatter_writebehind(lock);
4516 if (c)
4517 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4518 return;
4519 }
4520 */
4521
4522 if (mdcache->is_readonly()) {
4523 if (lock->get_state() != LOCK_SYNC) {
4524 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4525 simple_sync(static_cast<ScatterLock*>(lock));
4526 }
4527 break;
4528 }
4529
4530 // adjust lock state
4531 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4532 switch (lock->get_type()) {
4533 case CEPH_LOCK_IFILE:
4534 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4535 scatter_mix(static_cast<ScatterLock*>(lock));
4536 else if (lock->get_state() != LOCK_LOCK)
4537 simple_lock(static_cast<ScatterLock*>(lock));
4538 else
4539 simple_sync(static_cast<ScatterLock*>(lock));
4540 break;
4541
4542 case CEPH_LOCK_IDFT:
4543 case CEPH_LOCK_INEST:
4544 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4545 scatter_mix(lock);
4546 else if (lock->get_state() != LOCK_LOCK)
4547 simple_lock(lock);
4548 else
4549 simple_sync(lock);
4550 break;
4551 default:
4552 ceph_abort();
4553 }
4554 ++count;
4555 if (lock->is_stable() && count == 2) {
4556 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4557 // this should only realy happen when called via
4558 // handle_file_lock due to AC_NUDGE, because the rest of the
4559 // time we are replicated or have dirty data and won't get
4560 // called. bailing here avoids an infinite loop.
4561 assert(!c);
4562 break;
4563 }
4564 } else {
4565 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4566 if (c)
4567 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4568 return;
4569 }
4570 }
4571 } else {
4572 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4573 << *lock << " on " << *p << dendl;
4574 // request unscatter?
4575 mds_rank_t auth = lock->get_parent()->authority().first;
4576 if (!mds->is_cluster_degraded() ||
4577 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4578 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4579
4580 // wait...
4581 if (c)
4582 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4583
4584 // also, requeue, in case we had wrong auth or something
4585 updated_scatterlocks.push_back(lock->get_updated_item());
4586 }
4587}
4588
4589void Locker::scatter_tick()
4590{
4591 dout(10) << "scatter_tick" << dendl;
4592
4593 // updated
4594 utime_t now = ceph_clock_now();
4595 int n = updated_scatterlocks.size();
4596 while (!updated_scatterlocks.empty()) {
4597 ScatterLock *lock = updated_scatterlocks.front();
4598
4599 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4600
4601 if (!lock->is_dirty()) {
4602 updated_scatterlocks.pop_front();
4603 dout(10) << " removing from updated_scatterlocks "
4604 << *lock << " " << *lock->get_parent() << dendl;
4605 continue;
4606 }
4607 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4608 break;
4609 updated_scatterlocks.pop_front();
4610 scatter_nudge(lock, 0);
4611 }
4612 mds->mdlog->flush();
4613}
4614
4615
4616void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4617{
4618 dout(10) << "scatter_tempsync " << *lock
4619 << " on " << *lock->get_parent() << dendl;
4620 assert(lock->get_parent()->is_auth());
4621 assert(lock->is_stable());
4622
4623 assert(0 == "not fully implemented, at least not for filelock");
4624
4625 CInode *in = static_cast<CInode *>(lock->get_parent());
4626
4627 switch (lock->get_state()) {
4628 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4629 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4630 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4631 default: ceph_abort();
4632 }
4633
4634 int gather = 0;
4635 if (lock->is_wrlocked())
4636 gather++;
4637
4638 if (lock->get_cap_shift() &&
4639 in->is_head() &&
4640 in->issued_caps_need_gather(lock)) {
4641 if (need_issue)
4642 *need_issue = true;
4643 else
4644 issue_caps(in);
4645 gather++;
4646 }
4647
4648 if (lock->get_state() == LOCK_MIX_TSYN &&
4649 in->is_replicated()) {
4650 lock->init_gather();
4651 send_lock_message(lock, LOCK_AC_LOCK);
4652 gather++;
4653 }
4654
4655 if (gather) {
4656 in->auth_pin(lock);
4657 } else {
4658 // do tempsync
4659 lock->set_state(LOCK_TSYN);
4660 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4661 if (lock->get_cap_shift()) {
4662 if (need_issue)
4663 *need_issue = true;
4664 else
4665 issue_caps(in);
4666 }
4667 }
4668}
4669
4670
4671
4672// ==========================================================================
4673// local lock
4674
4675void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4676{
4677 dout(7) << "local_wrlock_grab on " << *lock
4678 << " on " << *lock->get_parent() << dendl;
4679
4680 assert(lock->get_parent()->is_auth());
4681 assert(lock->can_wrlock());
4682 assert(!mut->wrlocks.count(lock));
4683 lock->get_wrlock(mut->get_client());
4684 mut->wrlocks.insert(lock);
4685 mut->locks.insert(lock);
4686}
4687
4688bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4689{
4690 dout(7) << "local_wrlock_start on " << *lock
4691 << " on " << *lock->get_parent() << dendl;
4692
4693 assert(lock->get_parent()->is_auth());
4694 if (lock->can_wrlock()) {
4695 assert(!mut->wrlocks.count(lock));
4696 lock->get_wrlock(mut->get_client());
4697 mut->wrlocks.insert(lock);
4698 mut->locks.insert(lock);
4699 return true;
4700 } else {
4701 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4702 return false;
4703 }
4704}
4705
4706void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4707{
4708 dout(7) << "local_wrlock_finish on " << *lock
4709 << " on " << *lock->get_parent() << dendl;
4710 lock->put_wrlock();
4711 mut->wrlocks.erase(lock);
4712 mut->locks.erase(lock);
4713 if (lock->get_num_wrlocks() == 0) {
4714 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4715 SimpleLock::WAIT_WR |
4716 SimpleLock::WAIT_RD);
4717 }
4718}
4719
4720bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4721{
4722 dout(7) << "local_xlock_start on " << *lock
4723 << " on " << *lock->get_parent() << dendl;
4724
4725 assert(lock->get_parent()->is_auth());
4726 if (!lock->can_xlock_local()) {
4727 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4728 return false;
4729 }
4730
4731 lock->get_xlock(mut, mut->get_client());
4732 mut->xlocks.insert(lock);
4733 mut->locks.insert(lock);
4734 return true;
4735}
4736
4737void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4738{
4739 dout(7) << "local_xlock_finish on " << *lock
4740 << " on " << *lock->get_parent() << dendl;
4741 lock->put_xlock();
4742 mut->xlocks.erase(lock);
4743 mut->locks.erase(lock);
4744
4745 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4746 SimpleLock::WAIT_WR |
4747 SimpleLock::WAIT_RD);
4748}
4749
4750
4751
4752// ==========================================================================
4753// file lock
4754
4755
4756void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4757{
4758 CInode *in = static_cast<CInode*>(lock->get_parent());
4759 int loner_wanted, other_wanted;
4760 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4761 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4762 << " loner_wanted=" << gcap_string(loner_wanted)
4763 << " other_wanted=" << gcap_string(other_wanted)
4764 << " filelock=" << *lock << " on " << *lock->get_parent()
4765 << dendl;
4766
4767 assert(lock->get_parent()->is_auth());
4768 assert(lock->is_stable());
4769
4770 if (lock->get_parent()->is_freezing_or_frozen())
4771 return;
4772
4773 if (mdcache->is_readonly()) {
4774 if (lock->get_state() != LOCK_SYNC) {
4775 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4776 simple_sync(lock, need_issue);
4777 }
4778 return;
4779 }
4780
4781 // excl -> *?
4782 if (lock->get_state() == LOCK_EXCL) {
4783 dout(20) << " is excl" << dendl;
4784 int loner_issued, other_issued, xlocker_issued;
4785 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4786 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4787 << " other_issued=" << gcap_string(other_issued)
4788 << " xlocker_issued=" << gcap_string(xlocker_issued)
4789 << dendl;
4790 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4791 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4792 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4793 dout(20) << " should lose it" << dendl;
4794 // we should lose it.
4795 // loner other want
4796 // R R SYNC
4797 // R R|W MIX
4798 // R W MIX
4799 // R|W R MIX
4800 // R|W R|W MIX
4801 // R|W W MIX
4802 // W R MIX
4803 // W R|W MIX
4804 // W W MIX
4805 // -> any writer means MIX; RD doesn't matter.
4806 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4807 lock->is_waiter_for(SimpleLock::WAIT_WR))
4808 scatter_mix(lock, need_issue);
4809 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4810 simple_sync(lock, need_issue);
4811 else
4812 dout(10) << " waiting for wrlock to drain" << dendl;
4813 }
4814 }
4815
4816 // * -> excl?
4817 else if (lock->get_state() != LOCK_EXCL &&
4818 !lock->is_rdlocked() &&
4819 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4820 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4821 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4822 in->get_target_loner() >= 0) {
4823 dout(7) << "file_eval stable, bump to loner " << *lock
4824 << " on " << *lock->get_parent() << dendl;
4825 file_excl(lock, need_issue);
4826 }
4827
4828 // * -> mixed?
4829 else if (lock->get_state() != LOCK_MIX &&
4830 !lock->is_rdlocked() &&
4831 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4832 (lock->get_scatter_wanted() ||
4833 (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4834 dout(7) << "file_eval stable, bump to mixed " << *lock
4835 << " on " << *lock->get_parent() << dendl;
4836 scatter_mix(lock, need_issue);
4837 }
4838
4839 // * -> sync?
4840 else if (lock->get_state() != LOCK_SYNC &&
4841 !lock->is_wrlocked() && // drain wrlocks first!
4842 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4843 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4844 !((lock->get_state() == LOCK_MIX) &&
4845 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4846 //((wanted & CEPH_CAP_RD) ||
4847 //in->is_replicated() ||
4848 //lock->get_num_client_lease() ||
4849 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4850 ) {
4851 dout(7) << "file_eval stable, bump to sync " << *lock
4852 << " on " << *lock->get_parent() << dendl;
4853 simple_sync(lock, need_issue);
4854 }
4855}
4856
4857
4858
4859void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4860{
4861 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4862
4863 CInode *in = static_cast<CInode*>(lock->get_parent());
4864 assert(in->is_auth());
4865 assert(lock->is_stable());
4866
4867 if (lock->get_state() == LOCK_LOCK) {
4868 in->start_scatter(lock);
4869 if (in->is_replicated()) {
4870 // data
4871 bufferlist softdata;
4872 lock->encode_locked_state(softdata);
4873
4874 // bcast to replicas
4875 send_lock_message(lock, LOCK_AC_MIX, softdata);
4876 }
4877
4878 // change lock
4879 lock->set_state(LOCK_MIX);
4880 lock->clear_scatter_wanted();
4881 if (lock->get_cap_shift()) {
4882 if (need_issue)
4883 *need_issue = true;
4884 else
4885 issue_caps(in);
4886 }
4887 } else {
4888 // gather?
4889 switch (lock->get_state()) {
4890 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4891 case LOCK_XSYN:
4892 file_excl(lock, need_issue);
4893 if (lock->get_state() != LOCK_EXCL)
4894 return;
4895 // fall-thru
4896 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4897 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4898 default: ceph_abort();
4899 }
4900
4901 int gather = 0;
4902 if (lock->is_rdlocked())
4903 gather++;
4904 if (in->is_replicated()) {
4905 if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK
4906 lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too!
4907 send_lock_message(lock, LOCK_AC_MIX);
4908 lock->init_gather();
4909 gather++;
4910 }
4911 }
4912 if (lock->is_leased()) {
4913 revoke_client_leases(lock);
4914 gather++;
4915 }
4916 if (lock->get_cap_shift() &&
4917 in->is_head() &&
4918 in->issued_caps_need_gather(lock)) {
4919 if (need_issue)
4920 *need_issue = true;
4921 else
4922 issue_caps(in);
4923 gather++;
4924 }
4925 bool need_recover = false;
4926 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4927 mds->mdcache->queue_file_recover(in);
4928 need_recover = true;
4929 gather++;
4930 }
4931
4932 if (gather) {
4933 lock->get_parent()->auth_pin(lock);
4934 if (need_recover)
4935 mds->mdcache->do_file_recover();
4936 } else {
4937 in->start_scatter(lock);
4938 lock->set_state(LOCK_MIX);
4939 lock->clear_scatter_wanted();
4940 if (in->is_replicated()) {
4941 bufferlist softdata;
4942 lock->encode_locked_state(softdata);
4943 send_lock_message(lock, LOCK_AC_MIX, softdata);
4944 }
4945 if (lock->get_cap_shift()) {
4946 if (need_issue)
4947 *need_issue = true;
4948 else
4949 issue_caps(in);
4950 }
4951 }
4952 }
4953}
4954
4955
4956void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4957{
4958 CInode *in = static_cast<CInode*>(lock->get_parent());
4959 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4960
4961 assert(in->is_auth());
4962 assert(lock->is_stable());
4963
4964 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4965 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4966
4967 switch (lock->get_state()) {
4968 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4969 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4970 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4971 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4972 default: ceph_abort();
4973 }
4974 int gather = 0;
4975
4976 if (lock->is_rdlocked())
4977 gather++;
4978 if (lock->is_wrlocked())
4979 gather++;
4980
4981 if (in->is_replicated() &&
4982 lock->get_state() != LOCK_LOCK_EXCL &&
4983 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4984 send_lock_message(lock, LOCK_AC_LOCK);
4985 lock->init_gather();
4986 gather++;
4987 }
4988 if (lock->is_leased()) {
4989 revoke_client_leases(lock);
4990 gather++;
4991 }
4992 if (in->is_head() &&
4993 in->issued_caps_need_gather(lock)) {
4994 if (need_issue)
4995 *need_issue = true;
4996 else
4997 issue_caps(in);
4998 gather++;
4999 }
5000 bool need_recover = false;
5001 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5002 mds->mdcache->queue_file_recover(in);
5003 need_recover = true;
5004 gather++;
5005 }
5006
5007 if (gather) {
5008 lock->get_parent()->auth_pin(lock);
5009 if (need_recover)
5010 mds->mdcache->do_file_recover();
5011 } else {
5012 lock->set_state(LOCK_EXCL);
5013 if (need_issue)
5014 *need_issue = true;
5015 else
5016 issue_caps(in);
5017 }
5018}
5019
5020void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5021{
5022 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5023 CInode *in = static_cast<CInode *>(lock->get_parent());
5024 assert(in->is_auth());
5025 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5026
5027 switch (lock->get_state()) {
5028 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5029 default: ceph_abort();
5030 }
5031
5032 int gather = 0;
5033 if (lock->is_wrlocked())
5034 gather++;
5035
5036 if (in->is_head() &&
5037 in->issued_caps_need_gather(lock)) {
5038 if (need_issue)
5039 *need_issue = true;
5040 else
5041 issue_caps(in);
5042 gather++;
5043 }
5044
5045 if (gather) {
5046 lock->get_parent()->auth_pin(lock);
5047 } else {
5048 lock->set_state(LOCK_XSYN);
5049 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5050 if (need_issue)
5051 *need_issue = true;
5052 else
5053 issue_caps(in);
5054 }
5055}
5056
5057void Locker::file_recover(ScatterLock *lock)
5058{
5059 CInode *in = static_cast<CInode *>(lock->get_parent());
5060 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5061
5062 assert(in->is_auth());
5063 //assert(lock->is_stable());
5064 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5065
5066 int gather = 0;
5067
5068 /*
5069 if (in->is_replicated()
5070 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5071 send_lock_message(lock, LOCK_AC_LOCK);
5072 lock->init_gather();
5073 gather++;
5074 }
5075 */
5076 if (in->is_head() &&
5077 in->issued_caps_need_gather(lock)) {
5078 issue_caps(in);
5079 gather++;
5080 }
5081
5082 lock->set_state(LOCK_SCAN);
5083 if (gather)
5084 in->state_set(CInode::STATE_NEEDSRECOVER);
5085 else
5086 mds->mdcache->queue_file_recover(in);
5087}
5088
5089
5090// messenger
5091/* This function DOES put the passed message before returning */
5092void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5093{
5094 CInode *in = static_cast<CInode*>(lock->get_parent());
5095 int from = m->get_asker();
5096
5097 if (mds->is_rejoin()) {
5098 if (in->is_rejoining()) {
5099 dout(7) << "handle_file_lock still rejoining " << *in
5100 << ", dropping " << *m << dendl;
5101 m->put();
5102 return;
5103 }
5104 }
5105
5106 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5107 << " on " << *lock
5108 << " from mds." << from << " "
5109 << *in << dendl;
5110
5111 bool caps = lock->get_cap_shift();
5112
5113 switch (m->get_action()) {
5114 // -- replica --
5115 case LOCK_AC_SYNC:
5116 assert(lock->get_state() == LOCK_LOCK ||
5117 lock->get_state() == LOCK_MIX ||
5118 lock->get_state() == LOCK_MIX_SYNC2);
5119
5120 if (lock->get_state() == LOCK_MIX) {
5121 lock->set_state(LOCK_MIX_SYNC);
5122 eval_gather(lock, true);
31f18b77
FG
5123 if (lock->is_unstable_and_locked())
5124 mds->mdlog->flush();
7c673cae
FG
5125 break;
5126 }
5127
5128 (static_cast<ScatterLock *>(lock))->finish_flush();
5129 (static_cast<ScatterLock *>(lock))->clear_flushed();
5130
5131 // ok
5132 lock->decode_locked_state(m->get_data());
5133 lock->set_state(LOCK_SYNC);
5134
5135 lock->get_rdlock();
5136 if (caps)
5137 issue_caps(in);
5138 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5139 lock->put_rdlock();
5140 break;
5141
5142 case LOCK_AC_LOCK:
5143 switch (lock->get_state()) {
5144 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5145 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5146 default: ceph_abort();
5147 }
5148
5149 eval_gather(lock, true);
31f18b77
FG
5150 if (lock->is_unstable_and_locked())
5151 mds->mdlog->flush();
5152
7c673cae
FG
5153 break;
5154
5155 case LOCK_AC_LOCKFLUSHED:
5156 (static_cast<ScatterLock *>(lock))->finish_flush();
5157 (static_cast<ScatterLock *>(lock))->clear_flushed();
5158 // wake up scatter_nudge waiters
5159 if (lock->is_stable())
5160 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5161 break;
5162
5163 case LOCK_AC_MIX:
5164 assert(lock->get_state() == LOCK_SYNC ||
5165 lock->get_state() == LOCK_LOCK ||
5166 lock->get_state() == LOCK_SYNC_MIX2);
5167
5168 if (lock->get_state() == LOCK_SYNC) {
5169 // MIXED
5170 lock->set_state(LOCK_SYNC_MIX);
5171 eval_gather(lock, true);
31f18b77
FG
5172 if (lock->is_unstable_and_locked())
5173 mds->mdlog->flush();
7c673cae
FG
5174 break;
5175 }
c07f9fc5 5176
7c673cae 5177 // ok
7c673cae 5178 lock->set_state(LOCK_MIX);
c07f9fc5 5179 lock->decode_locked_state(m->get_data());
7c673cae
FG
5180
5181 if (caps)
5182 issue_caps(in);
5183
5184 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5185 break;
5186
5187
5188 // -- auth --
5189 case LOCK_AC_LOCKACK:
5190 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5191 lock->get_state() == LOCK_MIX_LOCK ||
5192 lock->get_state() == LOCK_MIX_LOCK2 ||
5193 lock->get_state() == LOCK_MIX_EXCL ||
5194 lock->get_state() == LOCK_SYNC_EXCL ||
5195 lock->get_state() == LOCK_SYNC_MIX ||
5196 lock->get_state() == LOCK_MIX_TSYN);
5197 assert(lock->is_gathering(from));
5198 lock->remove_gather(from);
5199
5200 if (lock->get_state() == LOCK_MIX_LOCK ||
5201 lock->get_state() == LOCK_MIX_LOCK2 ||
5202 lock->get_state() == LOCK_MIX_EXCL ||
5203 lock->get_state() == LOCK_MIX_TSYN) {
5204 lock->decode_locked_state(m->get_data());
5205 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5206 // delay calling scatter_writebehind().
5207 lock->clear_flushed();
5208 }
5209
5210 if (lock->is_gathering()) {
5211 dout(7) << "handle_file_lock " << *in << " from " << from
5212 << ", still gathering " << lock->get_gather_set() << dendl;
5213 } else {
5214 dout(7) << "handle_file_lock " << *in << " from " << from
5215 << ", last one" << dendl;
5216 eval_gather(lock);
5217 }
5218 break;
5219
5220 case LOCK_AC_SYNCACK:
5221 assert(lock->get_state() == LOCK_MIX_SYNC);
5222 assert(lock->is_gathering(from));
5223 lock->remove_gather(from);
5224
5225 lock->decode_locked_state(m->get_data());
5226
5227 if (lock->is_gathering()) {
5228 dout(7) << "handle_file_lock " << *in << " from " << from
5229 << ", still gathering " << lock->get_gather_set() << dendl;
5230 } else {
5231 dout(7) << "handle_file_lock " << *in << " from " << from
5232 << ", last one" << dendl;
5233 eval_gather(lock);
5234 }
5235 break;
5236
5237 case LOCK_AC_MIXACK:
5238 assert(lock->get_state() == LOCK_SYNC_MIX);
5239 assert(lock->is_gathering(from));
5240 lock->remove_gather(from);
5241
5242 if (lock->is_gathering()) {
5243 dout(7) << "handle_file_lock " << *in << " from " << from
5244 << ", still gathering " << lock->get_gather_set() << dendl;
5245 } else {
5246 dout(7) << "handle_file_lock " << *in << " from " << from
5247 << ", last one" << dendl;
5248 eval_gather(lock);
5249 }
5250 break;
5251
5252
5253 // requests....
5254 case LOCK_AC_REQSCATTER:
5255 if (lock->is_stable()) {
5256 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5257 * because the replica should be holding an auth_pin if they're
5258 * doing this (and thus, we are freezing, not frozen, and indefinite
5259 * starvation isn't an issue).
5260 */
5261 dout(7) << "handle_file_lock got scatter request on " << *lock
5262 << " on " << *lock->get_parent() << dendl;
5263 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5264 scatter_mix(lock);
5265 } else {
5266 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5267 << " on " << *lock->get_parent() << dendl;
5268 lock->set_scatter_wanted();
5269 }
5270 break;
5271
5272 case LOCK_AC_REQUNSCATTER:
5273 if (lock->is_stable()) {
5274 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5275 * because the replica should be holding an auth_pin if they're
5276 * doing this (and thus, we are freezing, not frozen, and indefinite
5277 * starvation isn't an issue).
5278 */
5279 dout(7) << "handle_file_lock got unscatter request on " << *lock
5280 << " on " << *lock->get_parent() << dendl;
5281 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5282 simple_lock(lock); // FIXME tempsync?
5283 } else {
5284 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5285 << " on " << *lock->get_parent() << dendl;
5286 lock->set_unscatter_wanted();
5287 }
5288 break;
5289
5290 case LOCK_AC_REQRDLOCK:
5291 handle_reqrdlock(lock, m);
5292 break;
5293
5294 case LOCK_AC_NUDGE:
5295 if (!lock->get_parent()->is_auth()) {
5296 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5297 << " on " << *lock->get_parent() << dendl;
5298 } else if (!lock->get_parent()->is_replicated()) {
5299 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5300 << " on " << *lock->get_parent() << dendl;
5301 } else {
5302 dout(7) << "handle_file_lock trying nudge on " << *lock
5303 << " on " << *lock->get_parent() << dendl;
5304 scatter_nudge(lock, 0, true);
5305 mds->mdlog->flush();
5306 }
5307 break;
5308
5309 default:
5310 ceph_abort();
5311 }
5312
5313 m->put();
5314}
5315
5316
5317
5318
5319
5320