]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
update sources to v12.2.1
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "MDSRank.h"
17 #include "MDCache.h"
18 #include "Locker.h"
19 #include "CInode.h"
20 #include "CDir.h"
21 #include "CDentry.h"
22 #include "Mutation.h"
23 #include "MDSContext.h"
24
25 #include "MDLog.h"
26 #include "MDSMap.h"
27
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
30
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
40
41 #include "messages/MMDSSlaveRequest.h"
42
43 #include <errno.h>
44
45 #include "common/config.h"
46
47
48 #define dout_subsys ceph_subsys_mds
49 #undef dout_prefix
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
54 }
55
56
57 class LockerContext : public MDSInternalContextBase {
58 protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65 public:
66 explicit LockerContext(Locker *locker_) : locker(locker_) {
67 assert(locker != NULL);
68 }
69 };
70
71 class LockerLogContext : public MDSLogContextBase {
72 protected:
73 Locker *locker;
74 MDSRank *get_mds() override
75 {
76 return locker->mds;
77 }
78
79 public:
80 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81 assert(locker != NULL);
82 }
83 };
84
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message *m)
87 {
88
89 switch (m->get_type()) {
90
91 // inter-mds locking
92 case MSG_MDS_LOCK:
93 handle_lock(static_cast<MLock*>(m));
94 break;
95 // inter-mds caps
96 case MSG_MDS_INODEFILECAPS:
97 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
98 break;
99
100 // client sync
101 case CEPH_MSG_CLIENT_CAPS:
102 handle_client_caps(static_cast<MClientCaps*>(m));
103
104 break;
105 case CEPH_MSG_CLIENT_CAPRELEASE:
106 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
107 break;
108 case CEPH_MSG_CLIENT_LEASE:
109 handle_client_lease(static_cast<MClientLease*>(m));
110 break;
111
112 default:
113 derr << "locker unknown message " << m->get_type() << dendl;
114 assert(0 == "locker unknown message");
115 }
116 }
117
118 void Locker::tick()
119 {
120 scatter_tick();
121 caps_tick();
122 }
123
124 /*
125 * locks vs rejoin
126 *
127 *
128 *
129 */
130
131 void Locker::send_lock_message(SimpleLock *lock, int msg)
132 {
133 for (const auto &it : lock->get_parent()->get_replicas()) {
134 if (mds->is_cluster_degraded() &&
135 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
136 continue;
137 MLock *m = new MLock(lock, msg, mds->get_nodeid());
138 mds->send_message_mds(m, it.first);
139 }
140 }
141
142 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
143 {
144 for (const auto &it : lock->get_parent()->get_replicas()) {
145 if (mds->is_cluster_degraded() &&
146 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
147 continue;
148 MLock *m = new MLock(lock, msg, mds->get_nodeid());
149 m->set_data(data);
150 mds->send_message_mds(m, it.first);
151 }
152 }
153
154
155
156
157 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
158 {
159 // rdlock ancestor snaps
160 CInode *t = in;
161 rdlocks.insert(&in->snaplock);
162 while (t->get_projected_parent_dn()) {
163 t = t->get_projected_parent_dn()->get_dir()->get_inode();
164 rdlocks.insert(&t->snaplock);
165 }
166 }
167
168 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
169 file_layout_t **layout)
170 {
171 //rdlock ancestor snaps
172 CInode *t = in;
173 rdlocks.insert(&in->snaplock);
174 rdlocks.insert(&in->policylock);
175 bool found_layout = false;
176 while (t) {
177 rdlocks.insert(&t->snaplock);
178 if (!found_layout) {
179 rdlocks.insert(&t->policylock);
180 if (t->get_projected_inode()->has_layout()) {
181 *layout = &t->get_projected_inode()->layout;
182 found_layout = true;
183 }
184 }
185 if (t->get_projected_parent_dn() &&
186 t->get_projected_parent_dn()->get_dir())
187 t = t->get_projected_parent_dn()->get_dir()->get_inode();
188 else t = NULL;
189 }
190 }
191
192 struct MarkEventOnDestruct {
193 MDRequestRef& mdr;
194 const char* message;
195 bool mark_event;
196 MarkEventOnDestruct(MDRequestRef& _mdr,
197 const char *_message) : mdr(_mdr),
198 message(_message),
199 mark_event(true) {}
200 ~MarkEventOnDestruct() {
201 if (mark_event)
202 mdr->mark_event(message);
203 }
204 };
205
206 /* If this function returns false, the mdr has been placed
207 * on the appropriate wait list */
208 bool Locker::acquire_locks(MDRequestRef& mdr,
209 set<SimpleLock*> &rdlocks,
210 set<SimpleLock*> &wrlocks,
211 set<SimpleLock*> &xlocks,
212 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
213 CInode *auth_pin_freeze,
214 bool auth_pin_nonblock)
215 {
216 if (mdr->done_locking &&
217 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
218 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
219 return true; // at least we had better be!
220 }
221 dout(10) << "acquire_locks " << *mdr << dendl;
222
223 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
224
225 client_t client = mdr->get_client();
226
227 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
228 set<MDSCacheObject*> mustpin; // items to authpin
229
230 // xlocks
231 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
232 dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
233 sorted.insert(*p);
234 mustpin.insert((*p)->get_parent());
235
236 // augment xlock with a versionlock?
237 if ((*p)->get_type() == CEPH_LOCK_DN) {
238 CDentry *dn = (CDentry*)(*p)->get_parent();
239 if (!dn->is_auth())
240 continue;
241
242 if (xlocks.count(&dn->versionlock))
243 continue; // we're xlocking the versionlock too; don't wrlock it!
244
245 if (mdr->is_master()) {
246 // master. wrlock versionlock so we can pipeline dentry updates to journal.
247 wrlocks.insert(&dn->versionlock);
248 } else {
249 // slave. exclusively lock the dentry version (i.e. block other journal updates).
250 // this makes rollback safe.
251 xlocks.insert(&dn->versionlock);
252 sorted.insert(&dn->versionlock);
253 }
254 }
255 if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
256 // inode version lock?
257 CInode *in = (CInode*)(*p)->get_parent();
258 if (!in->is_auth())
259 continue;
260 if (mdr->is_master()) {
261 // master. wrlock versionlock so we can pipeline inode updates to journal.
262 wrlocks.insert(&in->versionlock);
263 } else {
264 // slave. exclusively lock the inode version (i.e. block other journal updates).
265 // this makes rollback safe.
266 xlocks.insert(&in->versionlock);
267 sorted.insert(&in->versionlock);
268 }
269 }
270 }
271
272 // wrlocks
273 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
274 MDSCacheObject *object = (*p)->get_parent();
275 dout(20) << " must wrlock " << **p << " " << *object << dendl;
276 sorted.insert(*p);
277 if (object->is_auth())
278 mustpin.insert(object);
279 else if (!object->is_auth() &&
280 !(*p)->can_wrlock(client) && // we might have to request a scatter
281 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
282 dout(15) << " will also auth_pin " << *object
283 << " in case we need to request a scatter" << dendl;
284 mustpin.insert(object);
285 }
286 }
287
288 // remote_wrlocks
289 if (remote_wrlocks) {
290 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
291 MDSCacheObject *object = p->first->get_parent();
292 dout(20) << " must remote_wrlock on mds." << p->second << " "
293 << *p->first << " " << *object << dendl;
294 sorted.insert(p->first);
295 mustpin.insert(object);
296 }
297 }
298
299 // rdlocks
300 for (set<SimpleLock*>::iterator p = rdlocks.begin();
301 p != rdlocks.end();
302 ++p) {
303 MDSCacheObject *object = (*p)->get_parent();
304 dout(20) << " must rdlock " << **p << " " << *object << dendl;
305 sorted.insert(*p);
306 if (object->is_auth())
307 mustpin.insert(object);
308 else if (!object->is_auth() &&
309 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
310 dout(15) << " will also auth_pin " << *object
311 << " in case we need to request a rdlock" << dendl;
312 mustpin.insert(object);
313 }
314 }
315
316
317 // AUTH PINS
318 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
319
320 // can i auth pin them all now?
321 marker.message = "failed to authpin local pins";
322 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
323 p != mustpin.end();
324 ++p) {
325 MDSCacheObject *object = *p;
326
327 dout(10) << " must authpin " << *object << dendl;
328
329 if (mdr->is_auth_pinned(object)) {
330 if (object != (MDSCacheObject*)auth_pin_freeze)
331 continue;
332 if (mdr->more()->is_remote_frozen_authpin) {
333 if (mdr->more()->rename_inode == auth_pin_freeze)
334 continue;
335 // unfreeze auth pin for the wrong inode
336 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
337 }
338 }
339
340 if (!object->is_auth()) {
341 if (!mdr->locks.empty())
342 drop_locks(mdr.get());
343 if (object->is_ambiguous_auth()) {
344 // wait
345 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
346 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
347 mdr->drop_local_auth_pins();
348 return false;
349 }
350 mustpin_remote[object->authority().first].insert(object);
351 continue;
352 }
353 if (!object->can_auth_pin()) {
354 // wait
355 drop_locks(mdr.get());
356 mdr->drop_local_auth_pins();
357 if (auth_pin_nonblock) {
358 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
359 mdr->aborted = true;
360 return false;
361 }
362 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
363 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
364
365 if (!mdr->remote_auth_pins.empty())
366 notify_freeze_waiter(object);
367
368 return false;
369 }
370 }
371
372 // ok, grab local auth pins
373 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
374 p != mustpin.end();
375 ++p) {
376 MDSCacheObject *object = *p;
377 if (mdr->is_auth_pinned(object)) {
378 dout(10) << " already auth_pinned " << *object << dendl;
379 } else if (object->is_auth()) {
380 dout(10) << " auth_pinning " << *object << dendl;
381 mdr->auth_pin(object);
382 }
383 }
384
385 // request remote auth_pins
386 if (!mustpin_remote.empty()) {
387 marker.message = "requesting remote authpins";
388 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
389 p != mdr->remote_auth_pins.end();
390 ++p) {
391 if (mustpin.count(p->first)) {
392 assert(p->second == p->first->authority().first);
393 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
394 if (q != mustpin_remote.end())
395 q->second.insert(p->first);
396 }
397 }
398 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
399 p != mustpin_remote.end();
400 ++p) {
401 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
402
403 // wait for active auth
404 if (mds->is_cluster_degraded() &&
405 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
406 dout(10) << " mds." << p->first << " is not active" << dendl;
407 if (mdr->more()->waiting_on_slave.empty())
408 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
409 return false;
410 }
411
412 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
413 MMDSSlaveRequest::OP_AUTHPIN);
414 for (set<MDSCacheObject*>::iterator q = p->second.begin();
415 q != p->second.end();
416 ++q) {
417 dout(10) << " req remote auth_pin of " << **q << dendl;
418 MDSCacheObjectInfo info;
419 (*q)->set_object_info(info);
420 req->get_authpins().push_back(info);
421 if (*q == auth_pin_freeze)
422 (*q)->set_object_info(req->get_authpin_freeze());
423 mdr->pin(*q);
424 }
425 if (auth_pin_nonblock)
426 req->mark_nonblock();
427 mds->send_message_mds(req, p->first);
428
429 // put in waiting list
430 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
431 mdr->more()->waiting_on_slave.insert(p->first);
432 }
433 return false;
434 }
435
436 // caps i'll need to issue
437 set<CInode*> issue_set;
438 bool result = false;
439
440 // acquire locks.
441 // make sure they match currently acquired locks.
442 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
443 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
444 p != sorted.end();
445 ++p) {
446 bool need_wrlock = !!wrlocks.count(*p);
447 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
448
449 // already locked?
450 if (existing != mdr->locks.end() && *existing == *p) {
451 // right kind?
452 SimpleLock *have = *existing;
453 ++existing;
454 if (xlocks.count(have) && mdr->xlocks.count(have)) {
455 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
456 continue;
457 }
458 if (mdr->remote_wrlocks.count(have)) {
459 if (!need_remote_wrlock ||
460 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
461 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
462 << " " << *have << " " << *have->get_parent() << dendl;
463 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
464 }
465 }
466 if (need_wrlock || need_remote_wrlock) {
467 if (need_wrlock == !!mdr->wrlocks.count(have) &&
468 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
469 if (need_wrlock)
470 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
471 if (need_remote_wrlock)
472 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
473 continue;
474 }
475 }
476 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
477 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
478 continue;
479 }
480 }
481
482 // hose any stray locks
483 if (existing != mdr->locks.end() && *existing == *p) {
484 assert(need_wrlock || need_remote_wrlock);
485 SimpleLock *lock = *existing;
486 if (mdr->wrlocks.count(lock)) {
487 if (!need_wrlock)
488 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
489 else if (need_remote_wrlock) // acquire remote_wrlock first
490 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
491 bool need_issue = false;
492 wrlock_finish(lock, mdr.get(), &need_issue);
493 if (need_issue)
494 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
495 }
496 ++existing;
497 }
498 while (existing != mdr->locks.end()) {
499 SimpleLock *stray = *existing;
500 ++existing;
501 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
502 bool need_issue = false;
503 if (mdr->xlocks.count(stray)) {
504 xlock_finish(stray, mdr.get(), &need_issue);
505 } else if (mdr->rdlocks.count(stray)) {
506 rdlock_finish(stray, mdr.get(), &need_issue);
507 } else {
508 // may have acquired both wrlock and remore wrlock
509 if (mdr->wrlocks.count(stray))
510 wrlock_finish(stray, mdr.get(), &need_issue);
511 if (mdr->remote_wrlocks.count(stray))
512 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
513 }
514 if (need_issue)
515 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
516 }
517
518 // lock
519 if (mdr->locking && *p != mdr->locking) {
520 cancel_locking(mdr.get(), &issue_set);
521 }
522 if (xlocks.count(*p)) {
523 marker.message = "failed to xlock, waiting";
524 if (!xlock_start(*p, mdr))
525 goto out;
526 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
527 } else if (need_wrlock || need_remote_wrlock) {
528 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
529 marker.message = "waiting for remote wrlocks";
530 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
531 goto out;
532 }
533 if (need_wrlock && !mdr->wrlocks.count(*p)) {
534 marker.message = "failed to wrlock, waiting";
535 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
536 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
537 // can't take the wrlock because the scatter lock is gathering. need to
538 // release the remote wrlock, so that the gathering process can finish.
539 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
540 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
541 goto out;
542 }
543 // nowait if we have already gotten remote wrlock
544 if (!wrlock_start(*p, mdr, need_remote_wrlock))
545 goto out;
546 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
547 }
548 } else {
549 assert(mdr->is_master());
550 if ((*p)->is_scatterlock()) {
551 ScatterLock *slock = static_cast<ScatterLock *>(*p);
552 if (slock->is_rejoin_mix()) {
553 // If there is a recovering mds who replcated an object when it failed
554 // and scatterlock in the object was in MIX state, It's possible that
555 // the recovering mds needs to take wrlock on the scatterlock when it
556 // replays unsafe requests. So this mds should delay taking rdlock on
557 // the scatterlock until the recovering mds finishes replaying unsafe.
558 // Otherwise unsafe requests may get replayed after current request.
559 //
560 // For example:
561 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
562 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
563 // delay the rmdir request until the recovering mds has replayed unlink
564 // requests.
565 if (mds->is_cluster_degraded()) {
566 if (!mdr->is_replay()) {
567 drop_locks(mdr.get());
568 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
569 dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
570 << ", waiting for cluster recovered" << dendl;
571 marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
572 return false;
573 }
574 } else {
575 slock->clear_rejoin_mix();
576 }
577 }
578 }
579
580 marker.message = "failed to rdlock, waiting";
581 if (!rdlock_start(*p, mdr))
582 goto out;
583 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
584 }
585 }
586
587 // any extra unneeded locks?
588 while (existing != mdr->locks.end()) {
589 SimpleLock *stray = *existing;
590 ++existing;
591 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
592 bool need_issue = false;
593 if (mdr->xlocks.count(stray)) {
594 xlock_finish(stray, mdr.get(), &need_issue);
595 } else if (mdr->rdlocks.count(stray)) {
596 rdlock_finish(stray, mdr.get(), &need_issue);
597 } else {
598 // may have acquired both wrlock and remore wrlock
599 if (mdr->wrlocks.count(stray))
600 wrlock_finish(stray, mdr.get(), &need_issue);
601 if (mdr->remote_wrlocks.count(stray))
602 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
603 }
604 if (need_issue)
605 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
606 }
607
608 mdr->done_locking = true;
609 mdr->set_mds_stamp(ceph_clock_now());
610 result = true;
611 marker.message = "acquired locks";
612
613 out:
614 issue_caps_set(issue_set);
615 return result;
616 }
617
618 void Locker::notify_freeze_waiter(MDSCacheObject *o)
619 {
620 CDir *dir = NULL;
621 if (CInode *in = dynamic_cast<CInode*>(o)) {
622 if (!in->is_root())
623 dir = in->get_parent_dir();
624 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
625 dir = dn->get_dir();
626 } else {
627 dir = dynamic_cast<CDir*>(o);
628 assert(dir);
629 }
630 if (dir) {
631 if (dir->is_freezing_dir())
632 mdcache->fragment_freeze_inc_num_waiters(dir);
633 if (dir->is_freezing_tree()) {
634 while (!dir->is_freezing_tree_root())
635 dir = dir->get_parent_dir();
636 mdcache->migrator->export_freeze_inc_num_waiters(dir);
637 }
638 }
639 }
640
641 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
642 {
643 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
644 p != mut->xlocks.end();
645 ++p) {
646 MDSCacheObject *object = (*p)->get_parent();
647 assert(object->is_auth());
648 if (skip_dentry &&
649 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
650 continue;
651 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
652 (*p)->set_xlock_done();
653 }
654 }
655
656 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
657 {
658 while (!mut->rdlocks.empty()) {
659 bool ni = false;
660 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
661 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
662 if (ni)
663 pneed_issue->insert(static_cast<CInode*>(p));
664 }
665 }
666
667 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
668 {
669 set<mds_rank_t> slaves;
670
671 while (!mut->xlocks.empty()) {
672 SimpleLock *lock = *mut->xlocks.begin();
673 MDSCacheObject *p = lock->get_parent();
674 if (!p->is_auth()) {
675 assert(lock->get_sm()->can_remote_xlock);
676 slaves.insert(p->authority().first);
677 lock->put_xlock();
678 mut->locks.erase(lock);
679 mut->xlocks.erase(lock);
680 continue;
681 }
682 bool ni = false;
683 xlock_finish(lock, mut, &ni);
684 if (ni)
685 pneed_issue->insert(static_cast<CInode*>(p));
686 }
687
688 while (!mut->remote_wrlocks.empty()) {
689 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
690 slaves.insert(p->second);
691 if (mut->wrlocks.count(p->first) == 0)
692 mut->locks.erase(p->first);
693 mut->remote_wrlocks.erase(p);
694 }
695
696 while (!mut->wrlocks.empty()) {
697 bool ni = false;
698 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
699 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
700 if (ni)
701 pneed_issue->insert(static_cast<CInode*>(p));
702 }
703
704 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
705 if (!mds->is_cluster_degraded() ||
706 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
707 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
708 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
709 MMDSSlaveRequest::OP_DROPLOCKS);
710 mds->send_message_mds(slavereq, *p);
711 }
712 }
713 }
714
715 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
716 {
717 SimpleLock *lock = mut->locking;
718 assert(lock);
719 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
720
721 if (lock->get_parent()->is_auth()) {
722 bool need_issue = false;
723 if (lock->get_state() == LOCK_PREXLOCK) {
724 _finish_xlock(lock, -1, &need_issue);
725 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
726 lock->get_num_xlocks() == 0) {
727 lock->set_state(LOCK_XLOCKDONE);
728 eval_gather(lock, true, &need_issue);
729 }
730 if (need_issue)
731 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
732 }
733 mut->finish_locking(lock);
734 }
735
736 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
737 {
738 // leftover locks
739 set<CInode*> my_need_issue;
740 if (!pneed_issue)
741 pneed_issue = &my_need_issue;
742
743 if (mut->locking)
744 cancel_locking(mut, pneed_issue);
745 _drop_non_rdlocks(mut, pneed_issue);
746 _drop_rdlocks(mut, pneed_issue);
747
748 if (pneed_issue == &my_need_issue)
749 issue_caps_set(*pneed_issue);
750 mut->done_locking = false;
751 }
752
753 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
754 {
755 set<CInode*> my_need_issue;
756 if (!pneed_issue)
757 pneed_issue = &my_need_issue;
758
759 _drop_non_rdlocks(mut, pneed_issue);
760
761 if (pneed_issue == &my_need_issue)
762 issue_caps_set(*pneed_issue);
763 }
764
765 void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
766 {
767 set<CInode*> my_need_issue;
768 if (!pneed_issue)
769 pneed_issue = &my_need_issue;
770
771 _drop_rdlocks(mut, pneed_issue);
772
773 if (pneed_issue == &my_need_issue)
774 issue_caps_set(*pneed_issue);
775 }
776
777
778 // generics
779
780 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
781 {
782 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
783 assert(!lock->is_stable());
784
785 int next = lock->get_next_state();
786
787 CInode *in = 0;
788 bool caps = lock->get_cap_shift();
789 if (lock->get_type() != CEPH_LOCK_DN)
790 in = static_cast<CInode *>(lock->get_parent());
791
792 bool need_issue = false;
793
794 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
795 assert(!caps || in != NULL);
796 if (caps && in->is_head()) {
797 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
798 lock->get_cap_shift(), lock->get_cap_mask());
799 dout(10) << " next state is " << lock->get_state_name(next)
800 << " issued/allows loner " << gcap_string(loner_issued)
801 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
802 << " xlocker " << gcap_string(xlocker_issued)
803 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
804 << " other " << gcap_string(other_issued)
805 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
806 << dendl;
807
808 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
809 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
810 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
811 need_issue = true;
812 }
813
814 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
815 bool auth = lock->get_parent()->is_auth();
816 if (!lock->is_gathering() &&
817 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
818 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
819 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
820 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
821 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
822 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
823 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
824 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
825 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
826 lock->get_state() != LOCK_MIX_SYNC2
827 ) {
828 dout(7) << "eval_gather finished gather on " << *lock
829 << " on " << *lock->get_parent() << dendl;
830
831 if (lock->get_sm() == &sm_filelock) {
832 assert(in);
833 if (in->state_test(CInode::STATE_RECOVERING)) {
834 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
835 return;
836 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
837 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
838 mds->mdcache->queue_file_recover(in);
839 mds->mdcache->do_file_recover();
840 return;
841 }
842 }
843
844 if (!lock->get_parent()->is_auth()) {
845 // replica: tell auth
846 mds_rank_t auth = lock->get_parent()->authority().first;
847
848 if (lock->get_parent()->is_rejoining() &&
849 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
850 dout(7) << "eval_gather finished gather, but still rejoining "
851 << *lock->get_parent() << dendl;
852 return;
853 }
854
855 if (!mds->is_cluster_degraded() ||
856 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
857 switch (lock->get_state()) {
858 case LOCK_SYNC_LOCK:
859 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
860 auth);
861 break;
862
863 case LOCK_MIX_SYNC:
864 {
865 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
866 lock->encode_locked_state(reply->get_data());
867 mds->send_message_mds(reply, auth);
868 next = LOCK_MIX_SYNC2;
869 (static_cast<ScatterLock *>(lock))->start_flush();
870 }
871 break;
872
873 case LOCK_MIX_SYNC2:
874 (static_cast<ScatterLock *>(lock))->finish_flush();
875 (static_cast<ScatterLock *>(lock))->clear_flushed();
876
877 case LOCK_SYNC_MIX2:
878 // do nothing, we already acked
879 break;
880
881 case LOCK_SYNC_MIX:
882 {
883 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
884 mds->send_message_mds(reply, auth);
885 next = LOCK_SYNC_MIX2;
886 }
887 break;
888
889 case LOCK_MIX_LOCK:
890 {
891 bufferlist data;
892 lock->encode_locked_state(data);
893 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
894 (static_cast<ScatterLock *>(lock))->start_flush();
895 // we'll get an AC_LOCKFLUSHED to complete
896 }
897 break;
898
899 default:
900 ceph_abort();
901 }
902 }
903 } else {
904 // auth
905
906 // once the first (local) stage of mix->lock gather complete we can
907 // gather from replicas
908 if (lock->get_state() == LOCK_MIX_LOCK &&
909 lock->get_parent()->is_replicated()) {
910 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
911 send_lock_message(lock, LOCK_AC_LOCK);
912 lock->init_gather();
913 lock->set_state(LOCK_MIX_LOCK2);
914 return;
915 }
916
917 if (lock->is_dirty() && !lock->is_flushed()) {
918 scatter_writebehind(static_cast<ScatterLock *>(lock));
919 mds->mdlog->flush();
920 return;
921 }
922 lock->clear_flushed();
923
924 switch (lock->get_state()) {
925 // to mixed
926 case LOCK_TSYN_MIX:
927 case LOCK_SYNC_MIX:
928 case LOCK_EXCL_MIX:
929 in->start_scatter(static_cast<ScatterLock *>(lock));
930 if (lock->get_parent()->is_replicated()) {
931 bufferlist softdata;
932 lock->encode_locked_state(softdata);
933 send_lock_message(lock, LOCK_AC_MIX, softdata);
934 }
935 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
936 break;
937
938 case LOCK_XLOCK:
939 case LOCK_XLOCKDONE:
940 if (next != LOCK_SYNC)
941 break;
942 // fall-thru
943
944 // to sync
945 case LOCK_EXCL_SYNC:
946 case LOCK_LOCK_SYNC:
947 case LOCK_MIX_SYNC:
948 case LOCK_XSYN_SYNC:
949 if (lock->get_parent()->is_replicated()) {
950 bufferlist softdata;
951 lock->encode_locked_state(softdata);
952 send_lock_message(lock, LOCK_AC_SYNC, softdata);
953 }
954 break;
955 }
956
957 }
958
959 lock->set_state(next);
960
961 if (lock->get_parent()->is_auth() &&
962 lock->is_stable())
963 lock->get_parent()->auth_unpin(lock);
964
965 // drop loner before doing waiters
966 if (caps &&
967 in->is_head() &&
968 in->is_auth() &&
969 in->get_wanted_loner() != in->get_loner()) {
970 dout(10) << " trying to drop loner" << dendl;
971 if (in->try_drop_loner()) {
972 dout(10) << " dropped loner" << dendl;
973 need_issue = true;
974 }
975 }
976
977 if (pfinishers)
978 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
979 *pfinishers);
980 else
981 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
982
983 if (caps && in->is_head())
984 need_issue = true;
985
986 if (lock->get_parent()->is_auth() &&
987 lock->is_stable())
988 try_eval(lock, &need_issue);
989 }
990
991 if (need_issue) {
992 if (pneed_issue)
993 *pneed_issue = true;
994 else if (in->is_head())
995 issue_caps(in);
996 }
997
998 }
999
1000 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1001 {
1002 bool need_issue = caps_imported;
1003 list<MDSInternalContextBase*> finishers;
1004
1005 dout(10) << "eval " << mask << " " << *in << dendl;
1006
1007 // choose loner?
1008 if (in->is_auth() && in->is_head()) {
1009 if (in->choose_ideal_loner() >= 0) {
1010 if (in->try_set_loner()) {
1011 dout(10) << "eval set loner to client." << in->get_loner() << dendl;
1012 need_issue = true;
1013 mask = -1;
1014 } else
1015 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1016 } else
1017 dout(10) << "eval doesn't want loner" << dendl;
1018 }
1019
1020 retry:
1021 if (mask & CEPH_LOCK_IFILE)
1022 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1023 if (mask & CEPH_LOCK_IAUTH)
1024 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1025 if (mask & CEPH_LOCK_ILINK)
1026 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1027 if (mask & CEPH_LOCK_IXATTR)
1028 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1029 if (mask & CEPH_LOCK_INEST)
1030 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1031 if (mask & CEPH_LOCK_IFLOCK)
1032 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1033 if (mask & CEPH_LOCK_IPOLICY)
1034 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1035
1036 // drop loner?
1037 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1038 dout(10) << " trying to drop loner" << dendl;
1039 if (in->try_drop_loner()) {
1040 dout(10) << " dropped loner" << dendl;
1041 need_issue = true;
1042
1043 if (in->get_wanted_loner() >= 0) {
1044 if (in->try_set_loner()) {
1045 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1046 mask = -1;
1047 goto retry;
1048 } else {
1049 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1050 }
1051 }
1052 }
1053 }
1054
1055 finish_contexts(g_ceph_context, finishers);
1056
1057 if (need_issue && in->is_head())
1058 issue_caps(in);
1059
1060 dout(10) << "eval done" << dendl;
1061 return need_issue;
1062 }
1063
1064 class C_Locker_Eval : public LockerContext {
1065 MDSCacheObject *p;
1066 int mask;
1067 public:
1068 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1069 // We are used as an MDSCacheObject waiter, so should
1070 // only be invoked by someone already holding the big lock.
1071 assert(locker->mds->mds_lock.is_locked_by_me());
1072 p->get(MDSCacheObject::PIN_PTRWAITER);
1073 }
1074 void finish(int r) override {
1075 locker->try_eval(p, mask);
1076 p->put(MDSCacheObject::PIN_PTRWAITER);
1077 }
1078 };
1079
1080 void Locker::try_eval(MDSCacheObject *p, int mask)
1081 {
1082 // unstable and ambiguous auth?
1083 if (p->is_ambiguous_auth()) {
1084 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1085 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1086 return;
1087 }
1088
1089 if (p->is_auth() && p->is_frozen()) {
1090 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1091 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1092 return;
1093 }
1094
1095 if (mask & CEPH_LOCK_DN) {
1096 assert(mask == CEPH_LOCK_DN);
1097 bool need_issue = false; // ignore this, no caps on dentries
1098 CDentry *dn = static_cast<CDentry *>(p);
1099 eval_any(&dn->lock, &need_issue);
1100 } else {
1101 CInode *in = static_cast<CInode *>(p);
1102 eval(in, mask);
1103 }
1104 }
1105
1106 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1107 {
1108 MDSCacheObject *p = lock->get_parent();
1109
1110 // unstable and ambiguous auth?
1111 if (p->is_ambiguous_auth()) {
1112 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1113 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1114 return;
1115 }
1116
1117 if (!p->is_auth()) {
1118 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1119 return;
1120 }
1121
1122 if (p->is_frozen()) {
1123 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1124 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1125 return;
1126 }
1127
1128 /*
1129 * We could have a situation like:
1130 *
1131 * - mds A authpins item on mds B
1132 * - mds B starts to freeze tree containing item
1133 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1134 * - mds B lock is unstable, sets scatter_wanted
1135 * - mds B lock stabilizes, calls try_eval.
1136 *
1137 * We can defer while freezing without causing a deadlock. Honor
1138 * scatter_wanted flag here. This will never get deferred by the
1139 * checks above due to the auth_pin held by the master.
1140 */
1141 if (lock->is_scatterlock()) {
1142 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1143 if (slock->get_scatter_wanted() &&
1144 slock->get_state() != LOCK_MIX) {
1145 scatter_mix(slock, pneed_issue);
1146 if (!lock->is_stable())
1147 return;
1148 } else if (slock->get_unscatter_wanted() &&
1149 slock->get_state() != LOCK_LOCK) {
1150 simple_lock(slock, pneed_issue);
1151 if (!lock->is_stable()) {
1152 return;
1153 }
1154 }
1155 }
1156
1157 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1158 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1159 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1160 return;
1161 }
1162
1163 eval(lock, pneed_issue);
1164 }
1165
1166 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1167 {
1168 bool need_issue = false;
1169 list<MDSInternalContextBase*> finishers;
1170
1171 // kick locks now
1172 if (!in->filelock.is_stable())
1173 eval_gather(&in->filelock, false, &need_issue, &finishers);
1174 if (!in->authlock.is_stable())
1175 eval_gather(&in->authlock, false, &need_issue, &finishers);
1176 if (!in->linklock.is_stable())
1177 eval_gather(&in->linklock, false, &need_issue, &finishers);
1178 if (!in->xattrlock.is_stable())
1179 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1180
1181 if (need_issue && in->is_head()) {
1182 if (issue_set)
1183 issue_set->insert(in);
1184 else
1185 issue_caps(in);
1186 }
1187
1188 finish_contexts(g_ceph_context, finishers);
1189 }
1190
1191 void Locker::eval_scatter_gathers(CInode *in)
1192 {
1193 bool need_issue = false;
1194 list<MDSInternalContextBase*> finishers;
1195
1196 dout(10) << "eval_scatter_gathers " << *in << dendl;
1197
1198 // kick locks now
1199 if (!in->filelock.is_stable())
1200 eval_gather(&in->filelock, false, &need_issue, &finishers);
1201 if (!in->nestlock.is_stable())
1202 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1203 if (!in->dirfragtreelock.is_stable())
1204 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1205
1206 if (need_issue && in->is_head())
1207 issue_caps(in);
1208
1209 finish_contexts(g_ceph_context, finishers);
1210 }
1211
1212 void Locker::eval(SimpleLock *lock, bool *need_issue)
1213 {
1214 switch (lock->get_type()) {
1215 case CEPH_LOCK_IFILE:
1216 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1217 case CEPH_LOCK_IDFT:
1218 case CEPH_LOCK_INEST:
1219 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1220 default:
1221 return simple_eval(lock, need_issue);
1222 }
1223 }
1224
1225
1226 // ------------------
1227 // rdlock
1228
1229 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1230 {
1231 // kick the lock
1232 if (lock->is_stable()) {
1233 if (lock->get_parent()->is_auth()) {
1234 if (lock->get_sm() == &sm_scatterlock) {
1235 // not until tempsync is fully implemented
1236 //if (lock->get_parent()->is_replicated())
1237 //scatter_tempsync((ScatterLock*)lock);
1238 //else
1239 simple_sync(lock);
1240 } else if (lock->get_sm() == &sm_filelock) {
1241 CInode *in = static_cast<CInode*>(lock->get_parent());
1242 if (lock->get_state() == LOCK_EXCL &&
1243 in->get_target_loner() >= 0 &&
1244 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1245 file_xsyn(lock);
1246 else
1247 simple_sync(lock);
1248 } else
1249 simple_sync(lock);
1250 return true;
1251 } else {
1252 // request rdlock state change from auth
1253 mds_rank_t auth = lock->get_parent()->authority().first;
1254 if (!mds->is_cluster_degraded() ||
1255 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1256 dout(10) << "requesting rdlock from auth on "
1257 << *lock << " on " << *lock->get_parent() << dendl;
1258 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1259 }
1260 return false;
1261 }
1262 }
1263 if (lock->get_type() == CEPH_LOCK_IFILE) {
1264 CInode *in = static_cast<CInode *>(lock->get_parent());
1265 if (in->state_test(CInode::STATE_RECOVERING)) {
1266 mds->mdcache->recovery_queue.prioritize(in);
1267 }
1268 }
1269
1270 return false;
1271 }
1272
1273 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1274 {
1275 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1276
1277 // can read? grab ref.
1278 if (lock->can_rdlock(client))
1279 return true;
1280
1281 _rdlock_kick(lock, false);
1282
1283 if (lock->can_rdlock(client))
1284 return true;
1285
1286 // wait!
1287 if (con) {
1288 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1289 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1290 }
1291 return false;
1292 }
1293
1294 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1295 {
1296 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1297
1298 // client may be allowed to rdlock the same item it has xlocked.
1299 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1300 if (mut->snapid != CEPH_NOSNAP)
1301 as_anon = true;
1302 client_t client = as_anon ? -1 : mut->get_client();
1303
1304 CInode *in = 0;
1305 if (lock->get_type() != CEPH_LOCK_DN)
1306 in = static_cast<CInode *>(lock->get_parent());
1307
1308 /*
1309 if (!lock->get_parent()->is_auth() &&
1310 lock->fw_rdlock_to_auth()) {
1311 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1312 return false;
1313 }
1314 */
1315
1316 while (1) {
1317 // can read? grab ref.
1318 if (lock->can_rdlock(client)) {
1319 lock->get_rdlock();
1320 mut->rdlocks.insert(lock);
1321 mut->locks.insert(lock);
1322 return true;
1323 }
1324
1325 // hmm, wait a second.
1326 if (in && !in->is_head() && in->is_auth() &&
1327 lock->get_state() == LOCK_SNAP_SYNC) {
1328 // okay, we actually need to kick the head's lock to get ourselves synced up.
1329 CInode *head = mdcache->get_inode(in->ino());
1330 assert(head);
1331 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1332 if (hlock->get_state() == LOCK_SYNC)
1333 hlock = head->get_lock(lock->get_type());
1334
1335 if (hlock->get_state() != LOCK_SYNC) {
1336 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1337 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1338 return false;
1339 // oh, check our lock again then
1340 }
1341 }
1342
1343 if (!_rdlock_kick(lock, as_anon))
1344 break;
1345 }
1346
1347 // wait!
1348 int wait_on;
1349 if (lock->get_parent()->is_auth() && lock->is_stable())
1350 wait_on = SimpleLock::WAIT_RD;
1351 else
1352 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1353 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1354 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1355 nudge_log(lock);
1356 return false;
1357 }
1358
1359 void Locker::nudge_log(SimpleLock *lock)
1360 {
1361 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1362 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1363 mds->mdlog->flush();
1364 }
1365
1366 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1367 {
1368 // drop ref
1369 lock->put_rdlock();
1370 if (mut) {
1371 mut->rdlocks.erase(lock);
1372 mut->locks.erase(lock);
1373 }
1374
1375 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1376
1377 // last one?
1378 if (!lock->is_rdlocked()) {
1379 if (!lock->is_stable())
1380 eval_gather(lock, false, pneed_issue);
1381 else if (lock->get_parent()->is_auth())
1382 try_eval(lock, pneed_issue);
1383 }
1384 }
1385
1386
1387 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1388 {
1389 dout(10) << "can_rdlock_set " << locks << dendl;
1390 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1391 if (!(*p)->can_rdlock(-1)) {
1392 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1393 return false;
1394 }
1395 return true;
1396 }
1397
1398 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1399 {
1400 dout(10) << "rdlock_try_set " << locks << dendl;
1401 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1402 if (!rdlock_try(*p, -1, NULL)) {
1403 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1404 return false;
1405 }
1406 return true;
1407 }
1408
1409 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1410 {
1411 dout(10) << "rdlock_take_set " << locks << dendl;
1412 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1413 (*p)->get_rdlock();
1414 mut->rdlocks.insert(*p);
1415 mut->locks.insert(*p);
1416 }
1417 }
1418
1419 // ------------------
1420 // wrlock
1421
1422 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1423 {
1424 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1425 lock->get_type() == CEPH_LOCK_DVERSION)
1426 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1427
1428 dout(7) << "wrlock_force on " << *lock
1429 << " on " << *lock->get_parent() << dendl;
1430 lock->get_wrlock(true);
1431 mut->wrlocks.insert(lock);
1432 mut->locks.insert(lock);
1433 }
1434
1435 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1436 {
1437 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1438 lock->get_type() == CEPH_LOCK_DVERSION)
1439 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1440
1441 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1442
1443 CInode *in = static_cast<CInode *>(lock->get_parent());
1444 client_t client = mut->get_client();
1445 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1446 (in->has_subtree_or_exporting_dirfrag() ||
1447 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1448
1449 while (1) {
1450 // wrlock?
1451 if (lock->can_wrlock(client) &&
1452 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1453 lock->get_wrlock();
1454 mut->wrlocks.insert(lock);
1455 mut->locks.insert(lock);
1456 return true;
1457 }
1458
1459 if (lock->get_type() == CEPH_LOCK_IFILE &&
1460 in->state_test(CInode::STATE_RECOVERING)) {
1461 mds->mdcache->recovery_queue.prioritize(in);
1462 }
1463
1464 if (!lock->is_stable())
1465 break;
1466
1467 if (in->is_auth()) {
1468 // don't do nested lock state change if we have dirty scatterdata and
1469 // may scatter_writebehind or start_scatter, because nowait==true implies
1470 // that the caller already has a log entry open!
1471 if (nowait && lock->is_dirty())
1472 return false;
1473
1474 if (want_scatter)
1475 scatter_mix(static_cast<ScatterLock*>(lock));
1476 else
1477 simple_lock(lock);
1478
1479 if (nowait && !lock->can_wrlock(client))
1480 return false;
1481
1482 } else {
1483 // replica.
1484 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1485 mds_rank_t auth = lock->get_parent()->authority().first;
1486 if (!mds->is_cluster_degraded() ||
1487 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1488 dout(10) << "requesting scatter from auth on "
1489 << *lock << " on " << *lock->get_parent() << dendl;
1490 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1491 }
1492 break;
1493 }
1494 }
1495
1496 if (!nowait) {
1497 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1498 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1499 nudge_log(lock);
1500 }
1501
1502 return false;
1503 }
1504
1505 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1506 {
1507 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1508 lock->get_type() == CEPH_LOCK_DVERSION)
1509 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1510
1511 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1512 lock->put_wrlock();
1513 if (mut) {
1514 mut->wrlocks.erase(lock);
1515 if (mut->remote_wrlocks.count(lock) == 0)
1516 mut->locks.erase(lock);
1517 }
1518
1519 if (!lock->is_wrlocked()) {
1520 if (!lock->is_stable())
1521 eval_gather(lock, false, pneed_issue);
1522 else if (lock->get_parent()->is_auth())
1523 try_eval(lock, pneed_issue);
1524 }
1525 }
1526
1527
1528 // remote wrlock
1529
1530 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1531 {
1532 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1533
1534 // wait for active target
1535 if (mds->is_cluster_degraded() &&
1536 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1537 dout(7) << " mds." << target << " is not active" << dendl;
1538 if (mut->more()->waiting_on_slave.empty())
1539 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1540 return;
1541 }
1542
1543 // send lock request
1544 mut->start_locking(lock, target);
1545 mut->more()->slaves.insert(target);
1546 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1547 MMDSSlaveRequest::OP_WRLOCK);
1548 r->set_lock_type(lock->get_type());
1549 lock->get_parent()->set_object_info(r->get_object_info());
1550 mds->send_message_mds(r, target);
1551
1552 assert(mut->more()->waiting_on_slave.count(target) == 0);
1553 mut->more()->waiting_on_slave.insert(target);
1554 }
1555
1556 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1557 MutationImpl *mut)
1558 {
1559 // drop ref
1560 mut->remote_wrlocks.erase(lock);
1561 if (mut->wrlocks.count(lock) == 0)
1562 mut->locks.erase(lock);
1563
1564 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1565 << " " << *lock->get_parent() << dendl;
1566 if (!mds->is_cluster_degraded() ||
1567 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1568 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1569 MMDSSlaveRequest::OP_UNWRLOCK);
1570 slavereq->set_lock_type(lock->get_type());
1571 lock->get_parent()->set_object_info(slavereq->get_object_info());
1572 mds->send_message_mds(slavereq, target);
1573 }
1574 }
1575
1576
1577 // ------------------
1578 // xlock
1579
1580 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1581 {
1582 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1583 lock->get_type() == CEPH_LOCK_DVERSION)
1584 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1585
1586 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1587 client_t client = mut->get_client();
1588
1589 // auth?
1590 if (lock->get_parent()->is_auth()) {
1591 // auth
1592 while (1) {
1593 if (lock->can_xlock(client)) {
1594 lock->set_state(LOCK_XLOCK);
1595 lock->get_xlock(mut, client);
1596 mut->xlocks.insert(lock);
1597 mut->locks.insert(lock);
1598 mut->finish_locking(lock);
1599 return true;
1600 }
1601
1602 if (lock->get_type() == CEPH_LOCK_IFILE) {
1603 CInode *in = static_cast<CInode*>(lock->get_parent());
1604 if (in->state_test(CInode::STATE_RECOVERING)) {
1605 mds->mdcache->recovery_queue.prioritize(in);
1606 }
1607 }
1608
1609 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1610 lock->get_xlock_by_client() != client ||
1611 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1612 break;
1613
1614 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1615 mut->start_locking(lock);
1616 simple_xlock(lock);
1617 } else {
1618 simple_lock(lock);
1619 }
1620 }
1621
1622 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1623 nudge_log(lock);
1624 return false;
1625 } else {
1626 // replica
1627 assert(lock->get_sm()->can_remote_xlock);
1628 assert(!mut->slave_request);
1629
1630 // wait for single auth
1631 if (lock->get_parent()->is_ambiguous_auth()) {
1632 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1633 new C_MDS_RetryRequest(mdcache, mut));
1634 return false;
1635 }
1636
1637 // wait for active auth
1638 mds_rank_t auth = lock->get_parent()->authority().first;
1639 if (mds->is_cluster_degraded() &&
1640 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1641 dout(7) << " mds." << auth << " is not active" << dendl;
1642 if (mut->more()->waiting_on_slave.empty())
1643 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1644 return false;
1645 }
1646
1647 // send lock request
1648 mut->more()->slaves.insert(auth);
1649 mut->start_locking(lock, auth);
1650 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1651 MMDSSlaveRequest::OP_XLOCK);
1652 r->set_lock_type(lock->get_type());
1653 lock->get_parent()->set_object_info(r->get_object_info());
1654 mds->send_message_mds(r, auth);
1655
1656 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1657 mut->more()->waiting_on_slave.insert(auth);
1658
1659 return false;
1660 }
1661 }
1662
1663 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1664 {
1665 assert(!lock->is_stable());
1666 if (lock->get_num_rdlocks() == 0 &&
1667 lock->get_num_wrlocks() == 0 &&
1668 lock->get_num_client_lease() == 0 &&
1669 lock->get_state() != LOCK_XLOCKSNAP &&
1670 lock->get_type() != CEPH_LOCK_DN) {
1671 CInode *in = static_cast<CInode*>(lock->get_parent());
1672 client_t loner = in->get_target_loner();
1673 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1674 lock->set_state(LOCK_EXCL);
1675 lock->get_parent()->auth_unpin(lock);
1676 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1677 if (lock->get_cap_shift())
1678 *pneed_issue = true;
1679 if (lock->get_parent()->is_auth() &&
1680 lock->is_stable())
1681 try_eval(lock, pneed_issue);
1682 return;
1683 }
1684 }
1685 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1686 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1687 }
1688
1689 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1690 {
1691 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1692 lock->get_type() == CEPH_LOCK_DVERSION)
1693 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1694
1695 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1696
1697 client_t xlocker = lock->get_xlock_by_client();
1698
1699 // drop ref
1700 lock->put_xlock();
1701 assert(mut);
1702 mut->xlocks.erase(lock);
1703 mut->locks.erase(lock);
1704
1705 bool do_issue = false;
1706
1707 // remote xlock?
1708 if (!lock->get_parent()->is_auth()) {
1709 assert(lock->get_sm()->can_remote_xlock);
1710
1711 // tell auth
1712 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1713 mds_rank_t auth = lock->get_parent()->authority().first;
1714 if (!mds->is_cluster_degraded() ||
1715 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1716 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1717 MMDSSlaveRequest::OP_UNXLOCK);
1718 slavereq->set_lock_type(lock->get_type());
1719 lock->get_parent()->set_object_info(slavereq->get_object_info());
1720 mds->send_message_mds(slavereq, auth);
1721 }
1722 // others waiting?
1723 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1724 SimpleLock::WAIT_WR |
1725 SimpleLock::WAIT_RD, 0);
1726 } else {
1727 if (lock->get_num_xlocks() == 0) {
1728 if (lock->get_state() == LOCK_LOCK_XLOCK)
1729 lock->set_state(LOCK_XLOCKDONE);
1730 _finish_xlock(lock, xlocker, &do_issue);
1731 }
1732 }
1733
1734 if (do_issue) {
1735 CInode *in = static_cast<CInode*>(lock->get_parent());
1736 if (in->is_head()) {
1737 if (pneed_issue)
1738 *pneed_issue = true;
1739 else
1740 issue_caps(in);
1741 }
1742 }
1743 }
1744
1745 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1746 {
1747 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1748
1749 lock->put_xlock();
1750 mut->xlocks.erase(lock);
1751 mut->locks.erase(lock);
1752
1753 MDSCacheObject *p = lock->get_parent();
1754 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1755
1756 if (!lock->is_stable())
1757 lock->get_parent()->auth_unpin(lock);
1758
1759 lock->set_state(LOCK_LOCK);
1760 }
1761
1762 void Locker::xlock_import(SimpleLock *lock)
1763 {
1764 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1765 lock->get_parent()->auth_pin(lock);
1766 }
1767
1768
1769
1770 // file i/o -----------------------------------------
1771
1772 version_t Locker::issue_file_data_version(CInode *in)
1773 {
1774 dout(7) << "issue_file_data_version on " << *in << dendl;
1775 return in->inode.file_data_version;
1776 }
1777
1778 class C_Locker_FileUpdate_finish : public LockerLogContext {
1779 CInode *in;
1780 MutationRef mut;
1781 bool share_max;
1782 bool need_issue;
1783 client_t client;
1784 MClientCaps *ack;
1785 public:
1786 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1787 bool sm=false, bool ni=false, client_t c=-1,
1788 MClientCaps *ac = 0)
1789 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1790 client(c), ack(ac) {
1791 in->get(CInode::PIN_PTRWAITER);
1792 }
1793 void finish(int r) override {
1794 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1795 in->put(CInode::PIN_PTRWAITER);
1796 }
1797 };
1798
1799 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1800 client_t client, MClientCaps *ack)
1801 {
1802 dout(10) << "file_update_finish on " << *in << dendl;
1803 in->pop_and_dirty_projected_inode(mut->ls);
1804
1805 mut->apply();
1806
1807 if (ack) {
1808 Session *session = mds->get_session(client);
1809 if (session) {
1810 // "oldest flush tid" > 0 means client uses unique TID for each flush
1811 if (ack->get_oldest_flush_tid() > 0)
1812 session->add_completed_flush(ack->get_client_tid());
1813 mds->send_message_client_counted(ack, session);
1814 } else {
1815 dout(10) << " no session for client." << client << " " << *ack << dendl;
1816 ack->put();
1817 }
1818 }
1819
1820 set<CInode*> need_issue;
1821 drop_locks(mut.get(), &need_issue);
1822
1823 if (!in->is_head() && !in->client_snap_caps.empty()) {
1824 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1825 // check for snap writeback completion
1826 bool gather = false;
1827 compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1828 while (p != in->client_snap_caps.end()) {
1829 SimpleLock *lock = in->get_lock(p->first);
1830 assert(lock);
1831 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1832 << " lock " << *lock << " on " << *in << dendl;
1833 lock->put_wrlock();
1834
1835 p->second.erase(client);
1836 if (p->second.empty()) {
1837 gather = true;
1838 in->client_snap_caps.erase(p++);
1839 } else
1840 ++p;
1841 }
1842 if (gather) {
1843 if (in->client_snap_caps.empty())
1844 in->item_open_file.remove_myself();
1845 eval_cap_gather(in, &need_issue);
1846 }
1847 } else {
1848 if (issue_client_cap && need_issue.count(in) == 0) {
1849 Capability *cap = in->get_client_cap(client);
1850 if (cap && (cap->wanted() & ~cap->pending()))
1851 issue_caps(in, cap);
1852 }
1853
1854 if (share_max && in->is_auth() &&
1855 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1856 share_inode_max_size(in);
1857 }
1858 issue_caps_set(need_issue);
1859
1860 // auth unpin after issuing caps
1861 mut->cleanup();
1862 }
1863
1864 Capability* Locker::issue_new_caps(CInode *in,
1865 int mode,
1866 Session *session,
1867 SnapRealm *realm,
1868 bool is_replay)
1869 {
1870 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1871 bool is_new;
1872
1873 // if replay, try to reconnect cap, and otherwise do nothing.
1874 if (is_replay) {
1875 mds->mdcache->try_reconnect_cap(in, session);
1876 return 0;
1877 }
1878
1879 // my needs
1880 assert(session->info.inst.name.is_client());
1881 client_t my_client = session->info.inst.name.num();
1882 int my_want = ceph_caps_for_mode(mode);
1883
1884 // register a capability
1885 Capability *cap = in->get_client_cap(my_client);
1886 if (!cap) {
1887 // new cap
1888 cap = in->add_client_cap(my_client, session, realm);
1889 cap->set_wanted(my_want);
1890 cap->mark_new();
1891 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1892 is_new = true;
1893 } else {
1894 is_new = false;
1895 // make sure it wants sufficient caps
1896 if (my_want & ~cap->wanted()) {
1897 // augment wanted caps for this client
1898 cap->set_wanted(cap->wanted() | my_want);
1899 }
1900 }
1901
1902 if (in->is_auth()) {
1903 // [auth] twiddle mode?
1904 eval(in, CEPH_CAP_LOCKS);
1905
1906 if (_need_flush_mdlog(in, my_want))
1907 mds->mdlog->flush();
1908
1909 } else {
1910 // [replica] tell auth about any new caps wanted
1911 request_inode_file_caps(in);
1912 }
1913
1914 // issue caps (pot. incl new one)
1915 //issue_caps(in); // note: _eval above may have done this already...
1916
1917 // re-issue whatever we can
1918 //cap->issue(cap->pending());
1919
1920 if (is_new)
1921 cap->dec_suppress();
1922
1923 return cap;
1924 }
1925
1926
1927 void Locker::issue_caps_set(set<CInode*>& inset)
1928 {
1929 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1930 issue_caps(*p);
1931 }
1932
1933 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1934 {
1935 // allowed caps are determined by the lock mode.
1936 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1937 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1938 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1939
1940 client_t loner = in->get_loner();
1941 if (loner >= 0) {
1942 dout(7) << "issue_caps loner client." << loner
1943 << " allowed=" << ccap_string(loner_allowed)
1944 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1945 << ", others allowed=" << ccap_string(all_allowed)
1946 << " on " << *in << dendl;
1947 } else {
1948 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1949 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1950 << " on " << *in << dendl;
1951 }
1952
1953 assert(in->is_head());
1954
1955 // count conflicts with
1956 int nissued = 0;
1957
1958 // client caps
1959 map<client_t, Capability*>::iterator it;
1960 if (only_cap)
1961 it = in->client_caps.find(only_cap->get_client());
1962 else
1963 it = in->client_caps.begin();
1964 for (; it != in->client_caps.end(); ++it) {
1965 Capability *cap = it->second;
1966 if (cap->is_stale())
1967 continue;
1968
1969 // do not issue _new_ bits when size|mtime is projected
1970 int allowed;
1971 if (loner == it->first)
1972 allowed = loner_allowed;
1973 else
1974 allowed = all_allowed;
1975
1976 // add in any xlocker-only caps (for locks this client is the xlocker for)
1977 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
1978
1979 Session *session = mds->get_session(it->first);
1980 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
1981 !(session && session->connection &&
1982 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
1983 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
1984
1985 int pending = cap->pending();
1986 int wanted = cap->wanted();
1987
1988 dout(20) << " client." << it->first
1989 << " pending " << ccap_string(pending)
1990 << " allowed " << ccap_string(allowed)
1991 << " wanted " << ccap_string(wanted)
1992 << dendl;
1993
1994 if (!(pending & ~allowed)) {
1995 // skip if suppress or new, and not revocation
1996 if (cap->is_new() || cap->is_suppress()) {
1997 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
1998 continue;
1999 }
2000 }
2001
2002 // notify clients about deleted inode, to make sure they release caps ASAP.
2003 if (in->inode.nlink == 0)
2004 wanted |= CEPH_CAP_LINK_SHARED;
2005
2006 // are there caps that the client _wants_ and can have, but aren't pending?
2007 // or do we need to revoke?
2008 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2009 (pending & ~allowed)) { // need to revoke ~allowed caps.
2010 // issue
2011 nissued++;
2012
2013 // include caps that clients generally like, while we're at it.
2014 int likes = in->get_caps_liked();
2015 int before = pending;
2016 long seq;
2017 if (pending & ~allowed)
2018 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2019 else
2020 seq = cap->issue((wanted|likes) & allowed);
2021 int after = cap->pending();
2022
2023 if (cap->is_new()) {
2024 // haven't send caps to client yet
2025 if (before & ~after)
2026 cap->confirm_receipt(seq, after);
2027 } else {
2028 dout(7) << " sending MClientCaps to client." << it->first
2029 << " seq " << cap->get_last_seq()
2030 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2031 << dendl;
2032
2033 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2034 if (op == CEPH_CAP_OP_REVOKE) {
2035 revoking_caps.push_back(&cap->item_revoking_caps);
2036 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2037 cap->set_last_revoke_stamp(ceph_clock_now());
2038 cap->reset_num_revoke_warnings();
2039 }
2040
2041 MClientCaps *m = new MClientCaps(op, in->ino(),
2042 in->find_snaprealm()->inode->ino(),
2043 cap->get_cap_id(), cap->get_last_seq(),
2044 after, wanted, 0,
2045 cap->get_mseq(),
2046 mds->get_osd_epoch_barrier());
2047 in->encode_cap_message(m, cap);
2048
2049 mds->send_message_client_counted(m, it->first);
2050 }
2051 }
2052
2053 if (only_cap)
2054 break;
2055 }
2056
2057 return (nissued == 0); // true if no re-issued, no callbacks
2058 }
2059
2060 void Locker::issue_truncate(CInode *in)
2061 {
2062 dout(7) << "issue_truncate on " << *in << dendl;
2063
2064 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2065 it != in->client_caps.end();
2066 ++it) {
2067 Capability *cap = it->second;
2068 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2069 in->ino(),
2070 in->find_snaprealm()->inode->ino(),
2071 cap->get_cap_id(), cap->get_last_seq(),
2072 cap->pending(), cap->wanted(), 0,
2073 cap->get_mseq(),
2074 mds->get_osd_epoch_barrier());
2075 in->encode_cap_message(m, cap);
2076 mds->send_message_client_counted(m, it->first);
2077 }
2078
2079 // should we increase max_size?
2080 if (in->is_auth() && in->is_file())
2081 check_inode_max_size(in);
2082 }
2083
2084
2085 void Locker::revoke_stale_caps(Capability *cap)
2086 {
2087 CInode *in = cap->get_inode();
2088 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2089 // if export succeeds, the cap will be removed. if export fails, we need to
2090 // revoke the cap if it's still stale.
2091 in->state_set(CInode::STATE_EVALSTALECAPS);
2092 return;
2093 }
2094
2095 int issued = cap->issued();
2096 if (issued & ~CEPH_CAP_PIN) {
2097 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2098 cap->revoke();
2099
2100 if (in->is_auth() &&
2101 in->inode.client_ranges.count(cap->get_client()))
2102 in->state_set(CInode::STATE_NEEDSRECOVER);
2103
2104 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2105 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2106 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2107 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2108
2109 if (in->is_auth()) {
2110 try_eval(in, CEPH_CAP_LOCKS);
2111 } else {
2112 request_inode_file_caps(in);
2113 }
2114 }
2115 }
2116
2117 void Locker::revoke_stale_caps(Session *session)
2118 {
2119 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2120
2121 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2122 Capability *cap = *p;
2123 cap->mark_stale();
2124 revoke_stale_caps(cap);
2125 }
2126 }
2127
2128 void Locker::resume_stale_caps(Session *session)
2129 {
2130 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2131
2132 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2133 Capability *cap = *p;
2134 CInode *in = cap->get_inode();
2135 assert(in->is_head());
2136 if (cap->is_stale()) {
2137 dout(10) << " clearing stale flag on " << *in << dendl;
2138 cap->clear_stale();
2139
2140 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2141 // if export succeeds, the cap will be removed. if export fails,
2142 // we need to re-issue the cap if it's not stale.
2143 in->state_set(CInode::STATE_EVALSTALECAPS);
2144 continue;
2145 }
2146
2147 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2148 issue_caps(in, cap);
2149 }
2150 }
2151 }
2152
2153 void Locker::remove_stale_leases(Session *session)
2154 {
2155 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2156 xlist<ClientLease*>::iterator p = session->leases.begin();
2157 while (!p.end()) {
2158 ClientLease *l = *p;
2159 ++p;
2160 CDentry *parent = static_cast<CDentry*>(l->parent);
2161 dout(15) << " removing lease on " << *parent << dendl;
2162 parent->remove_client_lease(l, this);
2163 }
2164 }
2165
2166
2167 class C_MDL_RequestInodeFileCaps : public LockerContext {
2168 CInode *in;
2169 public:
2170 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2171 in->get(CInode::PIN_PTRWAITER);
2172 }
2173 void finish(int r) override {
2174 if (!in->is_auth())
2175 locker->request_inode_file_caps(in);
2176 in->put(CInode::PIN_PTRWAITER);
2177 }
2178 };
2179
2180 void Locker::request_inode_file_caps(CInode *in)
2181 {
2182 assert(!in->is_auth());
2183
2184 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2185 if (wanted != in->replica_caps_wanted) {
2186 // wait for single auth
2187 if (in->is_ambiguous_auth()) {
2188 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2189 new C_MDL_RequestInodeFileCaps(this, in));
2190 return;
2191 }
2192
2193 mds_rank_t auth = in->authority().first;
2194 if (mds->is_cluster_degraded() &&
2195 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2196 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2197 return;
2198 }
2199
2200 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2201 << " was " << ccap_string(in->replica_caps_wanted)
2202 << " on " << *in << " to mds." << auth << dendl;
2203
2204 in->replica_caps_wanted = wanted;
2205
2206 if (!mds->is_cluster_degraded() ||
2207 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2208 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2209 auth);
2210 }
2211 }
2212
2213 /* This function DOES put the passed message before returning */
2214 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2215 {
2216 // nobody should be talking to us during recovery.
2217 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2218
2219 // ok
2220 CInode *in = mdcache->get_inode(m->get_ino());
2221 mds_rank_t from = mds_rank_t(m->get_source().num());
2222
2223 assert(in);
2224 assert(in->is_auth());
2225
2226 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2227
2228 if (m->get_caps())
2229 in->mds_caps_wanted[from] = m->get_caps();
2230 else
2231 in->mds_caps_wanted.erase(from);
2232
2233 try_eval(in, CEPH_CAP_LOCKS);
2234 m->put();
2235 }
2236
2237
2238 class C_MDL_CheckMaxSize : public LockerContext {
2239 CInode *in;
2240 uint64_t new_max_size;
2241 uint64_t newsize;
2242 utime_t mtime;
2243
2244 public:
2245 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2246 uint64_t _newsize, utime_t _mtime) :
2247 LockerContext(l), in(i),
2248 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2249 {
2250 in->get(CInode::PIN_PTRWAITER);
2251 }
2252 void finish(int r) override {
2253 if (in->is_auth())
2254 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2255 in->put(CInode::PIN_PTRWAITER);
2256 }
2257 };
2258
2259 uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2260 {
2261 uint64_t new_max = (size + 1) << 1;
2262 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2263 if (max_inc > 0) {
2264 max_inc *= pi->get_layout_size_increment();
2265 new_max = MIN(new_max, size + max_inc);
2266 }
2267 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2268 }
2269
2270 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2271 map<client_t,client_writeable_range_t> *new_ranges,
2272 bool *max_increased)
2273 {
2274 inode_t *latest = in->get_projected_inode();
2275 uint64_t ms;
2276 if(latest->has_layout()) {
2277 ms = calc_new_max_size(latest, size);
2278 } else {
2279 // Layout-less directories like ~mds0/, have zero size
2280 ms = 0;
2281 }
2282
2283 // increase ranges as appropriate.
2284 // shrink to 0 if no WR|BUFFER caps issued.
2285 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2286 p != in->client_caps.end();
2287 ++p) {
2288 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2289 client_writeable_range_t& nr = (*new_ranges)[p->first];
2290 nr.range.first = 0;
2291 if (latest->client_ranges.count(p->first)) {
2292 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2293 if (ms > oldr.range.last)
2294 *max_increased = true;
2295 nr.range.last = MAX(ms, oldr.range.last);
2296 nr.follows = oldr.follows;
2297 } else {
2298 *max_increased = true;
2299 nr.range.last = ms;
2300 nr.follows = in->first - 1;
2301 }
2302 }
2303 }
2304 }
2305
2306 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2307 uint64_t new_max_size, uint64_t new_size,
2308 utime_t new_mtime)
2309 {
2310 assert(in->is_auth());
2311 assert(in->is_file());
2312
2313 inode_t *latest = in->get_projected_inode();
2314 map<client_t, client_writeable_range_t> new_ranges;
2315 uint64_t size = latest->size;
2316 bool update_size = new_size > 0;
2317 bool update_max = false;
2318 bool max_increased = false;
2319
2320 if (update_size) {
2321 new_size = size = MAX(size, new_size);
2322 new_mtime = MAX(new_mtime, latest->mtime);
2323 if (latest->size == new_size && latest->mtime == new_mtime)
2324 update_size = false;
2325 }
2326
2327 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2328
2329 if (max_increased || latest->client_ranges != new_ranges)
2330 update_max = true;
2331
2332 if (!update_size && !update_max) {
2333 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2334 return false;
2335 }
2336
2337 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2338 << " update_size " << update_size
2339 << " on " << *in << dendl;
2340
2341 if (in->is_frozen()) {
2342 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2343 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2344 new_max_size,
2345 new_size,
2346 new_mtime);
2347 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2348 return false;
2349 }
2350 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2351 // lock?
2352 if (in->filelock.is_stable()) {
2353 if (in->get_target_loner() >= 0)
2354 file_excl(&in->filelock);
2355 else
2356 simple_lock(&in->filelock);
2357 }
2358 if (!in->filelock.can_wrlock(in->get_loner())) {
2359 // try again later
2360 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2361 new_max_size,
2362 new_size,
2363 new_mtime);
2364
2365 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2366 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2367 return false;
2368 }
2369 }
2370
2371 MutationRef mut(new MutationImpl());
2372 mut->ls = mds->mdlog->get_current_segment();
2373
2374 inode_t *pi = in->project_inode();
2375 pi->version = in->pre_dirty();
2376
2377 if (update_max) {
2378 dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2379 pi->client_ranges = new_ranges;
2380 }
2381
2382 if (update_size) {
2383 dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2384 pi->size = new_size;
2385 pi->rstat.rbytes = new_size;
2386 dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2387 pi->mtime = new_mtime;
2388 }
2389
2390 // use EOpen if the file is still open; otherwise, use EUpdate.
2391 // this is just an optimization to push open files forward into
2392 // newer log segments.
2393 LogEvent *le;
2394 EMetaBlob *metablob;
2395 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2396 EOpen *eo = new EOpen(mds->mdlog);
2397 eo->add_ino(in->ino());
2398 metablob = &eo->metablob;
2399 le = eo;
2400 mut->ls->open_files.push_back(&in->item_open_file);
2401 } else {
2402 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2403 metablob = &eu->metablob;
2404 le = eu;
2405 }
2406 mds->mdlog->start_entry(le);
2407 if (update_size) { // FIXME if/when we do max_size nested accounting
2408 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2409 // no cow, here!
2410 CDentry *parent = in->get_projected_parent_dn();
2411 metablob->add_primary_dentry(parent, in, true);
2412 } else {
2413 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2414 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2415 }
2416 mds->mdlog->submit_entry(le,
2417 new C_Locker_FileUpdate_finish(this, in, mut, true));
2418 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2419 mut->auth_pin(in);
2420
2421 // make max_size _increase_ timely
2422 if (max_increased)
2423 mds->mdlog->flush();
2424
2425 return true;
2426 }
2427
2428
2429 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2430 {
2431 /*
2432 * only share if currently issued a WR cap. if client doesn't have it,
2433 * file_max doesn't matter, and the client will get it if/when they get
2434 * the cap later.
2435 */
2436 dout(10) << "share_inode_max_size on " << *in << dendl;
2437 map<client_t, Capability*>::iterator it;
2438 if (only_cap)
2439 it = in->client_caps.find(only_cap->get_client());
2440 else
2441 it = in->client_caps.begin();
2442 for (; it != in->client_caps.end(); ++it) {
2443 const client_t client = it->first;
2444 Capability *cap = it->second;
2445 if (cap->is_suppress())
2446 continue;
2447 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2448 dout(10) << "share_inode_max_size with client." << client << dendl;
2449 cap->inc_last_seq();
2450 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2451 in->ino(),
2452 in->find_snaprealm()->inode->ino(),
2453 cap->get_cap_id(), cap->get_last_seq(),
2454 cap->pending(), cap->wanted(), 0,
2455 cap->get_mseq(),
2456 mds->get_osd_epoch_barrier());
2457 in->encode_cap_message(m, cap);
2458 mds->send_message_client_counted(m, client);
2459 }
2460 if (only_cap)
2461 break;
2462 }
2463 }
2464
2465 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2466 {
2467 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2468 * pending mutations. */
2469 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2470 in->filelock.is_unstable_and_locked()) ||
2471 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2472 in->authlock.is_unstable_and_locked()) ||
2473 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2474 in->linklock.is_unstable_and_locked()) ||
2475 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2476 in->xattrlock.is_unstable_and_locked()))
2477 return true;
2478 return false;
2479 }
2480
2481 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2482 {
2483 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2484 dout(10) << " wanted " << ccap_string(cap->wanted())
2485 << " -> " << ccap_string(wanted) << dendl;
2486 cap->set_wanted(wanted);
2487 } else if (wanted & ~cap->wanted()) {
2488 dout(10) << " wanted " << ccap_string(cap->wanted())
2489 << " -> " << ccap_string(wanted)
2490 << " (added caps even though we had seq mismatch!)" << dendl;
2491 cap->set_wanted(wanted | cap->wanted());
2492 } else {
2493 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2494 << " -> " << ccap_string(wanted)
2495 << " (issue_seq " << issue_seq << " != last_issue "
2496 << cap->get_last_issue() << ")" << dendl;
2497 return;
2498 }
2499
2500 CInode *cur = cap->get_inode();
2501 if (!cur->is_auth()) {
2502 request_inode_file_caps(cur);
2503 return;
2504 }
2505
2506 if (cap->wanted() == 0) {
2507 if (cur->item_open_file.is_on_list() &&
2508 !cur->is_any_caps_wanted()) {
2509 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2510 cur->item_open_file.remove_myself();
2511 }
2512 } else {
2513 if (cur->state_test(CInode::STATE_RECOVERING) &&
2514 (cap->wanted() & (CEPH_CAP_FILE_RD |
2515 CEPH_CAP_FILE_WR))) {
2516 mds->mdcache->recovery_queue.prioritize(cur);
2517 }
2518
2519 if (!cur->item_open_file.is_on_list()) {
2520 dout(10) << " adding to open file list " << *cur << dendl;
2521 assert(cur->last == CEPH_NOSNAP);
2522 LogSegment *ls = mds->mdlog->get_current_segment();
2523 EOpen *le = new EOpen(mds->mdlog);
2524 mds->mdlog->start_entry(le);
2525 le->add_clean_inode(cur);
2526 ls->open_files.push_back(&cur->item_open_file);
2527 mds->mdlog->submit_entry(le);
2528 }
2529 }
2530 }
2531
2532
2533
2534 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2535 {
2536 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2537 for (auto p = head_in->client_need_snapflush.begin();
2538 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2539 snapid_t snapid = p->first;
2540 set<client_t>& clients = p->second;
2541 ++p; // be careful, q loop below depends on this
2542
2543 if (clients.count(client)) {
2544 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2545 CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
2546 if (!sin) {
2547 // hrm, look forward until we find the inode.
2548 // (we can only look it up by the last snapid it is valid for)
2549 dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
2550 for (compact_map<snapid_t, set<client_t> >::iterator q = p; // p is already at next entry
2551 q != head_in->client_need_snapflush.end();
2552 ++q) {
2553 dout(10) << " trying snapid " << q->first << dendl;
2554 sin = mdcache->get_inode(head_in->ino(), q->first);
2555 if (sin) {
2556 assert(sin->first <= snapid);
2557 break;
2558 }
2559 dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
2560 }
2561 if (!sin && head_in->is_multiversion())
2562 sin = head_in;
2563 assert(sin);
2564 }
2565 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2566 head_in->remove_need_snapflush(sin, snapid, client);
2567 }
2568 }
2569 }
2570
2571
2572 bool Locker::should_defer_client_cap_frozen(CInode *in)
2573 {
2574 /*
2575 * This policy needs to be AT LEAST as permissive as allowing a client request
2576 * to go forward, or else a client request can release something, the release
2577 * gets deferred, but the request gets processed and deadlocks because when the
2578 * caps can't get revoked.
2579 *
2580 * Currently, a request wait if anything locked is freezing (can't
2581 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2582 * _MUST_ be in the lock/auth_pin set.
2583 *
2584 * auth_pins==0 implies no unstable lock and not auth pinnned by
2585 * client request, otherwise continue even it's freezing.
2586 */
2587 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2588 }
2589
2590 /*
2591 * This function DOES put the passed message before returning
2592 */
2593 void Locker::handle_client_caps(MClientCaps *m)
2594 {
2595 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2596 client_t client = m->get_source().num();
2597
2598 snapid_t follows = m->get_snap_follows();
2599 dout(7) << "handle_client_caps "
2600 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2601 << " on " << m->get_ino()
2602 << " tid " << m->get_client_tid() << " follows " << follows
2603 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2604
2605 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2606 if (!session) {
2607 dout(5) << " no session, dropping " << *m << dendl;
2608 m->put();
2609 return;
2610 }
2611 if (session->is_closed() ||
2612 session->is_closing() ||
2613 session->is_killing()) {
2614 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2615 m->put();
2616 return;
2617 }
2618 if (mds->is_reconnect() &&
2619 m->get_dirty() && m->get_client_tid() > 0 &&
2620 !session->have_completed_flush(m->get_client_tid())) {
2621 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2622 }
2623 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2624 return;
2625 }
2626
2627 if (m->get_client_tid() > 0 && session &&
2628 session->have_completed_flush(m->get_client_tid())) {
2629 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2630 << " for client." << client << dendl;
2631 MClientCaps *ack;
2632 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2633 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2634 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2635 } else {
2636 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2637 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2638 mds->get_osd_epoch_barrier());
2639 }
2640 ack->set_snap_follows(follows);
2641 ack->set_client_tid(m->get_client_tid());
2642 mds->send_message_client_counted(ack, m->get_connection());
2643 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2644 m->put();
2645 return;
2646 } else {
2647 // fall-thru because the message may release some caps
2648 m->clear_dirty();
2649 m->set_op(CEPH_CAP_OP_UPDATE);
2650 }
2651 }
2652
2653 // "oldest flush tid" > 0 means client uses unique TID for each flush
2654 if (m->get_oldest_flush_tid() > 0 && session) {
2655 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2656 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2657
2658 if (session->get_num_trim_flushes_warnings() > 0 &&
2659 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2660 session->reset_num_trim_flushes_warnings();
2661 } else {
2662 if (session->get_num_completed_flushes() >=
2663 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2664 session->inc_num_trim_flushes_warnings();
2665 stringstream ss;
2666 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2667 << m->get_oldest_flush_tid() << "), "
2668 << session->get_num_completed_flushes()
2669 << " completed flushes recorded in session";
2670 mds->clog->warn() << ss.str();
2671 dout(20) << __func__ << " " << ss.str() << dendl;
2672 }
2673 }
2674 }
2675
2676 CInode *head_in = mdcache->get_inode(m->get_ino());
2677 if (!head_in) {
2678 if (mds->is_clientreplay()) {
2679 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2680 << ", will try again after replayed client requests" << dendl;
2681 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2682 return;
2683 }
2684 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2685 m->put();
2686 return;
2687 }
2688
2689 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2690 // Pause RADOS operations until we see the required epoch
2691 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2692 }
2693
2694 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2695 // Record the barrier so that we will retransmit it to clients
2696 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2697 }
2698
2699 CInode *in = head_in;
2700 if (follows > 0) {
2701 in = mdcache->pick_inode_snap(head_in, follows);
2702 if (in != head_in)
2703 dout(10) << " head inode " << *head_in << dendl;
2704 }
2705 dout(10) << " cap inode " << *in << dendl;
2706
2707 Capability *cap = 0;
2708 cap = in->get_client_cap(client);
2709 if (!cap && in != head_in)
2710 cap = head_in->get_client_cap(client);
2711 if (!cap) {
2712 dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
2713 m->put();
2714 return;
2715 }
2716 assert(cap);
2717
2718 // freezing|frozen?
2719 if (should_defer_client_cap_frozen(in)) {
2720 dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
2721 in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2722 return;
2723 }
2724 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2725 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2726 << ", dropping" << dendl;
2727 m->put();
2728 return;
2729 }
2730
2731 int op = m->get_op();
2732
2733 // flushsnap?
2734 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2735 if (!in->is_auth()) {
2736 dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
2737 goto out;
2738 }
2739
2740 SnapRealm *realm = in->find_snaprealm();
2741 snapid_t snap = realm->get_snap_following(follows);
2742 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2743
2744 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2745 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2746 // case we get a dup response, so whatever.)
2747 MClientCaps *ack = 0;
2748 if (m->get_dirty()) {
2749 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2750 ack->set_snap_follows(follows);
2751 ack->set_client_tid(m->get_client_tid());
2752 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2753 }
2754
2755 if (in == head_in ||
2756 (head_in->client_need_snapflush.count(snap) &&
2757 head_in->client_need_snapflush[snap].count(client))) {
2758 dout(7) << " flushsnap snap " << snap
2759 << " client." << client << " on " << *in << dendl;
2760
2761 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2762 if (in == head_in)
2763 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2764 else if (head_in->client_need_snapflush.begin()->first < snap)
2765 _do_null_snapflush(head_in, client, snap);
2766
2767 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2768
2769 if (in != head_in)
2770 head_in->remove_need_snapflush(in, snap, client);
2771
2772 } else {
2773 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2774 if (ack)
2775 mds->send_message_client_counted(ack, m->get_connection());
2776 }
2777 goto out;
2778 }
2779
2780 if (cap->get_cap_id() != m->get_cap_id()) {
2781 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2782 } else {
2783 // intermediate snap inodes
2784 while (in != head_in) {
2785 assert(in->last != CEPH_NOSNAP);
2786 if (in->is_auth() && m->get_dirty()) {
2787 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2788 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2789 }
2790 in = mdcache->pick_inode_snap(head_in, in->last);
2791 }
2792
2793 // head inode, and cap
2794 MClientCaps *ack = 0;
2795
2796 int caps = m->get_caps();
2797 if (caps & ~cap->issued()) {
2798 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2799 caps &= cap->issued();
2800 }
2801
2802 cap->confirm_receipt(m->get_seq(), caps);
2803 dout(10) << " follows " << follows
2804 << " retains " << ccap_string(m->get_caps())
2805 << " dirty " << ccap_string(m->get_dirty())
2806 << " on " << *in << dendl;
2807
2808
2809 // missing/skipped snapflush?
2810 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2811 // presently only does so when it has actual dirty metadata. But, we
2812 // set up the need_snapflush stuff based on the issued caps.
2813 // We can infer that the client WONT send a FLUSHSNAP once they have
2814 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2815 // update/release).
2816 if (!head_in->client_need_snapflush.empty()) {
2817 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2818 _do_null_snapflush(head_in, client);
2819 } else {
2820 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2821 }
2822 }
2823
2824 if (m->get_dirty() && in->is_auth()) {
2825 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2826 << " seq " << m->get_seq() << " on " << *in << dendl;
2827 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2828 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2829 ack->set_client_tid(m->get_client_tid());
2830 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2831 }
2832
2833 // filter wanted based on what we could ever give out (given auth/replica status)
2834 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2835 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2836 if (new_wanted != cap->wanted()) {
2837 if (!need_flush && (new_wanted & ~cap->pending())) {
2838 // exapnding caps. make sure we aren't waiting for a log flush
2839 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2840 }
2841
2842 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2843 }
2844
2845 if (in->is_auth() &&
2846 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2847 // updated
2848 eval(in, CEPH_CAP_LOCKS);
2849
2850 if (!need_flush && (cap->wanted() & ~cap->pending()))
2851 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2852 } else {
2853 // no update, ack now.
2854 if (ack)
2855 mds->send_message_client_counted(ack, m->get_connection());
2856
2857 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2858 if (!did_issue && (cap->wanted() & ~cap->pending()))
2859 issue_caps(in, cap);
2860
2861 if (cap->get_last_seq() == 0 &&
2862 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2863 cap->issue_norevoke(cap->issued());
2864 share_inode_max_size(in, cap);
2865 }
2866 }
2867
2868 if (need_flush)
2869 mds->mdlog->flush();
2870 }
2871
2872 out:
2873 m->put();
2874 }
2875
2876
2877 class C_Locker_RetryRequestCapRelease : public LockerContext {
2878 client_t client;
2879 ceph_mds_request_release item;
2880 public:
2881 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2882 LockerContext(l), client(c), item(it) { }
2883 void finish(int r) override {
2884 string dname;
2885 MDRequestRef null_ref;
2886 locker->process_request_cap_release(null_ref, client, item, dname);
2887 }
2888 };
2889
2890 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2891 const string &dname)
2892 {
2893 inodeno_t ino = (uint64_t)item.ino;
2894 uint64_t cap_id = item.cap_id;
2895 int caps = item.caps;
2896 int wanted = item.wanted;
2897 int seq = item.seq;
2898 int issue_seq = item.issue_seq;
2899 int mseq = item.mseq;
2900
2901 CInode *in = mdcache->get_inode(ino);
2902 if (!in)
2903 return;
2904
2905 if (dname.length()) {
2906 frag_t fg = in->pick_dirfrag(dname);
2907 CDir *dir = in->get_dirfrag(fg);
2908 if (dir) {
2909 CDentry *dn = dir->lookup(dname);
2910 if (dn) {
2911 ClientLease *l = dn->get_client_lease(client);
2912 if (l) {
2913 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2914 dn->remove_client_lease(l, this);
2915 } else {
2916 dout(7) << "process_cap_release client." << client
2917 << " doesn't have lease on " << *dn << dendl;
2918 }
2919 } else {
2920 dout(7) << "process_cap_release client." << client << " released lease on dn "
2921 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2922 }
2923 }
2924 }
2925
2926 Capability *cap = in->get_client_cap(client);
2927 if (!cap)
2928 return;
2929
2930 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2931 << (mdr ? "" : " (DEFERRED, no mdr)")
2932 << dendl;
2933
2934 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2935 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2936 return;
2937 }
2938
2939 if (cap->get_cap_id() != cap_id) {
2940 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2941 return;
2942 }
2943
2944 if (should_defer_client_cap_frozen(in)) {
2945 dout(7) << " frozen, deferring" << dendl;
2946 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2947 return;
2948 }
2949
2950 if (caps & ~cap->issued()) {
2951 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2952 caps &= cap->issued();
2953 }
2954 cap->confirm_receipt(seq, caps);
2955
2956 if (!in->client_need_snapflush.empty() &&
2957 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2958 _do_null_snapflush(in, client);
2959 }
2960
2961 adjust_cap_wanted(cap, wanted, issue_seq);
2962
2963 if (mdr)
2964 cap->inc_suppress();
2965 eval(in, CEPH_CAP_LOCKS);
2966 if (mdr)
2967 cap->dec_suppress();
2968
2969 // take note; we may need to reissue on this cap later
2970 if (mdr)
2971 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2972 }
2973
2974 class C_Locker_RetryKickIssueCaps : public LockerContext {
2975 CInode *in;
2976 client_t client;
2977 ceph_seq_t seq;
2978 public:
2979 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2980 LockerContext(l), in(i), client(c), seq(s) {
2981 in->get(CInode::PIN_PTRWAITER);
2982 }
2983 void finish(int r) override {
2984 locker->kick_issue_caps(in, client, seq);
2985 in->put(CInode::PIN_PTRWAITER);
2986 }
2987 };
2988
2989 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
2990 {
2991 Capability *cap = in->get_client_cap(client);
2992 if (!cap || cap->get_last_sent() != seq)
2993 return;
2994 if (in->is_frozen()) {
2995 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
2996 in->add_waiter(CInode::WAIT_UNFREEZE,
2997 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
2998 return;
2999 }
3000 dout(10) << "kick_issue_caps released at current seq " << seq
3001 << ", reissuing" << dendl;
3002 issue_caps(in, cap);
3003 }
3004
3005 void Locker::kick_cap_releases(MDRequestRef& mdr)
3006 {
3007 client_t client = mdr->get_client();
3008 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3009 p != mdr->cap_releases.end();
3010 ++p) {
3011 CInode *in = mdcache->get_inode(p->first);
3012 if (!in)
3013 continue;
3014 kick_issue_caps(in, client, p->second);
3015 }
3016 }
3017
3018 /**
3019 * m and ack might be NULL, so don't dereference them unless dirty != 0
3020 */
3021 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3022 {
3023 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3024 << " follows " << follows << " snap " << snap
3025 << " on " << *in << dendl;
3026
3027 if (snap == CEPH_NOSNAP) {
3028 // hmm, i guess snap was already deleted? just ack!
3029 dout(10) << " wow, the snap following " << follows
3030 << " was already deleted. nothing to record, just ack." << dendl;
3031 if (ack)
3032 mds->send_message_client_counted(ack, m->get_connection());
3033 return;
3034 }
3035
3036 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3037 mds->mdlog->start_entry(le);
3038 MutationRef mut = new MutationImpl();
3039 mut->ls = mds->mdlog->get_current_segment();
3040
3041 // normal metadata updates that we can apply to the head as well.
3042
3043 // update xattrs?
3044 bool xattrs = false;
3045 map<string,bufferptr> *px = 0;
3046 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3047 m->xattrbl.length() &&
3048 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3049 xattrs = true;
3050
3051 old_inode_t *oi = 0;
3052 if (in->is_multiversion()) {
3053 oi = in->pick_old_inode(snap);
3054 }
3055
3056 inode_t *pi;
3057 if (oi) {
3058 dout(10) << " writing into old inode" << dendl;
3059 pi = in->project_inode();
3060 pi->version = in->pre_dirty();
3061 if (snap > oi->first)
3062 in->split_old_inode(snap);
3063 pi = &oi->inode;
3064 if (xattrs)
3065 px = &oi->xattrs;
3066 } else {
3067 if (xattrs)
3068 px = new map<string,bufferptr>;
3069 pi = in->project_inode(px);
3070 pi->version = in->pre_dirty();
3071 }
3072
3073 _update_cap_fields(in, dirty, m, pi);
3074
3075 // xattr
3076 if (px) {
3077 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3078 << " len " << m->xattrbl.length() << dendl;
3079 pi->xattr_version = m->head.xattr_version;
3080 bufferlist::iterator p = m->xattrbl.begin();
3081 ::decode(*px, p);
3082 }
3083
3084 if (pi->client_ranges.count(client)) {
3085 if (in->last == snap) {
3086 dout(10) << " removing client_range entirely" << dendl;
3087 pi->client_ranges.erase(client);
3088 } else {
3089 dout(10) << " client_range now follows " << snap << dendl;
3090 pi->client_ranges[client].follows = snap;
3091 }
3092 }
3093
3094 mut->auth_pin(in);
3095 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3096 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3097
3098 // "oldest flush tid" > 0 means client uses unique TID for each flush
3099 if (ack && ack->get_oldest_flush_tid() > 0)
3100 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3101 ack->get_oldest_flush_tid());
3102
3103 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3104 client, ack));
3105 }
3106
3107 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3108 {
3109 if (dirty == 0)
3110 return;
3111
3112 /* m must be valid if there are dirty caps */
3113 assert(m);
3114 uint64_t features = m->get_connection()->get_features();
3115
3116 if (m->get_ctime() > pi->ctime) {
3117 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3118 << " for " << *in << dendl;
3119 pi->ctime = m->get_ctime();
3120 }
3121
3122 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3123 m->get_change_attr() > pi->change_attr) {
3124 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3125 << " for " << *in << dendl;
3126 pi->change_attr = m->get_change_attr();
3127 }
3128
3129 // file
3130 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3131 utime_t atime = m->get_atime();
3132 utime_t mtime = m->get_mtime();
3133 uint64_t size = m->get_size();
3134 version_t inline_version = m->inline_version;
3135
3136 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3137 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3138 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3139 << " for " << *in << dendl;
3140 pi->mtime = mtime;
3141 }
3142 if (in->inode.is_file() && // ONLY if regular file
3143 size > pi->size) {
3144 dout(7) << " size " << pi->size << " -> " << size
3145 << " for " << *in << dendl;
3146 pi->size = size;
3147 pi->rstat.rbytes = size;
3148 }
3149 if (in->inode.is_file() &&
3150 (dirty & CEPH_CAP_FILE_WR) &&
3151 inline_version > pi->inline_data.version) {
3152 pi->inline_data.version = inline_version;
3153 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3154 pi->inline_data.get_data() = m->inline_data;
3155 else
3156 pi->inline_data.free_data();
3157 }
3158 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3159 dout(7) << " atime " << pi->atime << " -> " << atime
3160 << " for " << *in << dendl;
3161 pi->atime = atime;
3162 }
3163 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3164 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3165 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3166 << " for " << *in << dendl;
3167 pi->time_warp_seq = m->get_time_warp_seq();
3168 }
3169 }
3170 // auth
3171 if (dirty & CEPH_CAP_AUTH_EXCL) {
3172 if (m->head.uid != pi->uid) {
3173 dout(7) << " uid " << pi->uid
3174 << " -> " << m->head.uid
3175 << " for " << *in << dendl;
3176 pi->uid = m->head.uid;
3177 }
3178 if (m->head.gid != pi->gid) {
3179 dout(7) << " gid " << pi->gid
3180 << " -> " << m->head.gid
3181 << " for " << *in << dendl;
3182 pi->gid = m->head.gid;
3183 }
3184 if (m->head.mode != pi->mode) {
3185 dout(7) << " mode " << oct << pi->mode
3186 << " -> " << m->head.mode << dec
3187 << " for " << *in << dendl;
3188 pi->mode = m->head.mode;
3189 }
3190 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3191 dout(7) << " btime " << oct << pi->btime
3192 << " -> " << m->get_btime() << dec
3193 << " for " << *in << dendl;
3194 pi->btime = m->get_btime();
3195 }
3196 }
3197 }
3198
3199 /*
3200 * update inode based on cap flush|flushsnap|wanted.
3201 * adjust max_size, if needed.
3202 * if we update, return true; otherwise, false (no updated needed).
3203 */
3204 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3205 int dirty, snapid_t follows,
3206 MClientCaps *m, MClientCaps *ack,
3207 bool *need_flush)
3208 {
3209 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3210 << " issued " << ccap_string(cap ? cap->issued() : 0)
3211 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3212 << " on " << *in << dendl;
3213 assert(in->is_auth());
3214 client_t client = m->get_source().num();
3215 inode_t *latest = in->get_projected_inode();
3216
3217 // increase or zero max_size?
3218 uint64_t size = m->get_size();
3219 bool change_max = false;
3220 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3221 uint64_t new_max = old_max;
3222
3223 if (in->is_file()) {
3224 bool forced_change_max = false;
3225 dout(20) << "inode is file" << dendl;
3226 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3227 dout(20) << "client has write caps; m->get_max_size="
3228 << m->get_max_size() << "; old_max=" << old_max << dendl;
3229 if (m->get_max_size() > new_max) {
3230 dout(10) << "client requests file_max " << m->get_max_size()
3231 << " > max " << old_max << dendl;
3232 change_max = true;
3233 forced_change_max = true;
3234 new_max = calc_new_max_size(latest, m->get_max_size());
3235 } else {
3236 new_max = calc_new_max_size(latest, size);
3237
3238 if (new_max > old_max)
3239 change_max = true;
3240 else
3241 new_max = old_max;
3242 }
3243 } else {
3244 if (old_max) {
3245 change_max = true;
3246 new_max = 0;
3247 }
3248 }
3249
3250 if (in->last == CEPH_NOSNAP &&
3251 change_max &&
3252 !in->filelock.can_wrlock(client) &&
3253 !in->filelock.can_force_wrlock(client)) {
3254 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3255 if (in->filelock.is_stable()) {
3256 bool need_issue = false;
3257 if (cap)
3258 cap->inc_suppress();
3259 if (in->mds_caps_wanted.empty() &&
3260 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3261 if (in->filelock.get_state() != LOCK_EXCL)
3262 file_excl(&in->filelock, &need_issue);
3263 } else
3264 simple_lock(&in->filelock, &need_issue);
3265 if (need_issue)
3266 issue_caps(in);
3267 if (cap)
3268 cap->dec_suppress();
3269 }
3270 if (!in->filelock.can_wrlock(client) &&
3271 !in->filelock.can_force_wrlock(client)) {
3272 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3273 forced_change_max ? new_max : 0,
3274 0, utime_t());
3275
3276 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3277 change_max = false;
3278 }
3279 }
3280 }
3281
3282 if (m->flockbl.length()) {
3283 int32_t num_locks;
3284 bufferlist::iterator bli = m->flockbl.begin();
3285 ::decode(num_locks, bli);
3286 for ( int i=0; i < num_locks; ++i) {
3287 ceph_filelock decoded_lock;
3288 ::decode(decoded_lock, bli);
3289 in->get_fcntl_lock_state()->held_locks.
3290 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3291 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3292 }
3293 ::decode(num_locks, bli);
3294 for ( int i=0; i < num_locks; ++i) {
3295 ceph_filelock decoded_lock;
3296 ::decode(decoded_lock, bli);
3297 in->get_flock_lock_state()->held_locks.
3298 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3299 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3300 }
3301 }
3302
3303 if (!dirty && !change_max)
3304 return false;
3305
3306 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3307 if (session->check_access(in, MAY_WRITE,
3308 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3309 session->put();
3310 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3311 return false;
3312 }
3313 session->put();
3314
3315 // do the update.
3316 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3317 mds->mdlog->start_entry(le);
3318
3319 // xattrs update?
3320 map<string,bufferptr> *px = 0;
3321 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3322 m->xattrbl.length() &&
3323 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3324 px = new map<string,bufferptr>;
3325
3326 inode_t *pi = in->project_inode(px);
3327 pi->version = in->pre_dirty();
3328
3329 MutationRef mut(new MutationImpl());
3330 mut->ls = mds->mdlog->get_current_segment();
3331
3332 _update_cap_fields(in, dirty, m, pi);
3333
3334 if (change_max) {
3335 dout(7) << " max_size " << old_max << " -> " << new_max
3336 << " for " << *in << dendl;
3337 if (new_max) {
3338 pi->client_ranges[client].range.first = 0;
3339 pi->client_ranges[client].range.last = new_max;
3340 pi->client_ranges[client].follows = in->first - 1;
3341 } else
3342 pi->client_ranges.erase(client);
3343 }
3344
3345 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3346 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3347
3348 // auth
3349 if (dirty & CEPH_CAP_AUTH_EXCL)
3350 wrlock_force(&in->authlock, mut);
3351
3352 // xattr
3353 if (px) {
3354 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3355 pi->xattr_version = m->head.xattr_version;
3356 bufferlist::iterator p = m->xattrbl.begin();
3357 ::decode(*px, p);
3358
3359 wrlock_force(&in->xattrlock, mut);
3360 }
3361
3362 mut->auth_pin(in);
3363 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3364 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3365
3366 // "oldest flush tid" > 0 means client uses unique TID for each flush
3367 if (ack && ack->get_oldest_flush_tid() > 0)
3368 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3369 ack->get_oldest_flush_tid());
3370
3371 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3372 change_max, !!cap,
3373 client, ack));
3374 if (need_flush && !*need_flush &&
3375 ((change_max && new_max) || // max INCREASE
3376 _need_flush_mdlog(in, dirty)))
3377 *need_flush = true;
3378
3379 return true;
3380 }
3381
3382 /* This function DOES put the passed message before returning */
3383 void Locker::handle_client_cap_release(MClientCapRelease *m)
3384 {
3385 client_t client = m->get_source().num();
3386 dout(10) << "handle_client_cap_release " << *m << dendl;
3387
3388 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3389 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3390 return;
3391 }
3392
3393 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3394 // Pause RADOS operations until we see the required epoch
3395 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3396 }
3397
3398 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3399 // Record the barrier so that we will retransmit it to clients
3400 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3401 }
3402
3403 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3404
3405 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3406 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3407 }
3408
3409 if (session) {
3410 session->notify_cap_release(m->caps.size());
3411 }
3412
3413 m->put();
3414 }
3415
3416 class C_Locker_RetryCapRelease : public LockerContext {
3417 client_t client;
3418 inodeno_t ino;
3419 uint64_t cap_id;
3420 ceph_seq_t migrate_seq;
3421 ceph_seq_t issue_seq;
3422 public:
3423 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3424 ceph_seq_t mseq, ceph_seq_t seq) :
3425 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3426 void finish(int r) override {
3427 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3428 }
3429 };
3430
3431 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3432 ceph_seq_t mseq, ceph_seq_t seq)
3433 {
3434 CInode *in = mdcache->get_inode(ino);
3435 if (!in) {
3436 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3437 return;
3438 }
3439 Capability *cap = in->get_client_cap(client);
3440 if (!cap) {
3441 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3442 return;
3443 }
3444
3445 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3446 if (cap->get_cap_id() != cap_id) {
3447 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3448 return;
3449 }
3450 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3451 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3452 return;
3453 }
3454 if (should_defer_client_cap_frozen(in)) {
3455 dout(7) << " freezing|frozen, deferring" << dendl;
3456 in->add_waiter(CInode::WAIT_UNFREEZE,
3457 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3458 return;
3459 }
3460 if (seq != cap->get_last_issue()) {
3461 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3462 // clean out any old revoke history
3463 cap->clean_revoke_from(seq);
3464 eval_cap_gather(in);
3465 return;
3466 }
3467 remove_client_cap(in, client);
3468 }
3469
3470 /* This function DOES put the passed message before returning */
3471
3472 void Locker::remove_client_cap(CInode *in, client_t client)
3473 {
3474 // clean out any pending snapflush state
3475 if (!in->client_need_snapflush.empty())
3476 _do_null_snapflush(in, client);
3477
3478 in->remove_client_cap(client);
3479
3480 if (in->is_auth()) {
3481 // make sure we clear out the client byte range
3482 if (in->get_projected_inode()->client_ranges.count(client) &&
3483 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3484 check_inode_max_size(in);
3485 } else {
3486 request_inode_file_caps(in);
3487 }
3488
3489 try_eval(in, CEPH_CAP_LOCKS);
3490 }
3491
3492
3493 /**
3494 * Return true if any currently revoking caps exceed the
3495 * mds_revoke_cap_timeout threshold.
3496 */
3497 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3498 {
3499 xlist<Capability*>::const_iterator p = revoking.begin();
3500 if (p.end()) {
3501 // No revoking caps at the moment
3502 return false;
3503 } else {
3504 utime_t now = ceph_clock_now();
3505 utime_t age = now - (*p)->get_last_revoke_stamp();
3506 if (age <= g_conf->mds_revoke_cap_timeout) {
3507 return false;
3508 } else {
3509 return true;
3510 }
3511 }
3512 }
3513
3514
3515 void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3516 {
3517 if (!any_late_revoking_caps(revoking_caps)) {
3518 // Fast path: no misbehaving clients, execute in O(1)
3519 return;
3520 }
3521
3522 // Slow path: execute in O(N_clients)
3523 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3524 for (client_rc_iter = revoking_caps_by_client.begin();
3525 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3526 xlist<Capability*> const &client_rc = client_rc_iter->second;
3527 bool any_late = any_late_revoking_caps(client_rc);
3528 if (any_late) {
3529 result->push_back(client_rc_iter->first);
3530 }
3531 }
3532 }
3533
3534 // Hard-code instead of surfacing a config settings because this is
3535 // really a hack that should go away at some point when we have better
3536 // inspection tools for getting at detailed cap state (#7316)
3537 #define MAX_WARN_CAPS 100
3538
3539 void Locker::caps_tick()
3540 {
3541 utime_t now = ceph_clock_now();
3542
3543 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3544
3545 int i = 0;
3546 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3547 Capability *cap = *p;
3548
3549 utime_t age = now - cap->get_last_revoke_stamp();
3550 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3551 if (age <= g_conf->mds_revoke_cap_timeout) {
3552 dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
3553 break;
3554 } else {
3555 ++i;
3556 if (i > MAX_WARN_CAPS) {
3557 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3558 << "revoking, ignoring subsequent caps" << dendl;
3559 break;
3560 }
3561 }
3562 // exponential backoff of warning intervals
3563 if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
3564 cap->inc_num_revoke_warnings();
3565 stringstream ss;
3566 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3567 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3568 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3569 mds->clog->warn() << ss.str();
3570 dout(20) << __func__ << " " << ss.str() << dendl;
3571 } else {
3572 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3573 }
3574 }
3575 }
3576
3577
3578 void Locker::handle_client_lease(MClientLease *m)
3579 {
3580 dout(10) << "handle_client_lease " << *m << dendl;
3581
3582 assert(m->get_source().is_client());
3583 client_t client = m->get_source().num();
3584
3585 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3586 if (!in) {
3587 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3588 m->put();
3589 return;
3590 }
3591 CDentry *dn = 0;
3592
3593 frag_t fg = in->pick_dirfrag(m->dname);
3594 CDir *dir = in->get_dirfrag(fg);
3595 if (dir)
3596 dn = dir->lookup(m->dname);
3597 if (!dn) {
3598 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3599 m->put();
3600 return;
3601 }
3602 dout(10) << " on " << *dn << dendl;
3603
3604 // replica and lock
3605 ClientLease *l = dn->get_client_lease(client);
3606 if (!l) {
3607 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3608 m->put();
3609 return;
3610 }
3611
3612 switch (m->get_action()) {
3613 case CEPH_MDS_LEASE_REVOKE_ACK:
3614 case CEPH_MDS_LEASE_RELEASE:
3615 if (l->seq != m->get_seq()) {
3616 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3617 } else {
3618 dout(7) << "handle_client_lease client." << client
3619 << " on " << *dn << dendl;
3620 dn->remove_client_lease(l, this);
3621 }
3622 m->put();
3623 break;
3624
3625 case CEPH_MDS_LEASE_RENEW:
3626 {
3627 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3628 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3629 if (dn->lock.can_lease(client)) {
3630 int pool = 1; // fixme.. do something smart!
3631 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3632 m->h.seq = ++l->seq;
3633 m->clear_payload();
3634
3635 utime_t now = ceph_clock_now();
3636 now += mdcache->client_lease_durations[pool];
3637 mdcache->touch_client_lease(l, pool, now);
3638
3639 mds->send_message_client_counted(m, m->get_connection());
3640 }
3641 }
3642 break;
3643
3644 default:
3645 ceph_abort(); // implement me
3646 break;
3647 }
3648 }
3649
3650
3651 void Locker::issue_client_lease(CDentry *dn, client_t client,
3652 bufferlist &bl, utime_t now, Session *session)
3653 {
3654 CInode *diri = dn->get_dir()->get_inode();
3655 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3656 ((!diri->filelock.can_lease(client) &&
3657 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3658 dn->lock.can_lease(client)) {
3659 int pool = 1; // fixme.. do something smart!
3660 // issue a dentry lease
3661 ClientLease *l = dn->add_client_lease(client, session);
3662 session->touch_lease(l);
3663
3664 now += mdcache->client_lease_durations[pool];
3665 mdcache->touch_client_lease(l, pool, now);
3666
3667 LeaseStat e;
3668 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3669 e.seq = ++l->seq;
3670 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3671 ::encode(e, bl);
3672 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3673 << " on " << *dn << dendl;
3674 } else {
3675 // null lease
3676 LeaseStat e;
3677 e.mask = 0;
3678 e.seq = 0;
3679 e.duration_ms = 0;
3680 ::encode(e, bl);
3681 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3682 }
3683 }
3684
3685
3686 void Locker::revoke_client_leases(SimpleLock *lock)
3687 {
3688 int n = 0;
3689 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3690 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3691 p != dn->client_lease_map.end();
3692 ++p) {
3693 ClientLease *l = p->second;
3694
3695 n++;
3696 assert(lock->get_type() == CEPH_LOCK_DN);
3697
3698 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3699 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3700
3701 // i should also revoke the dir ICONTENT lease, if they have it!
3702 CInode *diri = dn->get_dir()->get_inode();
3703 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3704 mask,
3705 diri->ino(),
3706 diri->first, CEPH_NOSNAP,
3707 dn->get_name()),
3708 l->client);
3709 }
3710 assert(n == lock->get_num_client_lease());
3711 }
3712
3713
3714
3715 // locks ----------------------------------------------------------------
3716
3717 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3718 {
3719 switch (lock_type) {
3720 case CEPH_LOCK_DN:
3721 {
3722 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3723 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3724 frag_t fg;
3725 CDir *dir = 0;
3726 CDentry *dn = 0;
3727 if (diri) {
3728 fg = diri->pick_dirfrag(info.dname);
3729 dir = diri->get_dirfrag(fg);
3730 if (dir)
3731 dn = dir->lookup(info.dname, info.snapid);
3732 }
3733 if (!dn) {
3734 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3735 return 0;
3736 }
3737 return &dn->lock;
3738 }
3739
3740 case CEPH_LOCK_IAUTH:
3741 case CEPH_LOCK_ILINK:
3742 case CEPH_LOCK_IDFT:
3743 case CEPH_LOCK_IFILE:
3744 case CEPH_LOCK_INEST:
3745 case CEPH_LOCK_IXATTR:
3746 case CEPH_LOCK_ISNAP:
3747 case CEPH_LOCK_IFLOCK:
3748 case CEPH_LOCK_IPOLICY:
3749 {
3750 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3751 if (!in) {
3752 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3753 return 0;
3754 }
3755 switch (lock_type) {
3756 case CEPH_LOCK_IAUTH: return &in->authlock;
3757 case CEPH_LOCK_ILINK: return &in->linklock;
3758 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3759 case CEPH_LOCK_IFILE: return &in->filelock;
3760 case CEPH_LOCK_INEST: return &in->nestlock;
3761 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3762 case CEPH_LOCK_ISNAP: return &in->snaplock;
3763 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3764 case CEPH_LOCK_IPOLICY: return &in->policylock;
3765 }
3766 }
3767
3768 default:
3769 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3770 ceph_abort();
3771 break;
3772 }
3773
3774 return 0;
3775 }
3776
3777 /* This function DOES put the passed message before returning */
3778 void Locker::handle_lock(MLock *m)
3779 {
3780 // nobody should be talking to us during recovery.
3781 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3782
3783 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3784 if (!lock) {
3785 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3786 m->put();
3787 return;
3788 }
3789
3790 switch (lock->get_type()) {
3791 case CEPH_LOCK_DN:
3792 case CEPH_LOCK_IAUTH:
3793 case CEPH_LOCK_ILINK:
3794 case CEPH_LOCK_ISNAP:
3795 case CEPH_LOCK_IXATTR:
3796 case CEPH_LOCK_IFLOCK:
3797 case CEPH_LOCK_IPOLICY:
3798 handle_simple_lock(lock, m);
3799 break;
3800
3801 case CEPH_LOCK_IDFT:
3802 case CEPH_LOCK_INEST:
3803 //handle_scatter_lock((ScatterLock*)lock, m);
3804 //break;
3805
3806 case CEPH_LOCK_IFILE:
3807 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3808 break;
3809
3810 default:
3811 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3812 ceph_abort();
3813 break;
3814 }
3815 }
3816
3817
3818
3819
3820
3821 // ==========================================================================
3822 // simple lock
3823
3824 /** This function may take a reference to m if it needs one, but does
3825 * not put references. */
3826 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3827 {
3828 MDSCacheObject *parent = lock->get_parent();
3829 if (parent->is_auth() &&
3830 lock->get_state() != LOCK_SYNC &&
3831 !parent->is_frozen()) {
3832 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3833 << " on " << *parent << dendl;
3834 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3835 if (lock->is_stable()) {
3836 simple_sync(lock);
3837 } else {
3838 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3839 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3840 new C_MDS_RetryMessage(mds, m->get()));
3841 }
3842 } else {
3843 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3844 << " on " << *parent << dendl;
3845 // replica should retry
3846 }
3847 }
3848
3849 /* This function DOES put the passed message before returning */
3850 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3851 {
3852 int from = m->get_asker();
3853
3854 dout(10) << "handle_simple_lock " << *m
3855 << " on " << *lock << " " << *lock->get_parent() << dendl;
3856
3857 if (mds->is_rejoin()) {
3858 if (lock->get_parent()->is_rejoining()) {
3859 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3860 << ", dropping " << *m << dendl;
3861 m->put();
3862 return;
3863 }
3864 }
3865
3866 switch (m->get_action()) {
3867 // -- replica --
3868 case LOCK_AC_SYNC:
3869 assert(lock->get_state() == LOCK_LOCK);
3870 lock->decode_locked_state(m->get_data());
3871 lock->set_state(LOCK_SYNC);
3872 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3873 break;
3874
3875 case LOCK_AC_LOCK:
3876 assert(lock->get_state() == LOCK_SYNC);
3877 lock->set_state(LOCK_SYNC_LOCK);
3878 if (lock->is_leased())
3879 revoke_client_leases(lock);
3880 eval_gather(lock, true);
3881 if (lock->is_unstable_and_locked())
3882 mds->mdlog->flush();
3883 break;
3884
3885
3886 // -- auth --
3887 case LOCK_AC_LOCKACK:
3888 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3889 lock->get_state() == LOCK_SYNC_EXCL);
3890 assert(lock->is_gathering(from));
3891 lock->remove_gather(from);
3892
3893 if (lock->is_gathering()) {
3894 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3895 << ", still gathering " << lock->get_gather_set() << dendl;
3896 } else {
3897 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3898 << ", last one" << dendl;
3899 eval_gather(lock);
3900 }
3901 break;
3902
3903 case LOCK_AC_REQRDLOCK:
3904 handle_reqrdlock(lock, m);
3905 break;
3906
3907 }
3908
3909 m->put();
3910 }
3911
3912 /* unused, currently.
3913
3914 class C_Locker_SimpleEval : public Context {
3915 Locker *locker;
3916 SimpleLock *lock;
3917 public:
3918 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3919 void finish(int r) {
3920 locker->try_simple_eval(lock);
3921 }
3922 };
3923
3924 void Locker::try_simple_eval(SimpleLock *lock)
3925 {
3926 // unstable and ambiguous auth?
3927 if (!lock->is_stable() &&
3928 lock->get_parent()->is_ambiguous_auth()) {
3929 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3930 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3931 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3932 return;
3933 }
3934
3935 if (!lock->get_parent()->is_auth()) {
3936 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3937 return;
3938 }
3939
3940 if (!lock->get_parent()->can_auth_pin()) {
3941 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3942 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3943 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3944 return;
3945 }
3946
3947 if (lock->is_stable())
3948 simple_eval(lock);
3949 }
3950 */
3951
3952
3953 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3954 {
3955 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3956
3957 assert(lock->get_parent()->is_auth());
3958 assert(lock->is_stable());
3959
3960 if (lock->get_parent()->is_freezing_or_frozen()) {
3961 // dentry lock in unreadable state can block path traverse
3962 if ((lock->get_type() != CEPH_LOCK_DN ||
3963 lock->get_state() == LOCK_SYNC ||
3964 lock->get_parent()->is_frozen()))
3965 return;
3966 }
3967
3968 if (mdcache->is_readonly()) {
3969 if (lock->get_state() != LOCK_SYNC) {
3970 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3971 simple_sync(lock, need_issue);
3972 }
3973 return;
3974 }
3975
3976 CInode *in = 0;
3977 int wanted = 0;
3978 if (lock->get_type() != CEPH_LOCK_DN) {
3979 in = static_cast<CInode*>(lock->get_parent());
3980 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3981 }
3982
3983 // -> excl?
3984 if (lock->get_state() != LOCK_EXCL &&
3985 in && in->get_target_loner() >= 0 &&
3986 (wanted & CEPH_CAP_GEXCL)) {
3987 dout(7) << "simple_eval stable, going to excl " << *lock
3988 << " on " << *lock->get_parent() << dendl;
3989 simple_excl(lock, need_issue);
3990 }
3991
3992 // stable -> sync?
3993 else if (lock->get_state() != LOCK_SYNC &&
3994 !lock->is_wrlocked() &&
3995 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
3996 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
3997 dout(7) << "simple_eval stable, syncing " << *lock
3998 << " on " << *lock->get_parent() << dendl;
3999 simple_sync(lock, need_issue);
4000 }
4001 }
4002
4003
4004 // mid
4005
4006 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4007 {
4008 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4009 assert(lock->get_parent()->is_auth());
4010 assert(lock->is_stable());
4011
4012 CInode *in = 0;
4013 if (lock->get_cap_shift())
4014 in = static_cast<CInode *>(lock->get_parent());
4015
4016 int old_state = lock->get_state();
4017
4018 if (old_state != LOCK_TSYN) {
4019
4020 switch (lock->get_state()) {
4021 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4022 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4023 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4024 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4025 default: ceph_abort();
4026 }
4027
4028 int gather = 0;
4029 if (lock->is_wrlocked())
4030 gather++;
4031
4032 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4033 send_lock_message(lock, LOCK_AC_SYNC);
4034 lock->init_gather();
4035 gather++;
4036 }
4037
4038 if (in && in->is_head()) {
4039 if (in->issued_caps_need_gather(lock)) {
4040 if (need_issue)
4041 *need_issue = true;
4042 else
4043 issue_caps(in);
4044 gather++;
4045 }
4046 }
4047
4048 bool need_recover = false;
4049 if (lock->get_type() == CEPH_LOCK_IFILE) {
4050 assert(in);
4051 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4052 mds->mdcache->queue_file_recover(in);
4053 need_recover = true;
4054 gather++;
4055 }
4056 }
4057
4058 if (!gather && lock->is_dirty()) {
4059 lock->get_parent()->auth_pin(lock);
4060 scatter_writebehind(static_cast<ScatterLock*>(lock));
4061 mds->mdlog->flush();
4062 return false;
4063 }
4064
4065 if (gather) {
4066 lock->get_parent()->auth_pin(lock);
4067 if (need_recover)
4068 mds->mdcache->do_file_recover();
4069 return false;
4070 }
4071 }
4072
4073 if (lock->get_parent()->is_replicated()) { // FIXME
4074 bufferlist data;
4075 lock->encode_locked_state(data);
4076 send_lock_message(lock, LOCK_AC_SYNC, data);
4077 }
4078 lock->set_state(LOCK_SYNC);
4079 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4080 if (in && in->is_head()) {
4081 if (need_issue)
4082 *need_issue = true;
4083 else
4084 issue_caps(in);
4085 }
4086 return true;
4087 }
4088
4089 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4090 {
4091 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4092 assert(lock->get_parent()->is_auth());
4093 assert(lock->is_stable());
4094
4095 CInode *in = 0;
4096 if (lock->get_cap_shift())
4097 in = static_cast<CInode *>(lock->get_parent());
4098
4099 switch (lock->get_state()) {
4100 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4101 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4102 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4103 default: ceph_abort();
4104 }
4105
4106 int gather = 0;
4107 if (lock->is_rdlocked())
4108 gather++;
4109 if (lock->is_wrlocked())
4110 gather++;
4111
4112 if (lock->get_parent()->is_replicated() &&
4113 lock->get_state() != LOCK_LOCK_EXCL &&
4114 lock->get_state() != LOCK_XSYN_EXCL) {
4115 send_lock_message(lock, LOCK_AC_LOCK);
4116 lock->init_gather();
4117 gather++;
4118 }
4119
4120 if (in && in->is_head()) {
4121 if (in->issued_caps_need_gather(lock)) {
4122 if (need_issue)
4123 *need_issue = true;
4124 else
4125 issue_caps(in);
4126 gather++;
4127 }
4128 }
4129
4130 if (gather) {
4131 lock->get_parent()->auth_pin(lock);
4132 } else {
4133 lock->set_state(LOCK_EXCL);
4134 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4135 if (in) {
4136 if (need_issue)
4137 *need_issue = true;
4138 else
4139 issue_caps(in);
4140 }
4141 }
4142 }
4143
4144 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4145 {
4146 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4147 assert(lock->get_parent()->is_auth());
4148 assert(lock->is_stable());
4149 assert(lock->get_state() != LOCK_LOCK);
4150
4151 CInode *in = 0;
4152 if (lock->get_cap_shift())
4153 in = static_cast<CInode *>(lock->get_parent());
4154
4155 int old_state = lock->get_state();
4156
4157 switch (lock->get_state()) {
4158 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4159 case LOCK_XSYN:
4160 file_excl(static_cast<ScatterLock*>(lock), need_issue);
4161 if (lock->get_state() != LOCK_EXCL)
4162 return;
4163 // fall-thru
4164 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4165 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4166 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4167 break;
4168 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4169 default: ceph_abort();
4170 }
4171
4172 int gather = 0;
4173 if (lock->is_leased()) {
4174 gather++;
4175 revoke_client_leases(lock);
4176 }
4177 if (lock->is_rdlocked())
4178 gather++;
4179 if (in && in->is_head()) {
4180 if (in->issued_caps_need_gather(lock)) {
4181 if (need_issue)
4182 *need_issue = true;
4183 else
4184 issue_caps(in);
4185 gather++;
4186 }
4187 }
4188
4189 bool need_recover = false;
4190 if (lock->get_type() == CEPH_LOCK_IFILE) {
4191 assert(in);
4192 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4193 mds->mdcache->queue_file_recover(in);
4194 need_recover = true;
4195 gather++;
4196 }
4197 }
4198
4199 if (lock->get_parent()->is_replicated() &&
4200 lock->get_state() == LOCK_MIX_LOCK &&
4201 gather) {
4202 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4203 } else {
4204 // move to second stage of gather now, so we don't send the lock action later.
4205 if (lock->get_state() == LOCK_MIX_LOCK)
4206 lock->set_state(LOCK_MIX_LOCK2);
4207
4208 if (lock->get_parent()->is_replicated() &&
4209 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4210 gather++;
4211 send_lock_message(lock, LOCK_AC_LOCK);
4212 lock->init_gather();
4213 }
4214 }
4215
4216 if (!gather && lock->is_dirty()) {
4217 lock->get_parent()->auth_pin(lock);
4218 scatter_writebehind(static_cast<ScatterLock*>(lock));
4219 mds->mdlog->flush();
4220 return;
4221 }
4222
4223 if (gather) {
4224 lock->get_parent()->auth_pin(lock);
4225 if (need_recover)
4226 mds->mdcache->do_file_recover();
4227 } else {
4228 lock->set_state(LOCK_LOCK);
4229 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4230 }
4231 }
4232
4233
4234 void Locker::simple_xlock(SimpleLock *lock)
4235 {
4236 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4237 assert(lock->get_parent()->is_auth());
4238 //assert(lock->is_stable());
4239 assert(lock->get_state() != LOCK_XLOCK);
4240
4241 CInode *in = 0;
4242 if (lock->get_cap_shift())
4243 in = static_cast<CInode *>(lock->get_parent());
4244
4245 if (lock->is_stable())
4246 lock->get_parent()->auth_pin(lock);
4247
4248 switch (lock->get_state()) {
4249 case LOCK_LOCK:
4250 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4251 default: ceph_abort();
4252 }
4253
4254 int gather = 0;
4255 if (lock->is_rdlocked())
4256 gather++;
4257 if (lock->is_wrlocked())
4258 gather++;
4259
4260 if (in && in->is_head()) {
4261 if (in->issued_caps_need_gather(lock)) {
4262 issue_caps(in);
4263 gather++;
4264 }
4265 }
4266
4267 if (!gather) {
4268 lock->set_state(LOCK_PREXLOCK);
4269 //assert("shouldn't be called if we are already xlockable" == 0);
4270 }
4271 }
4272
4273
4274
4275
4276
4277 // ==========================================================================
4278 // scatter lock
4279
4280 /*
4281
4282 Some notes on scatterlocks.
4283
4284 - The scatter/gather is driven by the inode lock. The scatter always
4285 brings in the latest metadata from the fragments.
4286
4287 - When in a scattered/MIX state, fragments are only allowed to
4288 update/be written to if the accounted stat matches the inode's
4289 current version.
4290
4291 - That means, on gather, we _only_ assimilate diffs for frag metadata
4292 that match the current version, because those are the only ones
4293 written during this scatter/gather cycle. (Others didn't permit
4294 it.) We increment the version and journal this to disk.
4295
4296 - When possible, we also simultaneously update our local frag
4297 accounted stats to match.
4298
4299 - On scatter, the new inode info is broadcast to frags, both local
4300 and remote. If possible (auth and !frozen), the dirfrag auth
4301 should update the accounted state (if it isn't already up to date).
4302 Note that this may occur on both the local inode auth node and
4303 inode replicas, so there are two potential paths. If it is NOT
4304 possible, they need to mark_stale to prevent any possible writes.
4305
4306 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4307 only). Both are opportunities to update the frag accounted stats,
4308 even though only the MIX case is affected by a stale dirfrag.
4309
4310 - Because many scatter/gather cycles can potentially go by without a
4311 frag being able to update its accounted stats (due to being frozen
4312 by exports/refragments in progress), the frag may have (even very)
4313 old stat versions. That's fine. If when we do want to update it,
4314 we can update accounted_* and the version first.
4315
4316 */
4317
4318 class C_Locker_ScatterWB : public LockerLogContext {
4319 ScatterLock *lock;
4320 MutationRef mut;
4321 public:
4322 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4323 LockerLogContext(l), lock(sl), mut(m) {}
4324 void finish(int r) override {
4325 locker->scatter_writebehind_finish(lock, mut);
4326 }
4327 };
4328
4329 void Locker::scatter_writebehind(ScatterLock *lock)
4330 {
4331 CInode *in = static_cast<CInode*>(lock->get_parent());
4332 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4333
4334 // journal
4335 MutationRef mut(new MutationImpl());
4336 mut->ls = mds->mdlog->get_current_segment();
4337
4338 // forcefully take a wrlock
4339 lock->get_wrlock(true);
4340 mut->wrlocks.insert(lock);
4341 mut->locks.insert(lock);
4342
4343 in->pre_cow_old_inode(); // avoid cow mayhem
4344
4345 inode_t *pi = in->project_inode();
4346 pi->version = in->pre_dirty();
4347
4348 in->finish_scatter_gather_update(lock->get_type());
4349 lock->start_flush();
4350
4351 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4352 mds->mdlog->start_entry(le);
4353
4354 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4355 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4356
4357 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4358
4359 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4360 }
4361
4362 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4363 {
4364 CInode *in = static_cast<CInode*>(lock->get_parent());
4365 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4366 in->pop_and_dirty_projected_inode(mut->ls);
4367
4368 lock->finish_flush();
4369
4370 // if replicas may have flushed in a mix->lock state, send another
4371 // message so they can finish_flush().
4372 if (in->is_replicated()) {
4373 switch (lock->get_state()) {
4374 case LOCK_MIX_LOCK:
4375 case LOCK_MIX_LOCK2:
4376 case LOCK_MIX_EXCL:
4377 case LOCK_MIX_TSYN:
4378 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4379 }
4380 }
4381
4382 mut->apply();
4383 drop_locks(mut.get());
4384 mut->cleanup();
4385
4386 if (lock->is_stable())
4387 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4388
4389 //scatter_eval_gather(lock);
4390 }
4391
4392 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4393 {
4394 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4395
4396 assert(lock->get_parent()->is_auth());
4397 assert(lock->is_stable());
4398
4399 if (lock->get_parent()->is_freezing_or_frozen()) {
4400 dout(20) << " freezing|frozen" << dendl;
4401 return;
4402 }
4403
4404 if (mdcache->is_readonly()) {
4405 if (lock->get_state() != LOCK_SYNC) {
4406 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4407 simple_sync(lock, need_issue);
4408 }
4409 return;
4410 }
4411
4412 if (!lock->is_rdlocked() &&
4413 lock->get_state() != LOCK_MIX &&
4414 lock->get_scatter_wanted()) {
4415 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4416 << " on " << *lock->get_parent() << dendl;
4417 scatter_mix(lock, need_issue);
4418 return;
4419 }
4420
4421 if (lock->get_type() == CEPH_LOCK_INEST) {
4422 // in general, we want to keep INEST writable at all times.
4423 if (!lock->is_rdlocked()) {
4424 if (lock->get_parent()->is_replicated()) {
4425 if (lock->get_state() != LOCK_MIX)
4426 scatter_mix(lock, need_issue);
4427 } else {
4428 if (lock->get_state() != LOCK_LOCK)
4429 simple_lock(lock, need_issue);
4430 }
4431 }
4432 return;
4433 }
4434
4435 CInode *in = static_cast<CInode*>(lock->get_parent());
4436 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4437 // i _should_ be sync.
4438 if (!lock->is_wrlocked() &&
4439 lock->get_state() != LOCK_SYNC) {
4440 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4441 simple_sync(lock, need_issue);
4442 }
4443 }
4444 }
4445
4446
4447 /*
4448 * mark a scatterlock to indicate that the dir fnode has some dirty data
4449 */
4450 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4451 {
4452 lock->mark_dirty();
4453 if (lock->get_updated_item()->is_on_list()) {
4454 dout(10) << "mark_updated_scatterlock " << *lock
4455 << " - already on list since " << lock->get_update_stamp() << dendl;
4456 } else {
4457 updated_scatterlocks.push_back(lock->get_updated_item());
4458 utime_t now = ceph_clock_now();
4459 lock->set_update_stamp(now);
4460 dout(10) << "mark_updated_scatterlock " << *lock
4461 << " - added at " << now << dendl;
4462 }
4463 }
4464
4465 /*
4466 * this is called by scatter_tick and LogSegment::try_to_trim() when
4467 * trying to flush dirty scattered data (i.e. updated fnode) back to
4468 * the inode.
4469 *
4470 * we need to lock|scatter in order to push fnode changes into the
4471 * inode.dirstat.
4472 */
4473 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4474 {
4475 CInode *p = static_cast<CInode *>(lock->get_parent());
4476
4477 if (p->is_frozen() || p->is_freezing()) {
4478 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4479 if (c)
4480 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4481 else
4482 // just requeue. not ideal.. starvation prone..
4483 updated_scatterlocks.push_back(lock->get_updated_item());
4484 return;
4485 }
4486
4487 if (p->is_ambiguous_auth()) {
4488 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4489 if (c)
4490 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4491 else
4492 // just requeue. not ideal.. starvation prone..
4493 updated_scatterlocks.push_back(lock->get_updated_item());
4494 return;
4495 }
4496
4497 if (p->is_auth()) {
4498 int count = 0;
4499 while (true) {
4500 if (lock->is_stable()) {
4501 // can we do it now?
4502 // (only if we're not replicated.. if we are, we really do need
4503 // to nudge the lock state!)
4504 /*
4505 actually, even if we're not replicated, we can't stay in MIX, because another mds
4506 could discover and replicate us at any time. if that happens while we're flushing,
4507 they end up in MIX but their inode has the old scatterstat version.
4508
4509 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4510 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4511 scatter_writebehind(lock);
4512 if (c)
4513 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4514 return;
4515 }
4516 */
4517
4518 if (mdcache->is_readonly()) {
4519 if (lock->get_state() != LOCK_SYNC) {
4520 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4521 simple_sync(static_cast<ScatterLock*>(lock));
4522 }
4523 break;
4524 }
4525
4526 // adjust lock state
4527 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4528 switch (lock->get_type()) {
4529 case CEPH_LOCK_IFILE:
4530 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4531 scatter_mix(static_cast<ScatterLock*>(lock));
4532 else if (lock->get_state() != LOCK_LOCK)
4533 simple_lock(static_cast<ScatterLock*>(lock));
4534 else
4535 simple_sync(static_cast<ScatterLock*>(lock));
4536 break;
4537
4538 case CEPH_LOCK_IDFT:
4539 case CEPH_LOCK_INEST:
4540 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4541 scatter_mix(lock);
4542 else if (lock->get_state() != LOCK_LOCK)
4543 simple_lock(lock);
4544 else
4545 simple_sync(lock);
4546 break;
4547 default:
4548 ceph_abort();
4549 }
4550 ++count;
4551 if (lock->is_stable() && count == 2) {
4552 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4553 // this should only realy happen when called via
4554 // handle_file_lock due to AC_NUDGE, because the rest of the
4555 // time we are replicated or have dirty data and won't get
4556 // called. bailing here avoids an infinite loop.
4557 assert(!c);
4558 break;
4559 }
4560 } else {
4561 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4562 if (c)
4563 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4564 return;
4565 }
4566 }
4567 } else {
4568 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4569 << *lock << " on " << *p << dendl;
4570 // request unscatter?
4571 mds_rank_t auth = lock->get_parent()->authority().first;
4572 if (!mds->is_cluster_degraded() ||
4573 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4574 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4575
4576 // wait...
4577 if (c)
4578 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4579
4580 // also, requeue, in case we had wrong auth or something
4581 updated_scatterlocks.push_back(lock->get_updated_item());
4582 }
4583 }
4584
4585 void Locker::scatter_tick()
4586 {
4587 dout(10) << "scatter_tick" << dendl;
4588
4589 // updated
4590 utime_t now = ceph_clock_now();
4591 int n = updated_scatterlocks.size();
4592 while (!updated_scatterlocks.empty()) {
4593 ScatterLock *lock = updated_scatterlocks.front();
4594
4595 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4596
4597 if (!lock->is_dirty()) {
4598 updated_scatterlocks.pop_front();
4599 dout(10) << " removing from updated_scatterlocks "
4600 << *lock << " " << *lock->get_parent() << dendl;
4601 continue;
4602 }
4603 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4604 break;
4605 updated_scatterlocks.pop_front();
4606 scatter_nudge(lock, 0);
4607 }
4608 mds->mdlog->flush();
4609 }
4610
4611
4612 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4613 {
4614 dout(10) << "scatter_tempsync " << *lock
4615 << " on " << *lock->get_parent() << dendl;
4616 assert(lock->get_parent()->is_auth());
4617 assert(lock->is_stable());
4618
4619 assert(0 == "not fully implemented, at least not for filelock");
4620
4621 CInode *in = static_cast<CInode *>(lock->get_parent());
4622
4623 switch (lock->get_state()) {
4624 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4625 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4626 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4627 default: ceph_abort();
4628 }
4629
4630 int gather = 0;
4631 if (lock->is_wrlocked())
4632 gather++;
4633
4634 if (lock->get_cap_shift() &&
4635 in->is_head() &&
4636 in->issued_caps_need_gather(lock)) {
4637 if (need_issue)
4638 *need_issue = true;
4639 else
4640 issue_caps(in);
4641 gather++;
4642 }
4643
4644 if (lock->get_state() == LOCK_MIX_TSYN &&
4645 in->is_replicated()) {
4646 lock->init_gather();
4647 send_lock_message(lock, LOCK_AC_LOCK);
4648 gather++;
4649 }
4650
4651 if (gather) {
4652 in->auth_pin(lock);
4653 } else {
4654 // do tempsync
4655 lock->set_state(LOCK_TSYN);
4656 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4657 if (lock->get_cap_shift()) {
4658 if (need_issue)
4659 *need_issue = true;
4660 else
4661 issue_caps(in);
4662 }
4663 }
4664 }
4665
4666
4667
4668 // ==========================================================================
4669 // local lock
4670
4671 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4672 {
4673 dout(7) << "local_wrlock_grab on " << *lock
4674 << " on " << *lock->get_parent() << dendl;
4675
4676 assert(lock->get_parent()->is_auth());
4677 assert(lock->can_wrlock());
4678 assert(!mut->wrlocks.count(lock));
4679 lock->get_wrlock(mut->get_client());
4680 mut->wrlocks.insert(lock);
4681 mut->locks.insert(lock);
4682 }
4683
4684 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4685 {
4686 dout(7) << "local_wrlock_start on " << *lock
4687 << " on " << *lock->get_parent() << dendl;
4688
4689 assert(lock->get_parent()->is_auth());
4690 if (lock->can_wrlock()) {
4691 assert(!mut->wrlocks.count(lock));
4692 lock->get_wrlock(mut->get_client());
4693 mut->wrlocks.insert(lock);
4694 mut->locks.insert(lock);
4695 return true;
4696 } else {
4697 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4698 return false;
4699 }
4700 }
4701
4702 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4703 {
4704 dout(7) << "local_wrlock_finish on " << *lock
4705 << " on " << *lock->get_parent() << dendl;
4706 lock->put_wrlock();
4707 mut->wrlocks.erase(lock);
4708 mut->locks.erase(lock);
4709 if (lock->get_num_wrlocks() == 0) {
4710 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4711 SimpleLock::WAIT_WR |
4712 SimpleLock::WAIT_RD);
4713 }
4714 }
4715
4716 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4717 {
4718 dout(7) << "local_xlock_start on " << *lock
4719 << " on " << *lock->get_parent() << dendl;
4720
4721 assert(lock->get_parent()->is_auth());
4722 if (!lock->can_xlock_local()) {
4723 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4724 return false;
4725 }
4726
4727 lock->get_xlock(mut, mut->get_client());
4728 mut->xlocks.insert(lock);
4729 mut->locks.insert(lock);
4730 return true;
4731 }
4732
4733 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4734 {
4735 dout(7) << "local_xlock_finish on " << *lock
4736 << " on " << *lock->get_parent() << dendl;
4737 lock->put_xlock();
4738 mut->xlocks.erase(lock);
4739 mut->locks.erase(lock);
4740
4741 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4742 SimpleLock::WAIT_WR |
4743 SimpleLock::WAIT_RD);
4744 }
4745
4746
4747
4748 // ==========================================================================
4749 // file lock
4750
4751
4752 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4753 {
4754 CInode *in = static_cast<CInode*>(lock->get_parent());
4755 int loner_wanted, other_wanted;
4756 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4757 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4758 << " loner_wanted=" << gcap_string(loner_wanted)
4759 << " other_wanted=" << gcap_string(other_wanted)
4760 << " filelock=" << *lock << " on " << *lock->get_parent()
4761 << dendl;
4762
4763 assert(lock->get_parent()->is_auth());
4764 assert(lock->is_stable());
4765
4766 if (lock->get_parent()->is_freezing_or_frozen())
4767 return;
4768
4769 if (mdcache->is_readonly()) {
4770 if (lock->get_state() != LOCK_SYNC) {
4771 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4772 simple_sync(lock, need_issue);
4773 }
4774 return;
4775 }
4776
4777 // excl -> *?
4778 if (lock->get_state() == LOCK_EXCL) {
4779 dout(20) << " is excl" << dendl;
4780 int loner_issued, other_issued, xlocker_issued;
4781 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4782 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4783 << " other_issued=" << gcap_string(other_issued)
4784 << " xlocker_issued=" << gcap_string(xlocker_issued)
4785 << dendl;
4786 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4787 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4788 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4789 dout(20) << " should lose it" << dendl;
4790 // we should lose it.
4791 // loner other want
4792 // R R SYNC
4793 // R R|W MIX
4794 // R W MIX
4795 // R|W R MIX
4796 // R|W R|W MIX
4797 // R|W W MIX
4798 // W R MIX
4799 // W R|W MIX
4800 // W W MIX
4801 // -> any writer means MIX; RD doesn't matter.
4802 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4803 lock->is_waiter_for(SimpleLock::WAIT_WR))
4804 scatter_mix(lock, need_issue);
4805 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4806 simple_sync(lock, need_issue);
4807 else
4808 dout(10) << " waiting for wrlock to drain" << dendl;
4809 }
4810 }
4811
4812 // * -> excl?
4813 else if (lock->get_state() != LOCK_EXCL &&
4814 !lock->is_rdlocked() &&
4815 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4816 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4817 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4818 in->get_target_loner() >= 0) {
4819 dout(7) << "file_eval stable, bump to loner " << *lock
4820 << " on " << *lock->get_parent() << dendl;
4821 file_excl(lock, need_issue);
4822 }
4823
4824 // * -> mixed?
4825 else if (lock->get_state() != LOCK_MIX &&
4826 !lock->is_rdlocked() &&
4827 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4828 (lock->get_scatter_wanted() ||
4829 (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4830 dout(7) << "file_eval stable, bump to mixed " << *lock
4831 << " on " << *lock->get_parent() << dendl;
4832 scatter_mix(lock, need_issue);
4833 }
4834
4835 // * -> sync?
4836 else if (lock->get_state() != LOCK_SYNC &&
4837 !lock->is_wrlocked() && // drain wrlocks first!
4838 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4839 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4840 !((lock->get_state() == LOCK_MIX) &&
4841 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4842 //((wanted & CEPH_CAP_RD) ||
4843 //in->is_replicated() ||
4844 //lock->get_num_client_lease() ||
4845 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4846 ) {
4847 dout(7) << "file_eval stable, bump to sync " << *lock
4848 << " on " << *lock->get_parent() << dendl;
4849 simple_sync(lock, need_issue);
4850 }
4851 }
4852
4853
4854
4855 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4856 {
4857 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4858
4859 CInode *in = static_cast<CInode*>(lock->get_parent());
4860 assert(in->is_auth());
4861 assert(lock->is_stable());
4862
4863 if (lock->get_state() == LOCK_LOCK) {
4864 in->start_scatter(lock);
4865 if (in->is_replicated()) {
4866 // data
4867 bufferlist softdata;
4868 lock->encode_locked_state(softdata);
4869
4870 // bcast to replicas
4871 send_lock_message(lock, LOCK_AC_MIX, softdata);
4872 }
4873
4874 // change lock
4875 lock->set_state(LOCK_MIX);
4876 lock->clear_scatter_wanted();
4877 if (lock->get_cap_shift()) {
4878 if (need_issue)
4879 *need_issue = true;
4880 else
4881 issue_caps(in);
4882 }
4883 } else {
4884 // gather?
4885 switch (lock->get_state()) {
4886 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4887 case LOCK_XSYN:
4888 file_excl(lock, need_issue);
4889 if (lock->get_state() != LOCK_EXCL)
4890 return;
4891 // fall-thru
4892 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4893 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4894 default: ceph_abort();
4895 }
4896
4897 int gather = 0;
4898 if (lock->is_rdlocked())
4899 gather++;
4900 if (in->is_replicated()) {
4901 if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK
4902 lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too!
4903 send_lock_message(lock, LOCK_AC_MIX);
4904 lock->init_gather();
4905 gather++;
4906 }
4907 }
4908 if (lock->is_leased()) {
4909 revoke_client_leases(lock);
4910 gather++;
4911 }
4912 if (lock->get_cap_shift() &&
4913 in->is_head() &&
4914 in->issued_caps_need_gather(lock)) {
4915 if (need_issue)
4916 *need_issue = true;
4917 else
4918 issue_caps(in);
4919 gather++;
4920 }
4921 bool need_recover = false;
4922 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4923 mds->mdcache->queue_file_recover(in);
4924 need_recover = true;
4925 gather++;
4926 }
4927
4928 if (gather) {
4929 lock->get_parent()->auth_pin(lock);
4930 if (need_recover)
4931 mds->mdcache->do_file_recover();
4932 } else {
4933 in->start_scatter(lock);
4934 lock->set_state(LOCK_MIX);
4935 lock->clear_scatter_wanted();
4936 if (in->is_replicated()) {
4937 bufferlist softdata;
4938 lock->encode_locked_state(softdata);
4939 send_lock_message(lock, LOCK_AC_MIX, softdata);
4940 }
4941 if (lock->get_cap_shift()) {
4942 if (need_issue)
4943 *need_issue = true;
4944 else
4945 issue_caps(in);
4946 }
4947 }
4948 }
4949 }
4950
4951
4952 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4953 {
4954 CInode *in = static_cast<CInode*>(lock->get_parent());
4955 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4956
4957 assert(in->is_auth());
4958 assert(lock->is_stable());
4959
4960 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4961 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4962
4963 switch (lock->get_state()) {
4964 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4965 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4966 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4967 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4968 default: ceph_abort();
4969 }
4970 int gather = 0;
4971
4972 if (lock->is_rdlocked())
4973 gather++;
4974 if (lock->is_wrlocked())
4975 gather++;
4976
4977 if (in->is_replicated() &&
4978 lock->get_state() != LOCK_LOCK_EXCL &&
4979 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4980 send_lock_message(lock, LOCK_AC_LOCK);
4981 lock->init_gather();
4982 gather++;
4983 }
4984 if (lock->is_leased()) {
4985 revoke_client_leases(lock);
4986 gather++;
4987 }
4988 if (in->is_head() &&
4989 in->issued_caps_need_gather(lock)) {
4990 if (need_issue)
4991 *need_issue = true;
4992 else
4993 issue_caps(in);
4994 gather++;
4995 }
4996 bool need_recover = false;
4997 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4998 mds->mdcache->queue_file_recover(in);
4999 need_recover = true;
5000 gather++;
5001 }
5002
5003 if (gather) {
5004 lock->get_parent()->auth_pin(lock);
5005 if (need_recover)
5006 mds->mdcache->do_file_recover();
5007 } else {
5008 lock->set_state(LOCK_EXCL);
5009 if (need_issue)
5010 *need_issue = true;
5011 else
5012 issue_caps(in);
5013 }
5014 }
5015
5016 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5017 {
5018 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5019 CInode *in = static_cast<CInode *>(lock->get_parent());
5020 assert(in->is_auth());
5021 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5022
5023 switch (lock->get_state()) {
5024 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5025 default: ceph_abort();
5026 }
5027
5028 int gather = 0;
5029 if (lock->is_wrlocked())
5030 gather++;
5031
5032 if (in->is_head() &&
5033 in->issued_caps_need_gather(lock)) {
5034 if (need_issue)
5035 *need_issue = true;
5036 else
5037 issue_caps(in);
5038 gather++;
5039 }
5040
5041 if (gather) {
5042 lock->get_parent()->auth_pin(lock);
5043 } else {
5044 lock->set_state(LOCK_XSYN);
5045 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5046 if (need_issue)
5047 *need_issue = true;
5048 else
5049 issue_caps(in);
5050 }
5051 }
5052
5053 void Locker::file_recover(ScatterLock *lock)
5054 {
5055 CInode *in = static_cast<CInode *>(lock->get_parent());
5056 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5057
5058 assert(in->is_auth());
5059 //assert(lock->is_stable());
5060 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5061
5062 int gather = 0;
5063
5064 /*
5065 if (in->is_replicated()
5066 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5067 send_lock_message(lock, LOCK_AC_LOCK);
5068 lock->init_gather();
5069 gather++;
5070 }
5071 */
5072 if (in->is_head() &&
5073 in->issued_caps_need_gather(lock)) {
5074 issue_caps(in);
5075 gather++;
5076 }
5077
5078 lock->set_state(LOCK_SCAN);
5079 if (gather)
5080 in->state_set(CInode::STATE_NEEDSRECOVER);
5081 else
5082 mds->mdcache->queue_file_recover(in);
5083 }
5084
5085
5086 // messenger
5087 /* This function DOES put the passed message before returning */
5088 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5089 {
5090 CInode *in = static_cast<CInode*>(lock->get_parent());
5091 int from = m->get_asker();
5092
5093 if (mds->is_rejoin()) {
5094 if (in->is_rejoining()) {
5095 dout(7) << "handle_file_lock still rejoining " << *in
5096 << ", dropping " << *m << dendl;
5097 m->put();
5098 return;
5099 }
5100 }
5101
5102 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5103 << " on " << *lock
5104 << " from mds." << from << " "
5105 << *in << dendl;
5106
5107 bool caps = lock->get_cap_shift();
5108
5109 switch (m->get_action()) {
5110 // -- replica --
5111 case LOCK_AC_SYNC:
5112 assert(lock->get_state() == LOCK_LOCK ||
5113 lock->get_state() == LOCK_MIX ||
5114 lock->get_state() == LOCK_MIX_SYNC2);
5115
5116 if (lock->get_state() == LOCK_MIX) {
5117 lock->set_state(LOCK_MIX_SYNC);
5118 eval_gather(lock, true);
5119 if (lock->is_unstable_and_locked())
5120 mds->mdlog->flush();
5121 break;
5122 }
5123
5124 (static_cast<ScatterLock *>(lock))->finish_flush();
5125 (static_cast<ScatterLock *>(lock))->clear_flushed();
5126
5127 // ok
5128 lock->decode_locked_state(m->get_data());
5129 lock->set_state(LOCK_SYNC);
5130
5131 lock->get_rdlock();
5132 if (caps)
5133 issue_caps(in);
5134 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5135 lock->put_rdlock();
5136 break;
5137
5138 case LOCK_AC_LOCK:
5139 switch (lock->get_state()) {
5140 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5141 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5142 default: ceph_abort();
5143 }
5144
5145 eval_gather(lock, true);
5146 if (lock->is_unstable_and_locked())
5147 mds->mdlog->flush();
5148
5149 break;
5150
5151 case LOCK_AC_LOCKFLUSHED:
5152 (static_cast<ScatterLock *>(lock))->finish_flush();
5153 (static_cast<ScatterLock *>(lock))->clear_flushed();
5154 // wake up scatter_nudge waiters
5155 if (lock->is_stable())
5156 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5157 break;
5158
5159 case LOCK_AC_MIX:
5160 assert(lock->get_state() == LOCK_SYNC ||
5161 lock->get_state() == LOCK_LOCK ||
5162 lock->get_state() == LOCK_SYNC_MIX2);
5163
5164 if (lock->get_state() == LOCK_SYNC) {
5165 // MIXED
5166 lock->set_state(LOCK_SYNC_MIX);
5167 eval_gather(lock, true);
5168 if (lock->is_unstable_and_locked())
5169 mds->mdlog->flush();
5170 break;
5171 }
5172
5173 // ok
5174 lock->set_state(LOCK_MIX);
5175 lock->decode_locked_state(m->get_data());
5176
5177 if (caps)
5178 issue_caps(in);
5179
5180 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5181 break;
5182
5183
5184 // -- auth --
5185 case LOCK_AC_LOCKACK:
5186 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5187 lock->get_state() == LOCK_MIX_LOCK ||
5188 lock->get_state() == LOCK_MIX_LOCK2 ||
5189 lock->get_state() == LOCK_MIX_EXCL ||
5190 lock->get_state() == LOCK_SYNC_EXCL ||
5191 lock->get_state() == LOCK_SYNC_MIX ||
5192 lock->get_state() == LOCK_MIX_TSYN);
5193 assert(lock->is_gathering(from));
5194 lock->remove_gather(from);
5195
5196 if (lock->get_state() == LOCK_MIX_LOCK ||
5197 lock->get_state() == LOCK_MIX_LOCK2 ||
5198 lock->get_state() == LOCK_MIX_EXCL ||
5199 lock->get_state() == LOCK_MIX_TSYN) {
5200 lock->decode_locked_state(m->get_data());
5201 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5202 // delay calling scatter_writebehind().
5203 lock->clear_flushed();
5204 }
5205
5206 if (lock->is_gathering()) {
5207 dout(7) << "handle_file_lock " << *in << " from " << from
5208 << ", still gathering " << lock->get_gather_set() << dendl;
5209 } else {
5210 dout(7) << "handle_file_lock " << *in << " from " << from
5211 << ", last one" << dendl;
5212 eval_gather(lock);
5213 }
5214 break;
5215
5216 case LOCK_AC_SYNCACK:
5217 assert(lock->get_state() == LOCK_MIX_SYNC);
5218 assert(lock->is_gathering(from));
5219 lock->remove_gather(from);
5220
5221 lock->decode_locked_state(m->get_data());
5222
5223 if (lock->is_gathering()) {
5224 dout(7) << "handle_file_lock " << *in << " from " << from
5225 << ", still gathering " << lock->get_gather_set() << dendl;
5226 } else {
5227 dout(7) << "handle_file_lock " << *in << " from " << from
5228 << ", last one" << dendl;
5229 eval_gather(lock);
5230 }
5231 break;
5232
5233 case LOCK_AC_MIXACK:
5234 assert(lock->get_state() == LOCK_SYNC_MIX);
5235 assert(lock->is_gathering(from));
5236 lock->remove_gather(from);
5237
5238 if (lock->is_gathering()) {
5239 dout(7) << "handle_file_lock " << *in << " from " << from
5240 << ", still gathering " << lock->get_gather_set() << dendl;
5241 } else {
5242 dout(7) << "handle_file_lock " << *in << " from " << from
5243 << ", last one" << dendl;
5244 eval_gather(lock);
5245 }
5246 break;
5247
5248
5249 // requests....
5250 case LOCK_AC_REQSCATTER:
5251 if (lock->is_stable()) {
5252 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5253 * because the replica should be holding an auth_pin if they're
5254 * doing this (and thus, we are freezing, not frozen, and indefinite
5255 * starvation isn't an issue).
5256 */
5257 dout(7) << "handle_file_lock got scatter request on " << *lock
5258 << " on " << *lock->get_parent() << dendl;
5259 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5260 scatter_mix(lock);
5261 } else {
5262 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5263 << " on " << *lock->get_parent() << dendl;
5264 lock->set_scatter_wanted();
5265 }
5266 break;
5267
5268 case LOCK_AC_REQUNSCATTER:
5269 if (lock->is_stable()) {
5270 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5271 * because the replica should be holding an auth_pin if they're
5272 * doing this (and thus, we are freezing, not frozen, and indefinite
5273 * starvation isn't an issue).
5274 */
5275 dout(7) << "handle_file_lock got unscatter request on " << *lock
5276 << " on " << *lock->get_parent() << dendl;
5277 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5278 simple_lock(lock); // FIXME tempsync?
5279 } else {
5280 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5281 << " on " << *lock->get_parent() << dendl;
5282 lock->set_unscatter_wanted();
5283 }
5284 break;
5285
5286 case LOCK_AC_REQRDLOCK:
5287 handle_reqrdlock(lock, m);
5288 break;
5289
5290 case LOCK_AC_NUDGE:
5291 if (!lock->get_parent()->is_auth()) {
5292 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5293 << " on " << *lock->get_parent() << dendl;
5294 } else if (!lock->get_parent()->is_replicated()) {
5295 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5296 << " on " << *lock->get_parent() << dendl;
5297 } else {
5298 dout(7) << "handle_file_lock trying nudge on " << *lock
5299 << " on " << *lock->get_parent() << dendl;
5300 scatter_nudge(lock, 0, true);
5301 mds->mdlog->flush();
5302 }
5303 break;
5304
5305 default:
5306 ceph_abort();
5307 }
5308
5309 m->put();
5310 }
5311
5312
5313
5314
5315
5316