]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/Locker.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mds / Locker.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include "MDSRank.h"
17#include "MDCache.h"
18#include "Locker.h"
19#include "CInode.h"
20#include "CDir.h"
21#include "CDentry.h"
22#include "Mutation.h"
23#include "MDSContext.h"
24
25#include "MDLog.h"
26#include "MDSMap.h"
27
28#include "events/EUpdate.h"
29#include "events/EOpen.h"
30
31#include "msg/Messenger.h"
32#include "osdc/Objecter.h"
33
34#include "messages/MInodeFileCaps.h"
35#include "messages/MLock.h"
36#include "messages/MClientLease.h"
37#include "messages/MClientReply.h"
38#include "messages/MClientCaps.h"
39#include "messages/MClientCapRelease.h"
40
41#include "messages/MMDSSlaveRequest.h"
42
43#include <errno.h>
44
45#include "common/config.h"
46
47
48#define dout_subsys ceph_subsys_mds
49#undef dout_prefix
50#define dout_context g_ceph_context
51#define dout_prefix _prefix(_dout, mds)
52static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
54}
55
56
57class LockerContext : public MDSInternalContextBase {
58protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65public:
66 explicit LockerContext(Locker *locker_) : locker(locker_) {
67 assert(locker != NULL);
68 }
69};
70
71class LockerLogContext : public MDSLogContextBase {
72protected:
73 Locker *locker;
74 MDSRank *get_mds() override
75 {
76 return locker->mds;
77 }
78
79public:
80 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81 assert(locker != NULL);
82 }
83};
84
85/* This function DOES put the passed message before returning */
86void Locker::dispatch(Message *m)
87{
88
89 switch (m->get_type()) {
90
91 // inter-mds locking
92 case MSG_MDS_LOCK:
93 handle_lock(static_cast<MLock*>(m));
94 break;
95 // inter-mds caps
96 case MSG_MDS_INODEFILECAPS:
97 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
98 break;
99
100 // client sync
101 case CEPH_MSG_CLIENT_CAPS:
102 handle_client_caps(static_cast<MClientCaps*>(m));
103
104 break;
105 case CEPH_MSG_CLIENT_CAPRELEASE:
106 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
107 break;
108 case CEPH_MSG_CLIENT_LEASE:
109 handle_client_lease(static_cast<MClientLease*>(m));
110 break;
111
112 default:
113 derr << "locker unknown message " << m->get_type() << dendl;
114 assert(0 == "locker unknown message");
115 }
116}
117
118void Locker::tick()
119{
120 scatter_tick();
121 caps_tick();
122}
123
124/*
125 * locks vs rejoin
126 *
127 *
128 *
129 */
130
131void Locker::send_lock_message(SimpleLock *lock, int msg)
132{
133 for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
134 it != lock->get_parent()->replicas_end();
135 ++it) {
136 if (mds->is_cluster_degraded() &&
137 mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
138 continue;
139 MLock *m = new MLock(lock, msg, mds->get_nodeid());
140 mds->send_message_mds(m, it->first);
141 }
142}
143
144void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145{
146 for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
147 it != lock->get_parent()->replicas_end();
148 ++it) {
149 if (mds->is_cluster_degraded() &&
150 mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
151 continue;
152 MLock *m = new MLock(lock, msg, mds->get_nodeid());
153 m->set_data(data);
154 mds->send_message_mds(m, it->first);
155 }
156}
157
158
159
160
161void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
162{
163 // rdlock ancestor snaps
164 CInode *t = in;
165 rdlocks.insert(&in->snaplock);
166 while (t->get_projected_parent_dn()) {
167 t = t->get_projected_parent_dn()->get_dir()->get_inode();
168 rdlocks.insert(&t->snaplock);
169 }
170}
171
172void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
173 file_layout_t **layout)
174{
175 //rdlock ancestor snaps
176 CInode *t = in;
177 rdlocks.insert(&in->snaplock);
178 rdlocks.insert(&in->policylock);
179 bool found_layout = false;
180 while (t) {
181 rdlocks.insert(&t->snaplock);
182 if (!found_layout) {
183 rdlocks.insert(&t->policylock);
184 if (t->get_projected_inode()->has_layout()) {
185 *layout = &t->get_projected_inode()->layout;
186 found_layout = true;
187 }
188 }
189 if (t->get_projected_parent_dn() &&
190 t->get_projected_parent_dn()->get_dir())
191 t = t->get_projected_parent_dn()->get_dir()->get_inode();
192 else t = NULL;
193 }
194}
195
196struct MarkEventOnDestruct {
197 MDRequestRef& mdr;
198 const char* message;
199 bool mark_event;
200 MarkEventOnDestruct(MDRequestRef& _mdr,
201 const char *_message) : mdr(_mdr),
202 message(_message),
203 mark_event(true) {}
204 ~MarkEventOnDestruct() {
205 if (mark_event)
206 mdr->mark_event(message);
207 }
208};
209
210/* If this function returns false, the mdr has been placed
211 * on the appropriate wait list */
212bool Locker::acquire_locks(MDRequestRef& mdr,
213 set<SimpleLock*> &rdlocks,
214 set<SimpleLock*> &wrlocks,
215 set<SimpleLock*> &xlocks,
216 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
217 CInode *auth_pin_freeze,
218 bool auth_pin_nonblock)
219{
220 if (mdr->done_locking &&
221 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
222 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
223 return true; // at least we had better be!
224 }
225 dout(10) << "acquire_locks " << *mdr << dendl;
226
227 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
228
229 client_t client = mdr->get_client();
230
231 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
232 set<MDSCacheObject*> mustpin; // items to authpin
233
234 // xlocks
235 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
236 dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
237 sorted.insert(*p);
238 mustpin.insert((*p)->get_parent());
239
240 // augment xlock with a versionlock?
241 if ((*p)->get_type() == CEPH_LOCK_DN) {
242 CDentry *dn = (CDentry*)(*p)->get_parent();
243 if (!dn->is_auth())
244 continue;
245
246 if (xlocks.count(&dn->versionlock))
247 continue; // we're xlocking the versionlock too; don't wrlock it!
248
249 if (mdr->is_master()) {
250 // master. wrlock versionlock so we can pipeline dentry updates to journal.
251 wrlocks.insert(&dn->versionlock);
252 } else {
253 // slave. exclusively lock the dentry version (i.e. block other journal updates).
254 // this makes rollback safe.
255 xlocks.insert(&dn->versionlock);
256 sorted.insert(&dn->versionlock);
257 }
258 }
259 if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
260 // inode version lock?
261 CInode *in = (CInode*)(*p)->get_parent();
262 if (!in->is_auth())
263 continue;
264 if (mdr->is_master()) {
265 // master. wrlock versionlock so we can pipeline inode updates to journal.
266 wrlocks.insert(&in->versionlock);
267 } else {
268 // slave. exclusively lock the inode version (i.e. block other journal updates).
269 // this makes rollback safe.
270 xlocks.insert(&in->versionlock);
271 sorted.insert(&in->versionlock);
272 }
273 }
274 }
275
276 // wrlocks
277 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
278 MDSCacheObject *object = (*p)->get_parent();
279 dout(20) << " must wrlock " << **p << " " << *object << dendl;
280 sorted.insert(*p);
281 if (object->is_auth())
282 mustpin.insert(object);
283 else if (!object->is_auth() &&
284 !(*p)->can_wrlock(client) && // we might have to request a scatter
285 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
286 dout(15) << " will also auth_pin " << *object
287 << " in case we need to request a scatter" << dendl;
288 mustpin.insert(object);
289 }
290 }
291
292 // remote_wrlocks
293 if (remote_wrlocks) {
294 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
295 MDSCacheObject *object = p->first->get_parent();
296 dout(20) << " must remote_wrlock on mds." << p->second << " "
297 << *p->first << " " << *object << dendl;
298 sorted.insert(p->first);
299 mustpin.insert(object);
300 }
301 }
302
303 // rdlocks
304 for (set<SimpleLock*>::iterator p = rdlocks.begin();
305 p != rdlocks.end();
306 ++p) {
307 MDSCacheObject *object = (*p)->get_parent();
308 dout(20) << " must rdlock " << **p << " " << *object << dendl;
309 sorted.insert(*p);
310 if (object->is_auth())
311 mustpin.insert(object);
312 else if (!object->is_auth() &&
313 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
314 dout(15) << " will also auth_pin " << *object
315 << " in case we need to request a rdlock" << dendl;
316 mustpin.insert(object);
317 }
318 }
319
320
321 // AUTH PINS
322 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
323
324 // can i auth pin them all now?
325 marker.message = "failed to authpin local pins";
326 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
327 p != mustpin.end();
328 ++p) {
329 MDSCacheObject *object = *p;
330
331 dout(10) << " must authpin " << *object << dendl;
332
333 if (mdr->is_auth_pinned(object)) {
334 if (object != (MDSCacheObject*)auth_pin_freeze)
335 continue;
336 if (mdr->more()->is_remote_frozen_authpin) {
337 if (mdr->more()->rename_inode == auth_pin_freeze)
338 continue;
339 // unfreeze auth pin for the wrong inode
340 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
341 }
342 }
343
344 if (!object->is_auth()) {
345 if (!mdr->locks.empty())
346 drop_locks(mdr.get());
347 if (object->is_ambiguous_auth()) {
348 // wait
349 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
350 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
351 mdr->drop_local_auth_pins();
352 return false;
353 }
354 mustpin_remote[object->authority().first].insert(object);
355 continue;
356 }
357 if (!object->can_auth_pin()) {
358 // wait
359 drop_locks(mdr.get());
360 mdr->drop_local_auth_pins();
361 if (auth_pin_nonblock) {
362 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
363 mdr->aborted = true;
364 return false;
365 }
366 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
367 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
224ce89b
WB
368
369 if (!mdr->remote_auth_pins.empty())
370 notify_freeze_waiter(object);
371
7c673cae
FG
372 return false;
373 }
374 }
375
376 // ok, grab local auth pins
377 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
378 p != mustpin.end();
379 ++p) {
380 MDSCacheObject *object = *p;
381 if (mdr->is_auth_pinned(object)) {
382 dout(10) << " already auth_pinned " << *object << dendl;
383 } else if (object->is_auth()) {
384 dout(10) << " auth_pinning " << *object << dendl;
385 mdr->auth_pin(object);
386 }
387 }
388
389 // request remote auth_pins
390 if (!mustpin_remote.empty()) {
391 marker.message = "requesting remote authpins";
392 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
393 p != mdr->remote_auth_pins.end();
394 ++p) {
395 if (mustpin.count(p->first)) {
396 assert(p->second == p->first->authority().first);
397 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
398 if (q != mustpin_remote.end())
399 q->second.insert(p->first);
400 }
401 }
402 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
403 p != mustpin_remote.end();
404 ++p) {
405 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
406
407 // wait for active auth
408 if (mds->is_cluster_degraded() &&
409 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
410 dout(10) << " mds." << p->first << " is not active" << dendl;
411 if (mdr->more()->waiting_on_slave.empty())
412 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
413 return false;
414 }
415
416 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
417 MMDSSlaveRequest::OP_AUTHPIN);
418 for (set<MDSCacheObject*>::iterator q = p->second.begin();
419 q != p->second.end();
420 ++q) {
421 dout(10) << " req remote auth_pin of " << **q << dendl;
422 MDSCacheObjectInfo info;
423 (*q)->set_object_info(info);
424 req->get_authpins().push_back(info);
425 if (*q == auth_pin_freeze)
426 (*q)->set_object_info(req->get_authpin_freeze());
427 mdr->pin(*q);
428 }
429 if (auth_pin_nonblock)
430 req->mark_nonblock();
431 mds->send_message_mds(req, p->first);
432
433 // put in waiting list
434 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
435 mdr->more()->waiting_on_slave.insert(p->first);
436 }
437 return false;
438 }
439
440 // caps i'll need to issue
441 set<CInode*> issue_set;
442 bool result = false;
443
444 // acquire locks.
445 // make sure they match currently acquired locks.
446 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
447 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
448 p != sorted.end();
449 ++p) {
450 bool need_wrlock = !!wrlocks.count(*p);
451 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
452
453 // already locked?
454 if (existing != mdr->locks.end() && *existing == *p) {
455 // right kind?
456 SimpleLock *have = *existing;
457 ++existing;
458 if (xlocks.count(have) && mdr->xlocks.count(have)) {
459 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
460 continue;
461 }
462 if (mdr->remote_wrlocks.count(have)) {
463 if (!need_remote_wrlock ||
464 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
465 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
466 << " " << *have << " " << *have->get_parent() << dendl;
467 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
468 }
469 }
470 if (need_wrlock || need_remote_wrlock) {
471 if (need_wrlock == !!mdr->wrlocks.count(have) &&
472 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
473 if (need_wrlock)
474 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
475 if (need_remote_wrlock)
476 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
477 continue;
478 }
479 }
480 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
481 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
482 continue;
483 }
484 }
485
486 // hose any stray locks
487 if (existing != mdr->locks.end() && *existing == *p) {
488 assert(need_wrlock || need_remote_wrlock);
489 SimpleLock *lock = *existing;
490 if (mdr->wrlocks.count(lock)) {
491 if (!need_wrlock)
492 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
493 else if (need_remote_wrlock) // acquire remote_wrlock first
494 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
495 bool need_issue = false;
496 wrlock_finish(lock, mdr.get(), &need_issue);
497 if (need_issue)
498 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
499 }
500 ++existing;
501 }
502 while (existing != mdr->locks.end()) {
503 SimpleLock *stray = *existing;
504 ++existing;
505 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
506 bool need_issue = false;
507 if (mdr->xlocks.count(stray)) {
508 xlock_finish(stray, mdr.get(), &need_issue);
509 } else if (mdr->rdlocks.count(stray)) {
510 rdlock_finish(stray, mdr.get(), &need_issue);
511 } else {
512 // may have acquired both wrlock and remore wrlock
513 if (mdr->wrlocks.count(stray))
514 wrlock_finish(stray, mdr.get(), &need_issue);
515 if (mdr->remote_wrlocks.count(stray))
516 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
517 }
518 if (need_issue)
519 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
520 }
521
522 // lock
523 if (mdr->locking && *p != mdr->locking) {
524 cancel_locking(mdr.get(), &issue_set);
525 }
526 if (xlocks.count(*p)) {
527 marker.message = "failed to xlock, waiting";
528 if (!xlock_start(*p, mdr))
529 goto out;
530 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
531 } else if (need_wrlock || need_remote_wrlock) {
532 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
533 marker.message = "waiting for remote wrlocks";
534 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
535 goto out;
536 }
537 if (need_wrlock && !mdr->wrlocks.count(*p)) {
538 marker.message = "failed to wrlock, waiting";
539 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
540 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
541 // can't take the wrlock because the scatter lock is gathering. need to
542 // release the remote wrlock, so that the gathering process can finish.
543 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
544 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
545 goto out;
546 }
547 // nowait if we have already gotten remote wrlock
548 if (!wrlock_start(*p, mdr, need_remote_wrlock))
549 goto out;
550 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
551 }
552 } else {
553 assert(mdr->is_master());
554 if ((*p)->is_scatterlock()) {
555 ScatterLock *slock = static_cast<ScatterLock *>(*p);
556 if (slock->is_rejoin_mix()) {
557 // If there is a recovering mds who replcated an object when it failed
558 // and scatterlock in the object was in MIX state, It's possible that
559 // the recovering mds needs to take wrlock on the scatterlock when it
560 // replays unsafe requests. So this mds should delay taking rdlock on
561 // the scatterlock until the recovering mds finishes replaying unsafe.
562 // Otherwise unsafe requests may get replayed after current request.
563 //
564 // For example:
565 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
566 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
567 // delay the rmdir request until the recovering mds has replayed unlink
568 // requests.
569 if (mds->is_cluster_degraded()) {
570 if (!mdr->is_replay()) {
571 drop_locks(mdr.get());
572 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
573 dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
574 << ", waiting for cluster recovered" << dendl;
575 marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
576 return false;
577 }
578 } else {
579 slock->clear_rejoin_mix();
580 }
581 }
582 }
583
584 marker.message = "failed to rdlock, waiting";
585 if (!rdlock_start(*p, mdr))
586 goto out;
587 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
588 }
589 }
590
591 // any extra unneeded locks?
592 while (existing != mdr->locks.end()) {
593 SimpleLock *stray = *existing;
594 ++existing;
595 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
596 bool need_issue = false;
597 if (mdr->xlocks.count(stray)) {
598 xlock_finish(stray, mdr.get(), &need_issue);
599 } else if (mdr->rdlocks.count(stray)) {
600 rdlock_finish(stray, mdr.get(), &need_issue);
601 } else {
602 // may have acquired both wrlock and remore wrlock
603 if (mdr->wrlocks.count(stray))
604 wrlock_finish(stray, mdr.get(), &need_issue);
605 if (mdr->remote_wrlocks.count(stray))
606 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
607 }
608 if (need_issue)
609 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
610 }
611
612 mdr->done_locking = true;
613 mdr->set_mds_stamp(ceph_clock_now());
614 result = true;
615 marker.message = "acquired locks";
616
617 out:
618 issue_caps_set(issue_set);
619 return result;
620}
621
224ce89b
WB
622void Locker::notify_freeze_waiter(MDSCacheObject *o)
623{
624 CDir *dir = NULL;
625 if (CInode *in = dynamic_cast<CInode*>(o)) {
626 if (!in->is_root())
627 dir = in->get_parent_dir();
628 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
629 dir = dn->get_dir();
630 } else {
631 dir = dynamic_cast<CDir*>(o);
632 assert(dir);
633 }
634 if (dir) {
635 if (dir->is_freezing_dir())
636 mdcache->fragment_freeze_inc_num_waiters(dir);
637 if (dir->is_freezing_tree()) {
638 while (!dir->is_freezing_tree_root())
639 dir = dir->get_parent_dir();
640 mdcache->migrator->export_freeze_inc_num_waiters(dir);
641 }
642 }
643}
7c673cae
FG
644
645void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
646{
647 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
648 p != mut->xlocks.end();
649 ++p) {
650 MDSCacheObject *object = (*p)->get_parent();
651 assert(object->is_auth());
652 if (skip_dentry &&
653 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
654 continue;
655 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
656 (*p)->set_xlock_done();
657 }
658}
659
660void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
661{
662 while (!mut->rdlocks.empty()) {
663 bool ni = false;
664 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
665 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
666 if (ni)
667 pneed_issue->insert(static_cast<CInode*>(p));
668 }
669}
670
671void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
672{
673 set<mds_rank_t> slaves;
674
675 while (!mut->xlocks.empty()) {
676 SimpleLock *lock = *mut->xlocks.begin();
677 MDSCacheObject *p = lock->get_parent();
678 if (!p->is_auth()) {
679 assert(lock->get_sm()->can_remote_xlock);
680 slaves.insert(p->authority().first);
681 lock->put_xlock();
682 mut->locks.erase(lock);
683 mut->xlocks.erase(lock);
684 continue;
685 }
686 bool ni = false;
687 xlock_finish(lock, mut, &ni);
688 if (ni)
689 pneed_issue->insert(static_cast<CInode*>(p));
690 }
691
692 while (!mut->remote_wrlocks.empty()) {
693 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
694 slaves.insert(p->second);
695 if (mut->wrlocks.count(p->first) == 0)
696 mut->locks.erase(p->first);
697 mut->remote_wrlocks.erase(p);
698 }
699
700 while (!mut->wrlocks.empty()) {
701 bool ni = false;
702 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
703 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
704 if (ni)
705 pneed_issue->insert(static_cast<CInode*>(p));
706 }
707
708 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
709 if (!mds->is_cluster_degraded() ||
710 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
711 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
712 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
713 MMDSSlaveRequest::OP_DROPLOCKS);
714 mds->send_message_mds(slavereq, *p);
715 }
716 }
717}
718
719void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
720{
721 SimpleLock *lock = mut->locking;
722 assert(lock);
723 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
724
725 if (lock->get_parent()->is_auth()) {
726 bool need_issue = false;
727 if (lock->get_state() == LOCK_PREXLOCK) {
728 _finish_xlock(lock, -1, &need_issue);
729 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
730 lock->get_num_xlocks() == 0) {
731 lock->set_state(LOCK_XLOCKDONE);
732 eval_gather(lock, true, &need_issue);
733 }
734 if (need_issue)
735 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
736 }
737 mut->finish_locking(lock);
738}
739
740void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
741{
742 // leftover locks
743 set<CInode*> my_need_issue;
744 if (!pneed_issue)
745 pneed_issue = &my_need_issue;
746
747 if (mut->locking)
748 cancel_locking(mut, pneed_issue);
749 _drop_non_rdlocks(mut, pneed_issue);
750 _drop_rdlocks(mut, pneed_issue);
751
752 if (pneed_issue == &my_need_issue)
753 issue_caps_set(*pneed_issue);
754 mut->done_locking = false;
755}
756
757void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
758{
759 set<CInode*> my_need_issue;
760 if (!pneed_issue)
761 pneed_issue = &my_need_issue;
762
763 _drop_non_rdlocks(mut, pneed_issue);
764
765 if (pneed_issue == &my_need_issue)
766 issue_caps_set(*pneed_issue);
767}
768
769void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
770{
771 set<CInode*> my_need_issue;
772 if (!pneed_issue)
773 pneed_issue = &my_need_issue;
774
775 _drop_rdlocks(mut, pneed_issue);
776
777 if (pneed_issue == &my_need_issue)
778 issue_caps_set(*pneed_issue);
779}
780
781
782// generics
783
784void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
785{
786 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
787 assert(!lock->is_stable());
788
789 int next = lock->get_next_state();
790
791 CInode *in = 0;
792 bool caps = lock->get_cap_shift();
793 if (lock->get_type() != CEPH_LOCK_DN)
794 in = static_cast<CInode *>(lock->get_parent());
795
796 bool need_issue = false;
797
798 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
799 assert(!caps || in != NULL);
800 if (caps && in->is_head()) {
801 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
802 lock->get_cap_shift(), lock->get_cap_mask());
803 dout(10) << " next state is " << lock->get_state_name(next)
804 << " issued/allows loner " << gcap_string(loner_issued)
805 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
806 << " xlocker " << gcap_string(xlocker_issued)
807 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
808 << " other " << gcap_string(other_issued)
809 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
810 << dendl;
811
812 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
813 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
814 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
815 need_issue = true;
816 }
817
818#define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
819 bool auth = lock->get_parent()->is_auth();
820 if (!lock->is_gathering() &&
821 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
822 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
823 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
824 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
825 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
826 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
827 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
828 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
829 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
830 lock->get_state() != LOCK_MIX_SYNC2
831 ) {
832 dout(7) << "eval_gather finished gather on " << *lock
833 << " on " << *lock->get_parent() << dendl;
834
835 if (lock->get_sm() == &sm_filelock) {
836 assert(in);
837 if (in->state_test(CInode::STATE_RECOVERING)) {
838 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
839 return;
840 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
841 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
842 mds->mdcache->queue_file_recover(in);
843 mds->mdcache->do_file_recover();
844 return;
845 }
846 }
847
848 if (!lock->get_parent()->is_auth()) {
849 // replica: tell auth
850 mds_rank_t auth = lock->get_parent()->authority().first;
851
852 if (lock->get_parent()->is_rejoining() &&
853 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
854 dout(7) << "eval_gather finished gather, but still rejoining "
855 << *lock->get_parent() << dendl;
856 return;
857 }
858
859 if (!mds->is_cluster_degraded() ||
860 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
861 switch (lock->get_state()) {
862 case LOCK_SYNC_LOCK:
863 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
864 auth);
865 break;
866
867 case LOCK_MIX_SYNC:
868 {
869 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
870 lock->encode_locked_state(reply->get_data());
871 mds->send_message_mds(reply, auth);
872 next = LOCK_MIX_SYNC2;
873 (static_cast<ScatterLock *>(lock))->start_flush();
874 }
875 break;
876
877 case LOCK_MIX_SYNC2:
878 (static_cast<ScatterLock *>(lock))->finish_flush();
879 (static_cast<ScatterLock *>(lock))->clear_flushed();
880
881 case LOCK_SYNC_MIX2:
882 // do nothing, we already acked
883 break;
884
885 case LOCK_SYNC_MIX:
886 {
887 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
888 mds->send_message_mds(reply, auth);
889 next = LOCK_SYNC_MIX2;
890 }
891 break;
892
893 case LOCK_MIX_LOCK:
894 {
895 bufferlist data;
896 lock->encode_locked_state(data);
897 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
898 (static_cast<ScatterLock *>(lock))->start_flush();
899 // we'll get an AC_LOCKFLUSHED to complete
900 }
901 break;
902
903 default:
904 ceph_abort();
905 }
906 }
907 } else {
908 // auth
909
910 // once the first (local) stage of mix->lock gather complete we can
911 // gather from replicas
912 if (lock->get_state() == LOCK_MIX_LOCK &&
913 lock->get_parent()->is_replicated()) {
914 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
915 send_lock_message(lock, LOCK_AC_LOCK);
916 lock->init_gather();
917 lock->set_state(LOCK_MIX_LOCK2);
918 return;
919 }
920
921 if (lock->is_dirty() && !lock->is_flushed()) {
922 scatter_writebehind(static_cast<ScatterLock *>(lock));
923 mds->mdlog->flush();
924 return;
925 }
926 lock->clear_flushed();
927
928 switch (lock->get_state()) {
929 // to mixed
930 case LOCK_TSYN_MIX:
931 case LOCK_SYNC_MIX:
932 case LOCK_EXCL_MIX:
933 in->start_scatter(static_cast<ScatterLock *>(lock));
934 if (lock->get_parent()->is_replicated()) {
935 bufferlist softdata;
936 lock->encode_locked_state(softdata);
937 send_lock_message(lock, LOCK_AC_MIX, softdata);
938 }
939 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
940 break;
941
942 case LOCK_XLOCK:
943 case LOCK_XLOCKDONE:
944 if (next != LOCK_SYNC)
945 break;
946 // fall-thru
947
948 // to sync
949 case LOCK_EXCL_SYNC:
950 case LOCK_LOCK_SYNC:
951 case LOCK_MIX_SYNC:
952 case LOCK_XSYN_SYNC:
953 if (lock->get_parent()->is_replicated()) {
954 bufferlist softdata;
955 lock->encode_locked_state(softdata);
956 send_lock_message(lock, LOCK_AC_SYNC, softdata);
957 }
958 break;
959 }
960
961 }
962
963 lock->set_state(next);
964
965 if (lock->get_parent()->is_auth() &&
966 lock->is_stable())
967 lock->get_parent()->auth_unpin(lock);
968
969 // drop loner before doing waiters
970 if (caps &&
971 in->is_head() &&
972 in->is_auth() &&
973 in->get_wanted_loner() != in->get_loner()) {
974 dout(10) << " trying to drop loner" << dendl;
975 if (in->try_drop_loner()) {
976 dout(10) << " dropped loner" << dendl;
977 need_issue = true;
978 }
979 }
980
981 if (pfinishers)
982 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
983 *pfinishers);
984 else
985 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
986
987 if (caps && in->is_head())
988 need_issue = true;
989
990 if (lock->get_parent()->is_auth() &&
991 lock->is_stable())
992 try_eval(lock, &need_issue);
993 }
994
995 if (need_issue) {
996 if (pneed_issue)
997 *pneed_issue = true;
998 else if (in->is_head())
999 issue_caps(in);
1000 }
1001
1002}
1003
1004bool Locker::eval(CInode *in, int mask, bool caps_imported)
1005{
1006 bool need_issue = caps_imported;
1007 list<MDSInternalContextBase*> finishers;
1008
1009 dout(10) << "eval " << mask << " " << *in << dendl;
1010
1011 // choose loner?
1012 if (in->is_auth() && in->is_head()) {
1013 if (in->choose_ideal_loner() >= 0) {
1014 if (in->try_set_loner()) {
1015 dout(10) << "eval set loner to client." << in->get_loner() << dendl;
1016 need_issue = true;
1017 mask = -1;
1018 } else
1019 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1020 } else
1021 dout(10) << "eval doesn't want loner" << dendl;
1022 }
1023
1024 retry:
1025 if (mask & CEPH_LOCK_IFILE)
1026 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1027 if (mask & CEPH_LOCK_IAUTH)
1028 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1029 if (mask & CEPH_LOCK_ILINK)
1030 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1031 if (mask & CEPH_LOCK_IXATTR)
1032 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1033 if (mask & CEPH_LOCK_INEST)
1034 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1035 if (mask & CEPH_LOCK_IFLOCK)
1036 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1037 if (mask & CEPH_LOCK_IPOLICY)
1038 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1039
1040 // drop loner?
1041 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1042 dout(10) << " trying to drop loner" << dendl;
1043 if (in->try_drop_loner()) {
1044 dout(10) << " dropped loner" << dendl;
1045 need_issue = true;
1046
1047 if (in->get_wanted_loner() >= 0) {
1048 if (in->try_set_loner()) {
1049 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1050 mask = -1;
1051 goto retry;
1052 } else {
1053 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1054 }
1055 }
1056 }
1057 }
1058
1059 finish_contexts(g_ceph_context, finishers);
1060
1061 if (need_issue && in->is_head())
1062 issue_caps(in);
1063
1064 dout(10) << "eval done" << dendl;
1065 return need_issue;
1066}
1067
1068class C_Locker_Eval : public LockerContext {
1069 MDSCacheObject *p;
1070 int mask;
1071public:
1072 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1073 // We are used as an MDSCacheObject waiter, so should
1074 // only be invoked by someone already holding the big lock.
1075 assert(locker->mds->mds_lock.is_locked_by_me());
1076 p->get(MDSCacheObject::PIN_PTRWAITER);
1077 }
1078 void finish(int r) override {
1079 locker->try_eval(p, mask);
1080 p->put(MDSCacheObject::PIN_PTRWAITER);
1081 }
1082};
1083
1084void Locker::try_eval(MDSCacheObject *p, int mask)
1085{
1086 // unstable and ambiguous auth?
1087 if (p->is_ambiguous_auth()) {
1088 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1089 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1090 return;
1091 }
1092
1093 if (p->is_auth() && p->is_frozen()) {
1094 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1095 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1096 return;
1097 }
1098
1099 if (mask & CEPH_LOCK_DN) {
1100 assert(mask == CEPH_LOCK_DN);
1101 bool need_issue = false; // ignore this, no caps on dentries
1102 CDentry *dn = static_cast<CDentry *>(p);
1103 eval_any(&dn->lock, &need_issue);
1104 } else {
1105 CInode *in = static_cast<CInode *>(p);
1106 eval(in, mask);
1107 }
1108}
1109
1110void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1111{
1112 MDSCacheObject *p = lock->get_parent();
1113
1114 // unstable and ambiguous auth?
1115 if (p->is_ambiguous_auth()) {
1116 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1117 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1118 return;
1119 }
1120
1121 if (!p->is_auth()) {
1122 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1123 return;
1124 }
1125
1126 if (p->is_frozen()) {
1127 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1128 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1129 return;
1130 }
1131
1132 /*
1133 * We could have a situation like:
1134 *
1135 * - mds A authpins item on mds B
1136 * - mds B starts to freeze tree containing item
1137 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1138 * - mds B lock is unstable, sets scatter_wanted
1139 * - mds B lock stabilizes, calls try_eval.
1140 *
1141 * We can defer while freezing without causing a deadlock. Honor
1142 * scatter_wanted flag here. This will never get deferred by the
1143 * checks above due to the auth_pin held by the master.
1144 */
1145 if (lock->is_scatterlock()) {
1146 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1147 if (slock->get_scatter_wanted() &&
1148 slock->get_state() != LOCK_MIX) {
1149 scatter_mix(slock, pneed_issue);
1150 if (!lock->is_stable())
1151 return;
1152 } else if (slock->get_unscatter_wanted() &&
1153 slock->get_state() != LOCK_LOCK) {
1154 simple_lock(slock, pneed_issue);
1155 if (!lock->is_stable()) {
1156 return;
1157 }
1158 }
1159 }
1160
1161 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1162 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1163 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1164 return;
1165 }
1166
1167 eval(lock, pneed_issue);
1168}
1169
1170void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1171{
1172 bool need_issue = false;
1173 list<MDSInternalContextBase*> finishers;
1174
1175 // kick locks now
1176 if (!in->filelock.is_stable())
1177 eval_gather(&in->filelock, false, &need_issue, &finishers);
1178 if (!in->authlock.is_stable())
1179 eval_gather(&in->authlock, false, &need_issue, &finishers);
1180 if (!in->linklock.is_stable())
1181 eval_gather(&in->linklock, false, &need_issue, &finishers);
1182 if (!in->xattrlock.is_stable())
1183 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1184
1185 if (need_issue && in->is_head()) {
1186 if (issue_set)
1187 issue_set->insert(in);
1188 else
1189 issue_caps(in);
1190 }
1191
1192 finish_contexts(g_ceph_context, finishers);
1193}
1194
1195void Locker::eval_scatter_gathers(CInode *in)
1196{
1197 bool need_issue = false;
1198 list<MDSInternalContextBase*> finishers;
1199
1200 dout(10) << "eval_scatter_gathers " << *in << dendl;
1201
1202 // kick locks now
1203 if (!in->filelock.is_stable())
1204 eval_gather(&in->filelock, false, &need_issue, &finishers);
1205 if (!in->nestlock.is_stable())
1206 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1207 if (!in->dirfragtreelock.is_stable())
1208 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1209
1210 if (need_issue && in->is_head())
1211 issue_caps(in);
1212
1213 finish_contexts(g_ceph_context, finishers);
1214}
1215
1216void Locker::eval(SimpleLock *lock, bool *need_issue)
1217{
1218 switch (lock->get_type()) {
1219 case CEPH_LOCK_IFILE:
1220 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1221 case CEPH_LOCK_IDFT:
1222 case CEPH_LOCK_INEST:
1223 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1224 default:
1225 return simple_eval(lock, need_issue);
1226 }
1227}
1228
1229
1230// ------------------
1231// rdlock
1232
1233bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1234{
1235 // kick the lock
1236 if (lock->is_stable()) {
1237 if (lock->get_parent()->is_auth()) {
1238 if (lock->get_sm() == &sm_scatterlock) {
1239 // not until tempsync is fully implemented
1240 //if (lock->get_parent()->is_replicated())
1241 //scatter_tempsync((ScatterLock*)lock);
1242 //else
1243 simple_sync(lock);
1244 } else if (lock->get_sm() == &sm_filelock) {
1245 CInode *in = static_cast<CInode*>(lock->get_parent());
1246 if (lock->get_state() == LOCK_EXCL &&
1247 in->get_target_loner() >= 0 &&
1248 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1249 file_xsyn(lock);
1250 else
1251 simple_sync(lock);
1252 } else
1253 simple_sync(lock);
1254 return true;
1255 } else {
1256 // request rdlock state change from auth
1257 mds_rank_t auth = lock->get_parent()->authority().first;
1258 if (!mds->is_cluster_degraded() ||
1259 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1260 dout(10) << "requesting rdlock from auth on "
1261 << *lock << " on " << *lock->get_parent() << dendl;
1262 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1263 }
1264 return false;
1265 }
1266 }
1267 if (lock->get_type() == CEPH_LOCK_IFILE) {
1268 CInode *in = static_cast<CInode *>(lock->get_parent());
1269 if (in->state_test(CInode::STATE_RECOVERING)) {
1270 mds->mdcache->recovery_queue.prioritize(in);
1271 }
1272 }
1273
1274 return false;
1275}
1276
1277bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1278{
1279 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1280
1281 // can read? grab ref.
1282 if (lock->can_rdlock(client))
1283 return true;
1284
1285 _rdlock_kick(lock, false);
1286
1287 if (lock->can_rdlock(client))
1288 return true;
1289
1290 // wait!
1291 if (con) {
1292 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1293 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1294 }
1295 return false;
1296}
1297
1298bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1299{
1300 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1301
1302 // client may be allowed to rdlock the same item it has xlocked.
1303 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1304 if (mut->snapid != CEPH_NOSNAP)
1305 as_anon = true;
1306 client_t client = as_anon ? -1 : mut->get_client();
1307
1308 CInode *in = 0;
1309 if (lock->get_type() != CEPH_LOCK_DN)
1310 in = static_cast<CInode *>(lock->get_parent());
1311
1312 /*
1313 if (!lock->get_parent()->is_auth() &&
1314 lock->fw_rdlock_to_auth()) {
1315 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1316 return false;
1317 }
1318 */
1319
1320 while (1) {
1321 // can read? grab ref.
1322 if (lock->can_rdlock(client)) {
1323 lock->get_rdlock();
1324 mut->rdlocks.insert(lock);
1325 mut->locks.insert(lock);
1326 return true;
1327 }
1328
1329 // hmm, wait a second.
1330 if (in && !in->is_head() && in->is_auth() &&
1331 lock->get_state() == LOCK_SNAP_SYNC) {
1332 // okay, we actually need to kick the head's lock to get ourselves synced up.
1333 CInode *head = mdcache->get_inode(in->ino());
1334 assert(head);
1335 SimpleLock *hlock = head->get_lock(lock->get_type());
1336 if (hlock->get_state() != LOCK_SYNC) {
1337 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1338 if (!rdlock_start(head->get_lock(lock->get_type()), mut, true)) // ** as_anon, no rdlock on EXCL **
1339 return false;
1340 // oh, check our lock again then
1341 }
1342 }
1343
1344 if (!_rdlock_kick(lock, as_anon))
1345 break;
1346 }
1347
1348 // wait!
1349 int wait_on;
1350 if (lock->get_parent()->is_auth() && lock->is_stable())
1351 wait_on = SimpleLock::WAIT_RD;
1352 else
1353 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1354 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1355 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1356 nudge_log(lock);
1357 return false;
1358}
1359
1360void Locker::nudge_log(SimpleLock *lock)
1361{
1362 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1363 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1364 mds->mdlog->flush();
1365}
1366
1367void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1368{
1369 // drop ref
1370 lock->put_rdlock();
1371 if (mut) {
1372 mut->rdlocks.erase(lock);
1373 mut->locks.erase(lock);
1374 }
1375
1376 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1377
1378 // last one?
1379 if (!lock->is_rdlocked()) {
1380 if (!lock->is_stable())
1381 eval_gather(lock, false, pneed_issue);
1382 else if (lock->get_parent()->is_auth())
1383 try_eval(lock, pneed_issue);
1384 }
1385}
1386
1387
1388bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1389{
1390 dout(10) << "can_rdlock_set " << locks << dendl;
1391 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1392 if (!(*p)->can_rdlock(-1)) {
1393 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1394 return false;
1395 }
1396 return true;
1397}
1398
1399bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1400{
1401 dout(10) << "rdlock_try_set " << locks << dendl;
1402 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1403 if (!rdlock_try(*p, -1, NULL)) {
1404 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1405 return false;
1406 }
1407 return true;
1408}
1409
1410void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1411{
1412 dout(10) << "rdlock_take_set " << locks << dendl;
1413 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1414 (*p)->get_rdlock();
1415 mut->rdlocks.insert(*p);
1416 mut->locks.insert(*p);
1417 }
1418}
1419
1420// ------------------
1421// wrlock
1422
1423void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1424{
1425 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1426 lock->get_type() == CEPH_LOCK_DVERSION)
1427 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1428
1429 dout(7) << "wrlock_force on " << *lock
1430 << " on " << *lock->get_parent() << dendl;
1431 lock->get_wrlock(true);
1432 mut->wrlocks.insert(lock);
1433 mut->locks.insert(lock);
1434}
1435
1436bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1437{
1438 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1439 lock->get_type() == CEPH_LOCK_DVERSION)
1440 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1441
1442 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1443
1444 CInode *in = static_cast<CInode *>(lock->get_parent());
1445 client_t client = mut->get_client();
1446 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1447 (in->has_subtree_or_exporting_dirfrag() ||
1448 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1449
1450 while (1) {
1451 // wrlock?
1452 if (lock->can_wrlock(client) &&
1453 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1454 lock->get_wrlock();
1455 mut->wrlocks.insert(lock);
1456 mut->locks.insert(lock);
1457 return true;
1458 }
1459
1460 if (lock->get_type() == CEPH_LOCK_IFILE &&
1461 in->state_test(CInode::STATE_RECOVERING)) {
1462 mds->mdcache->recovery_queue.prioritize(in);
1463 }
1464
1465 if (!lock->is_stable())
1466 break;
1467
1468 if (in->is_auth()) {
1469 // don't do nested lock state change if we have dirty scatterdata and
1470 // may scatter_writebehind or start_scatter, because nowait==true implies
1471 // that the caller already has a log entry open!
1472 if (nowait && lock->is_dirty())
1473 return false;
1474
1475 if (want_scatter)
1476 scatter_mix(static_cast<ScatterLock*>(lock));
1477 else
1478 simple_lock(lock);
1479
1480 if (nowait && !lock->can_wrlock(client))
1481 return false;
1482
1483 } else {
1484 // replica.
1485 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1486 mds_rank_t auth = lock->get_parent()->authority().first;
1487 if (!mds->is_cluster_degraded() ||
1488 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1489 dout(10) << "requesting scatter from auth on "
1490 << *lock << " on " << *lock->get_parent() << dendl;
1491 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1492 }
1493 break;
1494 }
1495 }
1496
1497 if (!nowait) {
1498 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1499 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1500 nudge_log(lock);
1501 }
1502
1503 return false;
1504}
1505
1506void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1507{
1508 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1509 lock->get_type() == CEPH_LOCK_DVERSION)
1510 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1511
1512 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1513 lock->put_wrlock();
1514 if (mut) {
1515 mut->wrlocks.erase(lock);
1516 if (mut->remote_wrlocks.count(lock) == 0)
1517 mut->locks.erase(lock);
1518 }
1519
1520 if (!lock->is_wrlocked()) {
1521 if (!lock->is_stable())
1522 eval_gather(lock, false, pneed_issue);
1523 else if (lock->get_parent()->is_auth())
1524 try_eval(lock, pneed_issue);
1525 }
1526}
1527
1528
1529// remote wrlock
1530
1531void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1532{
1533 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1534
1535 // wait for active target
1536 if (mds->is_cluster_degraded() &&
1537 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1538 dout(7) << " mds." << target << " is not active" << dendl;
1539 if (mut->more()->waiting_on_slave.empty())
1540 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1541 return;
1542 }
1543
1544 // send lock request
1545 mut->start_locking(lock, target);
1546 mut->more()->slaves.insert(target);
1547 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1548 MMDSSlaveRequest::OP_WRLOCK);
1549 r->set_lock_type(lock->get_type());
1550 lock->get_parent()->set_object_info(r->get_object_info());
1551 mds->send_message_mds(r, target);
1552
1553 assert(mut->more()->waiting_on_slave.count(target) == 0);
1554 mut->more()->waiting_on_slave.insert(target);
1555}
1556
1557void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1558 MutationImpl *mut)
1559{
1560 // drop ref
1561 mut->remote_wrlocks.erase(lock);
1562 if (mut->wrlocks.count(lock) == 0)
1563 mut->locks.erase(lock);
1564
1565 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1566 << " " << *lock->get_parent() << dendl;
1567 if (!mds->is_cluster_degraded() ||
1568 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1569 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1570 MMDSSlaveRequest::OP_UNWRLOCK);
1571 slavereq->set_lock_type(lock->get_type());
1572 lock->get_parent()->set_object_info(slavereq->get_object_info());
1573 mds->send_message_mds(slavereq, target);
1574 }
1575}
1576
1577
1578// ------------------
1579// xlock
1580
1581bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1582{
1583 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1584 lock->get_type() == CEPH_LOCK_DVERSION)
1585 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1586
1587 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1588 client_t client = mut->get_client();
1589
1590 // auth?
1591 if (lock->get_parent()->is_auth()) {
1592 // auth
1593 while (1) {
1594 if (lock->can_xlock(client)) {
1595 lock->set_state(LOCK_XLOCK);
1596 lock->get_xlock(mut, client);
1597 mut->xlocks.insert(lock);
1598 mut->locks.insert(lock);
1599 mut->finish_locking(lock);
1600 return true;
1601 }
1602
1603 if (lock->get_type() == CEPH_LOCK_IFILE) {
1604 CInode *in = static_cast<CInode*>(lock->get_parent());
1605 if (in->state_test(CInode::STATE_RECOVERING)) {
1606 mds->mdcache->recovery_queue.prioritize(in);
1607 }
1608 }
1609
1610 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1611 lock->get_xlock_by_client() != client ||
1612 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1613 break;
1614
1615 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1616 mut->start_locking(lock);
1617 simple_xlock(lock);
1618 } else {
1619 simple_lock(lock);
1620 }
1621 }
1622
1623 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1624 nudge_log(lock);
1625 return false;
1626 } else {
1627 // replica
1628 assert(lock->get_sm()->can_remote_xlock);
1629 assert(!mut->slave_request);
1630
1631 // wait for single auth
1632 if (lock->get_parent()->is_ambiguous_auth()) {
1633 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1634 new C_MDS_RetryRequest(mdcache, mut));
1635 return false;
1636 }
1637
1638 // wait for active auth
1639 mds_rank_t auth = lock->get_parent()->authority().first;
1640 if (mds->is_cluster_degraded() &&
1641 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1642 dout(7) << " mds." << auth << " is not active" << dendl;
1643 if (mut->more()->waiting_on_slave.empty())
1644 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1645 return false;
1646 }
1647
1648 // send lock request
1649 mut->more()->slaves.insert(auth);
1650 mut->start_locking(lock, auth);
1651 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1652 MMDSSlaveRequest::OP_XLOCK);
1653 r->set_lock_type(lock->get_type());
1654 lock->get_parent()->set_object_info(r->get_object_info());
1655 mds->send_message_mds(r, auth);
1656
1657 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1658 mut->more()->waiting_on_slave.insert(auth);
1659
1660 return false;
1661 }
1662}
1663
1664void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1665{
1666 assert(!lock->is_stable());
1667 if (lock->get_num_rdlocks() == 0 &&
1668 lock->get_num_wrlocks() == 0 &&
1669 lock->get_num_client_lease() == 0 &&
1670 lock->get_state() != LOCK_XLOCKSNAP &&
1671 lock->get_type() != CEPH_LOCK_DN) {
1672 CInode *in = static_cast<CInode*>(lock->get_parent());
1673 client_t loner = in->get_target_loner();
1674 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1675 lock->set_state(LOCK_EXCL);
1676 lock->get_parent()->auth_unpin(lock);
1677 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1678 if (lock->get_cap_shift())
1679 *pneed_issue = true;
1680 if (lock->get_parent()->is_auth() &&
1681 lock->is_stable())
1682 try_eval(lock, pneed_issue);
1683 return;
1684 }
1685 }
1686 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1687 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1688}
1689
1690void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1691{
1692 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1693 lock->get_type() == CEPH_LOCK_DVERSION)
1694 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1695
1696 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1697
1698 client_t xlocker = lock->get_xlock_by_client();
1699
1700 // drop ref
1701 lock->put_xlock();
1702 assert(mut);
1703 mut->xlocks.erase(lock);
1704 mut->locks.erase(lock);
1705
1706 bool do_issue = false;
1707
1708 // remote xlock?
1709 if (!lock->get_parent()->is_auth()) {
1710 assert(lock->get_sm()->can_remote_xlock);
1711
1712 // tell auth
1713 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1714 mds_rank_t auth = lock->get_parent()->authority().first;
1715 if (!mds->is_cluster_degraded() ||
1716 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1717 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1718 MMDSSlaveRequest::OP_UNXLOCK);
1719 slavereq->set_lock_type(lock->get_type());
1720 lock->get_parent()->set_object_info(slavereq->get_object_info());
1721 mds->send_message_mds(slavereq, auth);
1722 }
1723 // others waiting?
1724 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1725 SimpleLock::WAIT_WR |
1726 SimpleLock::WAIT_RD, 0);
1727 } else {
1728 if (lock->get_num_xlocks() == 0) {
1729 if (lock->get_state() == LOCK_LOCK_XLOCK)
1730 lock->set_state(LOCK_XLOCKDONE);
1731 _finish_xlock(lock, xlocker, &do_issue);
1732 }
1733 }
1734
1735 if (do_issue) {
1736 CInode *in = static_cast<CInode*>(lock->get_parent());
1737 if (in->is_head()) {
1738 if (pneed_issue)
1739 *pneed_issue = true;
1740 else
1741 issue_caps(in);
1742 }
1743 }
1744}
1745
1746void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1747{
1748 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1749
1750 lock->put_xlock();
1751 mut->xlocks.erase(lock);
1752 mut->locks.erase(lock);
1753
1754 MDSCacheObject *p = lock->get_parent();
1755 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1756
1757 if (!lock->is_stable())
1758 lock->get_parent()->auth_unpin(lock);
1759
1760 lock->set_state(LOCK_LOCK);
1761}
1762
1763void Locker::xlock_import(SimpleLock *lock)
1764{
1765 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1766 lock->get_parent()->auth_pin(lock);
1767}
1768
1769
1770
1771// file i/o -----------------------------------------
1772
1773version_t Locker::issue_file_data_version(CInode *in)
1774{
1775 dout(7) << "issue_file_data_version on " << *in << dendl;
1776 return in->inode.file_data_version;
1777}
1778
1779class C_Locker_FileUpdate_finish : public LockerLogContext {
1780 CInode *in;
1781 MutationRef mut;
1782 bool share_max;
1783 bool need_issue;
1784 client_t client;
1785 MClientCaps *ack;
1786public:
1787 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1788 bool sm=false, bool ni=false, client_t c=-1,
1789 MClientCaps *ac = 0)
1790 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1791 client(c), ack(ac) {
1792 in->get(CInode::PIN_PTRWAITER);
1793 }
1794 void finish(int r) override {
1795 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1796 in->put(CInode::PIN_PTRWAITER);
1797 }
1798};
1799
1800void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1801 client_t client, MClientCaps *ack)
1802{
1803 dout(10) << "file_update_finish on " << *in << dendl;
1804 in->pop_and_dirty_projected_inode(mut->ls);
1805
1806 mut->apply();
1807
1808 if (ack) {
1809 Session *session = mds->get_session(client);
1810 if (session) {
1811 // "oldest flush tid" > 0 means client uses unique TID for each flush
1812 if (ack->get_oldest_flush_tid() > 0)
1813 session->add_completed_flush(ack->get_client_tid());
1814 mds->send_message_client_counted(ack, session);
1815 } else {
1816 dout(10) << " no session for client." << client << " " << *ack << dendl;
1817 ack->put();
1818 }
1819 }
1820
1821 set<CInode*> need_issue;
1822 drop_locks(mut.get(), &need_issue);
1823
1824 if (!in->is_head() && !in->client_snap_caps.empty()) {
1825 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1826 // check for snap writeback completion
1827 bool gather = false;
1828 compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1829 while (p != in->client_snap_caps.end()) {
1830 SimpleLock *lock = in->get_lock(p->first);
1831 assert(lock);
1832 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1833 << " lock " << *lock << " on " << *in << dendl;
1834 lock->put_wrlock();
1835
1836 p->second.erase(client);
1837 if (p->second.empty()) {
1838 gather = true;
1839 in->client_snap_caps.erase(p++);
1840 } else
1841 ++p;
1842 }
1843 if (gather) {
1844 if (in->client_snap_caps.empty())
1845 in->item_open_file.remove_myself();
1846 eval_cap_gather(in, &need_issue);
1847 }
1848 } else {
1849 if (issue_client_cap && need_issue.count(in) == 0) {
1850 Capability *cap = in->get_client_cap(client);
1851 if (cap && (cap->wanted() & ~cap->pending()))
1852 issue_caps(in, cap);
1853 }
1854
1855 if (share_max && in->is_auth() &&
1856 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1857 share_inode_max_size(in);
1858 }
1859 issue_caps_set(need_issue);
1860
1861 // auth unpin after issuing caps
1862 mut->cleanup();
1863}
1864
1865Capability* Locker::issue_new_caps(CInode *in,
1866 int mode,
1867 Session *session,
1868 SnapRealm *realm,
1869 bool is_replay)
1870{
1871 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1872 bool is_new;
1873
1874 // if replay, try to reconnect cap, and otherwise do nothing.
1875 if (is_replay) {
1876 mds->mdcache->try_reconnect_cap(in, session);
1877 return 0;
1878 }
1879
1880 // my needs
1881 assert(session->info.inst.name.is_client());
31f18b77 1882 client_t my_client = session->info.inst.name.num();
7c673cae
FG
1883 int my_want = ceph_caps_for_mode(mode);
1884
1885 // register a capability
1886 Capability *cap = in->get_client_cap(my_client);
1887 if (!cap) {
1888 // new cap
1889 cap = in->add_client_cap(my_client, session, realm);
1890 cap->set_wanted(my_want);
1891 cap->mark_new();
1892 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1893 is_new = true;
1894 } else {
1895 is_new = false;
1896 // make sure it wants sufficient caps
1897 if (my_want & ~cap->wanted()) {
1898 // augment wanted caps for this client
1899 cap->set_wanted(cap->wanted() | my_want);
1900 }
1901 }
1902
1903 if (in->is_auth()) {
1904 // [auth] twiddle mode?
1905 eval(in, CEPH_CAP_LOCKS);
1906
1907 if (_need_flush_mdlog(in, my_want))
1908 mds->mdlog->flush();
1909
1910 } else {
1911 // [replica] tell auth about any new caps wanted
1912 request_inode_file_caps(in);
1913 }
1914
1915 // issue caps (pot. incl new one)
1916 //issue_caps(in); // note: _eval above may have done this already...
1917
1918 // re-issue whatever we can
1919 //cap->issue(cap->pending());
1920
1921 if (is_new)
1922 cap->dec_suppress();
1923
1924 return cap;
1925}
1926
1927
1928void Locker::issue_caps_set(set<CInode*>& inset)
1929{
1930 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1931 issue_caps(*p);
1932}
1933
1934bool Locker::issue_caps(CInode *in, Capability *only_cap)
1935{
1936 // allowed caps are determined by the lock mode.
1937 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1938 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1939 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1940
1941 client_t loner = in->get_loner();
1942 if (loner >= 0) {
1943 dout(7) << "issue_caps loner client." << loner
1944 << " allowed=" << ccap_string(loner_allowed)
1945 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1946 << ", others allowed=" << ccap_string(all_allowed)
1947 << " on " << *in << dendl;
1948 } else {
1949 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1950 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1951 << " on " << *in << dendl;
1952 }
1953
1954 assert(in->is_head());
1955
1956 // count conflicts with
1957 int nissued = 0;
1958
1959 // client caps
1960 map<client_t, Capability*>::iterator it;
1961 if (only_cap)
1962 it = in->client_caps.find(only_cap->get_client());
1963 else
1964 it = in->client_caps.begin();
1965 for (; it != in->client_caps.end(); ++it) {
1966 Capability *cap = it->second;
1967 if (cap->is_stale())
1968 continue;
1969
1970 // do not issue _new_ bits when size|mtime is projected
1971 int allowed;
1972 if (loner == it->first)
1973 allowed = loner_allowed;
1974 else
1975 allowed = all_allowed;
1976
1977 // add in any xlocker-only caps (for locks this client is the xlocker for)
1978 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
1979
1980 Session *session = mds->get_session(it->first);
1981 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
1982 !(session && session->connection &&
1983 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
1984 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
1985
1986 int pending = cap->pending();
1987 int wanted = cap->wanted();
1988
1989 dout(20) << " client." << it->first
1990 << " pending " << ccap_string(pending)
1991 << " allowed " << ccap_string(allowed)
1992 << " wanted " << ccap_string(wanted)
1993 << dendl;
1994
1995 if (!(pending & ~allowed)) {
1996 // skip if suppress or new, and not revocation
1997 if (cap->is_new() || cap->is_suppress()) {
1998 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
1999 continue;
2000 }
2001 }
2002
2003 // notify clients about deleted inode, to make sure they release caps ASAP.
2004 if (in->inode.nlink == 0)
2005 wanted |= CEPH_CAP_LINK_SHARED;
2006
2007 // are there caps that the client _wants_ and can have, but aren't pending?
2008 // or do we need to revoke?
2009 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2010 (pending & ~allowed)) { // need to revoke ~allowed caps.
2011 // issue
2012 nissued++;
2013
2014 // include caps that clients generally like, while we're at it.
2015 int likes = in->get_caps_liked();
2016 int before = pending;
2017 long seq;
2018 if (pending & ~allowed)
2019 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2020 else
2021 seq = cap->issue((wanted|likes) & allowed);
2022 int after = cap->pending();
2023
2024 if (cap->is_new()) {
2025 // haven't send caps to client yet
2026 if (before & ~after)
2027 cap->confirm_receipt(seq, after);
2028 } else {
2029 dout(7) << " sending MClientCaps to client." << it->first
2030 << " seq " << cap->get_last_seq()
2031 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2032 << dendl;
2033
2034 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2035 if (op == CEPH_CAP_OP_REVOKE) {
2036 revoking_caps.push_back(&cap->item_revoking_caps);
2037 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2038 cap->set_last_revoke_stamp(ceph_clock_now());
2039 cap->reset_num_revoke_warnings();
2040 }
2041
2042 MClientCaps *m = new MClientCaps(op, in->ino(),
2043 in->find_snaprealm()->inode->ino(),
2044 cap->get_cap_id(), cap->get_last_seq(),
2045 after, wanted, 0,
2046 cap->get_mseq(),
2047 mds->get_osd_epoch_barrier());
2048 in->encode_cap_message(m, cap);
2049
2050 mds->send_message_client_counted(m, it->first);
2051 }
2052 }
2053
2054 if (only_cap)
2055 break;
2056 }
2057
2058 return (nissued == 0); // true if no re-issued, no callbacks
2059}
2060
2061void Locker::issue_truncate(CInode *in)
2062{
2063 dout(7) << "issue_truncate on " << *in << dendl;
2064
2065 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2066 it != in->client_caps.end();
2067 ++it) {
2068 Capability *cap = it->second;
2069 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2070 in->ino(),
2071 in->find_snaprealm()->inode->ino(),
2072 cap->get_cap_id(), cap->get_last_seq(),
2073 cap->pending(), cap->wanted(), 0,
2074 cap->get_mseq(),
2075 mds->get_osd_epoch_barrier());
2076 in->encode_cap_message(m, cap);
2077 mds->send_message_client_counted(m, it->first);
2078 }
2079
2080 // should we increase max_size?
2081 if (in->is_auth() && in->is_file())
2082 check_inode_max_size(in);
2083}
2084
2085
2086void Locker::revoke_stale_caps(Capability *cap)
2087{
2088 CInode *in = cap->get_inode();
2089 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2090 // if export succeeds, the cap will be removed. if export fails, we need to
2091 // revoke the cap if it's still stale.
2092 in->state_set(CInode::STATE_EVALSTALECAPS);
2093 return;
2094 }
2095
2096 int issued = cap->issued();
2097 if (issued & ~CEPH_CAP_PIN) {
2098 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2099 cap->revoke();
2100
2101 if (in->is_auth() &&
2102 in->inode.client_ranges.count(cap->get_client()))
2103 in->state_set(CInode::STATE_NEEDSRECOVER);
2104
2105 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2106 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2107 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2108 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2109
2110 if (in->is_auth()) {
2111 try_eval(in, CEPH_CAP_LOCKS);
2112 } else {
2113 request_inode_file_caps(in);
2114 }
2115 }
2116}
2117
2118void Locker::revoke_stale_caps(Session *session)
2119{
2120 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2121
2122 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2123 Capability *cap = *p;
2124 cap->mark_stale();
2125 revoke_stale_caps(cap);
2126 }
2127}
2128
2129void Locker::resume_stale_caps(Session *session)
2130{
2131 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2132
2133 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2134 Capability *cap = *p;
2135 CInode *in = cap->get_inode();
2136 assert(in->is_head());
2137 if (cap->is_stale()) {
2138 dout(10) << " clearing stale flag on " << *in << dendl;
2139 cap->clear_stale();
2140
2141 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2142 // if export succeeds, the cap will be removed. if export fails,
2143 // we need to re-issue the cap if it's not stale.
2144 in->state_set(CInode::STATE_EVALSTALECAPS);
2145 continue;
2146 }
2147
2148 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2149 issue_caps(in, cap);
2150 }
2151 }
2152}
2153
2154void Locker::remove_stale_leases(Session *session)
2155{
2156 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2157 xlist<ClientLease*>::iterator p = session->leases.begin();
2158 while (!p.end()) {
2159 ClientLease *l = *p;
2160 ++p;
2161 CDentry *parent = static_cast<CDentry*>(l->parent);
2162 dout(15) << " removing lease on " << *parent << dendl;
2163 parent->remove_client_lease(l, this);
2164 }
2165}
2166
2167
2168class C_MDL_RequestInodeFileCaps : public LockerContext {
2169 CInode *in;
2170public:
2171 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2172 in->get(CInode::PIN_PTRWAITER);
2173 }
2174 void finish(int r) override {
2175 if (!in->is_auth())
2176 locker->request_inode_file_caps(in);
2177 in->put(CInode::PIN_PTRWAITER);
2178 }
2179};
2180
2181void Locker::request_inode_file_caps(CInode *in)
2182{
2183 assert(!in->is_auth());
2184
2185 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2186 if (wanted != in->replica_caps_wanted) {
2187 // wait for single auth
2188 if (in->is_ambiguous_auth()) {
2189 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2190 new C_MDL_RequestInodeFileCaps(this, in));
2191 return;
2192 }
2193
2194 mds_rank_t auth = in->authority().first;
2195 if (mds->is_cluster_degraded() &&
2196 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2197 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2198 return;
2199 }
2200
2201 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2202 << " was " << ccap_string(in->replica_caps_wanted)
2203 << " on " << *in << " to mds." << auth << dendl;
2204
2205 in->replica_caps_wanted = wanted;
2206
2207 if (!mds->is_cluster_degraded() ||
2208 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2209 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2210 auth);
2211 }
2212}
2213
2214/* This function DOES put the passed message before returning */
2215void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2216{
2217 // nobody should be talking to us during recovery.
2218 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2219
2220 // ok
2221 CInode *in = mdcache->get_inode(m->get_ino());
2222 mds_rank_t from = mds_rank_t(m->get_source().num());
2223
2224 assert(in);
2225 assert(in->is_auth());
2226
2227 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2228
2229 if (m->get_caps())
2230 in->mds_caps_wanted[from] = m->get_caps();
2231 else
2232 in->mds_caps_wanted.erase(from);
2233
2234 try_eval(in, CEPH_CAP_LOCKS);
2235 m->put();
2236}
2237
2238
2239class C_MDL_CheckMaxSize : public LockerContext {
2240 CInode *in;
2241 uint64_t new_max_size;
2242 uint64_t newsize;
2243 utime_t mtime;
2244
2245public:
2246 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2247 uint64_t _newsize, utime_t _mtime) :
2248 LockerContext(l), in(i),
2249 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2250 {
2251 in->get(CInode::PIN_PTRWAITER);
2252 }
2253 void finish(int r) override {
2254 if (in->is_auth())
2255 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2256 in->put(CInode::PIN_PTRWAITER);
2257 }
2258};
2259
31f18b77
FG
2260uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2261{
2262 uint64_t new_max = (size + 1) << 1;
2263 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2264 if (max_inc > 0) {
2265 max_inc *= pi->get_layout_size_increment();
2266 new_max = MIN(new_max, size + max_inc);
2267 }
2268 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2269}
7c673cae
FG
2270
2271void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2272 map<client_t,client_writeable_range_t> *new_ranges,
2273 bool *max_increased)
2274{
2275 inode_t *latest = in->get_projected_inode();
2276 uint64_t ms;
2277 if(latest->has_layout()) {
31f18b77 2278 ms = calc_new_max_size(latest, size);
7c673cae
FG
2279 } else {
2280 // Layout-less directories like ~mds0/, have zero size
2281 ms = 0;
2282 }
2283
2284 // increase ranges as appropriate.
2285 // shrink to 0 if no WR|BUFFER caps issued.
2286 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2287 p != in->client_caps.end();
2288 ++p) {
2289 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2290 client_writeable_range_t& nr = (*new_ranges)[p->first];
2291 nr.range.first = 0;
2292 if (latest->client_ranges.count(p->first)) {
2293 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2294 if (ms > oldr.range.last)
2295 *max_increased = true;
2296 nr.range.last = MAX(ms, oldr.range.last);
2297 nr.follows = oldr.follows;
2298 } else {
31f18b77 2299 *max_increased = true;
7c673cae
FG
2300 nr.range.last = ms;
2301 nr.follows = in->first - 1;
2302 }
2303 }
2304 }
2305}
2306
2307bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2308 uint64_t new_max_size, uint64_t new_size,
2309 utime_t new_mtime)
2310{
2311 assert(in->is_auth());
2312 assert(in->is_file());
2313
2314 inode_t *latest = in->get_projected_inode();
2315 map<client_t, client_writeable_range_t> new_ranges;
2316 uint64_t size = latest->size;
2317 bool update_size = new_size > 0;
2318 bool update_max = false;
2319 bool max_increased = false;
2320
2321 if (update_size) {
2322 new_size = size = MAX(size, new_size);
2323 new_mtime = MAX(new_mtime, latest->mtime);
2324 if (latest->size == new_size && latest->mtime == new_mtime)
2325 update_size = false;
2326 }
2327
2328 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2329
2330 if (max_increased || latest->client_ranges != new_ranges)
2331 update_max = true;
2332
2333 if (!update_size && !update_max) {
2334 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2335 return false;
2336 }
2337
2338 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2339 << " update_size " << update_size
2340 << " on " << *in << dendl;
2341
2342 if (in->is_frozen()) {
2343 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2344 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2345 new_max_size,
2346 new_size,
2347 new_mtime);
2348 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2349 return false;
2350 }
2351 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2352 // lock?
2353 if (in->filelock.is_stable()) {
2354 if (in->get_target_loner() >= 0)
2355 file_excl(&in->filelock);
2356 else
2357 simple_lock(&in->filelock);
2358 }
2359 if (!in->filelock.can_wrlock(in->get_loner())) {
2360 // try again later
2361 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2362 new_max_size,
2363 new_size,
2364 new_mtime);
2365
2366 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2367 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2368 return false;
2369 }
2370 }
2371
2372 MutationRef mut(new MutationImpl());
2373 mut->ls = mds->mdlog->get_current_segment();
2374
2375 inode_t *pi = in->project_inode();
2376 pi->version = in->pre_dirty();
2377
2378 if (update_max) {
2379 dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2380 pi->client_ranges = new_ranges;
2381 }
2382
2383 if (update_size) {
2384 dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2385 pi->size = new_size;
2386 pi->rstat.rbytes = new_size;
2387 dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2388 pi->mtime = new_mtime;
2389 }
2390
2391 // use EOpen if the file is still open; otherwise, use EUpdate.
2392 // this is just an optimization to push open files forward into
2393 // newer log segments.
2394 LogEvent *le;
2395 EMetaBlob *metablob;
2396 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2397 EOpen *eo = new EOpen(mds->mdlog);
2398 eo->add_ino(in->ino());
2399 metablob = &eo->metablob;
2400 le = eo;
2401 mut->ls->open_files.push_back(&in->item_open_file);
2402 } else {
2403 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2404 metablob = &eu->metablob;
2405 le = eu;
2406 }
2407 mds->mdlog->start_entry(le);
2408 if (update_size) { // FIXME if/when we do max_size nested accounting
2409 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2410 // no cow, here!
2411 CDentry *parent = in->get_projected_parent_dn();
2412 metablob->add_primary_dentry(parent, in, true);
2413 } else {
2414 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2415 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2416 }
2417 mds->mdlog->submit_entry(le,
2418 new C_Locker_FileUpdate_finish(this, in, mut, true));
2419 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2420 mut->auth_pin(in);
2421
2422 // make max_size _increase_ timely
2423 if (max_increased)
2424 mds->mdlog->flush();
2425
2426 return true;
2427}
2428
2429
2430void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2431{
2432 /*
2433 * only share if currently issued a WR cap. if client doesn't have it,
2434 * file_max doesn't matter, and the client will get it if/when they get
2435 * the cap later.
2436 */
2437 dout(10) << "share_inode_max_size on " << *in << dendl;
2438 map<client_t, Capability*>::iterator it;
2439 if (only_cap)
2440 it = in->client_caps.find(only_cap->get_client());
2441 else
2442 it = in->client_caps.begin();
2443 for (; it != in->client_caps.end(); ++it) {
2444 const client_t client = it->first;
2445 Capability *cap = it->second;
2446 if (cap->is_suppress())
2447 continue;
2448 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2449 dout(10) << "share_inode_max_size with client." << client << dendl;
2450 cap->inc_last_seq();
2451 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2452 in->ino(),
2453 in->find_snaprealm()->inode->ino(),
2454 cap->get_cap_id(), cap->get_last_seq(),
2455 cap->pending(), cap->wanted(), 0,
2456 cap->get_mseq(),
2457 mds->get_osd_epoch_barrier());
2458 in->encode_cap_message(m, cap);
2459 mds->send_message_client_counted(m, client);
2460 }
2461 if (only_cap)
2462 break;
2463 }
2464}
2465
2466bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2467{
2468 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2469 * pending mutations. */
2470 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2471 in->filelock.is_unstable_and_locked()) ||
2472 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2473 in->authlock.is_unstable_and_locked()) ||
2474 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2475 in->linklock.is_unstable_and_locked()) ||
2476 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2477 in->xattrlock.is_unstable_and_locked()))
2478 return true;
2479 return false;
2480}
2481
2482void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2483{
2484 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2485 dout(10) << " wanted " << ccap_string(cap->wanted())
2486 << " -> " << ccap_string(wanted) << dendl;
2487 cap->set_wanted(wanted);
2488 } else if (wanted & ~cap->wanted()) {
2489 dout(10) << " wanted " << ccap_string(cap->wanted())
2490 << " -> " << ccap_string(wanted)
2491 << " (added caps even though we had seq mismatch!)" << dendl;
2492 cap->set_wanted(wanted | cap->wanted());
2493 } else {
2494 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2495 << " -> " << ccap_string(wanted)
2496 << " (issue_seq " << issue_seq << " != last_issue "
2497 << cap->get_last_issue() << ")" << dendl;
2498 return;
2499 }
2500
2501 CInode *cur = cap->get_inode();
2502 if (!cur->is_auth()) {
2503 request_inode_file_caps(cur);
2504 return;
2505 }
2506
2507 if (cap->wanted() == 0) {
2508 if (cur->item_open_file.is_on_list() &&
2509 !cur->is_any_caps_wanted()) {
2510 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2511 cur->item_open_file.remove_myself();
2512 }
2513 } else {
2514 if (cur->state_test(CInode::STATE_RECOVERING) &&
2515 (cap->wanted() & (CEPH_CAP_FILE_RD |
2516 CEPH_CAP_FILE_WR))) {
2517 mds->mdcache->recovery_queue.prioritize(cur);
2518 }
2519
2520 if (!cur->item_open_file.is_on_list()) {
2521 dout(10) << " adding to open file list " << *cur << dendl;
2522 assert(cur->last == CEPH_NOSNAP);
2523 LogSegment *ls = mds->mdlog->get_current_segment();
2524 EOpen *le = new EOpen(mds->mdlog);
2525 mds->mdlog->start_entry(le);
2526 le->add_clean_inode(cur);
2527 ls->open_files.push_back(&cur->item_open_file);
2528 mds->mdlog->submit_entry(le);
2529 }
2530 }
2531}
2532
2533
2534
2535void Locker::_do_null_snapflush(CInode *head_in, client_t client)
2536{
2537 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2538 compact_map<snapid_t, set<client_t> >::iterator p = head_in->client_need_snapflush.begin();
2539 while (p != head_in->client_need_snapflush.end()) {
2540 snapid_t snapid = p->first;
2541 set<client_t>& clients = p->second;
2542 ++p; // be careful, q loop below depends on this
2543
2544 if (clients.count(client)) {
2545 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2546 CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
2547 if (!sin) {
2548 // hrm, look forward until we find the inode.
2549 // (we can only look it up by the last snapid it is valid for)
2550 dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
2551 for (compact_map<snapid_t, set<client_t> >::iterator q = p; // p is already at next entry
2552 q != head_in->client_need_snapflush.end();
2553 ++q) {
2554 dout(10) << " trying snapid " << q->first << dendl;
2555 sin = mdcache->get_inode(head_in->ino(), q->first);
2556 if (sin) {
2557 assert(sin->first <= snapid);
2558 break;
2559 }
2560 dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
2561 }
2562 if (!sin && head_in->is_multiversion())
2563 sin = head_in;
2564 assert(sin);
2565 }
2566 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2567 head_in->remove_need_snapflush(sin, snapid, client);
2568 }
2569 }
2570}
2571
2572
2573bool Locker::should_defer_client_cap_frozen(CInode *in)
2574{
2575 /*
2576 * This policy needs to be AT LEAST as permissive as allowing a client request
2577 * to go forward, or else a client request can release something, the release
2578 * gets deferred, but the request gets processed and deadlocks because when the
2579 * caps can't get revoked.
2580 *
2581 * Currently, a request wait if anything locked is freezing (can't
2582 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2583 * _MUST_ be in the lock/auth_pin set.
2584 *
2585 * auth_pins==0 implies no unstable lock and not auth pinnned by
2586 * client request, otherwise continue even it's freezing.
2587 */
2588 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2589}
2590
2591/*
2592 * This function DOES put the passed message before returning
2593 */
2594void Locker::handle_client_caps(MClientCaps *m)
2595{
2596 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2597 client_t client = m->get_source().num();
2598
2599 snapid_t follows = m->get_snap_follows();
2600 dout(7) << "handle_client_caps "
2601 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2602 << " on " << m->get_ino()
2603 << " tid " << m->get_client_tid() << " follows " << follows
2604 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2605
2606 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2607 if (!session) {
2608 dout(5) << " no session, dropping " << *m << dendl;
2609 m->put();
2610 return;
2611 }
2612 if (session->is_closed() ||
2613 session->is_closing() ||
2614 session->is_killing()) {
2615 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2616 m->put();
2617 return;
2618 }
2619 if (mds->is_reconnect() &&
2620 m->get_dirty() && m->get_client_tid() > 0 &&
2621 !session->have_completed_flush(m->get_client_tid())) {
2622 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2623 }
2624 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2625 return;
2626 }
2627
2628 if (m->get_client_tid() > 0 && session &&
2629 session->have_completed_flush(m->get_client_tid())) {
2630 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2631 << " for client." << client << dendl;
2632 MClientCaps *ack;
2633 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2634 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2635 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2636 } else {
2637 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2638 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2639 mds->get_osd_epoch_barrier());
2640 }
2641 ack->set_snap_follows(follows);
2642 ack->set_client_tid(m->get_client_tid());
2643 mds->send_message_client_counted(ack, m->get_connection());
2644 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2645 m->put();
2646 return;
2647 } else {
2648 // fall-thru because the message may release some caps
2649 m->clear_dirty();
2650 m->set_op(CEPH_CAP_OP_UPDATE);
2651 }
2652 }
2653
2654 // "oldest flush tid" > 0 means client uses unique TID for each flush
2655 if (m->get_oldest_flush_tid() > 0 && session) {
2656 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2657 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2658
2659 if (session->get_num_trim_flushes_warnings() > 0 &&
2660 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2661 session->reset_num_trim_flushes_warnings();
2662 } else {
2663 if (session->get_num_completed_flushes() >=
2664 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2665 session->inc_num_trim_flushes_warnings();
2666 stringstream ss;
2667 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2668 << m->get_oldest_flush_tid() << "), "
2669 << session->get_num_completed_flushes()
2670 << " completed flushes recorded in session";
2671 mds->clog->warn() << ss.str();
2672 dout(20) << __func__ << " " << ss.str() << dendl;
2673 }
2674 }
2675 }
2676
2677 CInode *head_in = mdcache->get_inode(m->get_ino());
2678 if (!head_in) {
2679 if (mds->is_clientreplay()) {
2680 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2681 << ", will try again after replayed client requests" << dendl;
2682 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2683 return;
2684 }
2685 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2686 m->put();
2687 return;
2688 }
2689
2690 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2691 // Pause RADOS operations until we see the required epoch
2692 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2693 }
2694
2695 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2696 // Record the barrier so that we will retransmit it to clients
2697 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2698 }
2699
2700 CInode *in = head_in;
2701 if (follows > 0) {
2702 in = mdcache->pick_inode_snap(head_in, follows);
2703 if (in != head_in)
2704 dout(10) << " head inode " << *head_in << dendl;
2705 }
2706 dout(10) << " cap inode " << *in << dendl;
2707
2708 Capability *cap = 0;
2709 cap = in->get_client_cap(client);
2710 if (!cap && in != head_in)
2711 cap = head_in->get_client_cap(client);
2712 if (!cap) {
2713 dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
2714 m->put();
2715 return;
2716 }
2717 assert(cap);
2718
2719 // freezing|frozen?
2720 if (should_defer_client_cap_frozen(in)) {
2721 dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
2722 in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2723 return;
2724 }
2725 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2726 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2727 << ", dropping" << dendl;
2728 m->put();
2729 return;
2730 }
2731
2732 int op = m->get_op();
2733
2734 // flushsnap?
2735 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2736 if (!in->is_auth()) {
2737 dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
2738 goto out;
2739 }
2740
2741 SnapRealm *realm = in->find_snaprealm();
2742 snapid_t snap = realm->get_snap_following(follows);
2743 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2744
2745 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2746 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2747 // case we get a dup response, so whatever.)
2748 MClientCaps *ack = 0;
2749 if (m->get_dirty()) {
2750 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2751 ack->set_snap_follows(follows);
2752 ack->set_client_tid(m->get_client_tid());
2753 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2754 }
2755
2756 if (in == head_in ||
2757 (head_in->client_need_snapflush.count(snap) &&
2758 head_in->client_need_snapflush[snap].count(client))) {
2759 dout(7) << " flushsnap snap " << snap
2760 << " client." << client << " on " << *in << dendl;
2761
2762 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2763 if (in == head_in)
2764 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2765
2766 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2767
2768 if (in != head_in)
2769 head_in->remove_need_snapflush(in, snap, client);
2770
2771 } else {
2772 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2773 if (ack)
2774 mds->send_message_client_counted(ack, m->get_connection());
2775 }
2776 goto out;
2777 }
2778
2779 if (cap->get_cap_id() != m->get_cap_id()) {
2780 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2781 } else {
2782 // intermediate snap inodes
2783 while (in != head_in) {
2784 assert(in->last != CEPH_NOSNAP);
2785 if (in->is_auth() && m->get_dirty()) {
2786 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2787 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2788 }
2789 in = mdcache->pick_inode_snap(head_in, in->last);
2790 }
2791
2792 // head inode, and cap
2793 MClientCaps *ack = 0;
2794
2795 int caps = m->get_caps();
2796 if (caps & ~cap->issued()) {
2797 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2798 caps &= cap->issued();
2799 }
2800
2801 cap->confirm_receipt(m->get_seq(), caps);
2802 dout(10) << " follows " << follows
2803 << " retains " << ccap_string(m->get_caps())
2804 << " dirty " << ccap_string(m->get_dirty())
2805 << " on " << *in << dendl;
2806
2807
2808 // missing/skipped snapflush?
2809 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2810 // presently only does so when it has actual dirty metadata. But, we
2811 // set up the need_snapflush stuff based on the issued caps.
2812 // We can infer that the client WONT send a FLUSHSNAP once they have
2813 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2814 // update/release).
2815 if (!head_in->client_need_snapflush.empty()) {
2816 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2817 _do_null_snapflush(head_in, client);
2818 } else {
2819 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2820 }
2821 }
2822
2823 if (m->get_dirty() && in->is_auth()) {
2824 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2825 << " seq " << m->get_seq() << " on " << *in << dendl;
2826 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2827 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2828 ack->set_client_tid(m->get_client_tid());
2829 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2830 }
2831
2832 // filter wanted based on what we could ever give out (given auth/replica status)
2833 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2834 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2835 if (new_wanted != cap->wanted()) {
2836 if (!need_flush && (new_wanted & ~cap->pending())) {
2837 // exapnding caps. make sure we aren't waiting for a log flush
2838 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2839 }
2840
2841 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2842 }
2843
2844 if (in->is_auth() &&
2845 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2846 // updated
2847 eval(in, CEPH_CAP_LOCKS);
2848
2849 if (!need_flush && (cap->wanted() & ~cap->pending()))
2850 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2851 } else {
2852 // no update, ack now.
2853 if (ack)
2854 mds->send_message_client_counted(ack, m->get_connection());
2855
2856 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2857 if (!did_issue && (cap->wanted() & ~cap->pending()))
2858 issue_caps(in, cap);
2859
2860 if (cap->get_last_seq() == 0 &&
2861 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2862 cap->issue_norevoke(cap->issued());
2863 share_inode_max_size(in, cap);
2864 }
2865 }
2866
2867 if (need_flush)
2868 mds->mdlog->flush();
2869 }
2870
2871 out:
2872 m->put();
2873}
2874
2875
2876class C_Locker_RetryRequestCapRelease : public LockerContext {
2877 client_t client;
2878 ceph_mds_request_release item;
2879public:
2880 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2881 LockerContext(l), client(c), item(it) { }
2882 void finish(int r) override {
2883 string dname;
2884 MDRequestRef null_ref;
2885 locker->process_request_cap_release(null_ref, client, item, dname);
2886 }
2887};
2888
2889void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2890 const string &dname)
2891{
2892 inodeno_t ino = (uint64_t)item.ino;
2893 uint64_t cap_id = item.cap_id;
2894 int caps = item.caps;
2895 int wanted = item.wanted;
2896 int seq = item.seq;
2897 int issue_seq = item.issue_seq;
2898 int mseq = item.mseq;
2899
2900 CInode *in = mdcache->get_inode(ino);
2901 if (!in)
2902 return;
2903
2904 if (dname.length()) {
2905 frag_t fg = in->pick_dirfrag(dname);
2906 CDir *dir = in->get_dirfrag(fg);
2907 if (dir) {
2908 CDentry *dn = dir->lookup(dname);
2909 if (dn) {
2910 ClientLease *l = dn->get_client_lease(client);
2911 if (l) {
2912 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2913 dn->remove_client_lease(l, this);
2914 } else {
2915 dout(7) << "process_cap_release client." << client
2916 << " doesn't have lease on " << *dn << dendl;
2917 }
2918 } else {
2919 dout(7) << "process_cap_release client." << client << " released lease on dn "
2920 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2921 }
2922 }
2923 }
2924
2925 Capability *cap = in->get_client_cap(client);
2926 if (!cap)
2927 return;
2928
2929 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2930 << (mdr ? "" : " (DEFERRED, no mdr)")
2931 << dendl;
2932
2933 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2934 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2935 return;
2936 }
2937
2938 if (cap->get_cap_id() != cap_id) {
2939 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2940 return;
2941 }
2942
2943 if (should_defer_client_cap_frozen(in)) {
2944 dout(7) << " frozen, deferring" << dendl;
2945 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2946 return;
2947 }
2948
2949 if (caps & ~cap->issued()) {
2950 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2951 caps &= cap->issued();
2952 }
2953 cap->confirm_receipt(seq, caps);
2954
2955 if (!in->client_need_snapflush.empty() &&
2956 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2957 _do_null_snapflush(in, client);
2958 }
2959
2960 adjust_cap_wanted(cap, wanted, issue_seq);
2961
2962 if (mdr)
2963 cap->inc_suppress();
2964 eval(in, CEPH_CAP_LOCKS);
2965 if (mdr)
2966 cap->dec_suppress();
2967
2968 // take note; we may need to reissue on this cap later
2969 if (mdr)
2970 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2971}
2972
2973class C_Locker_RetryKickIssueCaps : public LockerContext {
2974 CInode *in;
2975 client_t client;
2976 ceph_seq_t seq;
2977public:
2978 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2979 LockerContext(l), in(i), client(c), seq(s) {
2980 in->get(CInode::PIN_PTRWAITER);
2981 }
2982 void finish(int r) override {
2983 locker->kick_issue_caps(in, client, seq);
2984 in->put(CInode::PIN_PTRWAITER);
2985 }
2986};
2987
2988void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
2989{
2990 Capability *cap = in->get_client_cap(client);
2991 if (!cap || cap->get_last_sent() != seq)
2992 return;
2993 if (in->is_frozen()) {
2994 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
2995 in->add_waiter(CInode::WAIT_UNFREEZE,
2996 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
2997 return;
2998 }
2999 dout(10) << "kick_issue_caps released at current seq " << seq
3000 << ", reissuing" << dendl;
3001 issue_caps(in, cap);
3002}
3003
3004void Locker::kick_cap_releases(MDRequestRef& mdr)
3005{
3006 client_t client = mdr->get_client();
3007 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3008 p != mdr->cap_releases.end();
3009 ++p) {
3010 CInode *in = mdcache->get_inode(p->first);
3011 if (!in)
3012 continue;
3013 kick_issue_caps(in, client, p->second);
3014 }
3015}
3016
7c673cae
FG
3017/**
3018 * m and ack might be NULL, so don't dereference them unless dirty != 0
3019 */
3020void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3021{
3022 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3023 << " follows " << follows << " snap " << snap
3024 << " on " << *in << dendl;
3025
3026 if (snap == CEPH_NOSNAP) {
3027 // hmm, i guess snap was already deleted? just ack!
3028 dout(10) << " wow, the snap following " << follows
3029 << " was already deleted. nothing to record, just ack." << dendl;
3030 if (ack)
3031 mds->send_message_client_counted(ack, m->get_connection());
3032 return;
3033 }
3034
3035 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3036 mds->mdlog->start_entry(le);
3037 MutationRef mut = new MutationImpl();
3038 mut->ls = mds->mdlog->get_current_segment();
3039
3040 // normal metadata updates that we can apply to the head as well.
3041
3042 // update xattrs?
3043 bool xattrs = false;
3044 map<string,bufferptr> *px = 0;
3045 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3046 m->xattrbl.length() &&
3047 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3048 xattrs = true;
3049
3050 old_inode_t *oi = 0;
3051 if (in->is_multiversion()) {
3052 oi = in->pick_old_inode(snap);
3053 }
3054
3055 inode_t *pi;
3056 if (oi) {
3057 dout(10) << " writing into old inode" << dendl;
3058 pi = in->project_inode();
3059 pi->version = in->pre_dirty();
3060 if (snap > oi->first)
3061 in->split_old_inode(snap);
3062 pi = &oi->inode;
3063 if (xattrs)
3064 px = &oi->xattrs;
3065 } else {
3066 if (xattrs)
3067 px = new map<string,bufferptr>;
3068 pi = in->project_inode(px);
3069 pi->version = in->pre_dirty();
3070 }
3071
3072 _update_cap_fields(in, dirty, m, pi);
3073
3074 // xattr
3075 if (px) {
3076 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3077 << " len " << m->xattrbl.length() << dendl;
3078 pi->xattr_version = m->head.xattr_version;
3079 bufferlist::iterator p = m->xattrbl.begin();
3080 ::decode(*px, p);
3081 }
3082
3083 if (pi->client_ranges.count(client)) {
3084 if (in->last == snap) {
3085 dout(10) << " removing client_range entirely" << dendl;
3086 pi->client_ranges.erase(client);
3087 } else {
3088 dout(10) << " client_range now follows " << snap << dendl;
3089 pi->client_ranges[client].follows = snap;
3090 }
3091 }
3092
3093 mut->auth_pin(in);
3094 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3095 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3096
3097 // "oldest flush tid" > 0 means client uses unique TID for each flush
3098 if (ack && ack->get_oldest_flush_tid() > 0)
3099 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3100 ack->get_oldest_flush_tid());
3101
3102 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3103 client, ack));
3104}
3105
3106void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3107{
3108 if (dirty == 0)
3109 return;
3110
3111 /* m must be valid if there are dirty caps */
3112 assert(m);
3113 uint64_t features = m->get_connection()->get_features();
3114
3115 if (m->get_ctime() > pi->ctime) {
3116 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3117 << " for " << *in << dendl;
3118 pi->ctime = m->get_ctime();
3119 }
3120
3121 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3122 m->get_change_attr() > pi->change_attr) {
3123 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3124 << " for " << *in << dendl;
3125 pi->change_attr = m->get_change_attr();
3126 }
3127
3128 // file
3129 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3130 utime_t atime = m->get_atime();
3131 utime_t mtime = m->get_mtime();
3132 uint64_t size = m->get_size();
3133 version_t inline_version = m->inline_version;
3134
3135 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3136 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3137 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3138 << " for " << *in << dendl;
3139 pi->mtime = mtime;
3140 }
3141 if (in->inode.is_file() && // ONLY if regular file
3142 size > pi->size) {
3143 dout(7) << " size " << pi->size << " -> " << size
3144 << " for " << *in << dendl;
3145 pi->size = size;
3146 pi->rstat.rbytes = size;
3147 }
3148 if (in->inode.is_file() &&
3149 (dirty & CEPH_CAP_FILE_WR) &&
3150 inline_version > pi->inline_data.version) {
3151 pi->inline_data.version = inline_version;
3152 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3153 pi->inline_data.get_data() = m->inline_data;
3154 else
3155 pi->inline_data.free_data();
3156 }
3157 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3158 dout(7) << " atime " << pi->atime << " -> " << atime
3159 << " for " << *in << dendl;
3160 pi->atime = atime;
3161 }
3162 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3163 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3164 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3165 << " for " << *in << dendl;
3166 pi->time_warp_seq = m->get_time_warp_seq();
3167 }
3168 }
3169 // auth
3170 if (dirty & CEPH_CAP_AUTH_EXCL) {
3171 if (m->head.uid != pi->uid) {
3172 dout(7) << " uid " << pi->uid
3173 << " -> " << m->head.uid
3174 << " for " << *in << dendl;
3175 pi->uid = m->head.uid;
3176 }
3177 if (m->head.gid != pi->gid) {
3178 dout(7) << " gid " << pi->gid
3179 << " -> " << m->head.gid
3180 << " for " << *in << dendl;
3181 pi->gid = m->head.gid;
3182 }
3183 if (m->head.mode != pi->mode) {
3184 dout(7) << " mode " << oct << pi->mode
3185 << " -> " << m->head.mode << dec
3186 << " for " << *in << dendl;
3187 pi->mode = m->head.mode;
3188 }
3189 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3190 dout(7) << " btime " << oct << pi->btime
3191 << " -> " << m->get_btime() << dec
3192 << " for " << *in << dendl;
3193 pi->btime = m->get_btime();
3194 }
3195 }
3196}
3197
3198/*
3199 * update inode based on cap flush|flushsnap|wanted.
3200 * adjust max_size, if needed.
3201 * if we update, return true; otherwise, false (no updated needed).
3202 */
3203bool Locker::_do_cap_update(CInode *in, Capability *cap,
3204 int dirty, snapid_t follows,
3205 MClientCaps *m, MClientCaps *ack,
3206 bool *need_flush)
3207{
3208 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3209 << " issued " << ccap_string(cap ? cap->issued() : 0)
3210 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3211 << " on " << *in << dendl;
3212 assert(in->is_auth());
3213 client_t client = m->get_source().num();
3214 inode_t *latest = in->get_projected_inode();
3215
3216 // increase or zero max_size?
3217 uint64_t size = m->get_size();
3218 bool change_max = false;
3219 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3220 uint64_t new_max = old_max;
3221
3222 if (in->is_file()) {
3223 bool forced_change_max = false;
3224 dout(20) << "inode is file" << dendl;
3225 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3226 dout(20) << "client has write caps; m->get_max_size="
3227 << m->get_max_size() << "; old_max=" << old_max << dendl;
3228 if (m->get_max_size() > new_max) {
3229 dout(10) << "client requests file_max " << m->get_max_size()
3230 << " > max " << old_max << dendl;
3231 change_max = true;
3232 forced_change_max = true;
31f18b77 3233 new_max = calc_new_max_size(latest, m->get_max_size());
7c673cae 3234 } else {
31f18b77 3235 new_max = calc_new_max_size(latest, size);
7c673cae
FG
3236
3237 if (new_max > old_max)
3238 change_max = true;
3239 else
3240 new_max = old_max;
3241 }
3242 } else {
3243 if (old_max) {
3244 change_max = true;
3245 new_max = 0;
3246 }
3247 }
3248
3249 if (in->last == CEPH_NOSNAP &&
3250 change_max &&
3251 !in->filelock.can_wrlock(client) &&
3252 !in->filelock.can_force_wrlock(client)) {
3253 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3254 if (in->filelock.is_stable()) {
3255 bool need_issue = false;
3256 if (cap)
3257 cap->inc_suppress();
3258 if (in->mds_caps_wanted.empty() &&
3259 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3260 if (in->filelock.get_state() != LOCK_EXCL)
3261 file_excl(&in->filelock, &need_issue);
3262 } else
3263 simple_lock(&in->filelock, &need_issue);
3264 if (need_issue)
3265 issue_caps(in);
3266 if (cap)
3267 cap->dec_suppress();
3268 }
3269 if (!in->filelock.can_wrlock(client) &&
3270 !in->filelock.can_force_wrlock(client)) {
3271 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3272 forced_change_max ? new_max : 0,
3273 0, utime_t());
3274
3275 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3276 change_max = false;
3277 }
3278 }
3279 }
3280
3281 if (m->flockbl.length()) {
3282 int32_t num_locks;
3283 bufferlist::iterator bli = m->flockbl.begin();
3284 ::decode(num_locks, bli);
3285 for ( int i=0; i < num_locks; ++i) {
3286 ceph_filelock decoded_lock;
3287 ::decode(decoded_lock, bli);
3288 in->get_fcntl_lock_state()->held_locks.
3289 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3290 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3291 }
3292 ::decode(num_locks, bli);
3293 for ( int i=0; i < num_locks; ++i) {
3294 ceph_filelock decoded_lock;
3295 ::decode(decoded_lock, bli);
3296 in->get_flock_lock_state()->held_locks.
3297 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3298 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3299 }
3300 }
3301
3302 if (!dirty && !change_max)
3303 return false;
3304
3305 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3306 if (session->check_access(in, MAY_WRITE,
3307 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3308 session->put();
3309 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3310 return false;
3311 }
3312 session->put();
3313
3314 // do the update.
3315 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3316 mds->mdlog->start_entry(le);
3317
3318 // xattrs update?
3319 map<string,bufferptr> *px = 0;
3320 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3321 m->xattrbl.length() &&
3322 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3323 px = new map<string,bufferptr>;
3324
3325 inode_t *pi = in->project_inode(px);
3326 pi->version = in->pre_dirty();
3327
3328 MutationRef mut(new MutationImpl());
3329 mut->ls = mds->mdlog->get_current_segment();
3330
3331 _update_cap_fields(in, dirty, m, pi);
3332
3333 if (change_max) {
3334 dout(7) << " max_size " << old_max << " -> " << new_max
3335 << " for " << *in << dendl;
3336 if (new_max) {
3337 pi->client_ranges[client].range.first = 0;
3338 pi->client_ranges[client].range.last = new_max;
3339 pi->client_ranges[client].follows = in->first - 1;
3340 } else
3341 pi->client_ranges.erase(client);
3342 }
3343
3344 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3345 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3346
3347 // auth
3348 if (dirty & CEPH_CAP_AUTH_EXCL)
3349 wrlock_force(&in->authlock, mut);
3350
3351 // xattr
3352 if (px) {
3353 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3354 pi->xattr_version = m->head.xattr_version;
3355 bufferlist::iterator p = m->xattrbl.begin();
3356 ::decode(*px, p);
3357
3358 wrlock_force(&in->xattrlock, mut);
3359 }
3360
3361 mut->auth_pin(in);
3362 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3363 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3364
3365 // "oldest flush tid" > 0 means client uses unique TID for each flush
3366 if (ack && ack->get_oldest_flush_tid() > 0)
3367 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3368 ack->get_oldest_flush_tid());
3369
3370 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3371 change_max, !!cap,
3372 client, ack));
3373 if (need_flush && !*need_flush &&
3374 ((change_max && new_max) || // max INCREASE
3375 _need_flush_mdlog(in, dirty)))
3376 *need_flush = true;
3377
3378 return true;
3379}
3380
3381/* This function DOES put the passed message before returning */
3382void Locker::handle_client_cap_release(MClientCapRelease *m)
3383{
3384 client_t client = m->get_source().num();
3385 dout(10) << "handle_client_cap_release " << *m << dendl;
3386
3387 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3388 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3389 return;
3390 }
3391
3392 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3393 // Pause RADOS operations until we see the required epoch
3394 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3395 }
3396
3397 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3398 // Record the barrier so that we will retransmit it to clients
3399 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3400 }
3401
3402 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3403
3404 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3405 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3406 }
3407
3408 if (session) {
3409 session->notify_cap_release(m->caps.size());
3410 }
3411
3412 m->put();
3413}
3414
3415class C_Locker_RetryCapRelease : public LockerContext {
3416 client_t client;
3417 inodeno_t ino;
3418 uint64_t cap_id;
3419 ceph_seq_t migrate_seq;
3420 ceph_seq_t issue_seq;
3421public:
3422 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3423 ceph_seq_t mseq, ceph_seq_t seq) :
3424 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3425 void finish(int r) override {
3426 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3427 }
3428};
3429
3430void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3431 ceph_seq_t mseq, ceph_seq_t seq)
3432{
3433 CInode *in = mdcache->get_inode(ino);
3434 if (!in) {
3435 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3436 return;
3437 }
3438 Capability *cap = in->get_client_cap(client);
3439 if (!cap) {
3440 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3441 return;
3442 }
3443
3444 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3445 if (cap->get_cap_id() != cap_id) {
3446 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3447 return;
3448 }
3449 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3450 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3451 return;
3452 }
3453 if (should_defer_client_cap_frozen(in)) {
3454 dout(7) << " freezing|frozen, deferring" << dendl;
3455 in->add_waiter(CInode::WAIT_UNFREEZE,
3456 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3457 return;
3458 }
3459 if (seq != cap->get_last_issue()) {
3460 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3461 // clean out any old revoke history
3462 cap->clean_revoke_from(seq);
3463 eval_cap_gather(in);
3464 return;
3465 }
3466 remove_client_cap(in, client);
3467}
3468
3469/* This function DOES put the passed message before returning */
3470
3471void Locker::remove_client_cap(CInode *in, client_t client)
3472{
3473 // clean out any pending snapflush state
3474 if (!in->client_need_snapflush.empty())
3475 _do_null_snapflush(in, client);
3476
3477 in->remove_client_cap(client);
3478
3479 if (in->is_auth()) {
3480 // make sure we clear out the client byte range
3481 if (in->get_projected_inode()->client_ranges.count(client) &&
3482 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3483 check_inode_max_size(in);
3484 } else {
3485 request_inode_file_caps(in);
3486 }
3487
3488 try_eval(in, CEPH_CAP_LOCKS);
3489}
3490
3491
3492/**
3493 * Return true if any currently revoking caps exceed the
3494 * mds_revoke_cap_timeout threshold.
3495 */
3496bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3497{
3498 xlist<Capability*>::const_iterator p = revoking.begin();
3499 if (p.end()) {
3500 // No revoking caps at the moment
3501 return false;
3502 } else {
3503 utime_t now = ceph_clock_now();
3504 utime_t age = now - (*p)->get_last_revoke_stamp();
3505 if (age <= g_conf->mds_revoke_cap_timeout) {
3506 return false;
3507 } else {
3508 return true;
3509 }
3510 }
3511}
3512
3513
3514void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3515{
3516 if (!any_late_revoking_caps(revoking_caps)) {
3517 // Fast path: no misbehaving clients, execute in O(1)
3518 return;
3519 }
3520
3521 // Slow path: execute in O(N_clients)
3522 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3523 for (client_rc_iter = revoking_caps_by_client.begin();
3524 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3525 xlist<Capability*> const &client_rc = client_rc_iter->second;
3526 bool any_late = any_late_revoking_caps(client_rc);
3527 if (any_late) {
3528 result->push_back(client_rc_iter->first);
3529 }
3530 }
3531}
3532
3533// Hard-code instead of surfacing a config settings because this is
3534// really a hack that should go away at some point when we have better
3535// inspection tools for getting at detailed cap state (#7316)
3536#define MAX_WARN_CAPS 100
3537
3538void Locker::caps_tick()
3539{
3540 utime_t now = ceph_clock_now();
3541
3542 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3543
3544 int i = 0;
3545 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3546 Capability *cap = *p;
3547
3548 utime_t age = now - cap->get_last_revoke_stamp();
3549 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3550 if (age <= g_conf->mds_revoke_cap_timeout) {
3551 dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
3552 break;
3553 } else {
3554 ++i;
3555 if (i > MAX_WARN_CAPS) {
3556 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3557 << "revoking, ignoring subsequent caps" << dendl;
3558 break;
3559 }
3560 }
3561 // exponential backoff of warning intervals
3562 if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
3563 cap->inc_num_revoke_warnings();
3564 stringstream ss;
3565 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3566 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3567 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3568 mds->clog->warn() << ss.str();
3569 dout(20) << __func__ << " " << ss.str() << dendl;
3570 } else {
3571 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3572 }
3573 }
3574}
3575
3576
3577void Locker::handle_client_lease(MClientLease *m)
3578{
3579 dout(10) << "handle_client_lease " << *m << dendl;
3580
3581 assert(m->get_source().is_client());
3582 client_t client = m->get_source().num();
3583
3584 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3585 if (!in) {
3586 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3587 m->put();
3588 return;
3589 }
3590 CDentry *dn = 0;
3591
3592 frag_t fg = in->pick_dirfrag(m->dname);
3593 CDir *dir = in->get_dirfrag(fg);
3594 if (dir)
3595 dn = dir->lookup(m->dname);
3596 if (!dn) {
3597 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3598 m->put();
3599 return;
3600 }
3601 dout(10) << " on " << *dn << dendl;
3602
3603 // replica and lock
3604 ClientLease *l = dn->get_client_lease(client);
3605 if (!l) {
3606 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3607 m->put();
3608 return;
3609 }
3610
3611 switch (m->get_action()) {
3612 case CEPH_MDS_LEASE_REVOKE_ACK:
3613 case CEPH_MDS_LEASE_RELEASE:
3614 if (l->seq != m->get_seq()) {
3615 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3616 } else {
3617 dout(7) << "handle_client_lease client." << client
3618 << " on " << *dn << dendl;
3619 dn->remove_client_lease(l, this);
3620 }
3621 m->put();
3622 break;
3623
3624 case CEPH_MDS_LEASE_RENEW:
3625 {
3626 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3627 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3628 if (dn->lock.can_lease(client)) {
3629 int pool = 1; // fixme.. do something smart!
3630 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3631 m->h.seq = ++l->seq;
3632 m->clear_payload();
3633
3634 utime_t now = ceph_clock_now();
3635 now += mdcache->client_lease_durations[pool];
3636 mdcache->touch_client_lease(l, pool, now);
3637
3638 mds->send_message_client_counted(m, m->get_connection());
3639 }
3640 }
3641 break;
3642
3643 default:
3644 ceph_abort(); // implement me
3645 break;
3646 }
3647}
3648
3649
3650void Locker::issue_client_lease(CDentry *dn, client_t client,
3651 bufferlist &bl, utime_t now, Session *session)
3652{
3653 CInode *diri = dn->get_dir()->get_inode();
3654 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3655 ((!diri->filelock.can_lease(client) &&
3656 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3657 dn->lock.can_lease(client)) {
3658 int pool = 1; // fixme.. do something smart!
3659 // issue a dentry lease
3660 ClientLease *l = dn->add_client_lease(client, session);
3661 session->touch_lease(l);
3662
3663 now += mdcache->client_lease_durations[pool];
3664 mdcache->touch_client_lease(l, pool, now);
3665
3666 LeaseStat e;
3667 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3668 e.seq = ++l->seq;
3669 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3670 ::encode(e, bl);
3671 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3672 << " on " << *dn << dendl;
3673 } else {
3674 // null lease
3675 LeaseStat e;
3676 e.mask = 0;
3677 e.seq = 0;
3678 e.duration_ms = 0;
3679 ::encode(e, bl);
3680 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3681 }
3682}
3683
3684
3685void Locker::revoke_client_leases(SimpleLock *lock)
3686{
3687 int n = 0;
3688 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3689 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3690 p != dn->client_lease_map.end();
3691 ++p) {
3692 ClientLease *l = p->second;
3693
3694 n++;
3695 assert(lock->get_type() == CEPH_LOCK_DN);
3696
3697 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3698 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3699
3700 // i should also revoke the dir ICONTENT lease, if they have it!
3701 CInode *diri = dn->get_dir()->get_inode();
3702 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3703 mask,
3704 diri->ino(),
3705 diri->first, CEPH_NOSNAP,
3706 dn->get_name()),
3707 l->client);
3708 }
3709 assert(n == lock->get_num_client_lease());
3710}
3711
3712
3713
3714// locks ----------------------------------------------------------------
3715
3716SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3717{
3718 switch (lock_type) {
3719 case CEPH_LOCK_DN:
3720 {
3721 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3722 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3723 frag_t fg;
3724 CDir *dir = 0;
3725 CDentry *dn = 0;
3726 if (diri) {
3727 fg = diri->pick_dirfrag(info.dname);
3728 dir = diri->get_dirfrag(fg);
3729 if (dir)
3730 dn = dir->lookup(info.dname, info.snapid);
3731 }
3732 if (!dn) {
3733 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3734 return 0;
3735 }
3736 return &dn->lock;
3737 }
3738
3739 case CEPH_LOCK_IAUTH:
3740 case CEPH_LOCK_ILINK:
3741 case CEPH_LOCK_IDFT:
3742 case CEPH_LOCK_IFILE:
3743 case CEPH_LOCK_INEST:
3744 case CEPH_LOCK_IXATTR:
3745 case CEPH_LOCK_ISNAP:
3746 case CEPH_LOCK_IFLOCK:
3747 case CEPH_LOCK_IPOLICY:
3748 {
3749 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3750 if (!in) {
3751 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3752 return 0;
3753 }
3754 switch (lock_type) {
3755 case CEPH_LOCK_IAUTH: return &in->authlock;
3756 case CEPH_LOCK_ILINK: return &in->linklock;
3757 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3758 case CEPH_LOCK_IFILE: return &in->filelock;
3759 case CEPH_LOCK_INEST: return &in->nestlock;
3760 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3761 case CEPH_LOCK_ISNAP: return &in->snaplock;
3762 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3763 case CEPH_LOCK_IPOLICY: return &in->policylock;
3764 }
3765 }
3766
3767 default:
3768 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3769 ceph_abort();
3770 break;
3771 }
3772
3773 return 0;
3774}
3775
3776/* This function DOES put the passed message before returning */
3777void Locker::handle_lock(MLock *m)
3778{
3779 // nobody should be talking to us during recovery.
3780 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3781
3782 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3783 if (!lock) {
3784 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3785 m->put();
3786 return;
3787 }
3788
3789 switch (lock->get_type()) {
3790 case CEPH_LOCK_DN:
3791 case CEPH_LOCK_IAUTH:
3792 case CEPH_LOCK_ILINK:
3793 case CEPH_LOCK_ISNAP:
3794 case CEPH_LOCK_IXATTR:
3795 case CEPH_LOCK_IFLOCK:
3796 case CEPH_LOCK_IPOLICY:
3797 handle_simple_lock(lock, m);
3798 break;
3799
3800 case CEPH_LOCK_IDFT:
3801 case CEPH_LOCK_INEST:
3802 //handle_scatter_lock((ScatterLock*)lock, m);
3803 //break;
3804
3805 case CEPH_LOCK_IFILE:
3806 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3807 break;
3808
3809 default:
3810 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3811 ceph_abort();
3812 break;
3813 }
3814}
3815
3816
3817
3818
3819
3820// ==========================================================================
3821// simple lock
3822
3823/** This function may take a reference to m if it needs one, but does
3824 * not put references. */
3825void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3826{
3827 MDSCacheObject *parent = lock->get_parent();
3828 if (parent->is_auth() &&
3829 lock->get_state() != LOCK_SYNC &&
3830 !parent->is_frozen()) {
3831 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3832 << " on " << *parent << dendl;
3833 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3834 if (lock->is_stable()) {
3835 simple_sync(lock);
3836 } else {
3837 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3838 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3839 new C_MDS_RetryMessage(mds, m->get()));
3840 }
3841 } else {
3842 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3843 << " on " << *parent << dendl;
3844 // replica should retry
3845 }
3846}
3847
3848/* This function DOES put the passed message before returning */
3849void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3850{
3851 int from = m->get_asker();
3852
3853 dout(10) << "handle_simple_lock " << *m
3854 << " on " << *lock << " " << *lock->get_parent() << dendl;
3855
3856 if (mds->is_rejoin()) {
3857 if (lock->get_parent()->is_rejoining()) {
3858 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3859 << ", dropping " << *m << dendl;
3860 m->put();
3861 return;
3862 }
3863 }
3864
3865 switch (m->get_action()) {
3866 // -- replica --
3867 case LOCK_AC_SYNC:
3868 assert(lock->get_state() == LOCK_LOCK);
3869 lock->decode_locked_state(m->get_data());
3870 lock->set_state(LOCK_SYNC);
3871 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3872 break;
3873
3874 case LOCK_AC_LOCK:
3875 assert(lock->get_state() == LOCK_SYNC);
3876 lock->set_state(LOCK_SYNC_LOCK);
3877 if (lock->is_leased())
3878 revoke_client_leases(lock);
3879 eval_gather(lock, true);
31f18b77
FG
3880 if (lock->is_unstable_and_locked())
3881 mds->mdlog->flush();
7c673cae
FG
3882 break;
3883
3884
3885 // -- auth --
3886 case LOCK_AC_LOCKACK:
3887 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3888 lock->get_state() == LOCK_SYNC_EXCL);
3889 assert(lock->is_gathering(from));
3890 lock->remove_gather(from);
3891
3892 if (lock->is_gathering()) {
3893 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3894 << ", still gathering " << lock->get_gather_set() << dendl;
3895 } else {
3896 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3897 << ", last one" << dendl;
3898 eval_gather(lock);
3899 }
3900 break;
3901
3902 case LOCK_AC_REQRDLOCK:
3903 handle_reqrdlock(lock, m);
3904 break;
3905
3906 }
3907
3908 m->put();
3909}
3910
3911/* unused, currently.
3912
3913class C_Locker_SimpleEval : public Context {
3914 Locker *locker;
3915 SimpleLock *lock;
3916public:
3917 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3918 void finish(int r) {
3919 locker->try_simple_eval(lock);
3920 }
3921};
3922
3923void Locker::try_simple_eval(SimpleLock *lock)
3924{
3925 // unstable and ambiguous auth?
3926 if (!lock->is_stable() &&
3927 lock->get_parent()->is_ambiguous_auth()) {
3928 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3929 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3930 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3931 return;
3932 }
3933
3934 if (!lock->get_parent()->is_auth()) {
3935 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3936 return;
3937 }
3938
3939 if (!lock->get_parent()->can_auth_pin()) {
3940 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3941 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3942 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3943 return;
3944 }
3945
3946 if (lock->is_stable())
3947 simple_eval(lock);
3948}
3949*/
3950
3951
3952void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3953{
3954 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3955
3956 assert(lock->get_parent()->is_auth());
3957 assert(lock->is_stable());
3958
3959 if (lock->get_parent()->is_freezing_or_frozen()) {
3960 // dentry lock in unreadable state can block path traverse
3961 if ((lock->get_type() != CEPH_LOCK_DN ||
3962 lock->get_state() == LOCK_SYNC ||
3963 lock->get_parent()->is_frozen()))
3964 return;
3965 }
3966
3967 if (mdcache->is_readonly()) {
3968 if (lock->get_state() != LOCK_SYNC) {
3969 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3970 simple_sync(lock, need_issue);
3971 }
3972 return;
3973 }
3974
3975 CInode *in = 0;
3976 int wanted = 0;
3977 if (lock->get_type() != CEPH_LOCK_DN) {
3978 in = static_cast<CInode*>(lock->get_parent());
3979 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3980 }
3981
3982 // -> excl?
3983 if (lock->get_state() != LOCK_EXCL &&
3984 in && in->get_target_loner() >= 0 &&
3985 (wanted & CEPH_CAP_GEXCL)) {
3986 dout(7) << "simple_eval stable, going to excl " << *lock
3987 << " on " << *lock->get_parent() << dendl;
3988 simple_excl(lock, need_issue);
3989 }
3990
3991 // stable -> sync?
3992 else if (lock->get_state() != LOCK_SYNC &&
3993 !lock->is_wrlocked() &&
3994 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
3995 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
3996 dout(7) << "simple_eval stable, syncing " << *lock
3997 << " on " << *lock->get_parent() << dendl;
3998 simple_sync(lock, need_issue);
3999 }
4000}
4001
4002
4003// mid
4004
4005bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4006{
4007 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4008 assert(lock->get_parent()->is_auth());
4009 assert(lock->is_stable());
4010
4011 CInode *in = 0;
4012 if (lock->get_cap_shift())
4013 in = static_cast<CInode *>(lock->get_parent());
4014
4015 int old_state = lock->get_state();
4016
4017 if (old_state != LOCK_TSYN) {
4018
4019 switch (lock->get_state()) {
4020 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4021 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4022 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4023 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4024 default: ceph_abort();
4025 }
4026
4027 int gather = 0;
4028 if (lock->is_wrlocked())
4029 gather++;
4030
4031 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4032 send_lock_message(lock, LOCK_AC_SYNC);
4033 lock->init_gather();
4034 gather++;
4035 }
4036
4037 if (in && in->is_head()) {
4038 if (in->issued_caps_need_gather(lock)) {
4039 if (need_issue)
4040 *need_issue = true;
4041 else
4042 issue_caps(in);
4043 gather++;
4044 }
4045 }
4046
4047 bool need_recover = false;
4048 if (lock->get_type() == CEPH_LOCK_IFILE) {
4049 assert(in);
4050 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4051 mds->mdcache->queue_file_recover(in);
4052 need_recover = true;
4053 gather++;
4054 }
4055 }
4056
4057 if (!gather && lock->is_dirty()) {
4058 lock->get_parent()->auth_pin(lock);
4059 scatter_writebehind(static_cast<ScatterLock*>(lock));
4060 mds->mdlog->flush();
4061 return false;
4062 }
4063
4064 if (gather) {
4065 lock->get_parent()->auth_pin(lock);
4066 if (need_recover)
4067 mds->mdcache->do_file_recover();
4068 return false;
4069 }
4070 }
4071
4072 if (lock->get_parent()->is_replicated()) { // FIXME
4073 bufferlist data;
4074 lock->encode_locked_state(data);
4075 send_lock_message(lock, LOCK_AC_SYNC, data);
4076 }
4077 lock->set_state(LOCK_SYNC);
4078 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4079 if (in && in->is_head()) {
4080 if (need_issue)
4081 *need_issue = true;
4082 else
4083 issue_caps(in);
4084 }
4085 return true;
4086}
4087
4088void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4089{
4090 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4091 assert(lock->get_parent()->is_auth());
4092 assert(lock->is_stable());
4093
4094 CInode *in = 0;
4095 if (lock->get_cap_shift())
4096 in = static_cast<CInode *>(lock->get_parent());
4097
4098 switch (lock->get_state()) {
4099 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4100 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4101 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4102 default: ceph_abort();
4103 }
4104
4105 int gather = 0;
4106 if (lock->is_rdlocked())
4107 gather++;
4108 if (lock->is_wrlocked())
4109 gather++;
4110
4111 if (lock->get_parent()->is_replicated() &&
4112 lock->get_state() != LOCK_LOCK_EXCL &&
4113 lock->get_state() != LOCK_XSYN_EXCL) {
4114 send_lock_message(lock, LOCK_AC_LOCK);
4115 lock->init_gather();
4116 gather++;
4117 }
4118
4119 if (in && in->is_head()) {
4120 if (in->issued_caps_need_gather(lock)) {
4121 if (need_issue)
4122 *need_issue = true;
4123 else
4124 issue_caps(in);
4125 gather++;
4126 }
4127 }
4128
4129 if (gather) {
4130 lock->get_parent()->auth_pin(lock);
4131 } else {
4132 lock->set_state(LOCK_EXCL);
4133 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4134 if (in) {
4135 if (need_issue)
4136 *need_issue = true;
4137 else
4138 issue_caps(in);
4139 }
4140 }
4141}
4142
4143void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4144{
4145 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4146 assert(lock->get_parent()->is_auth());
4147 assert(lock->is_stable());
4148 assert(lock->get_state() != LOCK_LOCK);
4149
4150 CInode *in = 0;
4151 if (lock->get_cap_shift())
4152 in = static_cast<CInode *>(lock->get_parent());
4153
4154 int old_state = lock->get_state();
4155
4156 switch (lock->get_state()) {
4157 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4158 case LOCK_XSYN:
4159 file_excl(static_cast<ScatterLock*>(lock), need_issue);
4160 if (lock->get_state() != LOCK_EXCL)
4161 return;
4162 // fall-thru
4163 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4164 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4165 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4166 break;
4167 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4168 default: ceph_abort();
4169 }
4170
4171 int gather = 0;
4172 if (lock->is_leased()) {
4173 gather++;
4174 revoke_client_leases(lock);
4175 }
4176 if (lock->is_rdlocked())
4177 gather++;
4178 if (in && in->is_head()) {
4179 if (in->issued_caps_need_gather(lock)) {
4180 if (need_issue)
4181 *need_issue = true;
4182 else
4183 issue_caps(in);
4184 gather++;
4185 }
4186 }
4187
4188 bool need_recover = false;
4189 if (lock->get_type() == CEPH_LOCK_IFILE) {
4190 assert(in);
4191 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4192 mds->mdcache->queue_file_recover(in);
4193 need_recover = true;
4194 gather++;
4195 }
4196 }
4197
4198 if (lock->get_parent()->is_replicated() &&
4199 lock->get_state() == LOCK_MIX_LOCK &&
4200 gather) {
4201 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4202 } else {
4203 // move to second stage of gather now, so we don't send the lock action later.
4204 if (lock->get_state() == LOCK_MIX_LOCK)
4205 lock->set_state(LOCK_MIX_LOCK2);
4206
4207 if (lock->get_parent()->is_replicated() &&
4208 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4209 gather++;
4210 send_lock_message(lock, LOCK_AC_LOCK);
4211 lock->init_gather();
4212 }
4213 }
4214
4215 if (!gather && lock->is_dirty()) {
4216 lock->get_parent()->auth_pin(lock);
4217 scatter_writebehind(static_cast<ScatterLock*>(lock));
4218 mds->mdlog->flush();
4219 return;
4220 }
4221
4222 if (gather) {
4223 lock->get_parent()->auth_pin(lock);
4224 if (need_recover)
4225 mds->mdcache->do_file_recover();
4226 } else {
4227 lock->set_state(LOCK_LOCK);
4228 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4229 }
4230}
4231
4232
4233void Locker::simple_xlock(SimpleLock *lock)
4234{
4235 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4236 assert(lock->get_parent()->is_auth());
4237 //assert(lock->is_stable());
4238 assert(lock->get_state() != LOCK_XLOCK);
4239
4240 CInode *in = 0;
4241 if (lock->get_cap_shift())
4242 in = static_cast<CInode *>(lock->get_parent());
4243
4244 if (lock->is_stable())
4245 lock->get_parent()->auth_pin(lock);
4246
4247 switch (lock->get_state()) {
4248 case LOCK_LOCK:
4249 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4250 default: ceph_abort();
4251 }
4252
4253 int gather = 0;
4254 if (lock->is_rdlocked())
4255 gather++;
4256 if (lock->is_wrlocked())
4257 gather++;
4258
4259 if (in && in->is_head()) {
4260 if (in->issued_caps_need_gather(lock)) {
4261 issue_caps(in);
4262 gather++;
4263 }
4264 }
4265
4266 if (!gather) {
4267 lock->set_state(LOCK_PREXLOCK);
4268 //assert("shouldn't be called if we are already xlockable" == 0);
4269 }
4270}
4271
4272
4273
4274
4275
4276// ==========================================================================
4277// scatter lock
4278
4279/*
4280
4281Some notes on scatterlocks.
4282
4283 - The scatter/gather is driven by the inode lock. The scatter always
4284 brings in the latest metadata from the fragments.
4285
4286 - When in a scattered/MIX state, fragments are only allowed to
4287 update/be written to if the accounted stat matches the inode's
4288 current version.
4289
4290 - That means, on gather, we _only_ assimilate diffs for frag metadata
4291 that match the current version, because those are the only ones
4292 written during this scatter/gather cycle. (Others didn't permit
4293 it.) We increment the version and journal this to disk.
4294
4295 - When possible, we also simultaneously update our local frag
4296 accounted stats to match.
4297
4298 - On scatter, the new inode info is broadcast to frags, both local
4299 and remote. If possible (auth and !frozen), the dirfrag auth
4300 should update the accounted state (if it isn't already up to date).
4301 Note that this may occur on both the local inode auth node and
4302 inode replicas, so there are two potential paths. If it is NOT
4303 possible, they need to mark_stale to prevent any possible writes.
4304
4305 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4306 only). Both are opportunities to update the frag accounted stats,
4307 even though only the MIX case is affected by a stale dirfrag.
4308
4309 - Because many scatter/gather cycles can potentially go by without a
4310 frag being able to update its accounted stats (due to being frozen
4311 by exports/refragments in progress), the frag may have (even very)
4312 old stat versions. That's fine. If when we do want to update it,
4313 we can update accounted_* and the version first.
4314
4315*/
4316
4317class C_Locker_ScatterWB : public LockerLogContext {
4318 ScatterLock *lock;
4319 MutationRef mut;
4320public:
4321 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4322 LockerLogContext(l), lock(sl), mut(m) {}
4323 void finish(int r) override {
4324 locker->scatter_writebehind_finish(lock, mut);
4325 }
4326};
4327
4328void Locker::scatter_writebehind(ScatterLock *lock)
4329{
4330 CInode *in = static_cast<CInode*>(lock->get_parent());
4331 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4332
4333 // journal
4334 MutationRef mut(new MutationImpl());
4335 mut->ls = mds->mdlog->get_current_segment();
4336
4337 // forcefully take a wrlock
4338 lock->get_wrlock(true);
4339 mut->wrlocks.insert(lock);
4340 mut->locks.insert(lock);
4341
4342 in->pre_cow_old_inode(); // avoid cow mayhem
4343
4344 inode_t *pi = in->project_inode();
4345 pi->version = in->pre_dirty();
4346
4347 in->finish_scatter_gather_update(lock->get_type());
4348 lock->start_flush();
4349
4350 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4351 mds->mdlog->start_entry(le);
4352
4353 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4354 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4355
4356 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4357
4358 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4359}
4360
4361void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4362{
4363 CInode *in = static_cast<CInode*>(lock->get_parent());
4364 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4365 in->pop_and_dirty_projected_inode(mut->ls);
4366
4367 lock->finish_flush();
4368
4369 // if replicas may have flushed in a mix->lock state, send another
4370 // message so they can finish_flush().
4371 if (in->is_replicated()) {
4372 switch (lock->get_state()) {
4373 case LOCK_MIX_LOCK:
4374 case LOCK_MIX_LOCK2:
4375 case LOCK_MIX_EXCL:
4376 case LOCK_MIX_TSYN:
4377 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4378 }
4379 }
4380
4381 mut->apply();
4382 drop_locks(mut.get());
4383 mut->cleanup();
4384
4385 if (lock->is_stable())
4386 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4387
4388 //scatter_eval_gather(lock);
4389}
4390
4391void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4392{
4393 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4394
4395 assert(lock->get_parent()->is_auth());
4396 assert(lock->is_stable());
4397
4398 if (lock->get_parent()->is_freezing_or_frozen()) {
4399 dout(20) << " freezing|frozen" << dendl;
4400 return;
4401 }
4402
4403 if (mdcache->is_readonly()) {
4404 if (lock->get_state() != LOCK_SYNC) {
4405 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4406 simple_sync(lock, need_issue);
4407 }
4408 return;
4409 }
4410
4411 if (!lock->is_rdlocked() &&
4412 lock->get_state() != LOCK_MIX &&
4413 lock->get_scatter_wanted()) {
4414 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4415 << " on " << *lock->get_parent() << dendl;
4416 scatter_mix(lock, need_issue);
4417 return;
4418 }
4419
4420 if (lock->get_type() == CEPH_LOCK_INEST) {
4421 // in general, we want to keep INEST writable at all times.
4422 if (!lock->is_rdlocked()) {
4423 if (lock->get_parent()->is_replicated()) {
4424 if (lock->get_state() != LOCK_MIX)
4425 scatter_mix(lock, need_issue);
4426 } else {
4427 if (lock->get_state() != LOCK_LOCK)
4428 simple_lock(lock, need_issue);
4429 }
4430 }
4431 return;
4432 }
4433
4434 CInode *in = static_cast<CInode*>(lock->get_parent());
4435 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4436 // i _should_ be sync.
4437 if (!lock->is_wrlocked() &&
4438 lock->get_state() != LOCK_SYNC) {
4439 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4440 simple_sync(lock, need_issue);
4441 }
4442 }
4443}
4444
4445
4446/*
4447 * mark a scatterlock to indicate that the dir fnode has some dirty data
4448 */
4449void Locker::mark_updated_scatterlock(ScatterLock *lock)
4450{
4451 lock->mark_dirty();
4452 if (lock->get_updated_item()->is_on_list()) {
4453 dout(10) << "mark_updated_scatterlock " << *lock
4454 << " - already on list since " << lock->get_update_stamp() << dendl;
4455 } else {
4456 updated_scatterlocks.push_back(lock->get_updated_item());
4457 utime_t now = ceph_clock_now();
4458 lock->set_update_stamp(now);
4459 dout(10) << "mark_updated_scatterlock " << *lock
4460 << " - added at " << now << dendl;
4461 }
4462}
4463
4464/*
4465 * this is called by scatter_tick and LogSegment::try_to_trim() when
4466 * trying to flush dirty scattered data (i.e. updated fnode) back to
4467 * the inode.
4468 *
4469 * we need to lock|scatter in order to push fnode changes into the
4470 * inode.dirstat.
4471 */
4472void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4473{
4474 CInode *p = static_cast<CInode *>(lock->get_parent());
4475
4476 if (p->is_frozen() || p->is_freezing()) {
4477 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4478 if (c)
4479 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4480 else
4481 // just requeue. not ideal.. starvation prone..
4482 updated_scatterlocks.push_back(lock->get_updated_item());
4483 return;
4484 }
4485
4486 if (p->is_ambiguous_auth()) {
4487 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4488 if (c)
4489 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4490 else
4491 // just requeue. not ideal.. starvation prone..
4492 updated_scatterlocks.push_back(lock->get_updated_item());
4493 return;
4494 }
4495
4496 if (p->is_auth()) {
4497 int count = 0;
4498 while (true) {
4499 if (lock->is_stable()) {
4500 // can we do it now?
4501 // (only if we're not replicated.. if we are, we really do need
4502 // to nudge the lock state!)
4503 /*
4504 actually, even if we're not replicated, we can't stay in MIX, because another mds
4505 could discover and replicate us at any time. if that happens while we're flushing,
4506 they end up in MIX but their inode has the old scatterstat version.
4507
4508 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4509 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4510 scatter_writebehind(lock);
4511 if (c)
4512 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4513 return;
4514 }
4515 */
4516
4517 if (mdcache->is_readonly()) {
4518 if (lock->get_state() != LOCK_SYNC) {
4519 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4520 simple_sync(static_cast<ScatterLock*>(lock));
4521 }
4522 break;
4523 }
4524
4525 // adjust lock state
4526 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4527 switch (lock->get_type()) {
4528 case CEPH_LOCK_IFILE:
4529 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4530 scatter_mix(static_cast<ScatterLock*>(lock));
4531 else if (lock->get_state() != LOCK_LOCK)
4532 simple_lock(static_cast<ScatterLock*>(lock));
4533 else
4534 simple_sync(static_cast<ScatterLock*>(lock));
4535 break;
4536
4537 case CEPH_LOCK_IDFT:
4538 case CEPH_LOCK_INEST:
4539 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4540 scatter_mix(lock);
4541 else if (lock->get_state() != LOCK_LOCK)
4542 simple_lock(lock);
4543 else
4544 simple_sync(lock);
4545 break;
4546 default:
4547 ceph_abort();
4548 }
4549 ++count;
4550 if (lock->is_stable() && count == 2) {
4551 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4552 // this should only realy happen when called via
4553 // handle_file_lock due to AC_NUDGE, because the rest of the
4554 // time we are replicated or have dirty data and won't get
4555 // called. bailing here avoids an infinite loop.
4556 assert(!c);
4557 break;
4558 }
4559 } else {
4560 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4561 if (c)
4562 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4563 return;
4564 }
4565 }
4566 } else {
4567 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4568 << *lock << " on " << *p << dendl;
4569 // request unscatter?
4570 mds_rank_t auth = lock->get_parent()->authority().first;
4571 if (!mds->is_cluster_degraded() ||
4572 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4573 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4574
4575 // wait...
4576 if (c)
4577 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4578
4579 // also, requeue, in case we had wrong auth or something
4580 updated_scatterlocks.push_back(lock->get_updated_item());
4581 }
4582}
4583
4584void Locker::scatter_tick()
4585{
4586 dout(10) << "scatter_tick" << dendl;
4587
4588 // updated
4589 utime_t now = ceph_clock_now();
4590 int n = updated_scatterlocks.size();
4591 while (!updated_scatterlocks.empty()) {
4592 ScatterLock *lock = updated_scatterlocks.front();
4593
4594 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4595
4596 if (!lock->is_dirty()) {
4597 updated_scatterlocks.pop_front();
4598 dout(10) << " removing from updated_scatterlocks "
4599 << *lock << " " << *lock->get_parent() << dendl;
4600 continue;
4601 }
4602 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4603 break;
4604 updated_scatterlocks.pop_front();
4605 scatter_nudge(lock, 0);
4606 }
4607 mds->mdlog->flush();
4608}
4609
4610
4611void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4612{
4613 dout(10) << "scatter_tempsync " << *lock
4614 << " on " << *lock->get_parent() << dendl;
4615 assert(lock->get_parent()->is_auth());
4616 assert(lock->is_stable());
4617
4618 assert(0 == "not fully implemented, at least not for filelock");
4619
4620 CInode *in = static_cast<CInode *>(lock->get_parent());
4621
4622 switch (lock->get_state()) {
4623 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4624 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4625 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4626 default: ceph_abort();
4627 }
4628
4629 int gather = 0;
4630 if (lock->is_wrlocked())
4631 gather++;
4632
4633 if (lock->get_cap_shift() &&
4634 in->is_head() &&
4635 in->issued_caps_need_gather(lock)) {
4636 if (need_issue)
4637 *need_issue = true;
4638 else
4639 issue_caps(in);
4640 gather++;
4641 }
4642
4643 if (lock->get_state() == LOCK_MIX_TSYN &&
4644 in->is_replicated()) {
4645 lock->init_gather();
4646 send_lock_message(lock, LOCK_AC_LOCK);
4647 gather++;
4648 }
4649
4650 if (gather) {
4651 in->auth_pin(lock);
4652 } else {
4653 // do tempsync
4654 lock->set_state(LOCK_TSYN);
4655 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4656 if (lock->get_cap_shift()) {
4657 if (need_issue)
4658 *need_issue = true;
4659 else
4660 issue_caps(in);
4661 }
4662 }
4663}
4664
4665
4666
4667// ==========================================================================
4668// local lock
4669
4670void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4671{
4672 dout(7) << "local_wrlock_grab on " << *lock
4673 << " on " << *lock->get_parent() << dendl;
4674
4675 assert(lock->get_parent()->is_auth());
4676 assert(lock->can_wrlock());
4677 assert(!mut->wrlocks.count(lock));
4678 lock->get_wrlock(mut->get_client());
4679 mut->wrlocks.insert(lock);
4680 mut->locks.insert(lock);
4681}
4682
4683bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4684{
4685 dout(7) << "local_wrlock_start on " << *lock
4686 << " on " << *lock->get_parent() << dendl;
4687
4688 assert(lock->get_parent()->is_auth());
4689 if (lock->can_wrlock()) {
4690 assert(!mut->wrlocks.count(lock));
4691 lock->get_wrlock(mut->get_client());
4692 mut->wrlocks.insert(lock);
4693 mut->locks.insert(lock);
4694 return true;
4695 } else {
4696 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4697 return false;
4698 }
4699}
4700
4701void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4702{
4703 dout(7) << "local_wrlock_finish on " << *lock
4704 << " on " << *lock->get_parent() << dendl;
4705 lock->put_wrlock();
4706 mut->wrlocks.erase(lock);
4707 mut->locks.erase(lock);
4708 if (lock->get_num_wrlocks() == 0) {
4709 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4710 SimpleLock::WAIT_WR |
4711 SimpleLock::WAIT_RD);
4712 }
4713}
4714
4715bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4716{
4717 dout(7) << "local_xlock_start on " << *lock
4718 << " on " << *lock->get_parent() << dendl;
4719
4720 assert(lock->get_parent()->is_auth());
4721 if (!lock->can_xlock_local()) {
4722 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4723 return false;
4724 }
4725
4726 lock->get_xlock(mut, mut->get_client());
4727 mut->xlocks.insert(lock);
4728 mut->locks.insert(lock);
4729 return true;
4730}
4731
4732void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4733{
4734 dout(7) << "local_xlock_finish on " << *lock
4735 << " on " << *lock->get_parent() << dendl;
4736 lock->put_xlock();
4737 mut->xlocks.erase(lock);
4738 mut->locks.erase(lock);
4739
4740 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4741 SimpleLock::WAIT_WR |
4742 SimpleLock::WAIT_RD);
4743}
4744
4745
4746
4747// ==========================================================================
4748// file lock
4749
4750
4751void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4752{
4753 CInode *in = static_cast<CInode*>(lock->get_parent());
4754 int loner_wanted, other_wanted;
4755 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4756 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4757 << " loner_wanted=" << gcap_string(loner_wanted)
4758 << " other_wanted=" << gcap_string(other_wanted)
4759 << " filelock=" << *lock << " on " << *lock->get_parent()
4760 << dendl;
4761
4762 assert(lock->get_parent()->is_auth());
4763 assert(lock->is_stable());
4764
4765 if (lock->get_parent()->is_freezing_or_frozen())
4766 return;
4767
4768 if (mdcache->is_readonly()) {
4769 if (lock->get_state() != LOCK_SYNC) {
4770 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4771 simple_sync(lock, need_issue);
4772 }
4773 return;
4774 }
4775
4776 // excl -> *?
4777 if (lock->get_state() == LOCK_EXCL) {
4778 dout(20) << " is excl" << dendl;
4779 int loner_issued, other_issued, xlocker_issued;
4780 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4781 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4782 << " other_issued=" << gcap_string(other_issued)
4783 << " xlocker_issued=" << gcap_string(xlocker_issued)
4784 << dendl;
4785 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4786 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4787 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4788 dout(20) << " should lose it" << dendl;
4789 // we should lose it.
4790 // loner other want
4791 // R R SYNC
4792 // R R|W MIX
4793 // R W MIX
4794 // R|W R MIX
4795 // R|W R|W MIX
4796 // R|W W MIX
4797 // W R MIX
4798 // W R|W MIX
4799 // W W MIX
4800 // -> any writer means MIX; RD doesn't matter.
4801 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4802 lock->is_waiter_for(SimpleLock::WAIT_WR))
4803 scatter_mix(lock, need_issue);
4804 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4805 simple_sync(lock, need_issue);
4806 else
4807 dout(10) << " waiting for wrlock to drain" << dendl;
4808 }
4809 }
4810
4811 // * -> excl?
4812 else if (lock->get_state() != LOCK_EXCL &&
4813 !lock->is_rdlocked() &&
4814 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4815 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4816 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4817 in->get_target_loner() >= 0) {
4818 dout(7) << "file_eval stable, bump to loner " << *lock
4819 << " on " << *lock->get_parent() << dendl;
4820 file_excl(lock, need_issue);
4821 }
4822
4823 // * -> mixed?
4824 else if (lock->get_state() != LOCK_MIX &&
4825 !lock->is_rdlocked() &&
4826 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4827 (lock->get_scatter_wanted() ||
4828 (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4829 dout(7) << "file_eval stable, bump to mixed " << *lock
4830 << " on " << *lock->get_parent() << dendl;
4831 scatter_mix(lock, need_issue);
4832 }
4833
4834 // * -> sync?
4835 else if (lock->get_state() != LOCK_SYNC &&
4836 !lock->is_wrlocked() && // drain wrlocks first!
4837 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4838 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4839 !((lock->get_state() == LOCK_MIX) &&
4840 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4841 //((wanted & CEPH_CAP_RD) ||
4842 //in->is_replicated() ||
4843 //lock->get_num_client_lease() ||
4844 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4845 ) {
4846 dout(7) << "file_eval stable, bump to sync " << *lock
4847 << " on " << *lock->get_parent() << dendl;
4848 simple_sync(lock, need_issue);
4849 }
4850}
4851
4852
4853
4854void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4855{
4856 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4857
4858 CInode *in = static_cast<CInode*>(lock->get_parent());
4859 assert(in->is_auth());
4860 assert(lock->is_stable());
4861
4862 if (lock->get_state() == LOCK_LOCK) {
4863 in->start_scatter(lock);
4864 if (in->is_replicated()) {
4865 // data
4866 bufferlist softdata;
4867 lock->encode_locked_state(softdata);
4868
4869 // bcast to replicas
4870 send_lock_message(lock, LOCK_AC_MIX, softdata);
4871 }
4872
4873 // change lock
4874 lock->set_state(LOCK_MIX);
4875 lock->clear_scatter_wanted();
4876 if (lock->get_cap_shift()) {
4877 if (need_issue)
4878 *need_issue = true;
4879 else
4880 issue_caps(in);
4881 }
4882 } else {
4883 // gather?
4884 switch (lock->get_state()) {
4885 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4886 case LOCK_XSYN:
4887 file_excl(lock, need_issue);
4888 if (lock->get_state() != LOCK_EXCL)
4889 return;
4890 // fall-thru
4891 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4892 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4893 default: ceph_abort();
4894 }
4895
4896 int gather = 0;
4897 if (lock->is_rdlocked())
4898 gather++;
4899 if (in->is_replicated()) {
4900 if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK
4901 lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too!
4902 send_lock_message(lock, LOCK_AC_MIX);
4903 lock->init_gather();
4904 gather++;
4905 }
4906 }
4907 if (lock->is_leased()) {
4908 revoke_client_leases(lock);
4909 gather++;
4910 }
4911 if (lock->get_cap_shift() &&
4912 in->is_head() &&
4913 in->issued_caps_need_gather(lock)) {
4914 if (need_issue)
4915 *need_issue = true;
4916 else
4917 issue_caps(in);
4918 gather++;
4919 }
4920 bool need_recover = false;
4921 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4922 mds->mdcache->queue_file_recover(in);
4923 need_recover = true;
4924 gather++;
4925 }
4926
4927 if (gather) {
4928 lock->get_parent()->auth_pin(lock);
4929 if (need_recover)
4930 mds->mdcache->do_file_recover();
4931 } else {
4932 in->start_scatter(lock);
4933 lock->set_state(LOCK_MIX);
4934 lock->clear_scatter_wanted();
4935 if (in->is_replicated()) {
4936 bufferlist softdata;
4937 lock->encode_locked_state(softdata);
4938 send_lock_message(lock, LOCK_AC_MIX, softdata);
4939 }
4940 if (lock->get_cap_shift()) {
4941 if (need_issue)
4942 *need_issue = true;
4943 else
4944 issue_caps(in);
4945 }
4946 }
4947 }
4948}
4949
4950
4951void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4952{
4953 CInode *in = static_cast<CInode*>(lock->get_parent());
4954 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4955
4956 assert(in->is_auth());
4957 assert(lock->is_stable());
4958
4959 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4960 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4961
4962 switch (lock->get_state()) {
4963 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4964 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4965 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4966 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4967 default: ceph_abort();
4968 }
4969 int gather = 0;
4970
4971 if (lock->is_rdlocked())
4972 gather++;
4973 if (lock->is_wrlocked())
4974 gather++;
4975
4976 if (in->is_replicated() &&
4977 lock->get_state() != LOCK_LOCK_EXCL &&
4978 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4979 send_lock_message(lock, LOCK_AC_LOCK);
4980 lock->init_gather();
4981 gather++;
4982 }
4983 if (lock->is_leased()) {
4984 revoke_client_leases(lock);
4985 gather++;
4986 }
4987 if (in->is_head() &&
4988 in->issued_caps_need_gather(lock)) {
4989 if (need_issue)
4990 *need_issue = true;
4991 else
4992 issue_caps(in);
4993 gather++;
4994 }
4995 bool need_recover = false;
4996 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4997 mds->mdcache->queue_file_recover(in);
4998 need_recover = true;
4999 gather++;
5000 }
5001
5002 if (gather) {
5003 lock->get_parent()->auth_pin(lock);
5004 if (need_recover)
5005 mds->mdcache->do_file_recover();
5006 } else {
5007 lock->set_state(LOCK_EXCL);
5008 if (need_issue)
5009 *need_issue = true;
5010 else
5011 issue_caps(in);
5012 }
5013}
5014
5015void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5016{
5017 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5018 CInode *in = static_cast<CInode *>(lock->get_parent());
5019 assert(in->is_auth());
5020 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5021
5022 switch (lock->get_state()) {
5023 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5024 default: ceph_abort();
5025 }
5026
5027 int gather = 0;
5028 if (lock->is_wrlocked())
5029 gather++;
5030
5031 if (in->is_head() &&
5032 in->issued_caps_need_gather(lock)) {
5033 if (need_issue)
5034 *need_issue = true;
5035 else
5036 issue_caps(in);
5037 gather++;
5038 }
5039
5040 if (gather) {
5041 lock->get_parent()->auth_pin(lock);
5042 } else {
5043 lock->set_state(LOCK_XSYN);
5044 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5045 if (need_issue)
5046 *need_issue = true;
5047 else
5048 issue_caps(in);
5049 }
5050}
5051
5052void Locker::file_recover(ScatterLock *lock)
5053{
5054 CInode *in = static_cast<CInode *>(lock->get_parent());
5055 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5056
5057 assert(in->is_auth());
5058 //assert(lock->is_stable());
5059 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5060
5061 int gather = 0;
5062
5063 /*
5064 if (in->is_replicated()
5065 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5066 send_lock_message(lock, LOCK_AC_LOCK);
5067 lock->init_gather();
5068 gather++;
5069 }
5070 */
5071 if (in->is_head() &&
5072 in->issued_caps_need_gather(lock)) {
5073 issue_caps(in);
5074 gather++;
5075 }
5076
5077 lock->set_state(LOCK_SCAN);
5078 if (gather)
5079 in->state_set(CInode::STATE_NEEDSRECOVER);
5080 else
5081 mds->mdcache->queue_file_recover(in);
5082}
5083
5084
5085// messenger
5086/* This function DOES put the passed message before returning */
5087void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5088{
5089 CInode *in = static_cast<CInode*>(lock->get_parent());
5090 int from = m->get_asker();
5091
5092 if (mds->is_rejoin()) {
5093 if (in->is_rejoining()) {
5094 dout(7) << "handle_file_lock still rejoining " << *in
5095 << ", dropping " << *m << dendl;
5096 m->put();
5097 return;
5098 }
5099 }
5100
5101 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5102 << " on " << *lock
5103 << " from mds." << from << " "
5104 << *in << dendl;
5105
5106 bool caps = lock->get_cap_shift();
5107
5108 switch (m->get_action()) {
5109 // -- replica --
5110 case LOCK_AC_SYNC:
5111 assert(lock->get_state() == LOCK_LOCK ||
5112 lock->get_state() == LOCK_MIX ||
5113 lock->get_state() == LOCK_MIX_SYNC2);
5114
5115 if (lock->get_state() == LOCK_MIX) {
5116 lock->set_state(LOCK_MIX_SYNC);
5117 eval_gather(lock, true);
31f18b77
FG
5118 if (lock->is_unstable_and_locked())
5119 mds->mdlog->flush();
7c673cae
FG
5120 break;
5121 }
5122
5123 (static_cast<ScatterLock *>(lock))->finish_flush();
5124 (static_cast<ScatterLock *>(lock))->clear_flushed();
5125
5126 // ok
5127 lock->decode_locked_state(m->get_data());
5128 lock->set_state(LOCK_SYNC);
5129
5130 lock->get_rdlock();
5131 if (caps)
5132 issue_caps(in);
5133 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5134 lock->put_rdlock();
5135 break;
5136
5137 case LOCK_AC_LOCK:
5138 switch (lock->get_state()) {
5139 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5140 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5141 default: ceph_abort();
5142 }
5143
5144 eval_gather(lock, true);
31f18b77
FG
5145 if (lock->is_unstable_and_locked())
5146 mds->mdlog->flush();
5147
7c673cae
FG
5148 break;
5149
5150 case LOCK_AC_LOCKFLUSHED:
5151 (static_cast<ScatterLock *>(lock))->finish_flush();
5152 (static_cast<ScatterLock *>(lock))->clear_flushed();
5153 // wake up scatter_nudge waiters
5154 if (lock->is_stable())
5155 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5156 break;
5157
5158 case LOCK_AC_MIX:
5159 assert(lock->get_state() == LOCK_SYNC ||
5160 lock->get_state() == LOCK_LOCK ||
5161 lock->get_state() == LOCK_SYNC_MIX2);
5162
5163 if (lock->get_state() == LOCK_SYNC) {
5164 // MIXED
5165 lock->set_state(LOCK_SYNC_MIX);
5166 eval_gather(lock, true);
31f18b77
FG
5167 if (lock->is_unstable_and_locked())
5168 mds->mdlog->flush();
7c673cae
FG
5169 break;
5170 }
5171
5172 // ok
5173 lock->decode_locked_state(m->get_data());
5174 lock->set_state(LOCK_MIX);
5175
5176 if (caps)
5177 issue_caps(in);
5178
5179 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5180 break;
5181
5182
5183 // -- auth --
5184 case LOCK_AC_LOCKACK:
5185 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5186 lock->get_state() == LOCK_MIX_LOCK ||
5187 lock->get_state() == LOCK_MIX_LOCK2 ||
5188 lock->get_state() == LOCK_MIX_EXCL ||
5189 lock->get_state() == LOCK_SYNC_EXCL ||
5190 lock->get_state() == LOCK_SYNC_MIX ||
5191 lock->get_state() == LOCK_MIX_TSYN);
5192 assert(lock->is_gathering(from));
5193 lock->remove_gather(from);
5194
5195 if (lock->get_state() == LOCK_MIX_LOCK ||
5196 lock->get_state() == LOCK_MIX_LOCK2 ||
5197 lock->get_state() == LOCK_MIX_EXCL ||
5198 lock->get_state() == LOCK_MIX_TSYN) {
5199 lock->decode_locked_state(m->get_data());
5200 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5201 // delay calling scatter_writebehind().
5202 lock->clear_flushed();
5203 }
5204
5205 if (lock->is_gathering()) {
5206 dout(7) << "handle_file_lock " << *in << " from " << from
5207 << ", still gathering " << lock->get_gather_set() << dendl;
5208 } else {
5209 dout(7) << "handle_file_lock " << *in << " from " << from
5210 << ", last one" << dendl;
5211 eval_gather(lock);
5212 }
5213 break;
5214
5215 case LOCK_AC_SYNCACK:
5216 assert(lock->get_state() == LOCK_MIX_SYNC);
5217 assert(lock->is_gathering(from));
5218 lock->remove_gather(from);
5219
5220 lock->decode_locked_state(m->get_data());
5221
5222 if (lock->is_gathering()) {
5223 dout(7) << "handle_file_lock " << *in << " from " << from
5224 << ", still gathering " << lock->get_gather_set() << dendl;
5225 } else {
5226 dout(7) << "handle_file_lock " << *in << " from " << from
5227 << ", last one" << dendl;
5228 eval_gather(lock);
5229 }
5230 break;
5231
5232 case LOCK_AC_MIXACK:
5233 assert(lock->get_state() == LOCK_SYNC_MIX);
5234 assert(lock->is_gathering(from));
5235 lock->remove_gather(from);
5236
5237 if (lock->is_gathering()) {
5238 dout(7) << "handle_file_lock " << *in << " from " << from
5239 << ", still gathering " << lock->get_gather_set() << dendl;
5240 } else {
5241 dout(7) << "handle_file_lock " << *in << " from " << from
5242 << ", last one" << dendl;
5243 eval_gather(lock);
5244 }
5245 break;
5246
5247
5248 // requests....
5249 case LOCK_AC_REQSCATTER:
5250 if (lock->is_stable()) {
5251 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5252 * because the replica should be holding an auth_pin if they're
5253 * doing this (and thus, we are freezing, not frozen, and indefinite
5254 * starvation isn't an issue).
5255 */
5256 dout(7) << "handle_file_lock got scatter request on " << *lock
5257 << " on " << *lock->get_parent() << dendl;
5258 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5259 scatter_mix(lock);
5260 } else {
5261 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5262 << " on " << *lock->get_parent() << dendl;
5263 lock->set_scatter_wanted();
5264 }
5265 break;
5266
5267 case LOCK_AC_REQUNSCATTER:
5268 if (lock->is_stable()) {
5269 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5270 * because the replica should be holding an auth_pin if they're
5271 * doing this (and thus, we are freezing, not frozen, and indefinite
5272 * starvation isn't an issue).
5273 */
5274 dout(7) << "handle_file_lock got unscatter request on " << *lock
5275 << " on " << *lock->get_parent() << dendl;
5276 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5277 simple_lock(lock); // FIXME tempsync?
5278 } else {
5279 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5280 << " on " << *lock->get_parent() << dendl;
5281 lock->set_unscatter_wanted();
5282 }
5283 break;
5284
5285 case LOCK_AC_REQRDLOCK:
5286 handle_reqrdlock(lock, m);
5287 break;
5288
5289 case LOCK_AC_NUDGE:
5290 if (!lock->get_parent()->is_auth()) {
5291 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5292 << " on " << *lock->get_parent() << dendl;
5293 } else if (!lock->get_parent()->is_replicated()) {
5294 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5295 << " on " << *lock->get_parent() << dendl;
5296 } else {
5297 dout(7) << "handle_file_lock trying nudge on " << *lock
5298 << " on " << *lock->get_parent() << dendl;
5299 scatter_nudge(lock, 0, true);
5300 mds->mdlog->flush();
5301 }
5302 break;
5303
5304 default:
5305 ceph_abort();
5306 }
5307
5308 m->put();
5309}
5310
5311
5312
5313
5314
5315