]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
6cb9a73511130e643d1c2ee69cb51db1007befab
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "MDSRank.h"
17 #include "MDCache.h"
18 #include "Locker.h"
19 #include "CInode.h"
20 #include "CDir.h"
21 #include "CDentry.h"
22 #include "Mutation.h"
23 #include "MDSContext.h"
24
25 #include "MDLog.h"
26 #include "MDSMap.h"
27
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
30
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
40
41 #include "messages/MMDSSlaveRequest.h"
42
43 #include <errno.h>
44
45 #include "common/config.h"
46
47
48 #define dout_subsys ceph_subsys_mds
49 #undef dout_prefix
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
54 }
55
56
57 class LockerContext : public MDSInternalContextBase {
58 protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65 public:
66 explicit LockerContext(Locker *locker_) : locker(locker_) {
67 assert(locker != NULL);
68 }
69 };
70
71 class LockerLogContext : public MDSLogContextBase {
72 protected:
73 Locker *locker;
74 MDSRank *get_mds() override
75 {
76 return locker->mds;
77 }
78
79 public:
80 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81 assert(locker != NULL);
82 }
83 };
84
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message *m)
87 {
88
89 switch (m->get_type()) {
90
91 // inter-mds locking
92 case MSG_MDS_LOCK:
93 handle_lock(static_cast<MLock*>(m));
94 break;
95 // inter-mds caps
96 case MSG_MDS_INODEFILECAPS:
97 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
98 break;
99
100 // client sync
101 case CEPH_MSG_CLIENT_CAPS:
102 handle_client_caps(static_cast<MClientCaps*>(m));
103
104 break;
105 case CEPH_MSG_CLIENT_CAPRELEASE:
106 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
107 break;
108 case CEPH_MSG_CLIENT_LEASE:
109 handle_client_lease(static_cast<MClientLease*>(m));
110 break;
111
112 default:
113 derr << "locker unknown message " << m->get_type() << dendl;
114 assert(0 == "locker unknown message");
115 }
116 }
117
118 void Locker::tick()
119 {
120 scatter_tick();
121 caps_tick();
122 }
123
124 /*
125 * locks vs rejoin
126 *
127 *
128 *
129 */
130
131 void Locker::send_lock_message(SimpleLock *lock, int msg)
132 {
133 for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
134 it != lock->get_parent()->replicas_end();
135 ++it) {
136 if (mds->is_cluster_degraded() &&
137 mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
138 continue;
139 MLock *m = new MLock(lock, msg, mds->get_nodeid());
140 mds->send_message_mds(m, it->first);
141 }
142 }
143
144 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145 {
146 for (compact_map<mds_rank_t,unsigned>::iterator it = lock->get_parent()->replicas_begin();
147 it != lock->get_parent()->replicas_end();
148 ++it) {
149 if (mds->is_cluster_degraded() &&
150 mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
151 continue;
152 MLock *m = new MLock(lock, msg, mds->get_nodeid());
153 m->set_data(data);
154 mds->send_message_mds(m, it->first);
155 }
156 }
157
158
159
160
161 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
162 {
163 // rdlock ancestor snaps
164 CInode *t = in;
165 rdlocks.insert(&in->snaplock);
166 while (t->get_projected_parent_dn()) {
167 t = t->get_projected_parent_dn()->get_dir()->get_inode();
168 rdlocks.insert(&t->snaplock);
169 }
170 }
171
172 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
173 file_layout_t **layout)
174 {
175 //rdlock ancestor snaps
176 CInode *t = in;
177 rdlocks.insert(&in->snaplock);
178 rdlocks.insert(&in->policylock);
179 bool found_layout = false;
180 while (t) {
181 rdlocks.insert(&t->snaplock);
182 if (!found_layout) {
183 rdlocks.insert(&t->policylock);
184 if (t->get_projected_inode()->has_layout()) {
185 *layout = &t->get_projected_inode()->layout;
186 found_layout = true;
187 }
188 }
189 if (t->get_projected_parent_dn() &&
190 t->get_projected_parent_dn()->get_dir())
191 t = t->get_projected_parent_dn()->get_dir()->get_inode();
192 else t = NULL;
193 }
194 }
195
196 struct MarkEventOnDestruct {
197 MDRequestRef& mdr;
198 const char* message;
199 bool mark_event;
200 MarkEventOnDestruct(MDRequestRef& _mdr,
201 const char *_message) : mdr(_mdr),
202 message(_message),
203 mark_event(true) {}
204 ~MarkEventOnDestruct() {
205 if (mark_event)
206 mdr->mark_event(message);
207 }
208 };
209
210 /* If this function returns false, the mdr has been placed
211 * on the appropriate wait list */
212 bool Locker::acquire_locks(MDRequestRef& mdr,
213 set<SimpleLock*> &rdlocks,
214 set<SimpleLock*> &wrlocks,
215 set<SimpleLock*> &xlocks,
216 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
217 CInode *auth_pin_freeze,
218 bool auth_pin_nonblock)
219 {
220 if (mdr->done_locking &&
221 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
222 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
223 return true; // at least we had better be!
224 }
225 dout(10) << "acquire_locks " << *mdr << dendl;
226
227 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
228
229 client_t client = mdr->get_client();
230
231 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
232 set<MDSCacheObject*> mustpin; // items to authpin
233
234 // xlocks
235 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
236 dout(20) << " must xlock " << **p << " " << *(*p)->get_parent() << dendl;
237 sorted.insert(*p);
238 mustpin.insert((*p)->get_parent());
239
240 // augment xlock with a versionlock?
241 if ((*p)->get_type() == CEPH_LOCK_DN) {
242 CDentry *dn = (CDentry*)(*p)->get_parent();
243 if (!dn->is_auth())
244 continue;
245
246 if (xlocks.count(&dn->versionlock))
247 continue; // we're xlocking the versionlock too; don't wrlock it!
248
249 if (mdr->is_master()) {
250 // master. wrlock versionlock so we can pipeline dentry updates to journal.
251 wrlocks.insert(&dn->versionlock);
252 } else {
253 // slave. exclusively lock the dentry version (i.e. block other journal updates).
254 // this makes rollback safe.
255 xlocks.insert(&dn->versionlock);
256 sorted.insert(&dn->versionlock);
257 }
258 }
259 if ((*p)->get_type() > CEPH_LOCK_IVERSION) {
260 // inode version lock?
261 CInode *in = (CInode*)(*p)->get_parent();
262 if (!in->is_auth())
263 continue;
264 if (mdr->is_master()) {
265 // master. wrlock versionlock so we can pipeline inode updates to journal.
266 wrlocks.insert(&in->versionlock);
267 } else {
268 // slave. exclusively lock the inode version (i.e. block other journal updates).
269 // this makes rollback safe.
270 xlocks.insert(&in->versionlock);
271 sorted.insert(&in->versionlock);
272 }
273 }
274 }
275
276 // wrlocks
277 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
278 MDSCacheObject *object = (*p)->get_parent();
279 dout(20) << " must wrlock " << **p << " " << *object << dendl;
280 sorted.insert(*p);
281 if (object->is_auth())
282 mustpin.insert(object);
283 else if (!object->is_auth() &&
284 !(*p)->can_wrlock(client) && // we might have to request a scatter
285 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
286 dout(15) << " will also auth_pin " << *object
287 << " in case we need to request a scatter" << dendl;
288 mustpin.insert(object);
289 }
290 }
291
292 // remote_wrlocks
293 if (remote_wrlocks) {
294 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
295 MDSCacheObject *object = p->first->get_parent();
296 dout(20) << " must remote_wrlock on mds." << p->second << " "
297 << *p->first << " " << *object << dendl;
298 sorted.insert(p->first);
299 mustpin.insert(object);
300 }
301 }
302
303 // rdlocks
304 for (set<SimpleLock*>::iterator p = rdlocks.begin();
305 p != rdlocks.end();
306 ++p) {
307 MDSCacheObject *object = (*p)->get_parent();
308 dout(20) << " must rdlock " << **p << " " << *object << dendl;
309 sorted.insert(*p);
310 if (object->is_auth())
311 mustpin.insert(object);
312 else if (!object->is_auth() &&
313 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
314 dout(15) << " will also auth_pin " << *object
315 << " in case we need to request a rdlock" << dendl;
316 mustpin.insert(object);
317 }
318 }
319
320
321 // AUTH PINS
322 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
323
324 // can i auth pin them all now?
325 marker.message = "failed to authpin local pins";
326 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
327 p != mustpin.end();
328 ++p) {
329 MDSCacheObject *object = *p;
330
331 dout(10) << " must authpin " << *object << dendl;
332
333 if (mdr->is_auth_pinned(object)) {
334 if (object != (MDSCacheObject*)auth_pin_freeze)
335 continue;
336 if (mdr->more()->is_remote_frozen_authpin) {
337 if (mdr->more()->rename_inode == auth_pin_freeze)
338 continue;
339 // unfreeze auth pin for the wrong inode
340 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
341 }
342 }
343
344 if (!object->is_auth()) {
345 if (!mdr->locks.empty())
346 drop_locks(mdr.get());
347 if (object->is_ambiguous_auth()) {
348 // wait
349 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
350 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
351 mdr->drop_local_auth_pins();
352 return false;
353 }
354 mustpin_remote[object->authority().first].insert(object);
355 continue;
356 }
357 if (!object->can_auth_pin()) {
358 // wait
359 drop_locks(mdr.get());
360 mdr->drop_local_auth_pins();
361 if (auth_pin_nonblock) {
362 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
363 mdr->aborted = true;
364 return false;
365 }
366 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
367 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
368 return false;
369 }
370 }
371
372 // ok, grab local auth pins
373 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
374 p != mustpin.end();
375 ++p) {
376 MDSCacheObject *object = *p;
377 if (mdr->is_auth_pinned(object)) {
378 dout(10) << " already auth_pinned " << *object << dendl;
379 } else if (object->is_auth()) {
380 dout(10) << " auth_pinning " << *object << dendl;
381 mdr->auth_pin(object);
382 }
383 }
384
385 // request remote auth_pins
386 if (!mustpin_remote.empty()) {
387 marker.message = "requesting remote authpins";
388 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
389 p != mdr->remote_auth_pins.end();
390 ++p) {
391 if (mustpin.count(p->first)) {
392 assert(p->second == p->first->authority().first);
393 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
394 if (q != mustpin_remote.end())
395 q->second.insert(p->first);
396 }
397 }
398 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
399 p != mustpin_remote.end();
400 ++p) {
401 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
402
403 // wait for active auth
404 if (mds->is_cluster_degraded() &&
405 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
406 dout(10) << " mds." << p->first << " is not active" << dendl;
407 if (mdr->more()->waiting_on_slave.empty())
408 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
409 return false;
410 }
411
412 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
413 MMDSSlaveRequest::OP_AUTHPIN);
414 for (set<MDSCacheObject*>::iterator q = p->second.begin();
415 q != p->second.end();
416 ++q) {
417 dout(10) << " req remote auth_pin of " << **q << dendl;
418 MDSCacheObjectInfo info;
419 (*q)->set_object_info(info);
420 req->get_authpins().push_back(info);
421 if (*q == auth_pin_freeze)
422 (*q)->set_object_info(req->get_authpin_freeze());
423 mdr->pin(*q);
424 }
425 if (auth_pin_nonblock)
426 req->mark_nonblock();
427 mds->send_message_mds(req, p->first);
428
429 // put in waiting list
430 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
431 mdr->more()->waiting_on_slave.insert(p->first);
432 }
433 return false;
434 }
435
436 // caps i'll need to issue
437 set<CInode*> issue_set;
438 bool result = false;
439
440 // acquire locks.
441 // make sure they match currently acquired locks.
442 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
443 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
444 p != sorted.end();
445 ++p) {
446 bool need_wrlock = !!wrlocks.count(*p);
447 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
448
449 // already locked?
450 if (existing != mdr->locks.end() && *existing == *p) {
451 // right kind?
452 SimpleLock *have = *existing;
453 ++existing;
454 if (xlocks.count(have) && mdr->xlocks.count(have)) {
455 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
456 continue;
457 }
458 if (mdr->remote_wrlocks.count(have)) {
459 if (!need_remote_wrlock ||
460 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
461 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
462 << " " << *have << " " << *have->get_parent() << dendl;
463 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
464 }
465 }
466 if (need_wrlock || need_remote_wrlock) {
467 if (need_wrlock == !!mdr->wrlocks.count(have) &&
468 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
469 if (need_wrlock)
470 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
471 if (need_remote_wrlock)
472 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
473 continue;
474 }
475 }
476 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
477 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
478 continue;
479 }
480 }
481
482 // hose any stray locks
483 if (existing != mdr->locks.end() && *existing == *p) {
484 assert(need_wrlock || need_remote_wrlock);
485 SimpleLock *lock = *existing;
486 if (mdr->wrlocks.count(lock)) {
487 if (!need_wrlock)
488 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
489 else if (need_remote_wrlock) // acquire remote_wrlock first
490 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
491 bool need_issue = false;
492 wrlock_finish(lock, mdr.get(), &need_issue);
493 if (need_issue)
494 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
495 }
496 ++existing;
497 }
498 while (existing != mdr->locks.end()) {
499 SimpleLock *stray = *existing;
500 ++existing;
501 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
502 bool need_issue = false;
503 if (mdr->xlocks.count(stray)) {
504 xlock_finish(stray, mdr.get(), &need_issue);
505 } else if (mdr->rdlocks.count(stray)) {
506 rdlock_finish(stray, mdr.get(), &need_issue);
507 } else {
508 // may have acquired both wrlock and remore wrlock
509 if (mdr->wrlocks.count(stray))
510 wrlock_finish(stray, mdr.get(), &need_issue);
511 if (mdr->remote_wrlocks.count(stray))
512 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
513 }
514 if (need_issue)
515 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
516 }
517
518 // lock
519 if (mdr->locking && *p != mdr->locking) {
520 cancel_locking(mdr.get(), &issue_set);
521 }
522 if (xlocks.count(*p)) {
523 marker.message = "failed to xlock, waiting";
524 if (!xlock_start(*p, mdr))
525 goto out;
526 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
527 } else if (need_wrlock || need_remote_wrlock) {
528 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
529 marker.message = "waiting for remote wrlocks";
530 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
531 goto out;
532 }
533 if (need_wrlock && !mdr->wrlocks.count(*p)) {
534 marker.message = "failed to wrlock, waiting";
535 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
536 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
537 // can't take the wrlock because the scatter lock is gathering. need to
538 // release the remote wrlock, so that the gathering process can finish.
539 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
540 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
541 goto out;
542 }
543 // nowait if we have already gotten remote wrlock
544 if (!wrlock_start(*p, mdr, need_remote_wrlock))
545 goto out;
546 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
547 }
548 } else {
549 assert(mdr->is_master());
550 if ((*p)->is_scatterlock()) {
551 ScatterLock *slock = static_cast<ScatterLock *>(*p);
552 if (slock->is_rejoin_mix()) {
553 // If there is a recovering mds who replcated an object when it failed
554 // and scatterlock in the object was in MIX state, It's possible that
555 // the recovering mds needs to take wrlock on the scatterlock when it
556 // replays unsafe requests. So this mds should delay taking rdlock on
557 // the scatterlock until the recovering mds finishes replaying unsafe.
558 // Otherwise unsafe requests may get replayed after current request.
559 //
560 // For example:
561 // The recovering mds is auth mds of a dirfrag, this mds is auth mds
562 // of correspinding inode. when 'rm -rf' the direcotry, this mds should
563 // delay the rmdir request until the recovering mds has replayed unlink
564 // requests.
565 if (mds->is_cluster_degraded()) {
566 if (!mdr->is_replay()) {
567 drop_locks(mdr.get());
568 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
569 dout(10) << " rejoin mix scatterlock " << *slock << " " << *(*p)->get_parent()
570 << ", waiting for cluster recovered" << dendl;
571 marker.message = "rejoin mix scatterlock, waiting for cluster recovered";
572 return false;
573 }
574 } else {
575 slock->clear_rejoin_mix();
576 }
577 }
578 }
579
580 marker.message = "failed to rdlock, waiting";
581 if (!rdlock_start(*p, mdr))
582 goto out;
583 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
584 }
585 }
586
587 // any extra unneeded locks?
588 while (existing != mdr->locks.end()) {
589 SimpleLock *stray = *existing;
590 ++existing;
591 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
592 bool need_issue = false;
593 if (mdr->xlocks.count(stray)) {
594 xlock_finish(stray, mdr.get(), &need_issue);
595 } else if (mdr->rdlocks.count(stray)) {
596 rdlock_finish(stray, mdr.get(), &need_issue);
597 } else {
598 // may have acquired both wrlock and remore wrlock
599 if (mdr->wrlocks.count(stray))
600 wrlock_finish(stray, mdr.get(), &need_issue);
601 if (mdr->remote_wrlocks.count(stray))
602 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
603 }
604 if (need_issue)
605 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
606 }
607
608 mdr->done_locking = true;
609 mdr->set_mds_stamp(ceph_clock_now());
610 result = true;
611 marker.message = "acquired locks";
612
613 out:
614 issue_caps_set(issue_set);
615 return result;
616 }
617
618
619 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
620 {
621 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
622 p != mut->xlocks.end();
623 ++p) {
624 MDSCacheObject *object = (*p)->get_parent();
625 assert(object->is_auth());
626 if (skip_dentry &&
627 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
628 continue;
629 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
630 (*p)->set_xlock_done();
631 }
632 }
633
634 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
635 {
636 while (!mut->rdlocks.empty()) {
637 bool ni = false;
638 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
639 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
640 if (ni)
641 pneed_issue->insert(static_cast<CInode*>(p));
642 }
643 }
644
645 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
646 {
647 set<mds_rank_t> slaves;
648
649 while (!mut->xlocks.empty()) {
650 SimpleLock *lock = *mut->xlocks.begin();
651 MDSCacheObject *p = lock->get_parent();
652 if (!p->is_auth()) {
653 assert(lock->get_sm()->can_remote_xlock);
654 slaves.insert(p->authority().first);
655 lock->put_xlock();
656 mut->locks.erase(lock);
657 mut->xlocks.erase(lock);
658 continue;
659 }
660 bool ni = false;
661 xlock_finish(lock, mut, &ni);
662 if (ni)
663 pneed_issue->insert(static_cast<CInode*>(p));
664 }
665
666 while (!mut->remote_wrlocks.empty()) {
667 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
668 slaves.insert(p->second);
669 if (mut->wrlocks.count(p->first) == 0)
670 mut->locks.erase(p->first);
671 mut->remote_wrlocks.erase(p);
672 }
673
674 while (!mut->wrlocks.empty()) {
675 bool ni = false;
676 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
677 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
678 if (ni)
679 pneed_issue->insert(static_cast<CInode*>(p));
680 }
681
682 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
683 if (!mds->is_cluster_degraded() ||
684 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
685 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
686 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
687 MMDSSlaveRequest::OP_DROPLOCKS);
688 mds->send_message_mds(slavereq, *p);
689 }
690 }
691 }
692
693 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
694 {
695 SimpleLock *lock = mut->locking;
696 assert(lock);
697 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
698
699 if (lock->get_parent()->is_auth()) {
700 bool need_issue = false;
701 if (lock->get_state() == LOCK_PREXLOCK) {
702 _finish_xlock(lock, -1, &need_issue);
703 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
704 lock->get_num_xlocks() == 0) {
705 lock->set_state(LOCK_XLOCKDONE);
706 eval_gather(lock, true, &need_issue);
707 }
708 if (need_issue)
709 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
710 }
711 mut->finish_locking(lock);
712 }
713
714 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
715 {
716 // leftover locks
717 set<CInode*> my_need_issue;
718 if (!pneed_issue)
719 pneed_issue = &my_need_issue;
720
721 if (mut->locking)
722 cancel_locking(mut, pneed_issue);
723 _drop_non_rdlocks(mut, pneed_issue);
724 _drop_rdlocks(mut, pneed_issue);
725
726 if (pneed_issue == &my_need_issue)
727 issue_caps_set(*pneed_issue);
728 mut->done_locking = false;
729 }
730
731 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
732 {
733 set<CInode*> my_need_issue;
734 if (!pneed_issue)
735 pneed_issue = &my_need_issue;
736
737 _drop_non_rdlocks(mut, pneed_issue);
738
739 if (pneed_issue == &my_need_issue)
740 issue_caps_set(*pneed_issue);
741 }
742
743 void Locker::drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
744 {
745 set<CInode*> my_need_issue;
746 if (!pneed_issue)
747 pneed_issue = &my_need_issue;
748
749 _drop_rdlocks(mut, pneed_issue);
750
751 if (pneed_issue == &my_need_issue)
752 issue_caps_set(*pneed_issue);
753 }
754
755
756 // generics
757
758 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
759 {
760 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
761 assert(!lock->is_stable());
762
763 int next = lock->get_next_state();
764
765 CInode *in = 0;
766 bool caps = lock->get_cap_shift();
767 if (lock->get_type() != CEPH_LOCK_DN)
768 in = static_cast<CInode *>(lock->get_parent());
769
770 bool need_issue = false;
771
772 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
773 assert(!caps || in != NULL);
774 if (caps && in->is_head()) {
775 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
776 lock->get_cap_shift(), lock->get_cap_mask());
777 dout(10) << " next state is " << lock->get_state_name(next)
778 << " issued/allows loner " << gcap_string(loner_issued)
779 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
780 << " xlocker " << gcap_string(xlocker_issued)
781 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
782 << " other " << gcap_string(other_issued)
783 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
784 << dendl;
785
786 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
787 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
788 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
789 need_issue = true;
790 }
791
792 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
793 bool auth = lock->get_parent()->is_auth();
794 if (!lock->is_gathering() &&
795 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
796 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
797 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
798 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
799 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
800 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
801 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
802 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
803 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
804 lock->get_state() != LOCK_MIX_SYNC2
805 ) {
806 dout(7) << "eval_gather finished gather on " << *lock
807 << " on " << *lock->get_parent() << dendl;
808
809 if (lock->get_sm() == &sm_filelock) {
810 assert(in);
811 if (in->state_test(CInode::STATE_RECOVERING)) {
812 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
813 return;
814 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
815 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
816 mds->mdcache->queue_file_recover(in);
817 mds->mdcache->do_file_recover();
818 return;
819 }
820 }
821
822 if (!lock->get_parent()->is_auth()) {
823 // replica: tell auth
824 mds_rank_t auth = lock->get_parent()->authority().first;
825
826 if (lock->get_parent()->is_rejoining() &&
827 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
828 dout(7) << "eval_gather finished gather, but still rejoining "
829 << *lock->get_parent() << dendl;
830 return;
831 }
832
833 if (!mds->is_cluster_degraded() ||
834 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
835 switch (lock->get_state()) {
836 case LOCK_SYNC_LOCK:
837 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
838 auth);
839 break;
840
841 case LOCK_MIX_SYNC:
842 {
843 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
844 lock->encode_locked_state(reply->get_data());
845 mds->send_message_mds(reply, auth);
846 next = LOCK_MIX_SYNC2;
847 (static_cast<ScatterLock *>(lock))->start_flush();
848 }
849 break;
850
851 case LOCK_MIX_SYNC2:
852 (static_cast<ScatterLock *>(lock))->finish_flush();
853 (static_cast<ScatterLock *>(lock))->clear_flushed();
854
855 case LOCK_SYNC_MIX2:
856 // do nothing, we already acked
857 break;
858
859 case LOCK_SYNC_MIX:
860 {
861 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
862 mds->send_message_mds(reply, auth);
863 next = LOCK_SYNC_MIX2;
864 }
865 break;
866
867 case LOCK_MIX_LOCK:
868 {
869 bufferlist data;
870 lock->encode_locked_state(data);
871 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
872 (static_cast<ScatterLock *>(lock))->start_flush();
873 // we'll get an AC_LOCKFLUSHED to complete
874 }
875 break;
876
877 default:
878 ceph_abort();
879 }
880 }
881 } else {
882 // auth
883
884 // once the first (local) stage of mix->lock gather complete we can
885 // gather from replicas
886 if (lock->get_state() == LOCK_MIX_LOCK &&
887 lock->get_parent()->is_replicated()) {
888 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
889 send_lock_message(lock, LOCK_AC_LOCK);
890 lock->init_gather();
891 lock->set_state(LOCK_MIX_LOCK2);
892 return;
893 }
894
895 if (lock->is_dirty() && !lock->is_flushed()) {
896 scatter_writebehind(static_cast<ScatterLock *>(lock));
897 mds->mdlog->flush();
898 return;
899 }
900 lock->clear_flushed();
901
902 switch (lock->get_state()) {
903 // to mixed
904 case LOCK_TSYN_MIX:
905 case LOCK_SYNC_MIX:
906 case LOCK_EXCL_MIX:
907 in->start_scatter(static_cast<ScatterLock *>(lock));
908 if (lock->get_parent()->is_replicated()) {
909 bufferlist softdata;
910 lock->encode_locked_state(softdata);
911 send_lock_message(lock, LOCK_AC_MIX, softdata);
912 }
913 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
914 break;
915
916 case LOCK_XLOCK:
917 case LOCK_XLOCKDONE:
918 if (next != LOCK_SYNC)
919 break;
920 // fall-thru
921
922 // to sync
923 case LOCK_EXCL_SYNC:
924 case LOCK_LOCK_SYNC:
925 case LOCK_MIX_SYNC:
926 case LOCK_XSYN_SYNC:
927 if (lock->get_parent()->is_replicated()) {
928 bufferlist softdata;
929 lock->encode_locked_state(softdata);
930 send_lock_message(lock, LOCK_AC_SYNC, softdata);
931 }
932 break;
933 }
934
935 }
936
937 lock->set_state(next);
938
939 if (lock->get_parent()->is_auth() &&
940 lock->is_stable())
941 lock->get_parent()->auth_unpin(lock);
942
943 // drop loner before doing waiters
944 if (caps &&
945 in->is_head() &&
946 in->is_auth() &&
947 in->get_wanted_loner() != in->get_loner()) {
948 dout(10) << " trying to drop loner" << dendl;
949 if (in->try_drop_loner()) {
950 dout(10) << " dropped loner" << dendl;
951 need_issue = true;
952 }
953 }
954
955 if (pfinishers)
956 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
957 *pfinishers);
958 else
959 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
960
961 if (caps && in->is_head())
962 need_issue = true;
963
964 if (lock->get_parent()->is_auth() &&
965 lock->is_stable())
966 try_eval(lock, &need_issue);
967 }
968
969 if (need_issue) {
970 if (pneed_issue)
971 *pneed_issue = true;
972 else if (in->is_head())
973 issue_caps(in);
974 }
975
976 }
977
978 bool Locker::eval(CInode *in, int mask, bool caps_imported)
979 {
980 bool need_issue = caps_imported;
981 list<MDSInternalContextBase*> finishers;
982
983 dout(10) << "eval " << mask << " " << *in << dendl;
984
985 // choose loner?
986 if (in->is_auth() && in->is_head()) {
987 if (in->choose_ideal_loner() >= 0) {
988 if (in->try_set_loner()) {
989 dout(10) << "eval set loner to client." << in->get_loner() << dendl;
990 need_issue = true;
991 mask = -1;
992 } else
993 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
994 } else
995 dout(10) << "eval doesn't want loner" << dendl;
996 }
997
998 retry:
999 if (mask & CEPH_LOCK_IFILE)
1000 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1001 if (mask & CEPH_LOCK_IAUTH)
1002 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1003 if (mask & CEPH_LOCK_ILINK)
1004 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1005 if (mask & CEPH_LOCK_IXATTR)
1006 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1007 if (mask & CEPH_LOCK_INEST)
1008 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1009 if (mask & CEPH_LOCK_IFLOCK)
1010 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1011 if (mask & CEPH_LOCK_IPOLICY)
1012 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1013
1014 // drop loner?
1015 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1016 dout(10) << " trying to drop loner" << dendl;
1017 if (in->try_drop_loner()) {
1018 dout(10) << " dropped loner" << dendl;
1019 need_issue = true;
1020
1021 if (in->get_wanted_loner() >= 0) {
1022 if (in->try_set_loner()) {
1023 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1024 mask = -1;
1025 goto retry;
1026 } else {
1027 dout(10) << "eval want loner client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1028 }
1029 }
1030 }
1031 }
1032
1033 finish_contexts(g_ceph_context, finishers);
1034
1035 if (need_issue && in->is_head())
1036 issue_caps(in);
1037
1038 dout(10) << "eval done" << dendl;
1039 return need_issue;
1040 }
1041
1042 class C_Locker_Eval : public LockerContext {
1043 MDSCacheObject *p;
1044 int mask;
1045 public:
1046 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1047 // We are used as an MDSCacheObject waiter, so should
1048 // only be invoked by someone already holding the big lock.
1049 assert(locker->mds->mds_lock.is_locked_by_me());
1050 p->get(MDSCacheObject::PIN_PTRWAITER);
1051 }
1052 void finish(int r) override {
1053 locker->try_eval(p, mask);
1054 p->put(MDSCacheObject::PIN_PTRWAITER);
1055 }
1056 };
1057
1058 void Locker::try_eval(MDSCacheObject *p, int mask)
1059 {
1060 // unstable and ambiguous auth?
1061 if (p->is_ambiguous_auth()) {
1062 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1063 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1064 return;
1065 }
1066
1067 if (p->is_auth() && p->is_frozen()) {
1068 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1069 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1070 return;
1071 }
1072
1073 if (mask & CEPH_LOCK_DN) {
1074 assert(mask == CEPH_LOCK_DN);
1075 bool need_issue = false; // ignore this, no caps on dentries
1076 CDentry *dn = static_cast<CDentry *>(p);
1077 eval_any(&dn->lock, &need_issue);
1078 } else {
1079 CInode *in = static_cast<CInode *>(p);
1080 eval(in, mask);
1081 }
1082 }
1083
1084 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1085 {
1086 MDSCacheObject *p = lock->get_parent();
1087
1088 // unstable and ambiguous auth?
1089 if (p->is_ambiguous_auth()) {
1090 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1091 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1092 return;
1093 }
1094
1095 if (!p->is_auth()) {
1096 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1097 return;
1098 }
1099
1100 if (p->is_frozen()) {
1101 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1102 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1103 return;
1104 }
1105
1106 /*
1107 * We could have a situation like:
1108 *
1109 * - mds A authpins item on mds B
1110 * - mds B starts to freeze tree containing item
1111 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1112 * - mds B lock is unstable, sets scatter_wanted
1113 * - mds B lock stabilizes, calls try_eval.
1114 *
1115 * We can defer while freezing without causing a deadlock. Honor
1116 * scatter_wanted flag here. This will never get deferred by the
1117 * checks above due to the auth_pin held by the master.
1118 */
1119 if (lock->is_scatterlock()) {
1120 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1121 if (slock->get_scatter_wanted() &&
1122 slock->get_state() != LOCK_MIX) {
1123 scatter_mix(slock, pneed_issue);
1124 if (!lock->is_stable())
1125 return;
1126 } else if (slock->get_unscatter_wanted() &&
1127 slock->get_state() != LOCK_LOCK) {
1128 simple_lock(slock, pneed_issue);
1129 if (!lock->is_stable()) {
1130 return;
1131 }
1132 }
1133 }
1134
1135 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1136 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1137 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1138 return;
1139 }
1140
1141 eval(lock, pneed_issue);
1142 }
1143
1144 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1145 {
1146 bool need_issue = false;
1147 list<MDSInternalContextBase*> finishers;
1148
1149 // kick locks now
1150 if (!in->filelock.is_stable())
1151 eval_gather(&in->filelock, false, &need_issue, &finishers);
1152 if (!in->authlock.is_stable())
1153 eval_gather(&in->authlock, false, &need_issue, &finishers);
1154 if (!in->linklock.is_stable())
1155 eval_gather(&in->linklock, false, &need_issue, &finishers);
1156 if (!in->xattrlock.is_stable())
1157 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1158
1159 if (need_issue && in->is_head()) {
1160 if (issue_set)
1161 issue_set->insert(in);
1162 else
1163 issue_caps(in);
1164 }
1165
1166 finish_contexts(g_ceph_context, finishers);
1167 }
1168
1169 void Locker::eval_scatter_gathers(CInode *in)
1170 {
1171 bool need_issue = false;
1172 list<MDSInternalContextBase*> finishers;
1173
1174 dout(10) << "eval_scatter_gathers " << *in << dendl;
1175
1176 // kick locks now
1177 if (!in->filelock.is_stable())
1178 eval_gather(&in->filelock, false, &need_issue, &finishers);
1179 if (!in->nestlock.is_stable())
1180 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1181 if (!in->dirfragtreelock.is_stable())
1182 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1183
1184 if (need_issue && in->is_head())
1185 issue_caps(in);
1186
1187 finish_contexts(g_ceph_context, finishers);
1188 }
1189
1190 void Locker::eval(SimpleLock *lock, bool *need_issue)
1191 {
1192 switch (lock->get_type()) {
1193 case CEPH_LOCK_IFILE:
1194 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1195 case CEPH_LOCK_IDFT:
1196 case CEPH_LOCK_INEST:
1197 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1198 default:
1199 return simple_eval(lock, need_issue);
1200 }
1201 }
1202
1203
1204 // ------------------
1205 // rdlock
1206
1207 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1208 {
1209 // kick the lock
1210 if (lock->is_stable()) {
1211 if (lock->get_parent()->is_auth()) {
1212 if (lock->get_sm() == &sm_scatterlock) {
1213 // not until tempsync is fully implemented
1214 //if (lock->get_parent()->is_replicated())
1215 //scatter_tempsync((ScatterLock*)lock);
1216 //else
1217 simple_sync(lock);
1218 } else if (lock->get_sm() == &sm_filelock) {
1219 CInode *in = static_cast<CInode*>(lock->get_parent());
1220 if (lock->get_state() == LOCK_EXCL &&
1221 in->get_target_loner() >= 0 &&
1222 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1223 file_xsyn(lock);
1224 else
1225 simple_sync(lock);
1226 } else
1227 simple_sync(lock);
1228 return true;
1229 } else {
1230 // request rdlock state change from auth
1231 mds_rank_t auth = lock->get_parent()->authority().first;
1232 if (!mds->is_cluster_degraded() ||
1233 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1234 dout(10) << "requesting rdlock from auth on "
1235 << *lock << " on " << *lock->get_parent() << dendl;
1236 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1237 }
1238 return false;
1239 }
1240 }
1241 if (lock->get_type() == CEPH_LOCK_IFILE) {
1242 CInode *in = static_cast<CInode *>(lock->get_parent());
1243 if (in->state_test(CInode::STATE_RECOVERING)) {
1244 mds->mdcache->recovery_queue.prioritize(in);
1245 }
1246 }
1247
1248 return false;
1249 }
1250
1251 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1252 {
1253 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1254
1255 // can read? grab ref.
1256 if (lock->can_rdlock(client))
1257 return true;
1258
1259 _rdlock_kick(lock, false);
1260
1261 if (lock->can_rdlock(client))
1262 return true;
1263
1264 // wait!
1265 if (con) {
1266 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1267 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1268 }
1269 return false;
1270 }
1271
1272 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1273 {
1274 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1275
1276 // client may be allowed to rdlock the same item it has xlocked.
1277 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1278 if (mut->snapid != CEPH_NOSNAP)
1279 as_anon = true;
1280 client_t client = as_anon ? -1 : mut->get_client();
1281
1282 CInode *in = 0;
1283 if (lock->get_type() != CEPH_LOCK_DN)
1284 in = static_cast<CInode *>(lock->get_parent());
1285
1286 /*
1287 if (!lock->get_parent()->is_auth() &&
1288 lock->fw_rdlock_to_auth()) {
1289 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1290 return false;
1291 }
1292 */
1293
1294 while (1) {
1295 // can read? grab ref.
1296 if (lock->can_rdlock(client)) {
1297 lock->get_rdlock();
1298 mut->rdlocks.insert(lock);
1299 mut->locks.insert(lock);
1300 return true;
1301 }
1302
1303 // hmm, wait a second.
1304 if (in && !in->is_head() && in->is_auth() &&
1305 lock->get_state() == LOCK_SNAP_SYNC) {
1306 // okay, we actually need to kick the head's lock to get ourselves synced up.
1307 CInode *head = mdcache->get_inode(in->ino());
1308 assert(head);
1309 SimpleLock *hlock = head->get_lock(lock->get_type());
1310 if (hlock->get_state() != LOCK_SYNC) {
1311 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1312 if (!rdlock_start(head->get_lock(lock->get_type()), mut, true)) // ** as_anon, no rdlock on EXCL **
1313 return false;
1314 // oh, check our lock again then
1315 }
1316 }
1317
1318 if (!_rdlock_kick(lock, as_anon))
1319 break;
1320 }
1321
1322 // wait!
1323 int wait_on;
1324 if (lock->get_parent()->is_auth() && lock->is_stable())
1325 wait_on = SimpleLock::WAIT_RD;
1326 else
1327 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1328 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1329 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1330 nudge_log(lock);
1331 return false;
1332 }
1333
1334 void Locker::nudge_log(SimpleLock *lock)
1335 {
1336 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1337 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1338 mds->mdlog->flush();
1339 }
1340
1341 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1342 {
1343 // drop ref
1344 lock->put_rdlock();
1345 if (mut) {
1346 mut->rdlocks.erase(lock);
1347 mut->locks.erase(lock);
1348 }
1349
1350 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1351
1352 // last one?
1353 if (!lock->is_rdlocked()) {
1354 if (!lock->is_stable())
1355 eval_gather(lock, false, pneed_issue);
1356 else if (lock->get_parent()->is_auth())
1357 try_eval(lock, pneed_issue);
1358 }
1359 }
1360
1361
1362 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1363 {
1364 dout(10) << "can_rdlock_set " << locks << dendl;
1365 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1366 if (!(*p)->can_rdlock(-1)) {
1367 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1368 return false;
1369 }
1370 return true;
1371 }
1372
1373 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1374 {
1375 dout(10) << "rdlock_try_set " << locks << dendl;
1376 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1377 if (!rdlock_try(*p, -1, NULL)) {
1378 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1379 return false;
1380 }
1381 return true;
1382 }
1383
1384 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1385 {
1386 dout(10) << "rdlock_take_set " << locks << dendl;
1387 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1388 (*p)->get_rdlock();
1389 mut->rdlocks.insert(*p);
1390 mut->locks.insert(*p);
1391 }
1392 }
1393
1394 // ------------------
1395 // wrlock
1396
1397 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1398 {
1399 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1400 lock->get_type() == CEPH_LOCK_DVERSION)
1401 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1402
1403 dout(7) << "wrlock_force on " << *lock
1404 << " on " << *lock->get_parent() << dendl;
1405 lock->get_wrlock(true);
1406 mut->wrlocks.insert(lock);
1407 mut->locks.insert(lock);
1408 }
1409
1410 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1411 {
1412 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1413 lock->get_type() == CEPH_LOCK_DVERSION)
1414 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1415
1416 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1417
1418 CInode *in = static_cast<CInode *>(lock->get_parent());
1419 client_t client = mut->get_client();
1420 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1421 (in->has_subtree_or_exporting_dirfrag() ||
1422 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1423
1424 while (1) {
1425 // wrlock?
1426 if (lock->can_wrlock(client) &&
1427 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1428 lock->get_wrlock();
1429 mut->wrlocks.insert(lock);
1430 mut->locks.insert(lock);
1431 return true;
1432 }
1433
1434 if (lock->get_type() == CEPH_LOCK_IFILE &&
1435 in->state_test(CInode::STATE_RECOVERING)) {
1436 mds->mdcache->recovery_queue.prioritize(in);
1437 }
1438
1439 if (!lock->is_stable())
1440 break;
1441
1442 if (in->is_auth()) {
1443 // don't do nested lock state change if we have dirty scatterdata and
1444 // may scatter_writebehind or start_scatter, because nowait==true implies
1445 // that the caller already has a log entry open!
1446 if (nowait && lock->is_dirty())
1447 return false;
1448
1449 if (want_scatter)
1450 scatter_mix(static_cast<ScatterLock*>(lock));
1451 else
1452 simple_lock(lock);
1453
1454 if (nowait && !lock->can_wrlock(client))
1455 return false;
1456
1457 } else {
1458 // replica.
1459 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1460 mds_rank_t auth = lock->get_parent()->authority().first;
1461 if (!mds->is_cluster_degraded() ||
1462 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1463 dout(10) << "requesting scatter from auth on "
1464 << *lock << " on " << *lock->get_parent() << dendl;
1465 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1466 }
1467 break;
1468 }
1469 }
1470
1471 if (!nowait) {
1472 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1473 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1474 nudge_log(lock);
1475 }
1476
1477 return false;
1478 }
1479
1480 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1481 {
1482 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1483 lock->get_type() == CEPH_LOCK_DVERSION)
1484 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1485
1486 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1487 lock->put_wrlock();
1488 if (mut) {
1489 mut->wrlocks.erase(lock);
1490 if (mut->remote_wrlocks.count(lock) == 0)
1491 mut->locks.erase(lock);
1492 }
1493
1494 if (!lock->is_wrlocked()) {
1495 if (!lock->is_stable())
1496 eval_gather(lock, false, pneed_issue);
1497 else if (lock->get_parent()->is_auth())
1498 try_eval(lock, pneed_issue);
1499 }
1500 }
1501
1502
1503 // remote wrlock
1504
1505 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1506 {
1507 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1508
1509 // wait for active target
1510 if (mds->is_cluster_degraded() &&
1511 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1512 dout(7) << " mds." << target << " is not active" << dendl;
1513 if (mut->more()->waiting_on_slave.empty())
1514 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1515 return;
1516 }
1517
1518 // send lock request
1519 mut->start_locking(lock, target);
1520 mut->more()->slaves.insert(target);
1521 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1522 MMDSSlaveRequest::OP_WRLOCK);
1523 r->set_lock_type(lock->get_type());
1524 lock->get_parent()->set_object_info(r->get_object_info());
1525 mds->send_message_mds(r, target);
1526
1527 assert(mut->more()->waiting_on_slave.count(target) == 0);
1528 mut->more()->waiting_on_slave.insert(target);
1529 }
1530
1531 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1532 MutationImpl *mut)
1533 {
1534 // drop ref
1535 mut->remote_wrlocks.erase(lock);
1536 if (mut->wrlocks.count(lock) == 0)
1537 mut->locks.erase(lock);
1538
1539 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1540 << " " << *lock->get_parent() << dendl;
1541 if (!mds->is_cluster_degraded() ||
1542 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1543 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1544 MMDSSlaveRequest::OP_UNWRLOCK);
1545 slavereq->set_lock_type(lock->get_type());
1546 lock->get_parent()->set_object_info(slavereq->get_object_info());
1547 mds->send_message_mds(slavereq, target);
1548 }
1549 }
1550
1551
1552 // ------------------
1553 // xlock
1554
1555 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1556 {
1557 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1558 lock->get_type() == CEPH_LOCK_DVERSION)
1559 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1560
1561 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1562 client_t client = mut->get_client();
1563
1564 // auth?
1565 if (lock->get_parent()->is_auth()) {
1566 // auth
1567 while (1) {
1568 if (lock->can_xlock(client)) {
1569 lock->set_state(LOCK_XLOCK);
1570 lock->get_xlock(mut, client);
1571 mut->xlocks.insert(lock);
1572 mut->locks.insert(lock);
1573 mut->finish_locking(lock);
1574 return true;
1575 }
1576
1577 if (lock->get_type() == CEPH_LOCK_IFILE) {
1578 CInode *in = static_cast<CInode*>(lock->get_parent());
1579 if (in->state_test(CInode::STATE_RECOVERING)) {
1580 mds->mdcache->recovery_queue.prioritize(in);
1581 }
1582 }
1583
1584 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1585 lock->get_xlock_by_client() != client ||
1586 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1587 break;
1588
1589 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1590 mut->start_locking(lock);
1591 simple_xlock(lock);
1592 } else {
1593 simple_lock(lock);
1594 }
1595 }
1596
1597 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1598 nudge_log(lock);
1599 return false;
1600 } else {
1601 // replica
1602 assert(lock->get_sm()->can_remote_xlock);
1603 assert(!mut->slave_request);
1604
1605 // wait for single auth
1606 if (lock->get_parent()->is_ambiguous_auth()) {
1607 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1608 new C_MDS_RetryRequest(mdcache, mut));
1609 return false;
1610 }
1611
1612 // wait for active auth
1613 mds_rank_t auth = lock->get_parent()->authority().first;
1614 if (mds->is_cluster_degraded() &&
1615 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1616 dout(7) << " mds." << auth << " is not active" << dendl;
1617 if (mut->more()->waiting_on_slave.empty())
1618 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1619 return false;
1620 }
1621
1622 // send lock request
1623 mut->more()->slaves.insert(auth);
1624 mut->start_locking(lock, auth);
1625 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1626 MMDSSlaveRequest::OP_XLOCK);
1627 r->set_lock_type(lock->get_type());
1628 lock->get_parent()->set_object_info(r->get_object_info());
1629 mds->send_message_mds(r, auth);
1630
1631 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1632 mut->more()->waiting_on_slave.insert(auth);
1633
1634 return false;
1635 }
1636 }
1637
1638 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1639 {
1640 assert(!lock->is_stable());
1641 if (lock->get_num_rdlocks() == 0 &&
1642 lock->get_num_wrlocks() == 0 &&
1643 lock->get_num_client_lease() == 0 &&
1644 lock->get_state() != LOCK_XLOCKSNAP &&
1645 lock->get_type() != CEPH_LOCK_DN) {
1646 CInode *in = static_cast<CInode*>(lock->get_parent());
1647 client_t loner = in->get_target_loner();
1648 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1649 lock->set_state(LOCK_EXCL);
1650 lock->get_parent()->auth_unpin(lock);
1651 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1652 if (lock->get_cap_shift())
1653 *pneed_issue = true;
1654 if (lock->get_parent()->is_auth() &&
1655 lock->is_stable())
1656 try_eval(lock, pneed_issue);
1657 return;
1658 }
1659 }
1660 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1661 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1662 }
1663
1664 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1665 {
1666 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1667 lock->get_type() == CEPH_LOCK_DVERSION)
1668 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1669
1670 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1671
1672 client_t xlocker = lock->get_xlock_by_client();
1673
1674 // drop ref
1675 lock->put_xlock();
1676 assert(mut);
1677 mut->xlocks.erase(lock);
1678 mut->locks.erase(lock);
1679
1680 bool do_issue = false;
1681
1682 // remote xlock?
1683 if (!lock->get_parent()->is_auth()) {
1684 assert(lock->get_sm()->can_remote_xlock);
1685
1686 // tell auth
1687 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1688 mds_rank_t auth = lock->get_parent()->authority().first;
1689 if (!mds->is_cluster_degraded() ||
1690 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1691 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1692 MMDSSlaveRequest::OP_UNXLOCK);
1693 slavereq->set_lock_type(lock->get_type());
1694 lock->get_parent()->set_object_info(slavereq->get_object_info());
1695 mds->send_message_mds(slavereq, auth);
1696 }
1697 // others waiting?
1698 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1699 SimpleLock::WAIT_WR |
1700 SimpleLock::WAIT_RD, 0);
1701 } else {
1702 if (lock->get_num_xlocks() == 0) {
1703 if (lock->get_state() == LOCK_LOCK_XLOCK)
1704 lock->set_state(LOCK_XLOCKDONE);
1705 _finish_xlock(lock, xlocker, &do_issue);
1706 }
1707 }
1708
1709 if (do_issue) {
1710 CInode *in = static_cast<CInode*>(lock->get_parent());
1711 if (in->is_head()) {
1712 if (pneed_issue)
1713 *pneed_issue = true;
1714 else
1715 issue_caps(in);
1716 }
1717 }
1718 }
1719
1720 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1721 {
1722 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1723
1724 lock->put_xlock();
1725 mut->xlocks.erase(lock);
1726 mut->locks.erase(lock);
1727
1728 MDSCacheObject *p = lock->get_parent();
1729 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1730
1731 if (!lock->is_stable())
1732 lock->get_parent()->auth_unpin(lock);
1733
1734 lock->set_state(LOCK_LOCK);
1735 }
1736
1737 void Locker::xlock_import(SimpleLock *lock)
1738 {
1739 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1740 lock->get_parent()->auth_pin(lock);
1741 }
1742
1743
1744
1745 // file i/o -----------------------------------------
1746
1747 version_t Locker::issue_file_data_version(CInode *in)
1748 {
1749 dout(7) << "issue_file_data_version on " << *in << dendl;
1750 return in->inode.file_data_version;
1751 }
1752
1753 class C_Locker_FileUpdate_finish : public LockerLogContext {
1754 CInode *in;
1755 MutationRef mut;
1756 bool share_max;
1757 bool need_issue;
1758 client_t client;
1759 MClientCaps *ack;
1760 public:
1761 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1762 bool sm=false, bool ni=false, client_t c=-1,
1763 MClientCaps *ac = 0)
1764 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1765 client(c), ack(ac) {
1766 in->get(CInode::PIN_PTRWAITER);
1767 }
1768 void finish(int r) override {
1769 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1770 in->put(CInode::PIN_PTRWAITER);
1771 }
1772 };
1773
1774 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1775 client_t client, MClientCaps *ack)
1776 {
1777 dout(10) << "file_update_finish on " << *in << dendl;
1778 in->pop_and_dirty_projected_inode(mut->ls);
1779
1780 mut->apply();
1781
1782 if (ack) {
1783 Session *session = mds->get_session(client);
1784 if (session) {
1785 // "oldest flush tid" > 0 means client uses unique TID for each flush
1786 if (ack->get_oldest_flush_tid() > 0)
1787 session->add_completed_flush(ack->get_client_tid());
1788 mds->send_message_client_counted(ack, session);
1789 } else {
1790 dout(10) << " no session for client." << client << " " << *ack << dendl;
1791 ack->put();
1792 }
1793 }
1794
1795 set<CInode*> need_issue;
1796 drop_locks(mut.get(), &need_issue);
1797
1798 if (!in->is_head() && !in->client_snap_caps.empty()) {
1799 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1800 // check for snap writeback completion
1801 bool gather = false;
1802 compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1803 while (p != in->client_snap_caps.end()) {
1804 SimpleLock *lock = in->get_lock(p->first);
1805 assert(lock);
1806 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1807 << " lock " << *lock << " on " << *in << dendl;
1808 lock->put_wrlock();
1809
1810 p->second.erase(client);
1811 if (p->second.empty()) {
1812 gather = true;
1813 in->client_snap_caps.erase(p++);
1814 } else
1815 ++p;
1816 }
1817 if (gather) {
1818 if (in->client_snap_caps.empty())
1819 in->item_open_file.remove_myself();
1820 eval_cap_gather(in, &need_issue);
1821 }
1822 } else {
1823 if (issue_client_cap && need_issue.count(in) == 0) {
1824 Capability *cap = in->get_client_cap(client);
1825 if (cap && (cap->wanted() & ~cap->pending()))
1826 issue_caps(in, cap);
1827 }
1828
1829 if (share_max && in->is_auth() &&
1830 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1831 share_inode_max_size(in);
1832 }
1833 issue_caps_set(need_issue);
1834
1835 // auth unpin after issuing caps
1836 mut->cleanup();
1837 }
1838
1839 Capability* Locker::issue_new_caps(CInode *in,
1840 int mode,
1841 Session *session,
1842 SnapRealm *realm,
1843 bool is_replay)
1844 {
1845 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1846 bool is_new;
1847
1848 // if replay, try to reconnect cap, and otherwise do nothing.
1849 if (is_replay) {
1850 mds->mdcache->try_reconnect_cap(in, session);
1851 return 0;
1852 }
1853
1854 // my needs
1855 assert(session->info.inst.name.is_client());
1856 client_t my_client = session->info.inst.name.num();
1857 int my_want = ceph_caps_for_mode(mode);
1858
1859 // register a capability
1860 Capability *cap = in->get_client_cap(my_client);
1861 if (!cap) {
1862 // new cap
1863 cap = in->add_client_cap(my_client, session, realm);
1864 cap->set_wanted(my_want);
1865 cap->mark_new();
1866 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1867 is_new = true;
1868 } else {
1869 is_new = false;
1870 // make sure it wants sufficient caps
1871 if (my_want & ~cap->wanted()) {
1872 // augment wanted caps for this client
1873 cap->set_wanted(cap->wanted() | my_want);
1874 }
1875 }
1876
1877 if (in->is_auth()) {
1878 // [auth] twiddle mode?
1879 eval(in, CEPH_CAP_LOCKS);
1880
1881 if (_need_flush_mdlog(in, my_want))
1882 mds->mdlog->flush();
1883
1884 } else {
1885 // [replica] tell auth about any new caps wanted
1886 request_inode_file_caps(in);
1887 }
1888
1889 // issue caps (pot. incl new one)
1890 //issue_caps(in); // note: _eval above may have done this already...
1891
1892 // re-issue whatever we can
1893 //cap->issue(cap->pending());
1894
1895 if (is_new)
1896 cap->dec_suppress();
1897
1898 return cap;
1899 }
1900
1901
1902 void Locker::issue_caps_set(set<CInode*>& inset)
1903 {
1904 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1905 issue_caps(*p);
1906 }
1907
1908 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1909 {
1910 // allowed caps are determined by the lock mode.
1911 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1912 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1913 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1914
1915 client_t loner = in->get_loner();
1916 if (loner >= 0) {
1917 dout(7) << "issue_caps loner client." << loner
1918 << " allowed=" << ccap_string(loner_allowed)
1919 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1920 << ", others allowed=" << ccap_string(all_allowed)
1921 << " on " << *in << dendl;
1922 } else {
1923 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1924 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1925 << " on " << *in << dendl;
1926 }
1927
1928 assert(in->is_head());
1929
1930 // count conflicts with
1931 int nissued = 0;
1932
1933 // client caps
1934 map<client_t, Capability*>::iterator it;
1935 if (only_cap)
1936 it = in->client_caps.find(only_cap->get_client());
1937 else
1938 it = in->client_caps.begin();
1939 for (; it != in->client_caps.end(); ++it) {
1940 Capability *cap = it->second;
1941 if (cap->is_stale())
1942 continue;
1943
1944 // do not issue _new_ bits when size|mtime is projected
1945 int allowed;
1946 if (loner == it->first)
1947 allowed = loner_allowed;
1948 else
1949 allowed = all_allowed;
1950
1951 // add in any xlocker-only caps (for locks this client is the xlocker for)
1952 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
1953
1954 Session *session = mds->get_session(it->first);
1955 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
1956 !(session && session->connection &&
1957 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
1958 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
1959
1960 int pending = cap->pending();
1961 int wanted = cap->wanted();
1962
1963 dout(20) << " client." << it->first
1964 << " pending " << ccap_string(pending)
1965 << " allowed " << ccap_string(allowed)
1966 << " wanted " << ccap_string(wanted)
1967 << dendl;
1968
1969 if (!(pending & ~allowed)) {
1970 // skip if suppress or new, and not revocation
1971 if (cap->is_new() || cap->is_suppress()) {
1972 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
1973 continue;
1974 }
1975 }
1976
1977 // notify clients about deleted inode, to make sure they release caps ASAP.
1978 if (in->inode.nlink == 0)
1979 wanted |= CEPH_CAP_LINK_SHARED;
1980
1981 // are there caps that the client _wants_ and can have, but aren't pending?
1982 // or do we need to revoke?
1983 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
1984 (pending & ~allowed)) { // need to revoke ~allowed caps.
1985 // issue
1986 nissued++;
1987
1988 // include caps that clients generally like, while we're at it.
1989 int likes = in->get_caps_liked();
1990 int before = pending;
1991 long seq;
1992 if (pending & ~allowed)
1993 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
1994 else
1995 seq = cap->issue((wanted|likes) & allowed);
1996 int after = cap->pending();
1997
1998 if (cap->is_new()) {
1999 // haven't send caps to client yet
2000 if (before & ~after)
2001 cap->confirm_receipt(seq, after);
2002 } else {
2003 dout(7) << " sending MClientCaps to client." << it->first
2004 << " seq " << cap->get_last_seq()
2005 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2006 << dendl;
2007
2008 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2009 if (op == CEPH_CAP_OP_REVOKE) {
2010 revoking_caps.push_back(&cap->item_revoking_caps);
2011 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2012 cap->set_last_revoke_stamp(ceph_clock_now());
2013 cap->reset_num_revoke_warnings();
2014 }
2015
2016 MClientCaps *m = new MClientCaps(op, in->ino(),
2017 in->find_snaprealm()->inode->ino(),
2018 cap->get_cap_id(), cap->get_last_seq(),
2019 after, wanted, 0,
2020 cap->get_mseq(),
2021 mds->get_osd_epoch_barrier());
2022 in->encode_cap_message(m, cap);
2023
2024 mds->send_message_client_counted(m, it->first);
2025 }
2026 }
2027
2028 if (only_cap)
2029 break;
2030 }
2031
2032 return (nissued == 0); // true if no re-issued, no callbacks
2033 }
2034
2035 void Locker::issue_truncate(CInode *in)
2036 {
2037 dout(7) << "issue_truncate on " << *in << dendl;
2038
2039 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2040 it != in->client_caps.end();
2041 ++it) {
2042 Capability *cap = it->second;
2043 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2044 in->ino(),
2045 in->find_snaprealm()->inode->ino(),
2046 cap->get_cap_id(), cap->get_last_seq(),
2047 cap->pending(), cap->wanted(), 0,
2048 cap->get_mseq(),
2049 mds->get_osd_epoch_barrier());
2050 in->encode_cap_message(m, cap);
2051 mds->send_message_client_counted(m, it->first);
2052 }
2053
2054 // should we increase max_size?
2055 if (in->is_auth() && in->is_file())
2056 check_inode_max_size(in);
2057 }
2058
2059
2060 void Locker::revoke_stale_caps(Capability *cap)
2061 {
2062 CInode *in = cap->get_inode();
2063 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2064 // if export succeeds, the cap will be removed. if export fails, we need to
2065 // revoke the cap if it's still stale.
2066 in->state_set(CInode::STATE_EVALSTALECAPS);
2067 return;
2068 }
2069
2070 int issued = cap->issued();
2071 if (issued & ~CEPH_CAP_PIN) {
2072 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2073 cap->revoke();
2074
2075 if (in->is_auth() &&
2076 in->inode.client_ranges.count(cap->get_client()))
2077 in->state_set(CInode::STATE_NEEDSRECOVER);
2078
2079 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2080 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2081 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2082 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2083
2084 if (in->is_auth()) {
2085 try_eval(in, CEPH_CAP_LOCKS);
2086 } else {
2087 request_inode_file_caps(in);
2088 }
2089 }
2090 }
2091
2092 void Locker::revoke_stale_caps(Session *session)
2093 {
2094 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2095
2096 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2097 Capability *cap = *p;
2098 cap->mark_stale();
2099 revoke_stale_caps(cap);
2100 }
2101 }
2102
2103 void Locker::resume_stale_caps(Session *session)
2104 {
2105 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2106
2107 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2108 Capability *cap = *p;
2109 CInode *in = cap->get_inode();
2110 assert(in->is_head());
2111 if (cap->is_stale()) {
2112 dout(10) << " clearing stale flag on " << *in << dendl;
2113 cap->clear_stale();
2114
2115 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2116 // if export succeeds, the cap will be removed. if export fails,
2117 // we need to re-issue the cap if it's not stale.
2118 in->state_set(CInode::STATE_EVALSTALECAPS);
2119 continue;
2120 }
2121
2122 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2123 issue_caps(in, cap);
2124 }
2125 }
2126 }
2127
2128 void Locker::remove_stale_leases(Session *session)
2129 {
2130 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2131 xlist<ClientLease*>::iterator p = session->leases.begin();
2132 while (!p.end()) {
2133 ClientLease *l = *p;
2134 ++p;
2135 CDentry *parent = static_cast<CDentry*>(l->parent);
2136 dout(15) << " removing lease on " << *parent << dendl;
2137 parent->remove_client_lease(l, this);
2138 }
2139 }
2140
2141
2142 class C_MDL_RequestInodeFileCaps : public LockerContext {
2143 CInode *in;
2144 public:
2145 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2146 in->get(CInode::PIN_PTRWAITER);
2147 }
2148 void finish(int r) override {
2149 if (!in->is_auth())
2150 locker->request_inode_file_caps(in);
2151 in->put(CInode::PIN_PTRWAITER);
2152 }
2153 };
2154
2155 void Locker::request_inode_file_caps(CInode *in)
2156 {
2157 assert(!in->is_auth());
2158
2159 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2160 if (wanted != in->replica_caps_wanted) {
2161 // wait for single auth
2162 if (in->is_ambiguous_auth()) {
2163 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2164 new C_MDL_RequestInodeFileCaps(this, in));
2165 return;
2166 }
2167
2168 mds_rank_t auth = in->authority().first;
2169 if (mds->is_cluster_degraded() &&
2170 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2171 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2172 return;
2173 }
2174
2175 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2176 << " was " << ccap_string(in->replica_caps_wanted)
2177 << " on " << *in << " to mds." << auth << dendl;
2178
2179 in->replica_caps_wanted = wanted;
2180
2181 if (!mds->is_cluster_degraded() ||
2182 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2183 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2184 auth);
2185 }
2186 }
2187
2188 /* This function DOES put the passed message before returning */
2189 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2190 {
2191 // nobody should be talking to us during recovery.
2192 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2193
2194 // ok
2195 CInode *in = mdcache->get_inode(m->get_ino());
2196 mds_rank_t from = mds_rank_t(m->get_source().num());
2197
2198 assert(in);
2199 assert(in->is_auth());
2200
2201 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2202
2203 if (m->get_caps())
2204 in->mds_caps_wanted[from] = m->get_caps();
2205 else
2206 in->mds_caps_wanted.erase(from);
2207
2208 try_eval(in, CEPH_CAP_LOCKS);
2209 m->put();
2210 }
2211
2212
2213 class C_MDL_CheckMaxSize : public LockerContext {
2214 CInode *in;
2215 uint64_t new_max_size;
2216 uint64_t newsize;
2217 utime_t mtime;
2218
2219 public:
2220 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2221 uint64_t _newsize, utime_t _mtime) :
2222 LockerContext(l), in(i),
2223 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2224 {
2225 in->get(CInode::PIN_PTRWAITER);
2226 }
2227 void finish(int r) override {
2228 if (in->is_auth())
2229 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2230 in->put(CInode::PIN_PTRWAITER);
2231 }
2232 };
2233
2234 uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2235 {
2236 uint64_t new_max = (size + 1) << 1;
2237 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2238 if (max_inc > 0) {
2239 max_inc *= pi->get_layout_size_increment();
2240 new_max = MIN(new_max, size + max_inc);
2241 }
2242 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2243 }
2244
2245 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2246 map<client_t,client_writeable_range_t> *new_ranges,
2247 bool *max_increased)
2248 {
2249 inode_t *latest = in->get_projected_inode();
2250 uint64_t ms;
2251 if(latest->has_layout()) {
2252 ms = calc_new_max_size(latest, size);
2253 } else {
2254 // Layout-less directories like ~mds0/, have zero size
2255 ms = 0;
2256 }
2257
2258 // increase ranges as appropriate.
2259 // shrink to 0 if no WR|BUFFER caps issued.
2260 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2261 p != in->client_caps.end();
2262 ++p) {
2263 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2264 client_writeable_range_t& nr = (*new_ranges)[p->first];
2265 nr.range.first = 0;
2266 if (latest->client_ranges.count(p->first)) {
2267 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2268 if (ms > oldr.range.last)
2269 *max_increased = true;
2270 nr.range.last = MAX(ms, oldr.range.last);
2271 nr.follows = oldr.follows;
2272 } else {
2273 *max_increased = true;
2274 nr.range.last = ms;
2275 nr.follows = in->first - 1;
2276 }
2277 }
2278 }
2279 }
2280
2281 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2282 uint64_t new_max_size, uint64_t new_size,
2283 utime_t new_mtime)
2284 {
2285 assert(in->is_auth());
2286 assert(in->is_file());
2287
2288 inode_t *latest = in->get_projected_inode();
2289 map<client_t, client_writeable_range_t> new_ranges;
2290 uint64_t size = latest->size;
2291 bool update_size = new_size > 0;
2292 bool update_max = false;
2293 bool max_increased = false;
2294
2295 if (update_size) {
2296 new_size = size = MAX(size, new_size);
2297 new_mtime = MAX(new_mtime, latest->mtime);
2298 if (latest->size == new_size && latest->mtime == new_mtime)
2299 update_size = false;
2300 }
2301
2302 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2303
2304 if (max_increased || latest->client_ranges != new_ranges)
2305 update_max = true;
2306
2307 if (!update_size && !update_max) {
2308 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2309 return false;
2310 }
2311
2312 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2313 << " update_size " << update_size
2314 << " on " << *in << dendl;
2315
2316 if (in->is_frozen()) {
2317 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2318 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2319 new_max_size,
2320 new_size,
2321 new_mtime);
2322 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2323 return false;
2324 }
2325 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2326 // lock?
2327 if (in->filelock.is_stable()) {
2328 if (in->get_target_loner() >= 0)
2329 file_excl(&in->filelock);
2330 else
2331 simple_lock(&in->filelock);
2332 }
2333 if (!in->filelock.can_wrlock(in->get_loner())) {
2334 // try again later
2335 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2336 new_max_size,
2337 new_size,
2338 new_mtime);
2339
2340 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2341 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2342 return false;
2343 }
2344 }
2345
2346 MutationRef mut(new MutationImpl());
2347 mut->ls = mds->mdlog->get_current_segment();
2348
2349 inode_t *pi = in->project_inode();
2350 pi->version = in->pre_dirty();
2351
2352 if (update_max) {
2353 dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2354 pi->client_ranges = new_ranges;
2355 }
2356
2357 if (update_size) {
2358 dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2359 pi->size = new_size;
2360 pi->rstat.rbytes = new_size;
2361 dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2362 pi->mtime = new_mtime;
2363 }
2364
2365 // use EOpen if the file is still open; otherwise, use EUpdate.
2366 // this is just an optimization to push open files forward into
2367 // newer log segments.
2368 LogEvent *le;
2369 EMetaBlob *metablob;
2370 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2371 EOpen *eo = new EOpen(mds->mdlog);
2372 eo->add_ino(in->ino());
2373 metablob = &eo->metablob;
2374 le = eo;
2375 mut->ls->open_files.push_back(&in->item_open_file);
2376 } else {
2377 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2378 metablob = &eu->metablob;
2379 le = eu;
2380 }
2381 mds->mdlog->start_entry(le);
2382 if (update_size) { // FIXME if/when we do max_size nested accounting
2383 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2384 // no cow, here!
2385 CDentry *parent = in->get_projected_parent_dn();
2386 metablob->add_primary_dentry(parent, in, true);
2387 } else {
2388 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2389 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2390 }
2391 mds->mdlog->submit_entry(le,
2392 new C_Locker_FileUpdate_finish(this, in, mut, true));
2393 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2394 mut->auth_pin(in);
2395
2396 // make max_size _increase_ timely
2397 if (max_increased)
2398 mds->mdlog->flush();
2399
2400 return true;
2401 }
2402
2403
2404 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2405 {
2406 /*
2407 * only share if currently issued a WR cap. if client doesn't have it,
2408 * file_max doesn't matter, and the client will get it if/when they get
2409 * the cap later.
2410 */
2411 dout(10) << "share_inode_max_size on " << *in << dendl;
2412 map<client_t, Capability*>::iterator it;
2413 if (only_cap)
2414 it = in->client_caps.find(only_cap->get_client());
2415 else
2416 it = in->client_caps.begin();
2417 for (; it != in->client_caps.end(); ++it) {
2418 const client_t client = it->first;
2419 Capability *cap = it->second;
2420 if (cap->is_suppress())
2421 continue;
2422 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2423 dout(10) << "share_inode_max_size with client." << client << dendl;
2424 cap->inc_last_seq();
2425 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2426 in->ino(),
2427 in->find_snaprealm()->inode->ino(),
2428 cap->get_cap_id(), cap->get_last_seq(),
2429 cap->pending(), cap->wanted(), 0,
2430 cap->get_mseq(),
2431 mds->get_osd_epoch_barrier());
2432 in->encode_cap_message(m, cap);
2433 mds->send_message_client_counted(m, client);
2434 }
2435 if (only_cap)
2436 break;
2437 }
2438 }
2439
2440 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2441 {
2442 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2443 * pending mutations. */
2444 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2445 in->filelock.is_unstable_and_locked()) ||
2446 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2447 in->authlock.is_unstable_and_locked()) ||
2448 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2449 in->linklock.is_unstable_and_locked()) ||
2450 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2451 in->xattrlock.is_unstable_and_locked()))
2452 return true;
2453 return false;
2454 }
2455
2456 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2457 {
2458 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2459 dout(10) << " wanted " << ccap_string(cap->wanted())
2460 << " -> " << ccap_string(wanted) << dendl;
2461 cap->set_wanted(wanted);
2462 } else if (wanted & ~cap->wanted()) {
2463 dout(10) << " wanted " << ccap_string(cap->wanted())
2464 << " -> " << ccap_string(wanted)
2465 << " (added caps even though we had seq mismatch!)" << dendl;
2466 cap->set_wanted(wanted | cap->wanted());
2467 } else {
2468 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2469 << " -> " << ccap_string(wanted)
2470 << " (issue_seq " << issue_seq << " != last_issue "
2471 << cap->get_last_issue() << ")" << dendl;
2472 return;
2473 }
2474
2475 CInode *cur = cap->get_inode();
2476 if (!cur->is_auth()) {
2477 request_inode_file_caps(cur);
2478 return;
2479 }
2480
2481 if (cap->wanted() == 0) {
2482 if (cur->item_open_file.is_on_list() &&
2483 !cur->is_any_caps_wanted()) {
2484 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2485 cur->item_open_file.remove_myself();
2486 }
2487 } else {
2488 if (cur->state_test(CInode::STATE_RECOVERING) &&
2489 (cap->wanted() & (CEPH_CAP_FILE_RD |
2490 CEPH_CAP_FILE_WR))) {
2491 mds->mdcache->recovery_queue.prioritize(cur);
2492 }
2493
2494 if (!cur->item_open_file.is_on_list()) {
2495 dout(10) << " adding to open file list " << *cur << dendl;
2496 assert(cur->last == CEPH_NOSNAP);
2497 LogSegment *ls = mds->mdlog->get_current_segment();
2498 EOpen *le = new EOpen(mds->mdlog);
2499 mds->mdlog->start_entry(le);
2500 le->add_clean_inode(cur);
2501 ls->open_files.push_back(&cur->item_open_file);
2502 mds->mdlog->submit_entry(le);
2503 }
2504 }
2505 }
2506
2507
2508
2509 void Locker::_do_null_snapflush(CInode *head_in, client_t client)
2510 {
2511 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2512 compact_map<snapid_t, set<client_t> >::iterator p = head_in->client_need_snapflush.begin();
2513 while (p != head_in->client_need_snapflush.end()) {
2514 snapid_t snapid = p->first;
2515 set<client_t>& clients = p->second;
2516 ++p; // be careful, q loop below depends on this
2517
2518 if (clients.count(client)) {
2519 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2520 CInode *sin = mdcache->get_inode(head_in->ino(), snapid);
2521 if (!sin) {
2522 // hrm, look forward until we find the inode.
2523 // (we can only look it up by the last snapid it is valid for)
2524 dout(10) << " didn't have " << head_in->ino() << " snapid " << snapid << dendl;
2525 for (compact_map<snapid_t, set<client_t> >::iterator q = p; // p is already at next entry
2526 q != head_in->client_need_snapflush.end();
2527 ++q) {
2528 dout(10) << " trying snapid " << q->first << dendl;
2529 sin = mdcache->get_inode(head_in->ino(), q->first);
2530 if (sin) {
2531 assert(sin->first <= snapid);
2532 break;
2533 }
2534 dout(10) << " didn't have " << head_in->ino() << " snapid " << q->first << dendl;
2535 }
2536 if (!sin && head_in->is_multiversion())
2537 sin = head_in;
2538 assert(sin);
2539 }
2540 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2541 head_in->remove_need_snapflush(sin, snapid, client);
2542 }
2543 }
2544 }
2545
2546
2547 bool Locker::should_defer_client_cap_frozen(CInode *in)
2548 {
2549 /*
2550 * This policy needs to be AT LEAST as permissive as allowing a client request
2551 * to go forward, or else a client request can release something, the release
2552 * gets deferred, but the request gets processed and deadlocks because when the
2553 * caps can't get revoked.
2554 *
2555 * Currently, a request wait if anything locked is freezing (can't
2556 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2557 * _MUST_ be in the lock/auth_pin set.
2558 *
2559 * auth_pins==0 implies no unstable lock and not auth pinnned by
2560 * client request, otherwise continue even it's freezing.
2561 */
2562 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2563 }
2564
2565 /*
2566 * This function DOES put the passed message before returning
2567 */
2568 void Locker::handle_client_caps(MClientCaps *m)
2569 {
2570 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2571 client_t client = m->get_source().num();
2572
2573 snapid_t follows = m->get_snap_follows();
2574 dout(7) << "handle_client_caps "
2575 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2576 << " on " << m->get_ino()
2577 << " tid " << m->get_client_tid() << " follows " << follows
2578 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2579
2580 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2581 if (!session) {
2582 dout(5) << " no session, dropping " << *m << dendl;
2583 m->put();
2584 return;
2585 }
2586 if (session->is_closed() ||
2587 session->is_closing() ||
2588 session->is_killing()) {
2589 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2590 m->put();
2591 return;
2592 }
2593 if (mds->is_reconnect() &&
2594 m->get_dirty() && m->get_client_tid() > 0 &&
2595 !session->have_completed_flush(m->get_client_tid())) {
2596 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2597 }
2598 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2599 return;
2600 }
2601
2602 if (m->get_client_tid() > 0 && session &&
2603 session->have_completed_flush(m->get_client_tid())) {
2604 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2605 << " for client." << client << dendl;
2606 MClientCaps *ack;
2607 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2608 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2609 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2610 } else {
2611 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2612 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2613 mds->get_osd_epoch_barrier());
2614 }
2615 ack->set_snap_follows(follows);
2616 ack->set_client_tid(m->get_client_tid());
2617 mds->send_message_client_counted(ack, m->get_connection());
2618 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2619 m->put();
2620 return;
2621 } else {
2622 // fall-thru because the message may release some caps
2623 m->clear_dirty();
2624 m->set_op(CEPH_CAP_OP_UPDATE);
2625 }
2626 }
2627
2628 // "oldest flush tid" > 0 means client uses unique TID for each flush
2629 if (m->get_oldest_flush_tid() > 0 && session) {
2630 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2631 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2632
2633 if (session->get_num_trim_flushes_warnings() > 0 &&
2634 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2635 session->reset_num_trim_flushes_warnings();
2636 } else {
2637 if (session->get_num_completed_flushes() >=
2638 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2639 session->inc_num_trim_flushes_warnings();
2640 stringstream ss;
2641 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2642 << m->get_oldest_flush_tid() << "), "
2643 << session->get_num_completed_flushes()
2644 << " completed flushes recorded in session";
2645 mds->clog->warn() << ss.str();
2646 dout(20) << __func__ << " " << ss.str() << dendl;
2647 }
2648 }
2649 }
2650
2651 CInode *head_in = mdcache->get_inode(m->get_ino());
2652 if (!head_in) {
2653 if (mds->is_clientreplay()) {
2654 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2655 << ", will try again after replayed client requests" << dendl;
2656 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2657 return;
2658 }
2659 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2660 m->put();
2661 return;
2662 }
2663
2664 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2665 // Pause RADOS operations until we see the required epoch
2666 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2667 }
2668
2669 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2670 // Record the barrier so that we will retransmit it to clients
2671 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2672 }
2673
2674 CInode *in = head_in;
2675 if (follows > 0) {
2676 in = mdcache->pick_inode_snap(head_in, follows);
2677 if (in != head_in)
2678 dout(10) << " head inode " << *head_in << dendl;
2679 }
2680 dout(10) << " cap inode " << *in << dendl;
2681
2682 Capability *cap = 0;
2683 cap = in->get_client_cap(client);
2684 if (!cap && in != head_in)
2685 cap = head_in->get_client_cap(client);
2686 if (!cap) {
2687 dout(7) << "handle_client_caps no cap for client." << client << " on " << *in << dendl;
2688 m->put();
2689 return;
2690 }
2691 assert(cap);
2692
2693 // freezing|frozen?
2694 if (should_defer_client_cap_frozen(in)) {
2695 dout(7) << "handle_client_caps freezing|frozen on " << *in << dendl;
2696 in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2697 return;
2698 }
2699 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2700 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2701 << ", dropping" << dendl;
2702 m->put();
2703 return;
2704 }
2705
2706 int op = m->get_op();
2707
2708 // flushsnap?
2709 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2710 if (!in->is_auth()) {
2711 dout(7) << " not auth, ignoring flushsnap on " << *in << dendl;
2712 goto out;
2713 }
2714
2715 SnapRealm *realm = in->find_snaprealm();
2716 snapid_t snap = realm->get_snap_following(follows);
2717 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2718
2719 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2720 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2721 // case we get a dup response, so whatever.)
2722 MClientCaps *ack = 0;
2723 if (m->get_dirty()) {
2724 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2725 ack->set_snap_follows(follows);
2726 ack->set_client_tid(m->get_client_tid());
2727 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2728 }
2729
2730 if (in == head_in ||
2731 (head_in->client_need_snapflush.count(snap) &&
2732 head_in->client_need_snapflush[snap].count(client))) {
2733 dout(7) << " flushsnap snap " << snap
2734 << " client." << client << " on " << *in << dendl;
2735
2736 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2737 if (in == head_in)
2738 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2739
2740 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2741
2742 if (in != head_in)
2743 head_in->remove_need_snapflush(in, snap, client);
2744
2745 } else {
2746 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2747 if (ack)
2748 mds->send_message_client_counted(ack, m->get_connection());
2749 }
2750 goto out;
2751 }
2752
2753 if (cap->get_cap_id() != m->get_cap_id()) {
2754 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2755 } else {
2756 // intermediate snap inodes
2757 while (in != head_in) {
2758 assert(in->last != CEPH_NOSNAP);
2759 if (in->is_auth() && m->get_dirty()) {
2760 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2761 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2762 }
2763 in = mdcache->pick_inode_snap(head_in, in->last);
2764 }
2765
2766 // head inode, and cap
2767 MClientCaps *ack = 0;
2768
2769 int caps = m->get_caps();
2770 if (caps & ~cap->issued()) {
2771 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2772 caps &= cap->issued();
2773 }
2774
2775 cap->confirm_receipt(m->get_seq(), caps);
2776 dout(10) << " follows " << follows
2777 << " retains " << ccap_string(m->get_caps())
2778 << " dirty " << ccap_string(m->get_dirty())
2779 << " on " << *in << dendl;
2780
2781
2782 // missing/skipped snapflush?
2783 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2784 // presently only does so when it has actual dirty metadata. But, we
2785 // set up the need_snapflush stuff based on the issued caps.
2786 // We can infer that the client WONT send a FLUSHSNAP once they have
2787 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2788 // update/release).
2789 if (!head_in->client_need_snapflush.empty()) {
2790 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2791 _do_null_snapflush(head_in, client);
2792 } else {
2793 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2794 }
2795 }
2796
2797 if (m->get_dirty() && in->is_auth()) {
2798 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2799 << " seq " << m->get_seq() << " on " << *in << dendl;
2800 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2801 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2802 ack->set_client_tid(m->get_client_tid());
2803 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2804 }
2805
2806 // filter wanted based on what we could ever give out (given auth/replica status)
2807 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2808 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2809 if (new_wanted != cap->wanted()) {
2810 if (!need_flush && (new_wanted & ~cap->pending())) {
2811 // exapnding caps. make sure we aren't waiting for a log flush
2812 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2813 }
2814
2815 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2816 }
2817
2818 if (in->is_auth() &&
2819 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2820 // updated
2821 eval(in, CEPH_CAP_LOCKS);
2822
2823 if (!need_flush && (cap->wanted() & ~cap->pending()))
2824 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2825 } else {
2826 // no update, ack now.
2827 if (ack)
2828 mds->send_message_client_counted(ack, m->get_connection());
2829
2830 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2831 if (!did_issue && (cap->wanted() & ~cap->pending()))
2832 issue_caps(in, cap);
2833
2834 if (cap->get_last_seq() == 0 &&
2835 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2836 cap->issue_norevoke(cap->issued());
2837 share_inode_max_size(in, cap);
2838 }
2839 }
2840
2841 if (need_flush)
2842 mds->mdlog->flush();
2843 }
2844
2845 out:
2846 m->put();
2847 }
2848
2849
2850 class C_Locker_RetryRequestCapRelease : public LockerContext {
2851 client_t client;
2852 ceph_mds_request_release item;
2853 public:
2854 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2855 LockerContext(l), client(c), item(it) { }
2856 void finish(int r) override {
2857 string dname;
2858 MDRequestRef null_ref;
2859 locker->process_request_cap_release(null_ref, client, item, dname);
2860 }
2861 };
2862
2863 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2864 const string &dname)
2865 {
2866 inodeno_t ino = (uint64_t)item.ino;
2867 uint64_t cap_id = item.cap_id;
2868 int caps = item.caps;
2869 int wanted = item.wanted;
2870 int seq = item.seq;
2871 int issue_seq = item.issue_seq;
2872 int mseq = item.mseq;
2873
2874 CInode *in = mdcache->get_inode(ino);
2875 if (!in)
2876 return;
2877
2878 if (dname.length()) {
2879 frag_t fg = in->pick_dirfrag(dname);
2880 CDir *dir = in->get_dirfrag(fg);
2881 if (dir) {
2882 CDentry *dn = dir->lookup(dname);
2883 if (dn) {
2884 ClientLease *l = dn->get_client_lease(client);
2885 if (l) {
2886 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2887 dn->remove_client_lease(l, this);
2888 } else {
2889 dout(7) << "process_cap_release client." << client
2890 << " doesn't have lease on " << *dn << dendl;
2891 }
2892 } else {
2893 dout(7) << "process_cap_release client." << client << " released lease on dn "
2894 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2895 }
2896 }
2897 }
2898
2899 Capability *cap = in->get_client_cap(client);
2900 if (!cap)
2901 return;
2902
2903 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2904 << (mdr ? "" : " (DEFERRED, no mdr)")
2905 << dendl;
2906
2907 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2908 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2909 return;
2910 }
2911
2912 if (cap->get_cap_id() != cap_id) {
2913 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2914 return;
2915 }
2916
2917 if (should_defer_client_cap_frozen(in)) {
2918 dout(7) << " frozen, deferring" << dendl;
2919 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2920 return;
2921 }
2922
2923 if (caps & ~cap->issued()) {
2924 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2925 caps &= cap->issued();
2926 }
2927 cap->confirm_receipt(seq, caps);
2928
2929 if (!in->client_need_snapflush.empty() &&
2930 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2931 _do_null_snapflush(in, client);
2932 }
2933
2934 adjust_cap_wanted(cap, wanted, issue_seq);
2935
2936 if (mdr)
2937 cap->inc_suppress();
2938 eval(in, CEPH_CAP_LOCKS);
2939 if (mdr)
2940 cap->dec_suppress();
2941
2942 // take note; we may need to reissue on this cap later
2943 if (mdr)
2944 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2945 }
2946
2947 class C_Locker_RetryKickIssueCaps : public LockerContext {
2948 CInode *in;
2949 client_t client;
2950 ceph_seq_t seq;
2951 public:
2952 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2953 LockerContext(l), in(i), client(c), seq(s) {
2954 in->get(CInode::PIN_PTRWAITER);
2955 }
2956 void finish(int r) override {
2957 locker->kick_issue_caps(in, client, seq);
2958 in->put(CInode::PIN_PTRWAITER);
2959 }
2960 };
2961
2962 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
2963 {
2964 Capability *cap = in->get_client_cap(client);
2965 if (!cap || cap->get_last_sent() != seq)
2966 return;
2967 if (in->is_frozen()) {
2968 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
2969 in->add_waiter(CInode::WAIT_UNFREEZE,
2970 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
2971 return;
2972 }
2973 dout(10) << "kick_issue_caps released at current seq " << seq
2974 << ", reissuing" << dendl;
2975 issue_caps(in, cap);
2976 }
2977
2978 void Locker::kick_cap_releases(MDRequestRef& mdr)
2979 {
2980 client_t client = mdr->get_client();
2981 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
2982 p != mdr->cap_releases.end();
2983 ++p) {
2984 CInode *in = mdcache->get_inode(p->first);
2985 if (!in)
2986 continue;
2987 kick_issue_caps(in, client, p->second);
2988 }
2989 }
2990
2991 /**
2992 * m and ack might be NULL, so don't dereference them unless dirty != 0
2993 */
2994 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
2995 {
2996 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
2997 << " follows " << follows << " snap " << snap
2998 << " on " << *in << dendl;
2999
3000 if (snap == CEPH_NOSNAP) {
3001 // hmm, i guess snap was already deleted? just ack!
3002 dout(10) << " wow, the snap following " << follows
3003 << " was already deleted. nothing to record, just ack." << dendl;
3004 if (ack)
3005 mds->send_message_client_counted(ack, m->get_connection());
3006 return;
3007 }
3008
3009 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3010 mds->mdlog->start_entry(le);
3011 MutationRef mut = new MutationImpl();
3012 mut->ls = mds->mdlog->get_current_segment();
3013
3014 // normal metadata updates that we can apply to the head as well.
3015
3016 // update xattrs?
3017 bool xattrs = false;
3018 map<string,bufferptr> *px = 0;
3019 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3020 m->xattrbl.length() &&
3021 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3022 xattrs = true;
3023
3024 old_inode_t *oi = 0;
3025 if (in->is_multiversion()) {
3026 oi = in->pick_old_inode(snap);
3027 }
3028
3029 inode_t *pi;
3030 if (oi) {
3031 dout(10) << " writing into old inode" << dendl;
3032 pi = in->project_inode();
3033 pi->version = in->pre_dirty();
3034 if (snap > oi->first)
3035 in->split_old_inode(snap);
3036 pi = &oi->inode;
3037 if (xattrs)
3038 px = &oi->xattrs;
3039 } else {
3040 if (xattrs)
3041 px = new map<string,bufferptr>;
3042 pi = in->project_inode(px);
3043 pi->version = in->pre_dirty();
3044 }
3045
3046 _update_cap_fields(in, dirty, m, pi);
3047
3048 // xattr
3049 if (px) {
3050 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3051 << " len " << m->xattrbl.length() << dendl;
3052 pi->xattr_version = m->head.xattr_version;
3053 bufferlist::iterator p = m->xattrbl.begin();
3054 ::decode(*px, p);
3055 }
3056
3057 if (pi->client_ranges.count(client)) {
3058 if (in->last == snap) {
3059 dout(10) << " removing client_range entirely" << dendl;
3060 pi->client_ranges.erase(client);
3061 } else {
3062 dout(10) << " client_range now follows " << snap << dendl;
3063 pi->client_ranges[client].follows = snap;
3064 }
3065 }
3066
3067 mut->auth_pin(in);
3068 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3069 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3070
3071 // "oldest flush tid" > 0 means client uses unique TID for each flush
3072 if (ack && ack->get_oldest_flush_tid() > 0)
3073 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3074 ack->get_oldest_flush_tid());
3075
3076 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3077 client, ack));
3078 }
3079
3080 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3081 {
3082 if (dirty == 0)
3083 return;
3084
3085 /* m must be valid if there are dirty caps */
3086 assert(m);
3087 uint64_t features = m->get_connection()->get_features();
3088
3089 if (m->get_ctime() > pi->ctime) {
3090 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3091 << " for " << *in << dendl;
3092 pi->ctime = m->get_ctime();
3093 }
3094
3095 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3096 m->get_change_attr() > pi->change_attr) {
3097 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3098 << " for " << *in << dendl;
3099 pi->change_attr = m->get_change_attr();
3100 }
3101
3102 // file
3103 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3104 utime_t atime = m->get_atime();
3105 utime_t mtime = m->get_mtime();
3106 uint64_t size = m->get_size();
3107 version_t inline_version = m->inline_version;
3108
3109 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3110 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3111 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3112 << " for " << *in << dendl;
3113 pi->mtime = mtime;
3114 }
3115 if (in->inode.is_file() && // ONLY if regular file
3116 size > pi->size) {
3117 dout(7) << " size " << pi->size << " -> " << size
3118 << " for " << *in << dendl;
3119 pi->size = size;
3120 pi->rstat.rbytes = size;
3121 }
3122 if (in->inode.is_file() &&
3123 (dirty & CEPH_CAP_FILE_WR) &&
3124 inline_version > pi->inline_data.version) {
3125 pi->inline_data.version = inline_version;
3126 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3127 pi->inline_data.get_data() = m->inline_data;
3128 else
3129 pi->inline_data.free_data();
3130 }
3131 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3132 dout(7) << " atime " << pi->atime << " -> " << atime
3133 << " for " << *in << dendl;
3134 pi->atime = atime;
3135 }
3136 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3137 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3138 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3139 << " for " << *in << dendl;
3140 pi->time_warp_seq = m->get_time_warp_seq();
3141 }
3142 }
3143 // auth
3144 if (dirty & CEPH_CAP_AUTH_EXCL) {
3145 if (m->head.uid != pi->uid) {
3146 dout(7) << " uid " << pi->uid
3147 << " -> " << m->head.uid
3148 << " for " << *in << dendl;
3149 pi->uid = m->head.uid;
3150 }
3151 if (m->head.gid != pi->gid) {
3152 dout(7) << " gid " << pi->gid
3153 << " -> " << m->head.gid
3154 << " for " << *in << dendl;
3155 pi->gid = m->head.gid;
3156 }
3157 if (m->head.mode != pi->mode) {
3158 dout(7) << " mode " << oct << pi->mode
3159 << " -> " << m->head.mode << dec
3160 << " for " << *in << dendl;
3161 pi->mode = m->head.mode;
3162 }
3163 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3164 dout(7) << " btime " << oct << pi->btime
3165 << " -> " << m->get_btime() << dec
3166 << " for " << *in << dendl;
3167 pi->btime = m->get_btime();
3168 }
3169 }
3170 }
3171
3172 /*
3173 * update inode based on cap flush|flushsnap|wanted.
3174 * adjust max_size, if needed.
3175 * if we update, return true; otherwise, false (no updated needed).
3176 */
3177 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3178 int dirty, snapid_t follows,
3179 MClientCaps *m, MClientCaps *ack,
3180 bool *need_flush)
3181 {
3182 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3183 << " issued " << ccap_string(cap ? cap->issued() : 0)
3184 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3185 << " on " << *in << dendl;
3186 assert(in->is_auth());
3187 client_t client = m->get_source().num();
3188 inode_t *latest = in->get_projected_inode();
3189
3190 // increase or zero max_size?
3191 uint64_t size = m->get_size();
3192 bool change_max = false;
3193 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3194 uint64_t new_max = old_max;
3195
3196 if (in->is_file()) {
3197 bool forced_change_max = false;
3198 dout(20) << "inode is file" << dendl;
3199 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3200 dout(20) << "client has write caps; m->get_max_size="
3201 << m->get_max_size() << "; old_max=" << old_max << dendl;
3202 if (m->get_max_size() > new_max) {
3203 dout(10) << "client requests file_max " << m->get_max_size()
3204 << " > max " << old_max << dendl;
3205 change_max = true;
3206 forced_change_max = true;
3207 new_max = calc_new_max_size(latest, m->get_max_size());
3208 } else {
3209 new_max = calc_new_max_size(latest, size);
3210
3211 if (new_max > old_max)
3212 change_max = true;
3213 else
3214 new_max = old_max;
3215 }
3216 } else {
3217 if (old_max) {
3218 change_max = true;
3219 new_max = 0;
3220 }
3221 }
3222
3223 if (in->last == CEPH_NOSNAP &&
3224 change_max &&
3225 !in->filelock.can_wrlock(client) &&
3226 !in->filelock.can_force_wrlock(client)) {
3227 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3228 if (in->filelock.is_stable()) {
3229 bool need_issue = false;
3230 if (cap)
3231 cap->inc_suppress();
3232 if (in->mds_caps_wanted.empty() &&
3233 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3234 if (in->filelock.get_state() != LOCK_EXCL)
3235 file_excl(&in->filelock, &need_issue);
3236 } else
3237 simple_lock(&in->filelock, &need_issue);
3238 if (need_issue)
3239 issue_caps(in);
3240 if (cap)
3241 cap->dec_suppress();
3242 }
3243 if (!in->filelock.can_wrlock(client) &&
3244 !in->filelock.can_force_wrlock(client)) {
3245 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3246 forced_change_max ? new_max : 0,
3247 0, utime_t());
3248
3249 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3250 change_max = false;
3251 }
3252 }
3253 }
3254
3255 if (m->flockbl.length()) {
3256 int32_t num_locks;
3257 bufferlist::iterator bli = m->flockbl.begin();
3258 ::decode(num_locks, bli);
3259 for ( int i=0; i < num_locks; ++i) {
3260 ceph_filelock decoded_lock;
3261 ::decode(decoded_lock, bli);
3262 in->get_fcntl_lock_state()->held_locks.
3263 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3264 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3265 }
3266 ::decode(num_locks, bli);
3267 for ( int i=0; i < num_locks; ++i) {
3268 ceph_filelock decoded_lock;
3269 ::decode(decoded_lock, bli);
3270 in->get_flock_lock_state()->held_locks.
3271 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3272 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3273 }
3274 }
3275
3276 if (!dirty && !change_max)
3277 return false;
3278
3279 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3280 if (session->check_access(in, MAY_WRITE,
3281 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3282 session->put();
3283 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3284 return false;
3285 }
3286 session->put();
3287
3288 // do the update.
3289 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3290 mds->mdlog->start_entry(le);
3291
3292 // xattrs update?
3293 map<string,bufferptr> *px = 0;
3294 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3295 m->xattrbl.length() &&
3296 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3297 px = new map<string,bufferptr>;
3298
3299 inode_t *pi = in->project_inode(px);
3300 pi->version = in->pre_dirty();
3301
3302 MutationRef mut(new MutationImpl());
3303 mut->ls = mds->mdlog->get_current_segment();
3304
3305 _update_cap_fields(in, dirty, m, pi);
3306
3307 if (change_max) {
3308 dout(7) << " max_size " << old_max << " -> " << new_max
3309 << " for " << *in << dendl;
3310 if (new_max) {
3311 pi->client_ranges[client].range.first = 0;
3312 pi->client_ranges[client].range.last = new_max;
3313 pi->client_ranges[client].follows = in->first - 1;
3314 } else
3315 pi->client_ranges.erase(client);
3316 }
3317
3318 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3319 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3320
3321 // auth
3322 if (dirty & CEPH_CAP_AUTH_EXCL)
3323 wrlock_force(&in->authlock, mut);
3324
3325 // xattr
3326 if (px) {
3327 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3328 pi->xattr_version = m->head.xattr_version;
3329 bufferlist::iterator p = m->xattrbl.begin();
3330 ::decode(*px, p);
3331
3332 wrlock_force(&in->xattrlock, mut);
3333 }
3334
3335 mut->auth_pin(in);
3336 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3337 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3338
3339 // "oldest flush tid" > 0 means client uses unique TID for each flush
3340 if (ack && ack->get_oldest_flush_tid() > 0)
3341 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3342 ack->get_oldest_flush_tid());
3343
3344 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3345 change_max, !!cap,
3346 client, ack));
3347 if (need_flush && !*need_flush &&
3348 ((change_max && new_max) || // max INCREASE
3349 _need_flush_mdlog(in, dirty)))
3350 *need_flush = true;
3351
3352 return true;
3353 }
3354
3355 /* This function DOES put the passed message before returning */
3356 void Locker::handle_client_cap_release(MClientCapRelease *m)
3357 {
3358 client_t client = m->get_source().num();
3359 dout(10) << "handle_client_cap_release " << *m << dendl;
3360
3361 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3362 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3363 return;
3364 }
3365
3366 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3367 // Pause RADOS operations until we see the required epoch
3368 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3369 }
3370
3371 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3372 // Record the barrier so that we will retransmit it to clients
3373 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3374 }
3375
3376 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3377
3378 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3379 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3380 }
3381
3382 if (session) {
3383 session->notify_cap_release(m->caps.size());
3384 }
3385
3386 m->put();
3387 }
3388
3389 class C_Locker_RetryCapRelease : public LockerContext {
3390 client_t client;
3391 inodeno_t ino;
3392 uint64_t cap_id;
3393 ceph_seq_t migrate_seq;
3394 ceph_seq_t issue_seq;
3395 public:
3396 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3397 ceph_seq_t mseq, ceph_seq_t seq) :
3398 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3399 void finish(int r) override {
3400 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3401 }
3402 };
3403
3404 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3405 ceph_seq_t mseq, ceph_seq_t seq)
3406 {
3407 CInode *in = mdcache->get_inode(ino);
3408 if (!in) {
3409 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3410 return;
3411 }
3412 Capability *cap = in->get_client_cap(client);
3413 if (!cap) {
3414 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3415 return;
3416 }
3417
3418 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3419 if (cap->get_cap_id() != cap_id) {
3420 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3421 return;
3422 }
3423 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3424 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3425 return;
3426 }
3427 if (should_defer_client_cap_frozen(in)) {
3428 dout(7) << " freezing|frozen, deferring" << dendl;
3429 in->add_waiter(CInode::WAIT_UNFREEZE,
3430 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3431 return;
3432 }
3433 if (seq != cap->get_last_issue()) {
3434 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3435 // clean out any old revoke history
3436 cap->clean_revoke_from(seq);
3437 eval_cap_gather(in);
3438 return;
3439 }
3440 remove_client_cap(in, client);
3441 }
3442
3443 /* This function DOES put the passed message before returning */
3444
3445 void Locker::remove_client_cap(CInode *in, client_t client)
3446 {
3447 // clean out any pending snapflush state
3448 if (!in->client_need_snapflush.empty())
3449 _do_null_snapflush(in, client);
3450
3451 in->remove_client_cap(client);
3452
3453 if (in->is_auth()) {
3454 // make sure we clear out the client byte range
3455 if (in->get_projected_inode()->client_ranges.count(client) &&
3456 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3457 check_inode_max_size(in);
3458 } else {
3459 request_inode_file_caps(in);
3460 }
3461
3462 try_eval(in, CEPH_CAP_LOCKS);
3463 }
3464
3465
3466 /**
3467 * Return true if any currently revoking caps exceed the
3468 * mds_revoke_cap_timeout threshold.
3469 */
3470 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3471 {
3472 xlist<Capability*>::const_iterator p = revoking.begin();
3473 if (p.end()) {
3474 // No revoking caps at the moment
3475 return false;
3476 } else {
3477 utime_t now = ceph_clock_now();
3478 utime_t age = now - (*p)->get_last_revoke_stamp();
3479 if (age <= g_conf->mds_revoke_cap_timeout) {
3480 return false;
3481 } else {
3482 return true;
3483 }
3484 }
3485 }
3486
3487
3488 void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3489 {
3490 if (!any_late_revoking_caps(revoking_caps)) {
3491 // Fast path: no misbehaving clients, execute in O(1)
3492 return;
3493 }
3494
3495 // Slow path: execute in O(N_clients)
3496 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3497 for (client_rc_iter = revoking_caps_by_client.begin();
3498 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3499 xlist<Capability*> const &client_rc = client_rc_iter->second;
3500 bool any_late = any_late_revoking_caps(client_rc);
3501 if (any_late) {
3502 result->push_back(client_rc_iter->first);
3503 }
3504 }
3505 }
3506
3507 // Hard-code instead of surfacing a config settings because this is
3508 // really a hack that should go away at some point when we have better
3509 // inspection tools for getting at detailed cap state (#7316)
3510 #define MAX_WARN_CAPS 100
3511
3512 void Locker::caps_tick()
3513 {
3514 utime_t now = ceph_clock_now();
3515
3516 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3517
3518 int i = 0;
3519 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3520 Capability *cap = *p;
3521
3522 utime_t age = now - cap->get_last_revoke_stamp();
3523 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3524 if (age <= g_conf->mds_revoke_cap_timeout) {
3525 dout(20) << __func__ << " age below timeout " << g_conf->mds_revoke_cap_timeout << dendl;
3526 break;
3527 } else {
3528 ++i;
3529 if (i > MAX_WARN_CAPS) {
3530 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3531 << "revoking, ignoring subsequent caps" << dendl;
3532 break;
3533 }
3534 }
3535 // exponential backoff of warning intervals
3536 if (age > g_conf->mds_revoke_cap_timeout * (1 << cap->get_num_revoke_warnings())) {
3537 cap->inc_num_revoke_warnings();
3538 stringstream ss;
3539 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3540 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3541 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3542 mds->clog->warn() << ss.str();
3543 dout(20) << __func__ << " " << ss.str() << dendl;
3544 } else {
3545 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3546 }
3547 }
3548 }
3549
3550
3551 void Locker::handle_client_lease(MClientLease *m)
3552 {
3553 dout(10) << "handle_client_lease " << *m << dendl;
3554
3555 assert(m->get_source().is_client());
3556 client_t client = m->get_source().num();
3557
3558 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3559 if (!in) {
3560 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3561 m->put();
3562 return;
3563 }
3564 CDentry *dn = 0;
3565
3566 frag_t fg = in->pick_dirfrag(m->dname);
3567 CDir *dir = in->get_dirfrag(fg);
3568 if (dir)
3569 dn = dir->lookup(m->dname);
3570 if (!dn) {
3571 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3572 m->put();
3573 return;
3574 }
3575 dout(10) << " on " << *dn << dendl;
3576
3577 // replica and lock
3578 ClientLease *l = dn->get_client_lease(client);
3579 if (!l) {
3580 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3581 m->put();
3582 return;
3583 }
3584
3585 switch (m->get_action()) {
3586 case CEPH_MDS_LEASE_REVOKE_ACK:
3587 case CEPH_MDS_LEASE_RELEASE:
3588 if (l->seq != m->get_seq()) {
3589 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3590 } else {
3591 dout(7) << "handle_client_lease client." << client
3592 << " on " << *dn << dendl;
3593 dn->remove_client_lease(l, this);
3594 }
3595 m->put();
3596 break;
3597
3598 case CEPH_MDS_LEASE_RENEW:
3599 {
3600 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3601 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3602 if (dn->lock.can_lease(client)) {
3603 int pool = 1; // fixme.. do something smart!
3604 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3605 m->h.seq = ++l->seq;
3606 m->clear_payload();
3607
3608 utime_t now = ceph_clock_now();
3609 now += mdcache->client_lease_durations[pool];
3610 mdcache->touch_client_lease(l, pool, now);
3611
3612 mds->send_message_client_counted(m, m->get_connection());
3613 }
3614 }
3615 break;
3616
3617 default:
3618 ceph_abort(); // implement me
3619 break;
3620 }
3621 }
3622
3623
3624 void Locker::issue_client_lease(CDentry *dn, client_t client,
3625 bufferlist &bl, utime_t now, Session *session)
3626 {
3627 CInode *diri = dn->get_dir()->get_inode();
3628 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3629 ((!diri->filelock.can_lease(client) &&
3630 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3631 dn->lock.can_lease(client)) {
3632 int pool = 1; // fixme.. do something smart!
3633 // issue a dentry lease
3634 ClientLease *l = dn->add_client_lease(client, session);
3635 session->touch_lease(l);
3636
3637 now += mdcache->client_lease_durations[pool];
3638 mdcache->touch_client_lease(l, pool, now);
3639
3640 LeaseStat e;
3641 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3642 e.seq = ++l->seq;
3643 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3644 ::encode(e, bl);
3645 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3646 << " on " << *dn << dendl;
3647 } else {
3648 // null lease
3649 LeaseStat e;
3650 e.mask = 0;
3651 e.seq = 0;
3652 e.duration_ms = 0;
3653 ::encode(e, bl);
3654 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3655 }
3656 }
3657
3658
3659 void Locker::revoke_client_leases(SimpleLock *lock)
3660 {
3661 int n = 0;
3662 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3663 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3664 p != dn->client_lease_map.end();
3665 ++p) {
3666 ClientLease *l = p->second;
3667
3668 n++;
3669 assert(lock->get_type() == CEPH_LOCK_DN);
3670
3671 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3672 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3673
3674 // i should also revoke the dir ICONTENT lease, if they have it!
3675 CInode *diri = dn->get_dir()->get_inode();
3676 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3677 mask,
3678 diri->ino(),
3679 diri->first, CEPH_NOSNAP,
3680 dn->get_name()),
3681 l->client);
3682 }
3683 assert(n == lock->get_num_client_lease());
3684 }
3685
3686
3687
3688 // locks ----------------------------------------------------------------
3689
3690 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3691 {
3692 switch (lock_type) {
3693 case CEPH_LOCK_DN:
3694 {
3695 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3696 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3697 frag_t fg;
3698 CDir *dir = 0;
3699 CDentry *dn = 0;
3700 if (diri) {
3701 fg = diri->pick_dirfrag(info.dname);
3702 dir = diri->get_dirfrag(fg);
3703 if (dir)
3704 dn = dir->lookup(info.dname, info.snapid);
3705 }
3706 if (!dn) {
3707 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3708 return 0;
3709 }
3710 return &dn->lock;
3711 }
3712
3713 case CEPH_LOCK_IAUTH:
3714 case CEPH_LOCK_ILINK:
3715 case CEPH_LOCK_IDFT:
3716 case CEPH_LOCK_IFILE:
3717 case CEPH_LOCK_INEST:
3718 case CEPH_LOCK_IXATTR:
3719 case CEPH_LOCK_ISNAP:
3720 case CEPH_LOCK_IFLOCK:
3721 case CEPH_LOCK_IPOLICY:
3722 {
3723 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3724 if (!in) {
3725 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3726 return 0;
3727 }
3728 switch (lock_type) {
3729 case CEPH_LOCK_IAUTH: return &in->authlock;
3730 case CEPH_LOCK_ILINK: return &in->linklock;
3731 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3732 case CEPH_LOCK_IFILE: return &in->filelock;
3733 case CEPH_LOCK_INEST: return &in->nestlock;
3734 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3735 case CEPH_LOCK_ISNAP: return &in->snaplock;
3736 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3737 case CEPH_LOCK_IPOLICY: return &in->policylock;
3738 }
3739 }
3740
3741 default:
3742 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3743 ceph_abort();
3744 break;
3745 }
3746
3747 return 0;
3748 }
3749
3750 /* This function DOES put the passed message before returning */
3751 void Locker::handle_lock(MLock *m)
3752 {
3753 // nobody should be talking to us during recovery.
3754 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3755
3756 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3757 if (!lock) {
3758 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3759 m->put();
3760 return;
3761 }
3762
3763 switch (lock->get_type()) {
3764 case CEPH_LOCK_DN:
3765 case CEPH_LOCK_IAUTH:
3766 case CEPH_LOCK_ILINK:
3767 case CEPH_LOCK_ISNAP:
3768 case CEPH_LOCK_IXATTR:
3769 case CEPH_LOCK_IFLOCK:
3770 case CEPH_LOCK_IPOLICY:
3771 handle_simple_lock(lock, m);
3772 break;
3773
3774 case CEPH_LOCK_IDFT:
3775 case CEPH_LOCK_INEST:
3776 //handle_scatter_lock((ScatterLock*)lock, m);
3777 //break;
3778
3779 case CEPH_LOCK_IFILE:
3780 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3781 break;
3782
3783 default:
3784 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3785 ceph_abort();
3786 break;
3787 }
3788 }
3789
3790
3791
3792
3793
3794 // ==========================================================================
3795 // simple lock
3796
3797 /** This function may take a reference to m if it needs one, but does
3798 * not put references. */
3799 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3800 {
3801 MDSCacheObject *parent = lock->get_parent();
3802 if (parent->is_auth() &&
3803 lock->get_state() != LOCK_SYNC &&
3804 !parent->is_frozen()) {
3805 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3806 << " on " << *parent << dendl;
3807 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3808 if (lock->is_stable()) {
3809 simple_sync(lock);
3810 } else {
3811 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3812 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3813 new C_MDS_RetryMessage(mds, m->get()));
3814 }
3815 } else {
3816 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3817 << " on " << *parent << dendl;
3818 // replica should retry
3819 }
3820 }
3821
3822 /* This function DOES put the passed message before returning */
3823 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3824 {
3825 int from = m->get_asker();
3826
3827 dout(10) << "handle_simple_lock " << *m
3828 << " on " << *lock << " " << *lock->get_parent() << dendl;
3829
3830 if (mds->is_rejoin()) {
3831 if (lock->get_parent()->is_rejoining()) {
3832 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3833 << ", dropping " << *m << dendl;
3834 m->put();
3835 return;
3836 }
3837 }
3838
3839 switch (m->get_action()) {
3840 // -- replica --
3841 case LOCK_AC_SYNC:
3842 assert(lock->get_state() == LOCK_LOCK);
3843 lock->decode_locked_state(m->get_data());
3844 lock->set_state(LOCK_SYNC);
3845 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3846 break;
3847
3848 case LOCK_AC_LOCK:
3849 assert(lock->get_state() == LOCK_SYNC);
3850 lock->set_state(LOCK_SYNC_LOCK);
3851 if (lock->is_leased())
3852 revoke_client_leases(lock);
3853 eval_gather(lock, true);
3854 if (lock->is_unstable_and_locked())
3855 mds->mdlog->flush();
3856 break;
3857
3858
3859 // -- auth --
3860 case LOCK_AC_LOCKACK:
3861 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3862 lock->get_state() == LOCK_SYNC_EXCL);
3863 assert(lock->is_gathering(from));
3864 lock->remove_gather(from);
3865
3866 if (lock->is_gathering()) {
3867 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3868 << ", still gathering " << lock->get_gather_set() << dendl;
3869 } else {
3870 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3871 << ", last one" << dendl;
3872 eval_gather(lock);
3873 }
3874 break;
3875
3876 case LOCK_AC_REQRDLOCK:
3877 handle_reqrdlock(lock, m);
3878 break;
3879
3880 }
3881
3882 m->put();
3883 }
3884
3885 /* unused, currently.
3886
3887 class C_Locker_SimpleEval : public Context {
3888 Locker *locker;
3889 SimpleLock *lock;
3890 public:
3891 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3892 void finish(int r) {
3893 locker->try_simple_eval(lock);
3894 }
3895 };
3896
3897 void Locker::try_simple_eval(SimpleLock *lock)
3898 {
3899 // unstable and ambiguous auth?
3900 if (!lock->is_stable() &&
3901 lock->get_parent()->is_ambiguous_auth()) {
3902 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3903 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3904 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3905 return;
3906 }
3907
3908 if (!lock->get_parent()->is_auth()) {
3909 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3910 return;
3911 }
3912
3913 if (!lock->get_parent()->can_auth_pin()) {
3914 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3915 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3916 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3917 return;
3918 }
3919
3920 if (lock->is_stable())
3921 simple_eval(lock);
3922 }
3923 */
3924
3925
3926 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3927 {
3928 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3929
3930 assert(lock->get_parent()->is_auth());
3931 assert(lock->is_stable());
3932
3933 if (lock->get_parent()->is_freezing_or_frozen()) {
3934 // dentry lock in unreadable state can block path traverse
3935 if ((lock->get_type() != CEPH_LOCK_DN ||
3936 lock->get_state() == LOCK_SYNC ||
3937 lock->get_parent()->is_frozen()))
3938 return;
3939 }
3940
3941 if (mdcache->is_readonly()) {
3942 if (lock->get_state() != LOCK_SYNC) {
3943 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3944 simple_sync(lock, need_issue);
3945 }
3946 return;
3947 }
3948
3949 CInode *in = 0;
3950 int wanted = 0;
3951 if (lock->get_type() != CEPH_LOCK_DN) {
3952 in = static_cast<CInode*>(lock->get_parent());
3953 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3954 }
3955
3956 // -> excl?
3957 if (lock->get_state() != LOCK_EXCL &&
3958 in && in->get_target_loner() >= 0 &&
3959 (wanted & CEPH_CAP_GEXCL)) {
3960 dout(7) << "simple_eval stable, going to excl " << *lock
3961 << " on " << *lock->get_parent() << dendl;
3962 simple_excl(lock, need_issue);
3963 }
3964
3965 // stable -> sync?
3966 else if (lock->get_state() != LOCK_SYNC &&
3967 !lock->is_wrlocked() &&
3968 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
3969 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
3970 dout(7) << "simple_eval stable, syncing " << *lock
3971 << " on " << *lock->get_parent() << dendl;
3972 simple_sync(lock, need_issue);
3973 }
3974 }
3975
3976
3977 // mid
3978
3979 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
3980 {
3981 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
3982 assert(lock->get_parent()->is_auth());
3983 assert(lock->is_stable());
3984
3985 CInode *in = 0;
3986 if (lock->get_cap_shift())
3987 in = static_cast<CInode *>(lock->get_parent());
3988
3989 int old_state = lock->get_state();
3990
3991 if (old_state != LOCK_TSYN) {
3992
3993 switch (lock->get_state()) {
3994 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
3995 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
3996 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
3997 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
3998 default: ceph_abort();
3999 }
4000
4001 int gather = 0;
4002 if (lock->is_wrlocked())
4003 gather++;
4004
4005 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4006 send_lock_message(lock, LOCK_AC_SYNC);
4007 lock->init_gather();
4008 gather++;
4009 }
4010
4011 if (in && in->is_head()) {
4012 if (in->issued_caps_need_gather(lock)) {
4013 if (need_issue)
4014 *need_issue = true;
4015 else
4016 issue_caps(in);
4017 gather++;
4018 }
4019 }
4020
4021 bool need_recover = false;
4022 if (lock->get_type() == CEPH_LOCK_IFILE) {
4023 assert(in);
4024 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4025 mds->mdcache->queue_file_recover(in);
4026 need_recover = true;
4027 gather++;
4028 }
4029 }
4030
4031 if (!gather && lock->is_dirty()) {
4032 lock->get_parent()->auth_pin(lock);
4033 scatter_writebehind(static_cast<ScatterLock*>(lock));
4034 mds->mdlog->flush();
4035 return false;
4036 }
4037
4038 if (gather) {
4039 lock->get_parent()->auth_pin(lock);
4040 if (need_recover)
4041 mds->mdcache->do_file_recover();
4042 return false;
4043 }
4044 }
4045
4046 if (lock->get_parent()->is_replicated()) { // FIXME
4047 bufferlist data;
4048 lock->encode_locked_state(data);
4049 send_lock_message(lock, LOCK_AC_SYNC, data);
4050 }
4051 lock->set_state(LOCK_SYNC);
4052 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4053 if (in && in->is_head()) {
4054 if (need_issue)
4055 *need_issue = true;
4056 else
4057 issue_caps(in);
4058 }
4059 return true;
4060 }
4061
4062 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4063 {
4064 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4065 assert(lock->get_parent()->is_auth());
4066 assert(lock->is_stable());
4067
4068 CInode *in = 0;
4069 if (lock->get_cap_shift())
4070 in = static_cast<CInode *>(lock->get_parent());
4071
4072 switch (lock->get_state()) {
4073 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4074 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4075 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4076 default: ceph_abort();
4077 }
4078
4079 int gather = 0;
4080 if (lock->is_rdlocked())
4081 gather++;
4082 if (lock->is_wrlocked())
4083 gather++;
4084
4085 if (lock->get_parent()->is_replicated() &&
4086 lock->get_state() != LOCK_LOCK_EXCL &&
4087 lock->get_state() != LOCK_XSYN_EXCL) {
4088 send_lock_message(lock, LOCK_AC_LOCK);
4089 lock->init_gather();
4090 gather++;
4091 }
4092
4093 if (in && in->is_head()) {
4094 if (in->issued_caps_need_gather(lock)) {
4095 if (need_issue)
4096 *need_issue = true;
4097 else
4098 issue_caps(in);
4099 gather++;
4100 }
4101 }
4102
4103 if (gather) {
4104 lock->get_parent()->auth_pin(lock);
4105 } else {
4106 lock->set_state(LOCK_EXCL);
4107 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4108 if (in) {
4109 if (need_issue)
4110 *need_issue = true;
4111 else
4112 issue_caps(in);
4113 }
4114 }
4115 }
4116
4117 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4118 {
4119 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4120 assert(lock->get_parent()->is_auth());
4121 assert(lock->is_stable());
4122 assert(lock->get_state() != LOCK_LOCK);
4123
4124 CInode *in = 0;
4125 if (lock->get_cap_shift())
4126 in = static_cast<CInode *>(lock->get_parent());
4127
4128 int old_state = lock->get_state();
4129
4130 switch (lock->get_state()) {
4131 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4132 case LOCK_XSYN:
4133 file_excl(static_cast<ScatterLock*>(lock), need_issue);
4134 if (lock->get_state() != LOCK_EXCL)
4135 return;
4136 // fall-thru
4137 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4138 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4139 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4140 break;
4141 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4142 default: ceph_abort();
4143 }
4144
4145 int gather = 0;
4146 if (lock->is_leased()) {
4147 gather++;
4148 revoke_client_leases(lock);
4149 }
4150 if (lock->is_rdlocked())
4151 gather++;
4152 if (in && in->is_head()) {
4153 if (in->issued_caps_need_gather(lock)) {
4154 if (need_issue)
4155 *need_issue = true;
4156 else
4157 issue_caps(in);
4158 gather++;
4159 }
4160 }
4161
4162 bool need_recover = false;
4163 if (lock->get_type() == CEPH_LOCK_IFILE) {
4164 assert(in);
4165 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4166 mds->mdcache->queue_file_recover(in);
4167 need_recover = true;
4168 gather++;
4169 }
4170 }
4171
4172 if (lock->get_parent()->is_replicated() &&
4173 lock->get_state() == LOCK_MIX_LOCK &&
4174 gather) {
4175 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4176 } else {
4177 // move to second stage of gather now, so we don't send the lock action later.
4178 if (lock->get_state() == LOCK_MIX_LOCK)
4179 lock->set_state(LOCK_MIX_LOCK2);
4180
4181 if (lock->get_parent()->is_replicated() &&
4182 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4183 gather++;
4184 send_lock_message(lock, LOCK_AC_LOCK);
4185 lock->init_gather();
4186 }
4187 }
4188
4189 if (!gather && lock->is_dirty()) {
4190 lock->get_parent()->auth_pin(lock);
4191 scatter_writebehind(static_cast<ScatterLock*>(lock));
4192 mds->mdlog->flush();
4193 return;
4194 }
4195
4196 if (gather) {
4197 lock->get_parent()->auth_pin(lock);
4198 if (need_recover)
4199 mds->mdcache->do_file_recover();
4200 } else {
4201 lock->set_state(LOCK_LOCK);
4202 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4203 }
4204 }
4205
4206
4207 void Locker::simple_xlock(SimpleLock *lock)
4208 {
4209 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4210 assert(lock->get_parent()->is_auth());
4211 //assert(lock->is_stable());
4212 assert(lock->get_state() != LOCK_XLOCK);
4213
4214 CInode *in = 0;
4215 if (lock->get_cap_shift())
4216 in = static_cast<CInode *>(lock->get_parent());
4217
4218 if (lock->is_stable())
4219 lock->get_parent()->auth_pin(lock);
4220
4221 switch (lock->get_state()) {
4222 case LOCK_LOCK:
4223 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4224 default: ceph_abort();
4225 }
4226
4227 int gather = 0;
4228 if (lock->is_rdlocked())
4229 gather++;
4230 if (lock->is_wrlocked())
4231 gather++;
4232
4233 if (in && in->is_head()) {
4234 if (in->issued_caps_need_gather(lock)) {
4235 issue_caps(in);
4236 gather++;
4237 }
4238 }
4239
4240 if (!gather) {
4241 lock->set_state(LOCK_PREXLOCK);
4242 //assert("shouldn't be called if we are already xlockable" == 0);
4243 }
4244 }
4245
4246
4247
4248
4249
4250 // ==========================================================================
4251 // scatter lock
4252
4253 /*
4254
4255 Some notes on scatterlocks.
4256
4257 - The scatter/gather is driven by the inode lock. The scatter always
4258 brings in the latest metadata from the fragments.
4259
4260 - When in a scattered/MIX state, fragments are only allowed to
4261 update/be written to if the accounted stat matches the inode's
4262 current version.
4263
4264 - That means, on gather, we _only_ assimilate diffs for frag metadata
4265 that match the current version, because those are the only ones
4266 written during this scatter/gather cycle. (Others didn't permit
4267 it.) We increment the version and journal this to disk.
4268
4269 - When possible, we also simultaneously update our local frag
4270 accounted stats to match.
4271
4272 - On scatter, the new inode info is broadcast to frags, both local
4273 and remote. If possible (auth and !frozen), the dirfrag auth
4274 should update the accounted state (if it isn't already up to date).
4275 Note that this may occur on both the local inode auth node and
4276 inode replicas, so there are two potential paths. If it is NOT
4277 possible, they need to mark_stale to prevent any possible writes.
4278
4279 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4280 only). Both are opportunities to update the frag accounted stats,
4281 even though only the MIX case is affected by a stale dirfrag.
4282
4283 - Because many scatter/gather cycles can potentially go by without a
4284 frag being able to update its accounted stats (due to being frozen
4285 by exports/refragments in progress), the frag may have (even very)
4286 old stat versions. That's fine. If when we do want to update it,
4287 we can update accounted_* and the version first.
4288
4289 */
4290
4291 class C_Locker_ScatterWB : public LockerLogContext {
4292 ScatterLock *lock;
4293 MutationRef mut;
4294 public:
4295 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4296 LockerLogContext(l), lock(sl), mut(m) {}
4297 void finish(int r) override {
4298 locker->scatter_writebehind_finish(lock, mut);
4299 }
4300 };
4301
4302 void Locker::scatter_writebehind(ScatterLock *lock)
4303 {
4304 CInode *in = static_cast<CInode*>(lock->get_parent());
4305 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4306
4307 // journal
4308 MutationRef mut(new MutationImpl());
4309 mut->ls = mds->mdlog->get_current_segment();
4310
4311 // forcefully take a wrlock
4312 lock->get_wrlock(true);
4313 mut->wrlocks.insert(lock);
4314 mut->locks.insert(lock);
4315
4316 in->pre_cow_old_inode(); // avoid cow mayhem
4317
4318 inode_t *pi = in->project_inode();
4319 pi->version = in->pre_dirty();
4320
4321 in->finish_scatter_gather_update(lock->get_type());
4322 lock->start_flush();
4323
4324 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4325 mds->mdlog->start_entry(le);
4326
4327 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4328 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4329
4330 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4331
4332 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4333 }
4334
4335 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4336 {
4337 CInode *in = static_cast<CInode*>(lock->get_parent());
4338 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4339 in->pop_and_dirty_projected_inode(mut->ls);
4340
4341 lock->finish_flush();
4342
4343 // if replicas may have flushed in a mix->lock state, send another
4344 // message so they can finish_flush().
4345 if (in->is_replicated()) {
4346 switch (lock->get_state()) {
4347 case LOCK_MIX_LOCK:
4348 case LOCK_MIX_LOCK2:
4349 case LOCK_MIX_EXCL:
4350 case LOCK_MIX_TSYN:
4351 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4352 }
4353 }
4354
4355 mut->apply();
4356 drop_locks(mut.get());
4357 mut->cleanup();
4358
4359 if (lock->is_stable())
4360 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4361
4362 //scatter_eval_gather(lock);
4363 }
4364
4365 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4366 {
4367 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4368
4369 assert(lock->get_parent()->is_auth());
4370 assert(lock->is_stable());
4371
4372 if (lock->get_parent()->is_freezing_or_frozen()) {
4373 dout(20) << " freezing|frozen" << dendl;
4374 return;
4375 }
4376
4377 if (mdcache->is_readonly()) {
4378 if (lock->get_state() != LOCK_SYNC) {
4379 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4380 simple_sync(lock, need_issue);
4381 }
4382 return;
4383 }
4384
4385 if (!lock->is_rdlocked() &&
4386 lock->get_state() != LOCK_MIX &&
4387 lock->get_scatter_wanted()) {
4388 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4389 << " on " << *lock->get_parent() << dendl;
4390 scatter_mix(lock, need_issue);
4391 return;
4392 }
4393
4394 if (lock->get_type() == CEPH_LOCK_INEST) {
4395 // in general, we want to keep INEST writable at all times.
4396 if (!lock->is_rdlocked()) {
4397 if (lock->get_parent()->is_replicated()) {
4398 if (lock->get_state() != LOCK_MIX)
4399 scatter_mix(lock, need_issue);
4400 } else {
4401 if (lock->get_state() != LOCK_LOCK)
4402 simple_lock(lock, need_issue);
4403 }
4404 }
4405 return;
4406 }
4407
4408 CInode *in = static_cast<CInode*>(lock->get_parent());
4409 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4410 // i _should_ be sync.
4411 if (!lock->is_wrlocked() &&
4412 lock->get_state() != LOCK_SYNC) {
4413 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4414 simple_sync(lock, need_issue);
4415 }
4416 }
4417 }
4418
4419
4420 /*
4421 * mark a scatterlock to indicate that the dir fnode has some dirty data
4422 */
4423 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4424 {
4425 lock->mark_dirty();
4426 if (lock->get_updated_item()->is_on_list()) {
4427 dout(10) << "mark_updated_scatterlock " << *lock
4428 << " - already on list since " << lock->get_update_stamp() << dendl;
4429 } else {
4430 updated_scatterlocks.push_back(lock->get_updated_item());
4431 utime_t now = ceph_clock_now();
4432 lock->set_update_stamp(now);
4433 dout(10) << "mark_updated_scatterlock " << *lock
4434 << " - added at " << now << dendl;
4435 }
4436 }
4437
4438 /*
4439 * this is called by scatter_tick and LogSegment::try_to_trim() when
4440 * trying to flush dirty scattered data (i.e. updated fnode) back to
4441 * the inode.
4442 *
4443 * we need to lock|scatter in order to push fnode changes into the
4444 * inode.dirstat.
4445 */
4446 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4447 {
4448 CInode *p = static_cast<CInode *>(lock->get_parent());
4449
4450 if (p->is_frozen() || p->is_freezing()) {
4451 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4452 if (c)
4453 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4454 else
4455 // just requeue. not ideal.. starvation prone..
4456 updated_scatterlocks.push_back(lock->get_updated_item());
4457 return;
4458 }
4459
4460 if (p->is_ambiguous_auth()) {
4461 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4462 if (c)
4463 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4464 else
4465 // just requeue. not ideal.. starvation prone..
4466 updated_scatterlocks.push_back(lock->get_updated_item());
4467 return;
4468 }
4469
4470 if (p->is_auth()) {
4471 int count = 0;
4472 while (true) {
4473 if (lock->is_stable()) {
4474 // can we do it now?
4475 // (only if we're not replicated.. if we are, we really do need
4476 // to nudge the lock state!)
4477 /*
4478 actually, even if we're not replicated, we can't stay in MIX, because another mds
4479 could discover and replicate us at any time. if that happens while we're flushing,
4480 they end up in MIX but their inode has the old scatterstat version.
4481
4482 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4483 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4484 scatter_writebehind(lock);
4485 if (c)
4486 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4487 return;
4488 }
4489 */
4490
4491 if (mdcache->is_readonly()) {
4492 if (lock->get_state() != LOCK_SYNC) {
4493 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4494 simple_sync(static_cast<ScatterLock*>(lock));
4495 }
4496 break;
4497 }
4498
4499 // adjust lock state
4500 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4501 switch (lock->get_type()) {
4502 case CEPH_LOCK_IFILE:
4503 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4504 scatter_mix(static_cast<ScatterLock*>(lock));
4505 else if (lock->get_state() != LOCK_LOCK)
4506 simple_lock(static_cast<ScatterLock*>(lock));
4507 else
4508 simple_sync(static_cast<ScatterLock*>(lock));
4509 break;
4510
4511 case CEPH_LOCK_IDFT:
4512 case CEPH_LOCK_INEST:
4513 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4514 scatter_mix(lock);
4515 else if (lock->get_state() != LOCK_LOCK)
4516 simple_lock(lock);
4517 else
4518 simple_sync(lock);
4519 break;
4520 default:
4521 ceph_abort();
4522 }
4523 ++count;
4524 if (lock->is_stable() && count == 2) {
4525 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4526 // this should only realy happen when called via
4527 // handle_file_lock due to AC_NUDGE, because the rest of the
4528 // time we are replicated or have dirty data and won't get
4529 // called. bailing here avoids an infinite loop.
4530 assert(!c);
4531 break;
4532 }
4533 } else {
4534 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4535 if (c)
4536 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4537 return;
4538 }
4539 }
4540 } else {
4541 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4542 << *lock << " on " << *p << dendl;
4543 // request unscatter?
4544 mds_rank_t auth = lock->get_parent()->authority().first;
4545 if (!mds->is_cluster_degraded() ||
4546 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4547 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4548
4549 // wait...
4550 if (c)
4551 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4552
4553 // also, requeue, in case we had wrong auth or something
4554 updated_scatterlocks.push_back(lock->get_updated_item());
4555 }
4556 }
4557
4558 void Locker::scatter_tick()
4559 {
4560 dout(10) << "scatter_tick" << dendl;
4561
4562 // updated
4563 utime_t now = ceph_clock_now();
4564 int n = updated_scatterlocks.size();
4565 while (!updated_scatterlocks.empty()) {
4566 ScatterLock *lock = updated_scatterlocks.front();
4567
4568 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4569
4570 if (!lock->is_dirty()) {
4571 updated_scatterlocks.pop_front();
4572 dout(10) << " removing from updated_scatterlocks "
4573 << *lock << " " << *lock->get_parent() << dendl;
4574 continue;
4575 }
4576 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4577 break;
4578 updated_scatterlocks.pop_front();
4579 scatter_nudge(lock, 0);
4580 }
4581 mds->mdlog->flush();
4582 }
4583
4584
4585 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4586 {
4587 dout(10) << "scatter_tempsync " << *lock
4588 << " on " << *lock->get_parent() << dendl;
4589 assert(lock->get_parent()->is_auth());
4590 assert(lock->is_stable());
4591
4592 assert(0 == "not fully implemented, at least not for filelock");
4593
4594 CInode *in = static_cast<CInode *>(lock->get_parent());
4595
4596 switch (lock->get_state()) {
4597 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4598 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4599 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4600 default: ceph_abort();
4601 }
4602
4603 int gather = 0;
4604 if (lock->is_wrlocked())
4605 gather++;
4606
4607 if (lock->get_cap_shift() &&
4608 in->is_head() &&
4609 in->issued_caps_need_gather(lock)) {
4610 if (need_issue)
4611 *need_issue = true;
4612 else
4613 issue_caps(in);
4614 gather++;
4615 }
4616
4617 if (lock->get_state() == LOCK_MIX_TSYN &&
4618 in->is_replicated()) {
4619 lock->init_gather();
4620 send_lock_message(lock, LOCK_AC_LOCK);
4621 gather++;
4622 }
4623
4624 if (gather) {
4625 in->auth_pin(lock);
4626 } else {
4627 // do tempsync
4628 lock->set_state(LOCK_TSYN);
4629 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4630 if (lock->get_cap_shift()) {
4631 if (need_issue)
4632 *need_issue = true;
4633 else
4634 issue_caps(in);
4635 }
4636 }
4637 }
4638
4639
4640
4641 // ==========================================================================
4642 // local lock
4643
4644 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4645 {
4646 dout(7) << "local_wrlock_grab on " << *lock
4647 << " on " << *lock->get_parent() << dendl;
4648
4649 assert(lock->get_parent()->is_auth());
4650 assert(lock->can_wrlock());
4651 assert(!mut->wrlocks.count(lock));
4652 lock->get_wrlock(mut->get_client());
4653 mut->wrlocks.insert(lock);
4654 mut->locks.insert(lock);
4655 }
4656
4657 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4658 {
4659 dout(7) << "local_wrlock_start on " << *lock
4660 << " on " << *lock->get_parent() << dendl;
4661
4662 assert(lock->get_parent()->is_auth());
4663 if (lock->can_wrlock()) {
4664 assert(!mut->wrlocks.count(lock));
4665 lock->get_wrlock(mut->get_client());
4666 mut->wrlocks.insert(lock);
4667 mut->locks.insert(lock);
4668 return true;
4669 } else {
4670 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4671 return false;
4672 }
4673 }
4674
4675 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4676 {
4677 dout(7) << "local_wrlock_finish on " << *lock
4678 << " on " << *lock->get_parent() << dendl;
4679 lock->put_wrlock();
4680 mut->wrlocks.erase(lock);
4681 mut->locks.erase(lock);
4682 if (lock->get_num_wrlocks() == 0) {
4683 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4684 SimpleLock::WAIT_WR |
4685 SimpleLock::WAIT_RD);
4686 }
4687 }
4688
4689 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4690 {
4691 dout(7) << "local_xlock_start on " << *lock
4692 << " on " << *lock->get_parent() << dendl;
4693
4694 assert(lock->get_parent()->is_auth());
4695 if (!lock->can_xlock_local()) {
4696 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4697 return false;
4698 }
4699
4700 lock->get_xlock(mut, mut->get_client());
4701 mut->xlocks.insert(lock);
4702 mut->locks.insert(lock);
4703 return true;
4704 }
4705
4706 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4707 {
4708 dout(7) << "local_xlock_finish on " << *lock
4709 << " on " << *lock->get_parent() << dendl;
4710 lock->put_xlock();
4711 mut->xlocks.erase(lock);
4712 mut->locks.erase(lock);
4713
4714 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4715 SimpleLock::WAIT_WR |
4716 SimpleLock::WAIT_RD);
4717 }
4718
4719
4720
4721 // ==========================================================================
4722 // file lock
4723
4724
4725 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4726 {
4727 CInode *in = static_cast<CInode*>(lock->get_parent());
4728 int loner_wanted, other_wanted;
4729 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4730 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4731 << " loner_wanted=" << gcap_string(loner_wanted)
4732 << " other_wanted=" << gcap_string(other_wanted)
4733 << " filelock=" << *lock << " on " << *lock->get_parent()
4734 << dendl;
4735
4736 assert(lock->get_parent()->is_auth());
4737 assert(lock->is_stable());
4738
4739 if (lock->get_parent()->is_freezing_or_frozen())
4740 return;
4741
4742 if (mdcache->is_readonly()) {
4743 if (lock->get_state() != LOCK_SYNC) {
4744 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4745 simple_sync(lock, need_issue);
4746 }
4747 return;
4748 }
4749
4750 // excl -> *?
4751 if (lock->get_state() == LOCK_EXCL) {
4752 dout(20) << " is excl" << dendl;
4753 int loner_issued, other_issued, xlocker_issued;
4754 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4755 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4756 << " other_issued=" << gcap_string(other_issued)
4757 << " xlocker_issued=" << gcap_string(xlocker_issued)
4758 << dendl;
4759 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4760 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4761 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4762 dout(20) << " should lose it" << dendl;
4763 // we should lose it.
4764 // loner other want
4765 // R R SYNC
4766 // R R|W MIX
4767 // R W MIX
4768 // R|W R MIX
4769 // R|W R|W MIX
4770 // R|W W MIX
4771 // W R MIX
4772 // W R|W MIX
4773 // W W MIX
4774 // -> any writer means MIX; RD doesn't matter.
4775 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4776 lock->is_waiter_for(SimpleLock::WAIT_WR))
4777 scatter_mix(lock, need_issue);
4778 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4779 simple_sync(lock, need_issue);
4780 else
4781 dout(10) << " waiting for wrlock to drain" << dendl;
4782 }
4783 }
4784
4785 // * -> excl?
4786 else if (lock->get_state() != LOCK_EXCL &&
4787 !lock->is_rdlocked() &&
4788 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4789 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4790 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4791 in->get_target_loner() >= 0) {
4792 dout(7) << "file_eval stable, bump to loner " << *lock
4793 << " on " << *lock->get_parent() << dendl;
4794 file_excl(lock, need_issue);
4795 }
4796
4797 // * -> mixed?
4798 else if (lock->get_state() != LOCK_MIX &&
4799 !lock->is_rdlocked() &&
4800 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4801 (lock->get_scatter_wanted() ||
4802 (in->get_wanted_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4803 dout(7) << "file_eval stable, bump to mixed " << *lock
4804 << " on " << *lock->get_parent() << dendl;
4805 scatter_mix(lock, need_issue);
4806 }
4807
4808 // * -> sync?
4809 else if (lock->get_state() != LOCK_SYNC &&
4810 !lock->is_wrlocked() && // drain wrlocks first!
4811 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4812 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4813 !((lock->get_state() == LOCK_MIX) &&
4814 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4815 //((wanted & CEPH_CAP_RD) ||
4816 //in->is_replicated() ||
4817 //lock->get_num_client_lease() ||
4818 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4819 ) {
4820 dout(7) << "file_eval stable, bump to sync " << *lock
4821 << " on " << *lock->get_parent() << dendl;
4822 simple_sync(lock, need_issue);
4823 }
4824 }
4825
4826
4827
4828 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4829 {
4830 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4831
4832 CInode *in = static_cast<CInode*>(lock->get_parent());
4833 assert(in->is_auth());
4834 assert(lock->is_stable());
4835
4836 if (lock->get_state() == LOCK_LOCK) {
4837 in->start_scatter(lock);
4838 if (in->is_replicated()) {
4839 // data
4840 bufferlist softdata;
4841 lock->encode_locked_state(softdata);
4842
4843 // bcast to replicas
4844 send_lock_message(lock, LOCK_AC_MIX, softdata);
4845 }
4846
4847 // change lock
4848 lock->set_state(LOCK_MIX);
4849 lock->clear_scatter_wanted();
4850 if (lock->get_cap_shift()) {
4851 if (need_issue)
4852 *need_issue = true;
4853 else
4854 issue_caps(in);
4855 }
4856 } else {
4857 // gather?
4858 switch (lock->get_state()) {
4859 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4860 case LOCK_XSYN:
4861 file_excl(lock, need_issue);
4862 if (lock->get_state() != LOCK_EXCL)
4863 return;
4864 // fall-thru
4865 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4866 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4867 default: ceph_abort();
4868 }
4869
4870 int gather = 0;
4871 if (lock->is_rdlocked())
4872 gather++;
4873 if (in->is_replicated()) {
4874 if (lock->get_state() != LOCK_EXCL_MIX && // EXCL replica is already LOCK
4875 lock->get_state() != LOCK_XSYN_EXCL) { // XSYN replica is already LOCK; ** FIXME here too!
4876 send_lock_message(lock, LOCK_AC_MIX);
4877 lock->init_gather();
4878 gather++;
4879 }
4880 }
4881 if (lock->is_leased()) {
4882 revoke_client_leases(lock);
4883 gather++;
4884 }
4885 if (lock->get_cap_shift() &&
4886 in->is_head() &&
4887 in->issued_caps_need_gather(lock)) {
4888 if (need_issue)
4889 *need_issue = true;
4890 else
4891 issue_caps(in);
4892 gather++;
4893 }
4894 bool need_recover = false;
4895 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4896 mds->mdcache->queue_file_recover(in);
4897 need_recover = true;
4898 gather++;
4899 }
4900
4901 if (gather) {
4902 lock->get_parent()->auth_pin(lock);
4903 if (need_recover)
4904 mds->mdcache->do_file_recover();
4905 } else {
4906 in->start_scatter(lock);
4907 lock->set_state(LOCK_MIX);
4908 lock->clear_scatter_wanted();
4909 if (in->is_replicated()) {
4910 bufferlist softdata;
4911 lock->encode_locked_state(softdata);
4912 send_lock_message(lock, LOCK_AC_MIX, softdata);
4913 }
4914 if (lock->get_cap_shift()) {
4915 if (need_issue)
4916 *need_issue = true;
4917 else
4918 issue_caps(in);
4919 }
4920 }
4921 }
4922 }
4923
4924
4925 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4926 {
4927 CInode *in = static_cast<CInode*>(lock->get_parent());
4928 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4929
4930 assert(in->is_auth());
4931 assert(lock->is_stable());
4932
4933 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4934 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4935
4936 switch (lock->get_state()) {
4937 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4938 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4939 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4940 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4941 default: ceph_abort();
4942 }
4943 int gather = 0;
4944
4945 if (lock->is_rdlocked())
4946 gather++;
4947 if (lock->is_wrlocked())
4948 gather++;
4949
4950 if (in->is_replicated() &&
4951 lock->get_state() != LOCK_LOCK_EXCL &&
4952 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4953 send_lock_message(lock, LOCK_AC_LOCK);
4954 lock->init_gather();
4955 gather++;
4956 }
4957 if (lock->is_leased()) {
4958 revoke_client_leases(lock);
4959 gather++;
4960 }
4961 if (in->is_head() &&
4962 in->issued_caps_need_gather(lock)) {
4963 if (need_issue)
4964 *need_issue = true;
4965 else
4966 issue_caps(in);
4967 gather++;
4968 }
4969 bool need_recover = false;
4970 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4971 mds->mdcache->queue_file_recover(in);
4972 need_recover = true;
4973 gather++;
4974 }
4975
4976 if (gather) {
4977 lock->get_parent()->auth_pin(lock);
4978 if (need_recover)
4979 mds->mdcache->do_file_recover();
4980 } else {
4981 lock->set_state(LOCK_EXCL);
4982 if (need_issue)
4983 *need_issue = true;
4984 else
4985 issue_caps(in);
4986 }
4987 }
4988
4989 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
4990 {
4991 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
4992 CInode *in = static_cast<CInode *>(lock->get_parent());
4993 assert(in->is_auth());
4994 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
4995
4996 switch (lock->get_state()) {
4997 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
4998 default: ceph_abort();
4999 }
5000
5001 int gather = 0;
5002 if (lock->is_wrlocked())
5003 gather++;
5004
5005 if (in->is_head() &&
5006 in->issued_caps_need_gather(lock)) {
5007 if (need_issue)
5008 *need_issue = true;
5009 else
5010 issue_caps(in);
5011 gather++;
5012 }
5013
5014 if (gather) {
5015 lock->get_parent()->auth_pin(lock);
5016 } else {
5017 lock->set_state(LOCK_XSYN);
5018 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5019 if (need_issue)
5020 *need_issue = true;
5021 else
5022 issue_caps(in);
5023 }
5024 }
5025
5026 void Locker::file_recover(ScatterLock *lock)
5027 {
5028 CInode *in = static_cast<CInode *>(lock->get_parent());
5029 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5030
5031 assert(in->is_auth());
5032 //assert(lock->is_stable());
5033 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5034
5035 int gather = 0;
5036
5037 /*
5038 if (in->is_replicated()
5039 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5040 send_lock_message(lock, LOCK_AC_LOCK);
5041 lock->init_gather();
5042 gather++;
5043 }
5044 */
5045 if (in->is_head() &&
5046 in->issued_caps_need_gather(lock)) {
5047 issue_caps(in);
5048 gather++;
5049 }
5050
5051 lock->set_state(LOCK_SCAN);
5052 if (gather)
5053 in->state_set(CInode::STATE_NEEDSRECOVER);
5054 else
5055 mds->mdcache->queue_file_recover(in);
5056 }
5057
5058
5059 // messenger
5060 /* This function DOES put the passed message before returning */
5061 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5062 {
5063 CInode *in = static_cast<CInode*>(lock->get_parent());
5064 int from = m->get_asker();
5065
5066 if (mds->is_rejoin()) {
5067 if (in->is_rejoining()) {
5068 dout(7) << "handle_file_lock still rejoining " << *in
5069 << ", dropping " << *m << dendl;
5070 m->put();
5071 return;
5072 }
5073 }
5074
5075 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5076 << " on " << *lock
5077 << " from mds." << from << " "
5078 << *in << dendl;
5079
5080 bool caps = lock->get_cap_shift();
5081
5082 switch (m->get_action()) {
5083 // -- replica --
5084 case LOCK_AC_SYNC:
5085 assert(lock->get_state() == LOCK_LOCK ||
5086 lock->get_state() == LOCK_MIX ||
5087 lock->get_state() == LOCK_MIX_SYNC2);
5088
5089 if (lock->get_state() == LOCK_MIX) {
5090 lock->set_state(LOCK_MIX_SYNC);
5091 eval_gather(lock, true);
5092 if (lock->is_unstable_and_locked())
5093 mds->mdlog->flush();
5094 break;
5095 }
5096
5097 (static_cast<ScatterLock *>(lock))->finish_flush();
5098 (static_cast<ScatterLock *>(lock))->clear_flushed();
5099
5100 // ok
5101 lock->decode_locked_state(m->get_data());
5102 lock->set_state(LOCK_SYNC);
5103
5104 lock->get_rdlock();
5105 if (caps)
5106 issue_caps(in);
5107 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5108 lock->put_rdlock();
5109 break;
5110
5111 case LOCK_AC_LOCK:
5112 switch (lock->get_state()) {
5113 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5114 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5115 default: ceph_abort();
5116 }
5117
5118 eval_gather(lock, true);
5119 if (lock->is_unstable_and_locked())
5120 mds->mdlog->flush();
5121
5122 break;
5123
5124 case LOCK_AC_LOCKFLUSHED:
5125 (static_cast<ScatterLock *>(lock))->finish_flush();
5126 (static_cast<ScatterLock *>(lock))->clear_flushed();
5127 // wake up scatter_nudge waiters
5128 if (lock->is_stable())
5129 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5130 break;
5131
5132 case LOCK_AC_MIX:
5133 assert(lock->get_state() == LOCK_SYNC ||
5134 lock->get_state() == LOCK_LOCK ||
5135 lock->get_state() == LOCK_SYNC_MIX2);
5136
5137 if (lock->get_state() == LOCK_SYNC) {
5138 // MIXED
5139 lock->set_state(LOCK_SYNC_MIX);
5140 eval_gather(lock, true);
5141 if (lock->is_unstable_and_locked())
5142 mds->mdlog->flush();
5143 break;
5144 }
5145
5146 // ok
5147 lock->decode_locked_state(m->get_data());
5148 lock->set_state(LOCK_MIX);
5149
5150 if (caps)
5151 issue_caps(in);
5152
5153 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5154 break;
5155
5156
5157 // -- auth --
5158 case LOCK_AC_LOCKACK:
5159 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5160 lock->get_state() == LOCK_MIX_LOCK ||
5161 lock->get_state() == LOCK_MIX_LOCK2 ||
5162 lock->get_state() == LOCK_MIX_EXCL ||
5163 lock->get_state() == LOCK_SYNC_EXCL ||
5164 lock->get_state() == LOCK_SYNC_MIX ||
5165 lock->get_state() == LOCK_MIX_TSYN);
5166 assert(lock->is_gathering(from));
5167 lock->remove_gather(from);
5168
5169 if (lock->get_state() == LOCK_MIX_LOCK ||
5170 lock->get_state() == LOCK_MIX_LOCK2 ||
5171 lock->get_state() == LOCK_MIX_EXCL ||
5172 lock->get_state() == LOCK_MIX_TSYN) {
5173 lock->decode_locked_state(m->get_data());
5174 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5175 // delay calling scatter_writebehind().
5176 lock->clear_flushed();
5177 }
5178
5179 if (lock->is_gathering()) {
5180 dout(7) << "handle_file_lock " << *in << " from " << from
5181 << ", still gathering " << lock->get_gather_set() << dendl;
5182 } else {
5183 dout(7) << "handle_file_lock " << *in << " from " << from
5184 << ", last one" << dendl;
5185 eval_gather(lock);
5186 }
5187 break;
5188
5189 case LOCK_AC_SYNCACK:
5190 assert(lock->get_state() == LOCK_MIX_SYNC);
5191 assert(lock->is_gathering(from));
5192 lock->remove_gather(from);
5193
5194 lock->decode_locked_state(m->get_data());
5195
5196 if (lock->is_gathering()) {
5197 dout(7) << "handle_file_lock " << *in << " from " << from
5198 << ", still gathering " << lock->get_gather_set() << dendl;
5199 } else {
5200 dout(7) << "handle_file_lock " << *in << " from " << from
5201 << ", last one" << dendl;
5202 eval_gather(lock);
5203 }
5204 break;
5205
5206 case LOCK_AC_MIXACK:
5207 assert(lock->get_state() == LOCK_SYNC_MIX);
5208 assert(lock->is_gathering(from));
5209 lock->remove_gather(from);
5210
5211 if (lock->is_gathering()) {
5212 dout(7) << "handle_file_lock " << *in << " from " << from
5213 << ", still gathering " << lock->get_gather_set() << dendl;
5214 } else {
5215 dout(7) << "handle_file_lock " << *in << " from " << from
5216 << ", last one" << dendl;
5217 eval_gather(lock);
5218 }
5219 break;
5220
5221
5222 // requests....
5223 case LOCK_AC_REQSCATTER:
5224 if (lock->is_stable()) {
5225 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5226 * because the replica should be holding an auth_pin if they're
5227 * doing this (and thus, we are freezing, not frozen, and indefinite
5228 * starvation isn't an issue).
5229 */
5230 dout(7) << "handle_file_lock got scatter request on " << *lock
5231 << " on " << *lock->get_parent() << dendl;
5232 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5233 scatter_mix(lock);
5234 } else {
5235 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5236 << " on " << *lock->get_parent() << dendl;
5237 lock->set_scatter_wanted();
5238 }
5239 break;
5240
5241 case LOCK_AC_REQUNSCATTER:
5242 if (lock->is_stable()) {
5243 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5244 * because the replica should be holding an auth_pin if they're
5245 * doing this (and thus, we are freezing, not frozen, and indefinite
5246 * starvation isn't an issue).
5247 */
5248 dout(7) << "handle_file_lock got unscatter request on " << *lock
5249 << " on " << *lock->get_parent() << dendl;
5250 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5251 simple_lock(lock); // FIXME tempsync?
5252 } else {
5253 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5254 << " on " << *lock->get_parent() << dendl;
5255 lock->set_unscatter_wanted();
5256 }
5257 break;
5258
5259 case LOCK_AC_REQRDLOCK:
5260 handle_reqrdlock(lock, m);
5261 break;
5262
5263 case LOCK_AC_NUDGE:
5264 if (!lock->get_parent()->is_auth()) {
5265 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5266 << " on " << *lock->get_parent() << dendl;
5267 } else if (!lock->get_parent()->is_replicated()) {
5268 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5269 << " on " << *lock->get_parent() << dendl;
5270 } else {
5271 dout(7) << "handle_file_lock trying nudge on " << *lock
5272 << " on " << *lock->get_parent() << dendl;
5273 scatter_nudge(lock, 0, true);
5274 mds->mdlog->flush();
5275 }
5276 break;
5277
5278 default:
5279 ceph_abort();
5280 }
5281
5282 m->put();
5283 }
5284
5285
5286
5287
5288
5289