]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include "MDSRank.h"
17 #include "MDCache.h"
18 #include "Locker.h"
19 #include "CInode.h"
20 #include "CDir.h"
21 #include "CDentry.h"
22 #include "Mutation.h"
23 #include "MDSContext.h"
24
25 #include "MDLog.h"
26 #include "MDSMap.h"
27
28 #include "events/EUpdate.h"
29 #include "events/EOpen.h"
30
31 #include "msg/Messenger.h"
32 #include "osdc/Objecter.h"
33
34 #include "messages/MInodeFileCaps.h"
35 #include "messages/MLock.h"
36 #include "messages/MClientLease.h"
37 #include "messages/MClientReply.h"
38 #include "messages/MClientCaps.h"
39 #include "messages/MClientCapRelease.h"
40
41 #include "messages/MMDSSlaveRequest.h"
42
43 #include <errno.h>
44
45 #include "common/config.h"
46
47
48 #define dout_subsys ceph_subsys_mds
49 #undef dout_prefix
50 #define dout_context g_ceph_context
51 #define dout_prefix _prefix(_dout, mds)
52 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
53 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
54 }
55
56
57 class LockerContext : public MDSInternalContextBase {
58 protected:
59 Locker *locker;
60 MDSRank *get_mds() override
61 {
62 return locker->mds;
63 }
64
65 public:
66 explicit LockerContext(Locker *locker_) : locker(locker_) {
67 assert(locker != NULL);
68 }
69 };
70
71 class LockerLogContext : public MDSLogContextBase {
72 protected:
73 Locker *locker;
74 MDSRank *get_mds() override
75 {
76 return locker->mds;
77 }
78
79 public:
80 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
81 assert(locker != NULL);
82 }
83 };
84
85 /* This function DOES put the passed message before returning */
86 void Locker::dispatch(Message *m)
87 {
88
89 switch (m->get_type()) {
90
91 // inter-mds locking
92 case MSG_MDS_LOCK:
93 handle_lock(static_cast<MLock*>(m));
94 break;
95 // inter-mds caps
96 case MSG_MDS_INODEFILECAPS:
97 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
98 break;
99
100 // client sync
101 case CEPH_MSG_CLIENT_CAPS:
102 handle_client_caps(static_cast<MClientCaps*>(m));
103
104 break;
105 case CEPH_MSG_CLIENT_CAPRELEASE:
106 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
107 break;
108 case CEPH_MSG_CLIENT_LEASE:
109 handle_client_lease(static_cast<MClientLease*>(m));
110 break;
111
112 default:
113 derr << "locker unknown message " << m->get_type() << dendl;
114 assert(0 == "locker unknown message");
115 }
116 }
117
118 void Locker::tick()
119 {
120 scatter_tick();
121 caps_tick();
122 }
123
124 /*
125 * locks vs rejoin
126 *
127 *
128 *
129 */
130
131 void Locker::send_lock_message(SimpleLock *lock, int msg)
132 {
133 for (const auto &it : lock->get_parent()->get_replicas()) {
134 if (mds->is_cluster_degraded() &&
135 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
136 continue;
137 MLock *m = new MLock(lock, msg, mds->get_nodeid());
138 mds->send_message_mds(m, it.first);
139 }
140 }
141
142 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
143 {
144 for (const auto &it : lock->get_parent()->get_replicas()) {
145 if (mds->is_cluster_degraded() &&
146 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
147 continue;
148 MLock *m = new MLock(lock, msg, mds->get_nodeid());
149 m->set_data(data);
150 mds->send_message_mds(m, it.first);
151 }
152 }
153
154
155
156
157 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
158 {
159 // rdlock ancestor snaps
160 CInode *t = in;
161 rdlocks.insert(&in->snaplock);
162 while (t->get_projected_parent_dn()) {
163 t = t->get_projected_parent_dn()->get_dir()->get_inode();
164 rdlocks.insert(&t->snaplock);
165 }
166 }
167
168 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
169 file_layout_t **layout)
170 {
171 //rdlock ancestor snaps
172 CInode *t = in;
173 rdlocks.insert(&in->snaplock);
174 rdlocks.insert(&in->policylock);
175 bool found_layout = false;
176 while (t) {
177 rdlocks.insert(&t->snaplock);
178 if (!found_layout) {
179 rdlocks.insert(&t->policylock);
180 if (t->get_projected_inode()->has_layout()) {
181 *layout = &t->get_projected_inode()->layout;
182 found_layout = true;
183 }
184 }
185 if (t->get_projected_parent_dn() &&
186 t->get_projected_parent_dn()->get_dir())
187 t = t->get_projected_parent_dn()->get_dir()->get_inode();
188 else t = NULL;
189 }
190 }
191
192 struct MarkEventOnDestruct {
193 MDRequestRef& mdr;
194 const char* message;
195 bool mark_event;
196 MarkEventOnDestruct(MDRequestRef& _mdr,
197 const char *_message) : mdr(_mdr),
198 message(_message),
199 mark_event(true) {}
200 ~MarkEventOnDestruct() {
201 if (mark_event)
202 mdr->mark_event(message);
203 }
204 };
205
206 /* If this function returns false, the mdr has been placed
207 * on the appropriate wait list */
208 bool Locker::acquire_locks(MDRequestRef& mdr,
209 set<SimpleLock*> &rdlocks,
210 set<SimpleLock*> &wrlocks,
211 set<SimpleLock*> &xlocks,
212 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
213 CInode *auth_pin_freeze,
214 bool auth_pin_nonblock)
215 {
216 if (mdr->done_locking &&
217 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
218 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
219 return true; // at least we had better be!
220 }
221 dout(10) << "acquire_locks " << *mdr << dendl;
222
223 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
224
225 client_t client = mdr->get_client();
226
227 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
228 set<MDSCacheObject*> mustpin; // items to authpin
229
230 // xlocks
231 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
232 SimpleLock *lock = *p;
233
234 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
235 lock->get_type() == CEPH_LOCK_IPOLICY) &&
236 mds->is_cluster_degraded() &&
237 mdr->is_master() &&
238 !mdr->is_queued_for_replay()) {
239 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
240 // get processed in proper order.
241 bool wait = false;
242 if (lock->get_parent()->is_auth()) {
243 if (!mdr->locks.count(lock)) {
244 set<mds_rank_t> ls;
245 lock->get_parent()->list_replicas(ls);
246 for (auto m : ls) {
247 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
248 wait = true;
249 break;
250 }
251 }
252 }
253 } else {
254 // if the lock is the latest locked one, it's possible that slave mds got the lock
255 // while there are recovering mds.
256 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
257 wait = true;
258 }
259 if (wait) {
260 dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
261 << ", waiting for cluster recovered" << dendl;
262 mds->locker->drop_locks(mdr.get(), NULL);
263 mdr->drop_local_auth_pins();
264 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
265 return false;
266 }
267 }
268
269 dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
270
271 sorted.insert(lock);
272 mustpin.insert(lock->get_parent());
273
274 // augment xlock with a versionlock?
275 if ((*p)->get_type() == CEPH_LOCK_DN) {
276 CDentry *dn = (CDentry*)lock->get_parent();
277 if (!dn->is_auth())
278 continue;
279
280 if (xlocks.count(&dn->versionlock))
281 continue; // we're xlocking the versionlock too; don't wrlock it!
282
283 if (mdr->is_master()) {
284 // master. wrlock versionlock so we can pipeline dentry updates to journal.
285 wrlocks.insert(&dn->versionlock);
286 } else {
287 // slave. exclusively lock the dentry version (i.e. block other journal updates).
288 // this makes rollback safe.
289 xlocks.insert(&dn->versionlock);
290 sorted.insert(&dn->versionlock);
291 }
292 }
293 if (lock->get_type() > CEPH_LOCK_IVERSION) {
294 // inode version lock?
295 CInode *in = (CInode*)lock->get_parent();
296 if (!in->is_auth())
297 continue;
298 if (mdr->is_master()) {
299 // master. wrlock versionlock so we can pipeline inode updates to journal.
300 wrlocks.insert(&in->versionlock);
301 } else {
302 // slave. exclusively lock the inode version (i.e. block other journal updates).
303 // this makes rollback safe.
304 xlocks.insert(&in->versionlock);
305 sorted.insert(&in->versionlock);
306 }
307 }
308 }
309
310 // wrlocks
311 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
312 MDSCacheObject *object = (*p)->get_parent();
313 dout(20) << " must wrlock " << **p << " " << *object << dendl;
314 sorted.insert(*p);
315 if (object->is_auth())
316 mustpin.insert(object);
317 else if (!object->is_auth() &&
318 !(*p)->can_wrlock(client) && // we might have to request a scatter
319 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
320 dout(15) << " will also auth_pin " << *object
321 << " in case we need to request a scatter" << dendl;
322 mustpin.insert(object);
323 }
324 }
325
326 // remote_wrlocks
327 if (remote_wrlocks) {
328 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
329 MDSCacheObject *object = p->first->get_parent();
330 dout(20) << " must remote_wrlock on mds." << p->second << " "
331 << *p->first << " " << *object << dendl;
332 sorted.insert(p->first);
333 mustpin.insert(object);
334 }
335 }
336
337 // rdlocks
338 for (set<SimpleLock*>::iterator p = rdlocks.begin();
339 p != rdlocks.end();
340 ++p) {
341 MDSCacheObject *object = (*p)->get_parent();
342 dout(20) << " must rdlock " << **p << " " << *object << dendl;
343 sorted.insert(*p);
344 if (object->is_auth())
345 mustpin.insert(object);
346 else if (!object->is_auth() &&
347 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
348 dout(15) << " will also auth_pin " << *object
349 << " in case we need to request a rdlock" << dendl;
350 mustpin.insert(object);
351 }
352 }
353
354
355 // AUTH PINS
356 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
357
358 // can i auth pin them all now?
359 marker.message = "failed to authpin local pins";
360 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
361 p != mustpin.end();
362 ++p) {
363 MDSCacheObject *object = *p;
364
365 dout(10) << " must authpin " << *object << dendl;
366
367 if (mdr->is_auth_pinned(object)) {
368 if (object != (MDSCacheObject*)auth_pin_freeze)
369 continue;
370 if (mdr->more()->is_remote_frozen_authpin) {
371 if (mdr->more()->rename_inode == auth_pin_freeze)
372 continue;
373 // unfreeze auth pin for the wrong inode
374 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
375 }
376 }
377
378 if (!object->is_auth()) {
379 if (!mdr->locks.empty())
380 drop_locks(mdr.get());
381 if (object->is_ambiguous_auth()) {
382 // wait
383 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
384 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
385 mdr->drop_local_auth_pins();
386 return false;
387 }
388 mustpin_remote[object->authority().first].insert(object);
389 continue;
390 }
391 if (!object->can_auth_pin()) {
392 // wait
393 drop_locks(mdr.get());
394 mdr->drop_local_auth_pins();
395 if (auth_pin_nonblock) {
396 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
397 mdr->aborted = true;
398 return false;
399 }
400 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
401 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
402
403 if (!mdr->remote_auth_pins.empty())
404 notify_freeze_waiter(object);
405
406 return false;
407 }
408 }
409
410 // ok, grab local auth pins
411 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
412 p != mustpin.end();
413 ++p) {
414 MDSCacheObject *object = *p;
415 if (mdr->is_auth_pinned(object)) {
416 dout(10) << " already auth_pinned " << *object << dendl;
417 } else if (object->is_auth()) {
418 dout(10) << " auth_pinning " << *object << dendl;
419 mdr->auth_pin(object);
420 }
421 }
422
423 // request remote auth_pins
424 if (!mustpin_remote.empty()) {
425 marker.message = "requesting remote authpins";
426 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
427 p != mdr->remote_auth_pins.end();
428 ++p) {
429 if (mustpin.count(p->first)) {
430 assert(p->second == p->first->authority().first);
431 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
432 if (q != mustpin_remote.end())
433 q->second.insert(p->first);
434 }
435 }
436 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
437 p != mustpin_remote.end();
438 ++p) {
439 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
440
441 // wait for active auth
442 if (mds->is_cluster_degraded() &&
443 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
444 dout(10) << " mds." << p->first << " is not active" << dendl;
445 if (mdr->more()->waiting_on_slave.empty())
446 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
447 return false;
448 }
449
450 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
451 MMDSSlaveRequest::OP_AUTHPIN);
452 for (set<MDSCacheObject*>::iterator q = p->second.begin();
453 q != p->second.end();
454 ++q) {
455 dout(10) << " req remote auth_pin of " << **q << dendl;
456 MDSCacheObjectInfo info;
457 (*q)->set_object_info(info);
458 req->get_authpins().push_back(info);
459 if (*q == auth_pin_freeze)
460 (*q)->set_object_info(req->get_authpin_freeze());
461 mdr->pin(*q);
462 }
463 if (auth_pin_nonblock)
464 req->mark_nonblock();
465 mds->send_message_mds(req, p->first);
466
467 // put in waiting list
468 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
469 mdr->more()->waiting_on_slave.insert(p->first);
470 }
471 return false;
472 }
473
474 // caps i'll need to issue
475 set<CInode*> issue_set;
476 bool result = false;
477
478 // acquire locks.
479 // make sure they match currently acquired locks.
480 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
481 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
482 p != sorted.end();
483 ++p) {
484 bool need_wrlock = !!wrlocks.count(*p);
485 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
486
487 // already locked?
488 if (existing != mdr->locks.end() && *existing == *p) {
489 // right kind?
490 SimpleLock *have = *existing;
491 ++existing;
492 if (xlocks.count(have) && mdr->xlocks.count(have)) {
493 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
494 continue;
495 }
496 if (mdr->remote_wrlocks.count(have)) {
497 if (!need_remote_wrlock ||
498 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
499 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
500 << " " << *have << " " << *have->get_parent() << dendl;
501 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
502 }
503 }
504 if (need_wrlock || need_remote_wrlock) {
505 if (need_wrlock == !!mdr->wrlocks.count(have) &&
506 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
507 if (need_wrlock)
508 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
509 if (need_remote_wrlock)
510 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
511 continue;
512 }
513 }
514 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
515 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
516 continue;
517 }
518 }
519
520 // hose any stray locks
521 if (existing != mdr->locks.end() && *existing == *p) {
522 assert(need_wrlock || need_remote_wrlock);
523 SimpleLock *lock = *existing;
524 if (mdr->wrlocks.count(lock)) {
525 if (!need_wrlock)
526 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
527 else if (need_remote_wrlock) // acquire remote_wrlock first
528 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
529 bool need_issue = false;
530 wrlock_finish(lock, mdr.get(), &need_issue);
531 if (need_issue)
532 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
533 }
534 ++existing;
535 }
536 while (existing != mdr->locks.end()) {
537 SimpleLock *stray = *existing;
538 ++existing;
539 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
540 bool need_issue = false;
541 if (mdr->xlocks.count(stray)) {
542 xlock_finish(stray, mdr.get(), &need_issue);
543 } else if (mdr->rdlocks.count(stray)) {
544 rdlock_finish(stray, mdr.get(), &need_issue);
545 } else {
546 // may have acquired both wrlock and remore wrlock
547 if (mdr->wrlocks.count(stray))
548 wrlock_finish(stray, mdr.get(), &need_issue);
549 if (mdr->remote_wrlocks.count(stray))
550 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
551 }
552 if (need_issue)
553 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
554 }
555
556 // lock
557 if (mdr->locking && *p != mdr->locking) {
558 cancel_locking(mdr.get(), &issue_set);
559 }
560 if (xlocks.count(*p)) {
561 marker.message = "failed to xlock, waiting";
562 if (!xlock_start(*p, mdr))
563 goto out;
564 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
565 } else if (need_wrlock || need_remote_wrlock) {
566 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
567 marker.message = "waiting for remote wrlocks";
568 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
569 goto out;
570 }
571 if (need_wrlock && !mdr->wrlocks.count(*p)) {
572 marker.message = "failed to wrlock, waiting";
573 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
574 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
575 // can't take the wrlock because the scatter lock is gathering. need to
576 // release the remote wrlock, so that the gathering process can finish.
577 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
578 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
579 goto out;
580 }
581 // nowait if we have already gotten remote wrlock
582 if (!wrlock_start(*p, mdr, need_remote_wrlock))
583 goto out;
584 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
585 }
586 } else {
587 assert(mdr->is_master());
588 if ((*p)->needs_recover()) {
589 if (mds->is_cluster_degraded()) {
590 if (!mdr->is_queued_for_replay()) {
591 // see comments in SimpleLock::set_state_rejoin() and
592 // ScatterLock::encode_state_for_rejoin()
593 drop_locks(mdr.get());
594 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
595 dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
596 << ", waiting for cluster recovered" << dendl;
597 marker.message = "rejoin recovering lock, waiting for cluster recovered";
598 return false;
599 }
600 } else {
601 (*p)->clear_need_recover();
602 }
603 }
604
605 marker.message = "failed to rdlock, waiting";
606 if (!rdlock_start(*p, mdr))
607 goto out;
608 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
609 }
610 }
611
612 // any extra unneeded locks?
613 while (existing != mdr->locks.end()) {
614 SimpleLock *stray = *existing;
615 ++existing;
616 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
617 bool need_issue = false;
618 if (mdr->xlocks.count(stray)) {
619 xlock_finish(stray, mdr.get(), &need_issue);
620 } else if (mdr->rdlocks.count(stray)) {
621 rdlock_finish(stray, mdr.get(), &need_issue);
622 } else {
623 // may have acquired both wrlock and remore wrlock
624 if (mdr->wrlocks.count(stray))
625 wrlock_finish(stray, mdr.get(), &need_issue);
626 if (mdr->remote_wrlocks.count(stray))
627 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
628 }
629 if (need_issue)
630 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
631 }
632
633 mdr->done_locking = true;
634 mdr->set_mds_stamp(ceph_clock_now());
635 result = true;
636 marker.message = "acquired locks";
637
638 out:
639 issue_caps_set(issue_set);
640 return result;
641 }
642
643 void Locker::notify_freeze_waiter(MDSCacheObject *o)
644 {
645 CDir *dir = NULL;
646 if (CInode *in = dynamic_cast<CInode*>(o)) {
647 if (!in->is_root())
648 dir = in->get_parent_dir();
649 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
650 dir = dn->get_dir();
651 } else {
652 dir = dynamic_cast<CDir*>(o);
653 assert(dir);
654 }
655 if (dir) {
656 if (dir->is_freezing_dir())
657 mdcache->fragment_freeze_inc_num_waiters(dir);
658 if (dir->is_freezing_tree()) {
659 while (!dir->is_freezing_tree_root())
660 dir = dir->get_parent_dir();
661 mdcache->migrator->export_freeze_inc_num_waiters(dir);
662 }
663 }
664 }
665
666 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
667 {
668 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
669 p != mut->xlocks.end();
670 ++p) {
671 MDSCacheObject *object = (*p)->get_parent();
672 assert(object->is_auth());
673 if (skip_dentry &&
674 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
675 continue;
676 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
677 (*p)->set_xlock_done();
678 }
679 }
680
681 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
682 {
683 while (!mut->rdlocks.empty()) {
684 bool ni = false;
685 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
686 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
687 if (ni)
688 pneed_issue->insert(static_cast<CInode*>(p));
689 }
690 }
691
692 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
693 {
694 set<mds_rank_t> slaves;
695
696 while (!mut->xlocks.empty()) {
697 SimpleLock *lock = *mut->xlocks.begin();
698 MDSCacheObject *p = lock->get_parent();
699 if (!p->is_auth()) {
700 assert(lock->get_sm()->can_remote_xlock);
701 slaves.insert(p->authority().first);
702 lock->put_xlock();
703 mut->locks.erase(lock);
704 mut->xlocks.erase(lock);
705 continue;
706 }
707 bool ni = false;
708 xlock_finish(lock, mut, &ni);
709 if (ni)
710 pneed_issue->insert(static_cast<CInode*>(p));
711 }
712
713 while (!mut->remote_wrlocks.empty()) {
714 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
715 slaves.insert(p->second);
716 if (mut->wrlocks.count(p->first) == 0)
717 mut->locks.erase(p->first);
718 mut->remote_wrlocks.erase(p);
719 }
720
721 while (!mut->wrlocks.empty()) {
722 bool ni = false;
723 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
724 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
725 if (ni)
726 pneed_issue->insert(static_cast<CInode*>(p));
727 }
728
729 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
730 if (!mds->is_cluster_degraded() ||
731 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
732 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
733 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
734 MMDSSlaveRequest::OP_DROPLOCKS);
735 mds->send_message_mds(slavereq, *p);
736 }
737 }
738 }
739
740 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
741 {
742 SimpleLock *lock = mut->locking;
743 assert(lock);
744 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
745
746 if (lock->get_parent()->is_auth()) {
747 bool need_issue = false;
748 if (lock->get_state() == LOCK_PREXLOCK) {
749 _finish_xlock(lock, -1, &need_issue);
750 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
751 lock->get_num_xlocks() == 0) {
752 lock->set_state(LOCK_XLOCKDONE);
753 eval_gather(lock, true, &need_issue);
754 }
755 if (need_issue)
756 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
757 }
758 mut->finish_locking(lock);
759 }
760
761 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
762 {
763 // leftover locks
764 set<CInode*> my_need_issue;
765 if (!pneed_issue)
766 pneed_issue = &my_need_issue;
767
768 if (mut->locking)
769 cancel_locking(mut, pneed_issue);
770 _drop_non_rdlocks(mut, pneed_issue);
771 _drop_rdlocks(mut, pneed_issue);
772
773 if (pneed_issue == &my_need_issue)
774 issue_caps_set(*pneed_issue);
775 mut->done_locking = false;
776 }
777
778 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
779 {
780 set<CInode*> my_need_issue;
781 if (!pneed_issue)
782 pneed_issue = &my_need_issue;
783
784 _drop_non_rdlocks(mut, pneed_issue);
785
786 if (pneed_issue == &my_need_issue)
787 issue_caps_set(*pneed_issue);
788 }
789
790 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
791 {
792 set<CInode*> need_issue;
793
794 for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
795 SimpleLock *lock = *p;
796 ++p;
797 // make later mksnap/setlayout (at other mds) wait for this unsafe request
798 if (lock->get_type() == CEPH_LOCK_ISNAP ||
799 lock->get_type() == CEPH_LOCK_IPOLICY)
800 continue;
801 bool ni = false;
802 rdlock_finish(lock, mut, &ni);
803 if (ni)
804 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
805 }
806
807 issue_caps_set(need_issue);
808 }
809
810 // generics
811
812 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
813 {
814 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
815 assert(!lock->is_stable());
816
817 int next = lock->get_next_state();
818
819 CInode *in = 0;
820 bool caps = lock->get_cap_shift();
821 if (lock->get_type() != CEPH_LOCK_DN)
822 in = static_cast<CInode *>(lock->get_parent());
823
824 bool need_issue = false;
825
826 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
827 assert(!caps || in != NULL);
828 if (caps && in->is_head()) {
829 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
830 lock->get_cap_shift(), lock->get_cap_mask());
831 dout(10) << " next state is " << lock->get_state_name(next)
832 << " issued/allows loner " << gcap_string(loner_issued)
833 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
834 << " xlocker " << gcap_string(xlocker_issued)
835 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
836 << " other " << gcap_string(other_issued)
837 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
838 << dendl;
839
840 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
841 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
842 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
843 need_issue = true;
844 }
845
846 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
847 bool auth = lock->get_parent()->is_auth();
848 if (!lock->is_gathering() &&
849 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
850 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
851 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
853 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
854 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
855 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
856 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
857 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
858 lock->get_state() != LOCK_MIX_SYNC2
859 ) {
860 dout(7) << "eval_gather finished gather on " << *lock
861 << " on " << *lock->get_parent() << dendl;
862
863 if (lock->get_sm() == &sm_filelock) {
864 assert(in);
865 if (in->state_test(CInode::STATE_RECOVERING)) {
866 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
867 return;
868 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
869 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
870 mds->mdcache->queue_file_recover(in);
871 mds->mdcache->do_file_recover();
872 return;
873 }
874 }
875
876 if (!lock->get_parent()->is_auth()) {
877 // replica: tell auth
878 mds_rank_t auth = lock->get_parent()->authority().first;
879
880 if (lock->get_parent()->is_rejoining() &&
881 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
882 dout(7) << "eval_gather finished gather, but still rejoining "
883 << *lock->get_parent() << dendl;
884 return;
885 }
886
887 if (!mds->is_cluster_degraded() ||
888 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
889 switch (lock->get_state()) {
890 case LOCK_SYNC_LOCK:
891 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
892 auth);
893 break;
894
895 case LOCK_MIX_SYNC:
896 {
897 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
898 lock->encode_locked_state(reply->get_data());
899 mds->send_message_mds(reply, auth);
900 next = LOCK_MIX_SYNC2;
901 (static_cast<ScatterLock *>(lock))->start_flush();
902 }
903 break;
904
905 case LOCK_MIX_SYNC2:
906 (static_cast<ScatterLock *>(lock))->finish_flush();
907 (static_cast<ScatterLock *>(lock))->clear_flushed();
908
909 case LOCK_SYNC_MIX2:
910 // do nothing, we already acked
911 break;
912
913 case LOCK_SYNC_MIX:
914 {
915 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
916 mds->send_message_mds(reply, auth);
917 next = LOCK_SYNC_MIX2;
918 }
919 break;
920
921 case LOCK_MIX_LOCK:
922 {
923 bufferlist data;
924 lock->encode_locked_state(data);
925 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
926 (static_cast<ScatterLock *>(lock))->start_flush();
927 // we'll get an AC_LOCKFLUSHED to complete
928 }
929 break;
930
931 default:
932 ceph_abort();
933 }
934 }
935 } else {
936 // auth
937
938 // once the first (local) stage of mix->lock gather complete we can
939 // gather from replicas
940 if (lock->get_state() == LOCK_MIX_LOCK &&
941 lock->get_parent()->is_replicated()) {
942 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
943 send_lock_message(lock, LOCK_AC_LOCK);
944 lock->init_gather();
945 lock->set_state(LOCK_MIX_LOCK2);
946 return;
947 }
948
949 if (lock->is_dirty() && !lock->is_flushed()) {
950 scatter_writebehind(static_cast<ScatterLock *>(lock));
951 mds->mdlog->flush();
952 return;
953 }
954 lock->clear_flushed();
955
956 switch (lock->get_state()) {
957 // to mixed
958 case LOCK_TSYN_MIX:
959 case LOCK_SYNC_MIX:
960 case LOCK_EXCL_MIX:
961 case LOCK_XSYN_MIX:
962 in->start_scatter(static_cast<ScatterLock *>(lock));
963 if (lock->get_parent()->is_replicated()) {
964 bufferlist softdata;
965 lock->encode_locked_state(softdata);
966 send_lock_message(lock, LOCK_AC_MIX, softdata);
967 }
968 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
969 break;
970
971 case LOCK_XLOCK:
972 case LOCK_XLOCKDONE:
973 if (next != LOCK_SYNC)
974 break;
975 // fall-thru
976
977 // to sync
978 case LOCK_EXCL_SYNC:
979 case LOCK_LOCK_SYNC:
980 case LOCK_MIX_SYNC:
981 case LOCK_XSYN_SYNC:
982 if (lock->get_parent()->is_replicated()) {
983 bufferlist softdata;
984 lock->encode_locked_state(softdata);
985 send_lock_message(lock, LOCK_AC_SYNC, softdata);
986 }
987 break;
988 }
989
990 }
991
992 lock->set_state(next);
993
994 if (lock->get_parent()->is_auth() &&
995 lock->is_stable())
996 lock->get_parent()->auth_unpin(lock);
997
998 // drop loner before doing waiters
999 if (caps &&
1000 in->is_head() &&
1001 in->is_auth() &&
1002 in->get_wanted_loner() != in->get_loner()) {
1003 dout(10) << " trying to drop loner" << dendl;
1004 if (in->try_drop_loner()) {
1005 dout(10) << " dropped loner" << dendl;
1006 need_issue = true;
1007 }
1008 }
1009
1010 if (pfinishers)
1011 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1012 *pfinishers);
1013 else
1014 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1015
1016 if (caps && in->is_head())
1017 need_issue = true;
1018
1019 if (lock->get_parent()->is_auth() &&
1020 lock->is_stable())
1021 try_eval(lock, &need_issue);
1022 }
1023
1024 if (need_issue) {
1025 if (pneed_issue)
1026 *pneed_issue = true;
1027 else if (in->is_head())
1028 issue_caps(in);
1029 }
1030
1031 }
1032
1033 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1034 {
1035 bool need_issue = caps_imported;
1036 list<MDSInternalContextBase*> finishers;
1037
1038 dout(10) << "eval " << mask << " " << *in << dendl;
1039
1040 // choose loner?
1041 if (in->is_auth() && in->is_head()) {
1042 client_t orig_loner = in->get_loner();
1043 if (in->choose_ideal_loner()) {
1044 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1045 need_issue = true;
1046 mask = -1;
1047 } else if (in->get_wanted_loner() != in->get_loner()) {
1048 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1049 mask = -1;
1050 }
1051 }
1052
1053 retry:
1054 if (mask & CEPH_LOCK_IFILE)
1055 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1056 if (mask & CEPH_LOCK_IAUTH)
1057 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1058 if (mask & CEPH_LOCK_ILINK)
1059 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1060 if (mask & CEPH_LOCK_IXATTR)
1061 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1062 if (mask & CEPH_LOCK_INEST)
1063 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1064 if (mask & CEPH_LOCK_IFLOCK)
1065 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1066 if (mask & CEPH_LOCK_IPOLICY)
1067 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1068
1069 // drop loner?
1070 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1071 if (in->try_drop_loner()) {
1072 need_issue = true;
1073 if (in->get_wanted_loner() >= 0) {
1074 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1075 bool ok = in->try_set_loner();
1076 assert(ok);
1077 mask = -1;
1078 goto retry;
1079 }
1080 }
1081 }
1082
1083 finish_contexts(g_ceph_context, finishers);
1084
1085 if (need_issue && in->is_head())
1086 issue_caps(in);
1087
1088 dout(10) << "eval done" << dendl;
1089 return need_issue;
1090 }
1091
1092 class C_Locker_Eval : public LockerContext {
1093 MDSCacheObject *p;
1094 int mask;
1095 public:
1096 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1097 // We are used as an MDSCacheObject waiter, so should
1098 // only be invoked by someone already holding the big lock.
1099 assert(locker->mds->mds_lock.is_locked_by_me());
1100 p->get(MDSCacheObject::PIN_PTRWAITER);
1101 }
1102 void finish(int r) override {
1103 locker->try_eval(p, mask);
1104 p->put(MDSCacheObject::PIN_PTRWAITER);
1105 }
1106 };
1107
1108 void Locker::try_eval(MDSCacheObject *p, int mask)
1109 {
1110 // unstable and ambiguous auth?
1111 if (p->is_ambiguous_auth()) {
1112 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1113 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1114 return;
1115 }
1116
1117 if (p->is_auth() && p->is_frozen()) {
1118 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1119 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1120 return;
1121 }
1122
1123 if (mask & CEPH_LOCK_DN) {
1124 assert(mask == CEPH_LOCK_DN);
1125 bool need_issue = false; // ignore this, no caps on dentries
1126 CDentry *dn = static_cast<CDentry *>(p);
1127 eval_any(&dn->lock, &need_issue);
1128 } else {
1129 CInode *in = static_cast<CInode *>(p);
1130 eval(in, mask);
1131 }
1132 }
1133
1134 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1135 {
1136 MDSCacheObject *p = lock->get_parent();
1137
1138 // unstable and ambiguous auth?
1139 if (p->is_ambiguous_auth()) {
1140 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1141 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1142 return;
1143 }
1144
1145 if (!p->is_auth()) {
1146 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1147 return;
1148 }
1149
1150 if (p->is_frozen()) {
1151 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1152 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1153 return;
1154 }
1155
1156 /*
1157 * We could have a situation like:
1158 *
1159 * - mds A authpins item on mds B
1160 * - mds B starts to freeze tree containing item
1161 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1162 * - mds B lock is unstable, sets scatter_wanted
1163 * - mds B lock stabilizes, calls try_eval.
1164 *
1165 * We can defer while freezing without causing a deadlock. Honor
1166 * scatter_wanted flag here. This will never get deferred by the
1167 * checks above due to the auth_pin held by the master.
1168 */
1169 if (lock->is_scatterlock()) {
1170 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1171 if (slock->get_scatter_wanted() &&
1172 slock->get_state() != LOCK_MIX) {
1173 scatter_mix(slock, pneed_issue);
1174 if (!lock->is_stable())
1175 return;
1176 } else if (slock->get_unscatter_wanted() &&
1177 slock->get_state() != LOCK_LOCK) {
1178 simple_lock(slock, pneed_issue);
1179 if (!lock->is_stable()) {
1180 return;
1181 }
1182 }
1183 }
1184
1185 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1186 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1187 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1188 return;
1189 }
1190
1191 eval(lock, pneed_issue);
1192 }
1193
1194 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1195 {
1196 bool need_issue = false;
1197 list<MDSInternalContextBase*> finishers;
1198
1199 // kick locks now
1200 if (!in->filelock.is_stable())
1201 eval_gather(&in->filelock, false, &need_issue, &finishers);
1202 if (!in->authlock.is_stable())
1203 eval_gather(&in->authlock, false, &need_issue, &finishers);
1204 if (!in->linklock.is_stable())
1205 eval_gather(&in->linklock, false, &need_issue, &finishers);
1206 if (!in->xattrlock.is_stable())
1207 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1208
1209 if (need_issue && in->is_head()) {
1210 if (issue_set)
1211 issue_set->insert(in);
1212 else
1213 issue_caps(in);
1214 }
1215
1216 finish_contexts(g_ceph_context, finishers);
1217 }
1218
1219 void Locker::eval_scatter_gathers(CInode *in)
1220 {
1221 bool need_issue = false;
1222 list<MDSInternalContextBase*> finishers;
1223
1224 dout(10) << "eval_scatter_gathers " << *in << dendl;
1225
1226 // kick locks now
1227 if (!in->filelock.is_stable())
1228 eval_gather(&in->filelock, false, &need_issue, &finishers);
1229 if (!in->nestlock.is_stable())
1230 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1231 if (!in->dirfragtreelock.is_stable())
1232 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1233
1234 if (need_issue && in->is_head())
1235 issue_caps(in);
1236
1237 finish_contexts(g_ceph_context, finishers);
1238 }
1239
1240 void Locker::eval(SimpleLock *lock, bool *need_issue)
1241 {
1242 switch (lock->get_type()) {
1243 case CEPH_LOCK_IFILE:
1244 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1245 case CEPH_LOCK_IDFT:
1246 case CEPH_LOCK_INEST:
1247 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1248 default:
1249 return simple_eval(lock, need_issue);
1250 }
1251 }
1252
1253
1254 // ------------------
1255 // rdlock
1256
1257 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1258 {
1259 // kick the lock
1260 if (lock->is_stable()) {
1261 if (lock->get_parent()->is_auth()) {
1262 if (lock->get_sm() == &sm_scatterlock) {
1263 // not until tempsync is fully implemented
1264 //if (lock->get_parent()->is_replicated())
1265 //scatter_tempsync((ScatterLock*)lock);
1266 //else
1267 simple_sync(lock);
1268 } else if (lock->get_sm() == &sm_filelock) {
1269 CInode *in = static_cast<CInode*>(lock->get_parent());
1270 if (lock->get_state() == LOCK_EXCL &&
1271 in->get_target_loner() >= 0 &&
1272 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1273 file_xsyn(lock);
1274 else
1275 simple_sync(lock);
1276 } else
1277 simple_sync(lock);
1278 return true;
1279 } else {
1280 // request rdlock state change from auth
1281 mds_rank_t auth = lock->get_parent()->authority().first;
1282 if (!mds->is_cluster_degraded() ||
1283 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1284 dout(10) << "requesting rdlock from auth on "
1285 << *lock << " on " << *lock->get_parent() << dendl;
1286 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1287 }
1288 return false;
1289 }
1290 }
1291 if (lock->get_type() == CEPH_LOCK_IFILE) {
1292 CInode *in = static_cast<CInode *>(lock->get_parent());
1293 if (in->state_test(CInode::STATE_RECOVERING)) {
1294 mds->mdcache->recovery_queue.prioritize(in);
1295 }
1296 }
1297
1298 return false;
1299 }
1300
1301 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1302 {
1303 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1304
1305 // can read? grab ref.
1306 if (lock->can_rdlock(client))
1307 return true;
1308
1309 _rdlock_kick(lock, false);
1310
1311 if (lock->can_rdlock(client))
1312 return true;
1313
1314 // wait!
1315 if (con) {
1316 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1317 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1318 }
1319 return false;
1320 }
1321
1322 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1323 {
1324 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1325
1326 // client may be allowed to rdlock the same item it has xlocked.
1327 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1328 if (mut->snapid != CEPH_NOSNAP)
1329 as_anon = true;
1330 client_t client = as_anon ? -1 : mut->get_client();
1331
1332 CInode *in = 0;
1333 if (lock->get_type() != CEPH_LOCK_DN)
1334 in = static_cast<CInode *>(lock->get_parent());
1335
1336 /*
1337 if (!lock->get_parent()->is_auth() &&
1338 lock->fw_rdlock_to_auth()) {
1339 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1340 return false;
1341 }
1342 */
1343
1344 while (1) {
1345 // can read? grab ref.
1346 if (lock->can_rdlock(client)) {
1347 lock->get_rdlock();
1348 mut->rdlocks.insert(lock);
1349 mut->locks.insert(lock);
1350 return true;
1351 }
1352
1353 // hmm, wait a second.
1354 if (in && !in->is_head() && in->is_auth() &&
1355 lock->get_state() == LOCK_SNAP_SYNC) {
1356 // okay, we actually need to kick the head's lock to get ourselves synced up.
1357 CInode *head = mdcache->get_inode(in->ino());
1358 assert(head);
1359 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1360 if (hlock->get_state() == LOCK_SYNC)
1361 hlock = head->get_lock(lock->get_type());
1362
1363 if (hlock->get_state() != LOCK_SYNC) {
1364 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1365 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1366 return false;
1367 // oh, check our lock again then
1368 }
1369 }
1370
1371 if (!_rdlock_kick(lock, as_anon))
1372 break;
1373 }
1374
1375 // wait!
1376 int wait_on;
1377 if (lock->get_parent()->is_auth() && lock->is_stable())
1378 wait_on = SimpleLock::WAIT_RD;
1379 else
1380 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1381 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1382 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1383 nudge_log(lock);
1384 return false;
1385 }
1386
1387 void Locker::nudge_log(SimpleLock *lock)
1388 {
1389 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1390 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1391 mds->mdlog->flush();
1392 }
1393
1394 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1395 {
1396 // drop ref
1397 lock->put_rdlock();
1398 if (mut) {
1399 mut->rdlocks.erase(lock);
1400 mut->locks.erase(lock);
1401 }
1402
1403 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1404
1405 // last one?
1406 if (!lock->is_rdlocked()) {
1407 if (!lock->is_stable())
1408 eval_gather(lock, false, pneed_issue);
1409 else if (lock->get_parent()->is_auth())
1410 try_eval(lock, pneed_issue);
1411 }
1412 }
1413
1414
1415 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1416 {
1417 dout(10) << "can_rdlock_set " << locks << dendl;
1418 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1419 if (!(*p)->can_rdlock(-1)) {
1420 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1421 return false;
1422 }
1423 return true;
1424 }
1425
1426 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1427 {
1428 dout(10) << "rdlock_try_set " << locks << dendl;
1429 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1430 if (!rdlock_try(*p, -1, NULL)) {
1431 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1432 return false;
1433 }
1434 return true;
1435 }
1436
1437 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1438 {
1439 dout(10) << "rdlock_take_set " << locks << dendl;
1440 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1441 (*p)->get_rdlock();
1442 mut->rdlocks.insert(*p);
1443 mut->locks.insert(*p);
1444 }
1445 }
1446
1447 // ------------------
1448 // wrlock
1449
1450 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1451 {
1452 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1453 lock->get_type() == CEPH_LOCK_DVERSION)
1454 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1455
1456 dout(7) << "wrlock_force on " << *lock
1457 << " on " << *lock->get_parent() << dendl;
1458 lock->get_wrlock(true);
1459 mut->wrlocks.insert(lock);
1460 mut->locks.insert(lock);
1461 }
1462
1463 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1464 {
1465 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1466 lock->get_type() == CEPH_LOCK_DVERSION)
1467 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1468
1469 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1470
1471 CInode *in = static_cast<CInode *>(lock->get_parent());
1472 client_t client = mut->get_client();
1473 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1474 (in->has_subtree_or_exporting_dirfrag() ||
1475 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1476
1477 while (1) {
1478 // wrlock?
1479 if (lock->can_wrlock(client) &&
1480 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1481 lock->get_wrlock();
1482 mut->wrlocks.insert(lock);
1483 mut->locks.insert(lock);
1484 return true;
1485 }
1486
1487 if (lock->get_type() == CEPH_LOCK_IFILE &&
1488 in->state_test(CInode::STATE_RECOVERING)) {
1489 mds->mdcache->recovery_queue.prioritize(in);
1490 }
1491
1492 if (!lock->is_stable())
1493 break;
1494
1495 if (in->is_auth()) {
1496 // don't do nested lock state change if we have dirty scatterdata and
1497 // may scatter_writebehind or start_scatter, because nowait==true implies
1498 // that the caller already has a log entry open!
1499 if (nowait && lock->is_dirty())
1500 return false;
1501
1502 if (want_scatter)
1503 scatter_mix(static_cast<ScatterLock*>(lock));
1504 else
1505 simple_lock(lock);
1506
1507 if (nowait && !lock->can_wrlock(client))
1508 return false;
1509
1510 } else {
1511 // replica.
1512 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1513 mds_rank_t auth = lock->get_parent()->authority().first;
1514 if (!mds->is_cluster_degraded() ||
1515 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1516 dout(10) << "requesting scatter from auth on "
1517 << *lock << " on " << *lock->get_parent() << dendl;
1518 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1519 }
1520 break;
1521 }
1522 }
1523
1524 if (!nowait) {
1525 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1526 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1527 nudge_log(lock);
1528 }
1529
1530 return false;
1531 }
1532
1533 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1534 {
1535 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1536 lock->get_type() == CEPH_LOCK_DVERSION)
1537 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1538
1539 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1540 lock->put_wrlock();
1541 if (mut) {
1542 mut->wrlocks.erase(lock);
1543 if (mut->remote_wrlocks.count(lock) == 0)
1544 mut->locks.erase(lock);
1545 }
1546
1547 if (!lock->is_wrlocked()) {
1548 if (!lock->is_stable())
1549 eval_gather(lock, false, pneed_issue);
1550 else if (lock->get_parent()->is_auth())
1551 try_eval(lock, pneed_issue);
1552 }
1553 }
1554
1555
1556 // remote wrlock
1557
1558 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1559 {
1560 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1561
1562 // wait for active target
1563 if (mds->is_cluster_degraded() &&
1564 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1565 dout(7) << " mds." << target << " is not active" << dendl;
1566 if (mut->more()->waiting_on_slave.empty())
1567 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1568 return;
1569 }
1570
1571 // send lock request
1572 mut->start_locking(lock, target);
1573 mut->more()->slaves.insert(target);
1574 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1575 MMDSSlaveRequest::OP_WRLOCK);
1576 r->set_lock_type(lock->get_type());
1577 lock->get_parent()->set_object_info(r->get_object_info());
1578 mds->send_message_mds(r, target);
1579
1580 assert(mut->more()->waiting_on_slave.count(target) == 0);
1581 mut->more()->waiting_on_slave.insert(target);
1582 }
1583
1584 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1585 MutationImpl *mut)
1586 {
1587 // drop ref
1588 mut->remote_wrlocks.erase(lock);
1589 if (mut->wrlocks.count(lock) == 0)
1590 mut->locks.erase(lock);
1591
1592 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1593 << " " << *lock->get_parent() << dendl;
1594 if (!mds->is_cluster_degraded() ||
1595 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1596 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1597 MMDSSlaveRequest::OP_UNWRLOCK);
1598 slavereq->set_lock_type(lock->get_type());
1599 lock->get_parent()->set_object_info(slavereq->get_object_info());
1600 mds->send_message_mds(slavereq, target);
1601 }
1602 }
1603
1604
1605 // ------------------
1606 // xlock
1607
1608 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1609 {
1610 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1611 lock->get_type() == CEPH_LOCK_DVERSION)
1612 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1613
1614 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1615 client_t client = mut->get_client();
1616
1617 // auth?
1618 if (lock->get_parent()->is_auth()) {
1619 // auth
1620 while (1) {
1621 if (lock->can_xlock(client)) {
1622 lock->set_state(LOCK_XLOCK);
1623 lock->get_xlock(mut, client);
1624 mut->xlocks.insert(lock);
1625 mut->locks.insert(lock);
1626 mut->finish_locking(lock);
1627 return true;
1628 }
1629
1630 if (lock->get_type() == CEPH_LOCK_IFILE) {
1631 CInode *in = static_cast<CInode*>(lock->get_parent());
1632 if (in->state_test(CInode::STATE_RECOVERING)) {
1633 mds->mdcache->recovery_queue.prioritize(in);
1634 }
1635 }
1636
1637 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1638 lock->get_xlock_by_client() != client ||
1639 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1640 break;
1641
1642 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1643 mut->start_locking(lock);
1644 simple_xlock(lock);
1645 } else {
1646 simple_lock(lock);
1647 }
1648 }
1649
1650 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1651 nudge_log(lock);
1652 return false;
1653 } else {
1654 // replica
1655 assert(lock->get_sm()->can_remote_xlock);
1656 assert(!mut->slave_request);
1657
1658 // wait for single auth
1659 if (lock->get_parent()->is_ambiguous_auth()) {
1660 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1661 new C_MDS_RetryRequest(mdcache, mut));
1662 return false;
1663 }
1664
1665 // wait for active auth
1666 mds_rank_t auth = lock->get_parent()->authority().first;
1667 if (mds->is_cluster_degraded() &&
1668 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1669 dout(7) << " mds." << auth << " is not active" << dendl;
1670 if (mut->more()->waiting_on_slave.empty())
1671 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1672 return false;
1673 }
1674
1675 // send lock request
1676 mut->more()->slaves.insert(auth);
1677 mut->start_locking(lock, auth);
1678 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1679 MMDSSlaveRequest::OP_XLOCK);
1680 r->set_lock_type(lock->get_type());
1681 lock->get_parent()->set_object_info(r->get_object_info());
1682 mds->send_message_mds(r, auth);
1683
1684 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1685 mut->more()->waiting_on_slave.insert(auth);
1686
1687 return false;
1688 }
1689 }
1690
1691 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1692 {
1693 assert(!lock->is_stable());
1694 if (lock->get_num_rdlocks() == 0 &&
1695 lock->get_num_wrlocks() == 0 &&
1696 !lock->is_leased() &&
1697 lock->get_state() != LOCK_XLOCKSNAP &&
1698 lock->get_type() != CEPH_LOCK_DN) {
1699 CInode *in = static_cast<CInode*>(lock->get_parent());
1700 client_t loner = in->get_target_loner();
1701 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1702 lock->set_state(LOCK_EXCL);
1703 lock->get_parent()->auth_unpin(lock);
1704 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1705 if (lock->get_cap_shift())
1706 *pneed_issue = true;
1707 if (lock->get_parent()->is_auth() &&
1708 lock->is_stable())
1709 try_eval(lock, pneed_issue);
1710 return;
1711 }
1712 }
1713 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1714 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1715 }
1716
1717 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1718 {
1719 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1720 lock->get_type() == CEPH_LOCK_DVERSION)
1721 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1722
1723 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1724
1725 client_t xlocker = lock->get_xlock_by_client();
1726
1727 // drop ref
1728 lock->put_xlock();
1729 assert(mut);
1730 mut->xlocks.erase(lock);
1731 mut->locks.erase(lock);
1732
1733 bool do_issue = false;
1734
1735 // remote xlock?
1736 if (!lock->get_parent()->is_auth()) {
1737 assert(lock->get_sm()->can_remote_xlock);
1738
1739 // tell auth
1740 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1741 mds_rank_t auth = lock->get_parent()->authority().first;
1742 if (!mds->is_cluster_degraded() ||
1743 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1744 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1745 MMDSSlaveRequest::OP_UNXLOCK);
1746 slavereq->set_lock_type(lock->get_type());
1747 lock->get_parent()->set_object_info(slavereq->get_object_info());
1748 mds->send_message_mds(slavereq, auth);
1749 }
1750 // others waiting?
1751 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1752 SimpleLock::WAIT_WR |
1753 SimpleLock::WAIT_RD, 0);
1754 } else {
1755 if (lock->get_num_xlocks() == 0) {
1756 if (lock->get_state() == LOCK_LOCK_XLOCK)
1757 lock->set_state(LOCK_XLOCKDONE);
1758 _finish_xlock(lock, xlocker, &do_issue);
1759 }
1760 }
1761
1762 if (do_issue) {
1763 CInode *in = static_cast<CInode*>(lock->get_parent());
1764 if (in->is_head()) {
1765 if (pneed_issue)
1766 *pneed_issue = true;
1767 else
1768 issue_caps(in);
1769 }
1770 }
1771 }
1772
1773 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1774 {
1775 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1776
1777 lock->put_xlock();
1778 mut->xlocks.erase(lock);
1779 mut->locks.erase(lock);
1780
1781 MDSCacheObject *p = lock->get_parent();
1782 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1783
1784 if (!lock->is_stable())
1785 lock->get_parent()->auth_unpin(lock);
1786
1787 lock->set_state(LOCK_LOCK);
1788 }
1789
1790 void Locker::xlock_import(SimpleLock *lock)
1791 {
1792 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1793 lock->get_parent()->auth_pin(lock);
1794 }
1795
1796
1797
1798 // file i/o -----------------------------------------
1799
1800 version_t Locker::issue_file_data_version(CInode *in)
1801 {
1802 dout(7) << "issue_file_data_version on " << *in << dendl;
1803 return in->inode.file_data_version;
1804 }
1805
1806 class C_Locker_FileUpdate_finish : public LockerLogContext {
1807 CInode *in;
1808 MutationRef mut;
1809 bool share_max;
1810 bool need_issue;
1811 client_t client;
1812 MClientCaps *ack;
1813 public:
1814 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1815 bool sm=false, bool ni=false, client_t c=-1,
1816 MClientCaps *ac = 0)
1817 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1818 client(c), ack(ac) {
1819 in->get(CInode::PIN_PTRWAITER);
1820 }
1821 void finish(int r) override {
1822 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1823 in->put(CInode::PIN_PTRWAITER);
1824 }
1825 };
1826
1827 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1828 client_t client, MClientCaps *ack)
1829 {
1830 dout(10) << "file_update_finish on " << *in << dendl;
1831 in->pop_and_dirty_projected_inode(mut->ls);
1832
1833 mut->apply();
1834
1835 if (ack) {
1836 Session *session = mds->get_session(client);
1837 if (session) {
1838 // "oldest flush tid" > 0 means client uses unique TID for each flush
1839 if (ack->get_oldest_flush_tid() > 0)
1840 session->add_completed_flush(ack->get_client_tid());
1841 mds->send_message_client_counted(ack, session);
1842 } else {
1843 dout(10) << " no session for client." << client << " " << *ack << dendl;
1844 ack->put();
1845 }
1846 }
1847
1848 set<CInode*> need_issue;
1849 drop_locks(mut.get(), &need_issue);
1850
1851 if (!in->is_head() && !in->client_snap_caps.empty()) {
1852 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1853 // check for snap writeback completion
1854 bool gather = false;
1855 compact_map<int,set<client_t> >::iterator p = in->client_snap_caps.begin();
1856 while (p != in->client_snap_caps.end()) {
1857 SimpleLock *lock = in->get_lock(p->first);
1858 assert(lock);
1859 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1860 << " lock " << *lock << " on " << *in << dendl;
1861 lock->put_wrlock();
1862
1863 p->second.erase(client);
1864 if (p->second.empty()) {
1865 gather = true;
1866 in->client_snap_caps.erase(p++);
1867 } else
1868 ++p;
1869 }
1870 if (gather) {
1871 if (in->client_snap_caps.empty())
1872 in->item_open_file.remove_myself();
1873 eval_cap_gather(in, &need_issue);
1874 }
1875 } else {
1876 if (issue_client_cap && need_issue.count(in) == 0) {
1877 Capability *cap = in->get_client_cap(client);
1878 if (cap && (cap->wanted() & ~cap->pending()))
1879 issue_caps(in, cap);
1880 }
1881
1882 if (share_max && in->is_auth() &&
1883 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1884 share_inode_max_size(in);
1885 }
1886 issue_caps_set(need_issue);
1887
1888 // auth unpin after issuing caps
1889 mut->cleanup();
1890 }
1891
1892 Capability* Locker::issue_new_caps(CInode *in,
1893 int mode,
1894 Session *session,
1895 SnapRealm *realm,
1896 bool is_replay)
1897 {
1898 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1899 bool is_new;
1900
1901 // if replay, try to reconnect cap, and otherwise do nothing.
1902 if (is_replay) {
1903 mds->mdcache->try_reconnect_cap(in, session);
1904 return 0;
1905 }
1906
1907 // my needs
1908 assert(session->info.inst.name.is_client());
1909 client_t my_client = session->info.inst.name.num();
1910 int my_want = ceph_caps_for_mode(mode);
1911
1912 // register a capability
1913 Capability *cap = in->get_client_cap(my_client);
1914 if (!cap) {
1915 // new cap
1916 cap = in->add_client_cap(my_client, session, realm);
1917 cap->set_wanted(my_want);
1918 cap->mark_new();
1919 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1920 is_new = true;
1921 } else {
1922 is_new = false;
1923 // make sure it wants sufficient caps
1924 if (my_want & ~cap->wanted()) {
1925 // augment wanted caps for this client
1926 cap->set_wanted(cap->wanted() | my_want);
1927 }
1928 }
1929
1930 if (in->is_auth()) {
1931 // [auth] twiddle mode?
1932 eval(in, CEPH_CAP_LOCKS);
1933
1934 if (_need_flush_mdlog(in, my_want))
1935 mds->mdlog->flush();
1936
1937 } else {
1938 // [replica] tell auth about any new caps wanted
1939 request_inode_file_caps(in);
1940 }
1941
1942 // issue caps (pot. incl new one)
1943 //issue_caps(in); // note: _eval above may have done this already...
1944
1945 // re-issue whatever we can
1946 //cap->issue(cap->pending());
1947
1948 if (is_new)
1949 cap->dec_suppress();
1950
1951 return cap;
1952 }
1953
1954
1955 void Locker::issue_caps_set(set<CInode*>& inset)
1956 {
1957 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1958 issue_caps(*p);
1959 }
1960
1961 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1962 {
1963 // allowed caps are determined by the lock mode.
1964 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1965 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1966 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1967
1968 client_t loner = in->get_loner();
1969 if (loner >= 0) {
1970 dout(7) << "issue_caps loner client." << loner
1971 << " allowed=" << ccap_string(loner_allowed)
1972 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1973 << ", others allowed=" << ccap_string(all_allowed)
1974 << " on " << *in << dendl;
1975 } else {
1976 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1977 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1978 << " on " << *in << dendl;
1979 }
1980
1981 assert(in->is_head());
1982
1983 // count conflicts with
1984 int nissued = 0;
1985
1986 // client caps
1987 map<client_t, Capability*>::iterator it;
1988 if (only_cap)
1989 it = in->client_caps.find(only_cap->get_client());
1990 else
1991 it = in->client_caps.begin();
1992 for (; it != in->client_caps.end(); ++it) {
1993 Capability *cap = it->second;
1994 if (cap->is_stale())
1995 continue;
1996
1997 // do not issue _new_ bits when size|mtime is projected
1998 int allowed;
1999 if (loner == it->first)
2000 allowed = loner_allowed;
2001 else
2002 allowed = all_allowed;
2003
2004 // add in any xlocker-only caps (for locks this client is the xlocker for)
2005 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2006
2007 Session *session = mds->get_session(it->first);
2008 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
2009 !(session && session->connection &&
2010 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
2011 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2012
2013 int pending = cap->pending();
2014 int wanted = cap->wanted();
2015
2016 dout(20) << " client." << it->first
2017 << " pending " << ccap_string(pending)
2018 << " allowed " << ccap_string(allowed)
2019 << " wanted " << ccap_string(wanted)
2020 << dendl;
2021
2022 if (!(pending & ~allowed)) {
2023 // skip if suppress or new, and not revocation
2024 if (cap->is_new() || cap->is_suppress()) {
2025 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2026 continue;
2027 }
2028 }
2029
2030 // notify clients about deleted inode, to make sure they release caps ASAP.
2031 if (in->inode.nlink == 0)
2032 wanted |= CEPH_CAP_LINK_SHARED;
2033
2034 // are there caps that the client _wants_ and can have, but aren't pending?
2035 // or do we need to revoke?
2036 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2037 (pending & ~allowed)) { // need to revoke ~allowed caps.
2038 // issue
2039 nissued++;
2040
2041 // include caps that clients generally like, while we're at it.
2042 int likes = in->get_caps_liked();
2043 int before = pending;
2044 long seq;
2045 if (pending & ~allowed)
2046 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2047 else
2048 seq = cap->issue((wanted|likes) & allowed);
2049 int after = cap->pending();
2050
2051 if (cap->is_new()) {
2052 // haven't send caps to client yet
2053 if (before & ~after)
2054 cap->confirm_receipt(seq, after);
2055 } else {
2056 dout(7) << " sending MClientCaps to client." << it->first
2057 << " seq " << cap->get_last_seq()
2058 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2059 << dendl;
2060
2061 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2062 if (op == CEPH_CAP_OP_REVOKE) {
2063 revoking_caps.push_back(&cap->item_revoking_caps);
2064 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2065 cap->set_last_revoke_stamp(ceph_clock_now());
2066 cap->reset_num_revoke_warnings();
2067 }
2068
2069 MClientCaps *m = new MClientCaps(op, in->ino(),
2070 in->find_snaprealm()->inode->ino(),
2071 cap->get_cap_id(), cap->get_last_seq(),
2072 after, wanted, 0,
2073 cap->get_mseq(),
2074 mds->get_osd_epoch_barrier());
2075 in->encode_cap_message(m, cap);
2076
2077 mds->send_message_client_counted(m, it->first);
2078 }
2079 }
2080
2081 if (only_cap)
2082 break;
2083 }
2084
2085 return (nissued == 0); // true if no re-issued, no callbacks
2086 }
2087
2088 void Locker::issue_truncate(CInode *in)
2089 {
2090 dout(7) << "issue_truncate on " << *in << dendl;
2091
2092 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2093 it != in->client_caps.end();
2094 ++it) {
2095 Capability *cap = it->second;
2096 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2097 in->ino(),
2098 in->find_snaprealm()->inode->ino(),
2099 cap->get_cap_id(), cap->get_last_seq(),
2100 cap->pending(), cap->wanted(), 0,
2101 cap->get_mseq(),
2102 mds->get_osd_epoch_barrier());
2103 in->encode_cap_message(m, cap);
2104 mds->send_message_client_counted(m, it->first);
2105 }
2106
2107 // should we increase max_size?
2108 if (in->is_auth() && in->is_file())
2109 check_inode_max_size(in);
2110 }
2111
2112
2113 void Locker::revoke_stale_caps(Capability *cap)
2114 {
2115 CInode *in = cap->get_inode();
2116 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2117 // if export succeeds, the cap will be removed. if export fails, we need to
2118 // revoke the cap if it's still stale.
2119 in->state_set(CInode::STATE_EVALSTALECAPS);
2120 return;
2121 }
2122
2123 int issued = cap->issued();
2124 if (issued & ~CEPH_CAP_PIN) {
2125 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2126 cap->revoke();
2127
2128 if (in->is_auth() &&
2129 in->inode.client_ranges.count(cap->get_client()))
2130 in->state_set(CInode::STATE_NEEDSRECOVER);
2131
2132 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2133 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2134 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2135 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2136
2137 if (in->is_auth()) {
2138 try_eval(in, CEPH_CAP_LOCKS);
2139 } else {
2140 request_inode_file_caps(in);
2141 }
2142 }
2143 }
2144
2145 void Locker::revoke_stale_caps(Session *session)
2146 {
2147 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2148
2149 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2150 Capability *cap = *p;
2151 cap->mark_stale();
2152 revoke_stale_caps(cap);
2153 }
2154 }
2155
2156 void Locker::resume_stale_caps(Session *session)
2157 {
2158 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2159
2160 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2161 Capability *cap = *p;
2162 CInode *in = cap->get_inode();
2163 assert(in->is_head());
2164 if (cap->is_stale()) {
2165 dout(10) << " clearing stale flag on " << *in << dendl;
2166 cap->clear_stale();
2167
2168 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2169 // if export succeeds, the cap will be removed. if export fails,
2170 // we need to re-issue the cap if it's not stale.
2171 in->state_set(CInode::STATE_EVALSTALECAPS);
2172 continue;
2173 }
2174
2175 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2176 issue_caps(in, cap);
2177 }
2178 }
2179 }
2180
2181 void Locker::remove_stale_leases(Session *session)
2182 {
2183 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2184 xlist<ClientLease*>::iterator p = session->leases.begin();
2185 while (!p.end()) {
2186 ClientLease *l = *p;
2187 ++p;
2188 CDentry *parent = static_cast<CDentry*>(l->parent);
2189 dout(15) << " removing lease on " << *parent << dendl;
2190 parent->remove_client_lease(l, this);
2191 }
2192 }
2193
2194
2195 class C_MDL_RequestInodeFileCaps : public LockerContext {
2196 CInode *in;
2197 public:
2198 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2199 in->get(CInode::PIN_PTRWAITER);
2200 }
2201 void finish(int r) override {
2202 if (!in->is_auth())
2203 locker->request_inode_file_caps(in);
2204 in->put(CInode::PIN_PTRWAITER);
2205 }
2206 };
2207
2208 void Locker::request_inode_file_caps(CInode *in)
2209 {
2210 assert(!in->is_auth());
2211
2212 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2213 if (wanted != in->replica_caps_wanted) {
2214 // wait for single auth
2215 if (in->is_ambiguous_auth()) {
2216 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2217 new C_MDL_RequestInodeFileCaps(this, in));
2218 return;
2219 }
2220
2221 mds_rank_t auth = in->authority().first;
2222 if (mds->is_cluster_degraded() &&
2223 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2224 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2225 return;
2226 }
2227
2228 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2229 << " was " << ccap_string(in->replica_caps_wanted)
2230 << " on " << *in << " to mds." << auth << dendl;
2231
2232 in->replica_caps_wanted = wanted;
2233
2234 if (!mds->is_cluster_degraded() ||
2235 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2236 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2237 auth);
2238 }
2239 }
2240
2241 /* This function DOES put the passed message before returning */
2242 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2243 {
2244 // nobody should be talking to us during recovery.
2245 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2246
2247 // ok
2248 CInode *in = mdcache->get_inode(m->get_ino());
2249 mds_rank_t from = mds_rank_t(m->get_source().num());
2250
2251 assert(in);
2252 assert(in->is_auth());
2253
2254 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2255
2256 if (m->get_caps())
2257 in->mds_caps_wanted[from] = m->get_caps();
2258 else
2259 in->mds_caps_wanted.erase(from);
2260
2261 try_eval(in, CEPH_CAP_LOCKS);
2262 m->put();
2263 }
2264
2265
2266 class C_MDL_CheckMaxSize : public LockerContext {
2267 CInode *in;
2268 uint64_t new_max_size;
2269 uint64_t newsize;
2270 utime_t mtime;
2271
2272 public:
2273 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2274 uint64_t _newsize, utime_t _mtime) :
2275 LockerContext(l), in(i),
2276 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2277 {
2278 in->get(CInode::PIN_PTRWAITER);
2279 }
2280 void finish(int r) override {
2281 if (in->is_auth())
2282 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2283 in->put(CInode::PIN_PTRWAITER);
2284 }
2285 };
2286
2287 uint64_t Locker::calc_new_max_size(inode_t *pi, uint64_t size)
2288 {
2289 uint64_t new_max = (size + 1) << 1;
2290 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2291 if (max_inc > 0) {
2292 max_inc *= pi->layout.object_size;
2293 new_max = MIN(new_max, size + max_inc);
2294 }
2295 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2296 }
2297
2298 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2299 map<client_t,client_writeable_range_t> *new_ranges,
2300 bool *max_increased)
2301 {
2302 inode_t *latest = in->get_projected_inode();
2303 uint64_t ms;
2304 if(latest->has_layout()) {
2305 ms = calc_new_max_size(latest, size);
2306 } else {
2307 // Layout-less directories like ~mds0/, have zero size
2308 ms = 0;
2309 }
2310
2311 // increase ranges as appropriate.
2312 // shrink to 0 if no WR|BUFFER caps issued.
2313 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2314 p != in->client_caps.end();
2315 ++p) {
2316 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2317 client_writeable_range_t& nr = (*new_ranges)[p->first];
2318 nr.range.first = 0;
2319 if (latest->client_ranges.count(p->first)) {
2320 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2321 if (ms > oldr.range.last)
2322 *max_increased = true;
2323 nr.range.last = MAX(ms, oldr.range.last);
2324 nr.follows = oldr.follows;
2325 } else {
2326 *max_increased = true;
2327 nr.range.last = ms;
2328 nr.follows = in->first - 1;
2329 }
2330 }
2331 }
2332 }
2333
2334 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2335 uint64_t new_max_size, uint64_t new_size,
2336 utime_t new_mtime)
2337 {
2338 assert(in->is_auth());
2339 assert(in->is_file());
2340
2341 inode_t *latest = in->get_projected_inode();
2342 map<client_t, client_writeable_range_t> new_ranges;
2343 uint64_t size = latest->size;
2344 bool update_size = new_size > 0;
2345 bool update_max = false;
2346 bool max_increased = false;
2347
2348 if (update_size) {
2349 new_size = size = MAX(size, new_size);
2350 new_mtime = MAX(new_mtime, latest->mtime);
2351 if (latest->size == new_size && latest->mtime == new_mtime)
2352 update_size = false;
2353 }
2354
2355 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2356
2357 if (max_increased || latest->client_ranges != new_ranges)
2358 update_max = true;
2359
2360 if (!update_size && !update_max) {
2361 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2362 return false;
2363 }
2364
2365 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2366 << " update_size " << update_size
2367 << " on " << *in << dendl;
2368
2369 if (in->is_frozen()) {
2370 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2371 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2372 new_max_size,
2373 new_size,
2374 new_mtime);
2375 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2376 return false;
2377 }
2378 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2379 // lock?
2380 if (in->filelock.is_stable()) {
2381 if (in->get_target_loner() >= 0)
2382 file_excl(&in->filelock);
2383 else
2384 simple_lock(&in->filelock);
2385 }
2386 if (!in->filelock.can_wrlock(in->get_loner())) {
2387 // try again later
2388 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2389 new_max_size,
2390 new_size,
2391 new_mtime);
2392
2393 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2394 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2395 return false;
2396 }
2397 }
2398
2399 MutationRef mut(new MutationImpl());
2400 mut->ls = mds->mdlog->get_current_segment();
2401
2402 inode_t *pi = in->project_inode();
2403 pi->version = in->pre_dirty();
2404
2405 if (update_max) {
2406 dout(10) << "check_inode_max_size client_ranges " << pi->client_ranges << " -> " << new_ranges << dendl;
2407 pi->client_ranges = new_ranges;
2408 }
2409
2410 if (update_size) {
2411 dout(10) << "check_inode_max_size size " << pi->size << " -> " << new_size << dendl;
2412 pi->size = new_size;
2413 pi->rstat.rbytes = new_size;
2414 dout(10) << "check_inode_max_size mtime " << pi->mtime << " -> " << new_mtime << dendl;
2415 pi->mtime = new_mtime;
2416 }
2417
2418 // use EOpen if the file is still open; otherwise, use EUpdate.
2419 // this is just an optimization to push open files forward into
2420 // newer log segments.
2421 LogEvent *le;
2422 EMetaBlob *metablob;
2423 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2424 EOpen *eo = new EOpen(mds->mdlog);
2425 eo->add_ino(in->ino());
2426 metablob = &eo->metablob;
2427 le = eo;
2428 mut->ls->open_files.push_back(&in->item_open_file);
2429 } else {
2430 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2431 metablob = &eu->metablob;
2432 le = eu;
2433 }
2434 mds->mdlog->start_entry(le);
2435 if (update_size) { // FIXME if/when we do max_size nested accounting
2436 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2437 // no cow, here!
2438 CDentry *parent = in->get_projected_parent_dn();
2439 metablob->add_primary_dentry(parent, in, true);
2440 } else {
2441 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2442 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2443 }
2444 mds->mdlog->submit_entry(le,
2445 new C_Locker_FileUpdate_finish(this, in, mut, true));
2446 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2447 mut->auth_pin(in);
2448
2449 // make max_size _increase_ timely
2450 if (max_increased)
2451 mds->mdlog->flush();
2452
2453 return true;
2454 }
2455
2456
2457 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2458 {
2459 /*
2460 * only share if currently issued a WR cap. if client doesn't have it,
2461 * file_max doesn't matter, and the client will get it if/when they get
2462 * the cap later.
2463 */
2464 dout(10) << "share_inode_max_size on " << *in << dendl;
2465 map<client_t, Capability*>::iterator it;
2466 if (only_cap)
2467 it = in->client_caps.find(only_cap->get_client());
2468 else
2469 it = in->client_caps.begin();
2470 for (; it != in->client_caps.end(); ++it) {
2471 const client_t client = it->first;
2472 Capability *cap = it->second;
2473 if (cap->is_suppress())
2474 continue;
2475 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2476 dout(10) << "share_inode_max_size with client." << client << dendl;
2477 cap->inc_last_seq();
2478 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2479 in->ino(),
2480 in->find_snaprealm()->inode->ino(),
2481 cap->get_cap_id(), cap->get_last_seq(),
2482 cap->pending(), cap->wanted(), 0,
2483 cap->get_mseq(),
2484 mds->get_osd_epoch_barrier());
2485 in->encode_cap_message(m, cap);
2486 mds->send_message_client_counted(m, client);
2487 }
2488 if (only_cap)
2489 break;
2490 }
2491 }
2492
2493 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2494 {
2495 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2496 * pending mutations. */
2497 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2498 in->filelock.is_unstable_and_locked()) ||
2499 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2500 in->authlock.is_unstable_and_locked()) ||
2501 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2502 in->linklock.is_unstable_and_locked()) ||
2503 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2504 in->xattrlock.is_unstable_and_locked()))
2505 return true;
2506 return false;
2507 }
2508
2509 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2510 {
2511 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2512 dout(10) << " wanted " << ccap_string(cap->wanted())
2513 << " -> " << ccap_string(wanted) << dendl;
2514 cap->set_wanted(wanted);
2515 } else if (wanted & ~cap->wanted()) {
2516 dout(10) << " wanted " << ccap_string(cap->wanted())
2517 << " -> " << ccap_string(wanted)
2518 << " (added caps even though we had seq mismatch!)" << dendl;
2519 cap->set_wanted(wanted | cap->wanted());
2520 } else {
2521 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2522 << " -> " << ccap_string(wanted)
2523 << " (issue_seq " << issue_seq << " != last_issue "
2524 << cap->get_last_issue() << ")" << dendl;
2525 return;
2526 }
2527
2528 CInode *cur = cap->get_inode();
2529 if (!cur->is_auth()) {
2530 request_inode_file_caps(cur);
2531 return;
2532 }
2533
2534 if (cap->wanted() == 0) {
2535 if (cur->item_open_file.is_on_list() &&
2536 !cur->is_any_caps_wanted()) {
2537 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2538 cur->item_open_file.remove_myself();
2539 }
2540 } else {
2541 if (cur->state_test(CInode::STATE_RECOVERING) &&
2542 (cap->wanted() & (CEPH_CAP_FILE_RD |
2543 CEPH_CAP_FILE_WR))) {
2544 mds->mdcache->recovery_queue.prioritize(cur);
2545 }
2546
2547 if (!cur->item_open_file.is_on_list()) {
2548 dout(10) << " adding to open file list " << *cur << dendl;
2549 assert(cur->last == CEPH_NOSNAP);
2550 LogSegment *ls = mds->mdlog->get_current_segment();
2551 EOpen *le = new EOpen(mds->mdlog);
2552 mds->mdlog->start_entry(le);
2553 le->add_clean_inode(cur);
2554 ls->open_files.push_back(&cur->item_open_file);
2555 mds->mdlog->submit_entry(le);
2556 }
2557 }
2558 }
2559
2560
2561
2562 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2563 {
2564 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2565 for (auto p = head_in->client_need_snapflush.begin();
2566 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2567 snapid_t snapid = p->first;
2568 set<client_t>& clients = p->second;
2569 ++p; // be careful, q loop below depends on this
2570
2571 if (clients.count(client)) {
2572 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2573 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2574 assert(sin);
2575 assert(sin->first <= snapid);
2576 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2577 head_in->remove_need_snapflush(sin, snapid, client);
2578 }
2579 }
2580 }
2581
2582
2583 bool Locker::should_defer_client_cap_frozen(CInode *in)
2584 {
2585 /*
2586 * This policy needs to be AT LEAST as permissive as allowing a client request
2587 * to go forward, or else a client request can release something, the release
2588 * gets deferred, but the request gets processed and deadlocks because when the
2589 * caps can't get revoked.
2590 *
2591 * Currently, a request wait if anything locked is freezing (can't
2592 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2593 * _MUST_ be in the lock/auth_pin set.
2594 *
2595 * auth_pins==0 implies no unstable lock and not auth pinnned by
2596 * client request, otherwise continue even it's freezing.
2597 */
2598 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2599 }
2600
2601 /*
2602 * This function DOES put the passed message before returning
2603 */
2604 void Locker::handle_client_caps(MClientCaps *m)
2605 {
2606 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
2607 client_t client = m->get_source().num();
2608
2609 snapid_t follows = m->get_snap_follows();
2610 dout(7) << "handle_client_caps "
2611 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2612 << " on " << m->get_ino()
2613 << " tid " << m->get_client_tid() << " follows " << follows
2614 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2615
2616 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2617 if (!session) {
2618 dout(5) << " no session, dropping " << *m << dendl;
2619 m->put();
2620 return;
2621 }
2622 if (session->is_closed() ||
2623 session->is_closing() ||
2624 session->is_killing()) {
2625 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2626 m->put();
2627 return;
2628 }
2629 if (mds->is_reconnect() &&
2630 m->get_dirty() && m->get_client_tid() > 0 &&
2631 !session->have_completed_flush(m->get_client_tid())) {
2632 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2633 }
2634 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2635 return;
2636 }
2637
2638 if (m->get_client_tid() > 0 && session &&
2639 session->have_completed_flush(m->get_client_tid())) {
2640 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2641 << " for client." << client << dendl;
2642 MClientCaps *ack;
2643 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2644 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2645 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2646 } else {
2647 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2648 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2649 mds->get_osd_epoch_barrier());
2650 }
2651 ack->set_snap_follows(follows);
2652 ack->set_client_tid(m->get_client_tid());
2653 mds->send_message_client_counted(ack, m->get_connection());
2654 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2655 m->put();
2656 return;
2657 } else {
2658 // fall-thru because the message may release some caps
2659 m->clear_dirty();
2660 m->set_op(CEPH_CAP_OP_UPDATE);
2661 }
2662 }
2663
2664 // "oldest flush tid" > 0 means client uses unique TID for each flush
2665 if (m->get_oldest_flush_tid() > 0 && session) {
2666 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2667 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2668
2669 if (session->get_num_trim_flushes_warnings() > 0 &&
2670 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2671 session->reset_num_trim_flushes_warnings();
2672 } else {
2673 if (session->get_num_completed_flushes() >=
2674 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2675 session->inc_num_trim_flushes_warnings();
2676 stringstream ss;
2677 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2678 << m->get_oldest_flush_tid() << "), "
2679 << session->get_num_completed_flushes()
2680 << " completed flushes recorded in session";
2681 mds->clog->warn() << ss.str();
2682 dout(20) << __func__ << " " << ss.str() << dendl;
2683 }
2684 }
2685 }
2686
2687 CInode *head_in = mdcache->get_inode(m->get_ino());
2688 if (!head_in) {
2689 if (mds->is_clientreplay()) {
2690 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2691 << ", will try again after replayed client requests" << dendl;
2692 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2693 return;
2694 }
2695 dout(1) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2696 m->put();
2697 return;
2698 }
2699
2700 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2701 // Pause RADOS operations until we see the required epoch
2702 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2703 }
2704
2705 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2706 // Record the barrier so that we will retransmit it to clients
2707 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2708 }
2709
2710 dout(10) << " head inode " << *head_in << dendl;
2711
2712 Capability *cap = 0;
2713 cap = head_in->get_client_cap(client);
2714 if (!cap) {
2715 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
2716 m->put();
2717 return;
2718 }
2719 assert(cap);
2720
2721 // freezing|frozen?
2722 if (should_defer_client_cap_frozen(head_in)) {
2723 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2724 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2725 return;
2726 }
2727 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2728 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2729 << ", dropping" << dendl;
2730 m->put();
2731 return;
2732 }
2733
2734 int op = m->get_op();
2735
2736 // flushsnap?
2737 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2738 if (!head_in->is_auth()) {
2739 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
2740 goto out;
2741 }
2742
2743 SnapRealm *realm = head_in->find_snaprealm();
2744 snapid_t snap = realm->get_snap_following(follows);
2745 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2746
2747 CInode *in = head_in;
2748 if (snap != CEPH_NOSNAP) {
2749 in = mdcache->pick_inode_snap(head_in, snap - 1);
2750 if (in != head_in)
2751 dout(10) << " snapped inode " << *in << dendl;
2752 }
2753
2754 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2755 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2756 // case we get a dup response, so whatever.)
2757 MClientCaps *ack = 0;
2758 if (m->get_dirty()) {
2759 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2760 ack->set_snap_follows(follows);
2761 ack->set_client_tid(m->get_client_tid());
2762 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2763 }
2764
2765 if (in == head_in ||
2766 (head_in->client_need_snapflush.count(snap) &&
2767 head_in->client_need_snapflush[snap].count(client))) {
2768 dout(7) << " flushsnap snap " << snap
2769 << " client." << client << " on " << *in << dendl;
2770
2771 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2772 if (in == head_in)
2773 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2774 else if (head_in->client_need_snapflush.begin()->first < snap)
2775 _do_null_snapflush(head_in, client, snap);
2776
2777 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2778
2779 if (in != head_in)
2780 head_in->remove_need_snapflush(in, snap, client);
2781 } else {
2782 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2783 if (ack)
2784 mds->send_message_client_counted(ack, m->get_connection());
2785 }
2786 goto out;
2787 }
2788
2789 if (cap->get_cap_id() != m->get_cap_id()) {
2790 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2791 } else {
2792 CInode *in = head_in;
2793 if (follows > 0) {
2794 in = mdcache->pick_inode_snap(head_in, follows);
2795 // intermediate snap inodes
2796 while (in != head_in) {
2797 assert(in->last != CEPH_NOSNAP);
2798 if (in->is_auth() && m->get_dirty()) {
2799 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2800 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2801 }
2802 in = mdcache->pick_inode_snap(head_in, in->last);
2803 }
2804 }
2805
2806 // head inode, and cap
2807 MClientCaps *ack = 0;
2808
2809 int caps = m->get_caps();
2810 if (caps & ~cap->issued()) {
2811 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2812 caps &= cap->issued();
2813 }
2814
2815 cap->confirm_receipt(m->get_seq(), caps);
2816 dout(10) << " follows " << follows
2817 << " retains " << ccap_string(m->get_caps())
2818 << " dirty " << ccap_string(m->get_dirty())
2819 << " on " << *in << dendl;
2820
2821
2822 // missing/skipped snapflush?
2823 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2824 // presently only does so when it has actual dirty metadata. But, we
2825 // set up the need_snapflush stuff based on the issued caps.
2826 // We can infer that the client WONT send a FLUSHSNAP once they have
2827 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2828 // update/release).
2829 if (!head_in->client_need_snapflush.empty()) {
2830 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2831 _do_null_snapflush(head_in, client);
2832 } else {
2833 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2834 }
2835 }
2836
2837 if (m->get_dirty() && in->is_auth()) {
2838 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2839 << " seq " << m->get_seq() << " on " << *in << dendl;
2840 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2841 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2842 ack->set_client_tid(m->get_client_tid());
2843 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2844 }
2845
2846 // filter wanted based on what we could ever give out (given auth/replica status)
2847 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2848 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2849 if (new_wanted != cap->wanted()) {
2850 if (!need_flush && (new_wanted & ~cap->pending())) {
2851 // exapnding caps. make sure we aren't waiting for a log flush
2852 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2853 }
2854
2855 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2856 }
2857
2858 if (in->is_auth() &&
2859 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2860 // updated
2861 eval(in, CEPH_CAP_LOCKS);
2862
2863 if (!need_flush && (cap->wanted() & ~cap->pending()))
2864 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2865 } else {
2866 // no update, ack now.
2867 if (ack)
2868 mds->send_message_client_counted(ack, m->get_connection());
2869
2870 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2871 if (!did_issue && (cap->wanted() & ~cap->pending()))
2872 issue_caps(in, cap);
2873
2874 if (cap->get_last_seq() == 0 &&
2875 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2876 cap->issue_norevoke(cap->issued());
2877 share_inode_max_size(in, cap);
2878 }
2879 }
2880
2881 if (need_flush)
2882 mds->mdlog->flush();
2883 }
2884
2885 out:
2886 m->put();
2887 }
2888
2889
2890 class C_Locker_RetryRequestCapRelease : public LockerContext {
2891 client_t client;
2892 ceph_mds_request_release item;
2893 public:
2894 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2895 LockerContext(l), client(c), item(it) { }
2896 void finish(int r) override {
2897 string dname;
2898 MDRequestRef null_ref;
2899 locker->process_request_cap_release(null_ref, client, item, dname);
2900 }
2901 };
2902
2903 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2904 const string &dname)
2905 {
2906 inodeno_t ino = (uint64_t)item.ino;
2907 uint64_t cap_id = item.cap_id;
2908 int caps = item.caps;
2909 int wanted = item.wanted;
2910 int seq = item.seq;
2911 int issue_seq = item.issue_seq;
2912 int mseq = item.mseq;
2913
2914 CInode *in = mdcache->get_inode(ino);
2915 if (!in)
2916 return;
2917
2918 if (dname.length()) {
2919 frag_t fg = in->pick_dirfrag(dname);
2920 CDir *dir = in->get_dirfrag(fg);
2921 if (dir) {
2922 CDentry *dn = dir->lookup(dname);
2923 if (dn) {
2924 ClientLease *l = dn->get_client_lease(client);
2925 if (l) {
2926 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2927 dn->remove_client_lease(l, this);
2928 } else {
2929 dout(7) << "process_cap_release client." << client
2930 << " doesn't have lease on " << *dn << dendl;
2931 }
2932 } else {
2933 dout(7) << "process_cap_release client." << client << " released lease on dn "
2934 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2935 }
2936 }
2937 }
2938
2939 Capability *cap = in->get_client_cap(client);
2940 if (!cap)
2941 return;
2942
2943 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2944 << (mdr ? "" : " (DEFERRED, no mdr)")
2945 << dendl;
2946
2947 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2948 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2949 return;
2950 }
2951
2952 if (cap->get_cap_id() != cap_id) {
2953 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2954 return;
2955 }
2956
2957 if (should_defer_client_cap_frozen(in)) {
2958 dout(7) << " frozen, deferring" << dendl;
2959 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2960 return;
2961 }
2962
2963 if (caps & ~cap->issued()) {
2964 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2965 caps &= cap->issued();
2966 }
2967 cap->confirm_receipt(seq, caps);
2968
2969 if (!in->client_need_snapflush.empty() &&
2970 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2971 _do_null_snapflush(in, client);
2972 }
2973
2974 adjust_cap_wanted(cap, wanted, issue_seq);
2975
2976 if (mdr)
2977 cap->inc_suppress();
2978 eval(in, CEPH_CAP_LOCKS);
2979 if (mdr)
2980 cap->dec_suppress();
2981
2982 // take note; we may need to reissue on this cap later
2983 if (mdr)
2984 mdr->cap_releases[in->vino()] = cap->get_last_seq();
2985 }
2986
2987 class C_Locker_RetryKickIssueCaps : public LockerContext {
2988 CInode *in;
2989 client_t client;
2990 ceph_seq_t seq;
2991 public:
2992 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
2993 LockerContext(l), in(i), client(c), seq(s) {
2994 in->get(CInode::PIN_PTRWAITER);
2995 }
2996 void finish(int r) override {
2997 locker->kick_issue_caps(in, client, seq);
2998 in->put(CInode::PIN_PTRWAITER);
2999 }
3000 };
3001
3002 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3003 {
3004 Capability *cap = in->get_client_cap(client);
3005 if (!cap || cap->get_last_sent() != seq)
3006 return;
3007 if (in->is_frozen()) {
3008 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3009 in->add_waiter(CInode::WAIT_UNFREEZE,
3010 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3011 return;
3012 }
3013 dout(10) << "kick_issue_caps released at current seq " << seq
3014 << ", reissuing" << dendl;
3015 issue_caps(in, cap);
3016 }
3017
3018 void Locker::kick_cap_releases(MDRequestRef& mdr)
3019 {
3020 client_t client = mdr->get_client();
3021 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3022 p != mdr->cap_releases.end();
3023 ++p) {
3024 CInode *in = mdcache->get_inode(p->first);
3025 if (!in)
3026 continue;
3027 kick_issue_caps(in, client, p->second);
3028 }
3029 }
3030
3031 /**
3032 * m and ack might be NULL, so don't dereference them unless dirty != 0
3033 */
3034 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3035 {
3036 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3037 << " follows " << follows << " snap " << snap
3038 << " on " << *in << dendl;
3039
3040 if (snap == CEPH_NOSNAP) {
3041 // hmm, i guess snap was already deleted? just ack!
3042 dout(10) << " wow, the snap following " << follows
3043 << " was already deleted. nothing to record, just ack." << dendl;
3044 if (ack)
3045 mds->send_message_client_counted(ack, m->get_connection());
3046 return;
3047 }
3048
3049 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3050 mds->mdlog->start_entry(le);
3051 MutationRef mut = new MutationImpl();
3052 mut->ls = mds->mdlog->get_current_segment();
3053
3054 // normal metadata updates that we can apply to the head as well.
3055
3056 // update xattrs?
3057 bool xattrs = false;
3058 map<string,bufferptr> *px = 0;
3059 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3060 m->xattrbl.length() &&
3061 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3062 xattrs = true;
3063
3064 old_inode_t *oi = 0;
3065 if (in->is_multiversion()) {
3066 oi = in->pick_old_inode(snap);
3067 }
3068
3069 inode_t *pi;
3070 if (oi) {
3071 dout(10) << " writing into old inode" << dendl;
3072 pi = in->project_inode();
3073 pi->version = in->pre_dirty();
3074 if (snap > oi->first)
3075 in->split_old_inode(snap);
3076 pi = &oi->inode;
3077 if (xattrs)
3078 px = &oi->xattrs;
3079 } else {
3080 if (xattrs)
3081 px = new map<string,bufferptr>;
3082 pi = in->project_inode(px);
3083 pi->version = in->pre_dirty();
3084 }
3085
3086 _update_cap_fields(in, dirty, m, pi);
3087
3088 // xattr
3089 if (px) {
3090 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version
3091 << " len " << m->xattrbl.length() << dendl;
3092 pi->xattr_version = m->head.xattr_version;
3093 bufferlist::iterator p = m->xattrbl.begin();
3094 ::decode(*px, p);
3095 }
3096
3097 if (pi->client_ranges.count(client)) {
3098 if (in->last == snap) {
3099 dout(10) << " removing client_range entirely" << dendl;
3100 pi->client_ranges.erase(client);
3101 } else {
3102 dout(10) << " client_range now follows " << snap << dendl;
3103 pi->client_ranges[client].follows = snap;
3104 }
3105 }
3106
3107 mut->auth_pin(in);
3108 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3109 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3110
3111 // "oldest flush tid" > 0 means client uses unique TID for each flush
3112 if (ack && ack->get_oldest_flush_tid() > 0)
3113 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3114 ack->get_oldest_flush_tid());
3115
3116 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3117 client, ack));
3118 }
3119
3120 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t *pi)
3121 {
3122 if (dirty == 0)
3123 return;
3124
3125 /* m must be valid if there are dirty caps */
3126 assert(m);
3127 uint64_t features = m->get_connection()->get_features();
3128
3129 if (m->get_ctime() > pi->ctime) {
3130 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3131 << " for " << *in << dendl;
3132 pi->ctime = m->get_ctime();
3133 }
3134
3135 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3136 m->get_change_attr() > pi->change_attr) {
3137 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3138 << " for " << *in << dendl;
3139 pi->change_attr = m->get_change_attr();
3140 }
3141
3142 // file
3143 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3144 utime_t atime = m->get_atime();
3145 utime_t mtime = m->get_mtime();
3146 uint64_t size = m->get_size();
3147 version_t inline_version = m->inline_version;
3148
3149 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3150 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3151 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3152 << " for " << *in << dendl;
3153 pi->mtime = mtime;
3154 }
3155 if (in->inode.is_file() && // ONLY if regular file
3156 size > pi->size) {
3157 dout(7) << " size " << pi->size << " -> " << size
3158 << " for " << *in << dendl;
3159 pi->size = size;
3160 pi->rstat.rbytes = size;
3161 }
3162 if (in->inode.is_file() &&
3163 (dirty & CEPH_CAP_FILE_WR) &&
3164 inline_version > pi->inline_data.version) {
3165 pi->inline_data.version = inline_version;
3166 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3167 pi->inline_data.get_data() = m->inline_data;
3168 else
3169 pi->inline_data.free_data();
3170 }
3171 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3172 dout(7) << " atime " << pi->atime << " -> " << atime
3173 << " for " << *in << dendl;
3174 pi->atime = atime;
3175 }
3176 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3177 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3178 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3179 << " for " << *in << dendl;
3180 pi->time_warp_seq = m->get_time_warp_seq();
3181 }
3182 }
3183 // auth
3184 if (dirty & CEPH_CAP_AUTH_EXCL) {
3185 if (m->head.uid != pi->uid) {
3186 dout(7) << " uid " << pi->uid
3187 << " -> " << m->head.uid
3188 << " for " << *in << dendl;
3189 pi->uid = m->head.uid;
3190 }
3191 if (m->head.gid != pi->gid) {
3192 dout(7) << " gid " << pi->gid
3193 << " -> " << m->head.gid
3194 << " for " << *in << dendl;
3195 pi->gid = m->head.gid;
3196 }
3197 if (m->head.mode != pi->mode) {
3198 dout(7) << " mode " << oct << pi->mode
3199 << " -> " << m->head.mode << dec
3200 << " for " << *in << dendl;
3201 pi->mode = m->head.mode;
3202 }
3203 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3204 dout(7) << " btime " << oct << pi->btime
3205 << " -> " << m->get_btime() << dec
3206 << " for " << *in << dendl;
3207 pi->btime = m->get_btime();
3208 }
3209 }
3210 }
3211
3212 /*
3213 * update inode based on cap flush|flushsnap|wanted.
3214 * adjust max_size, if needed.
3215 * if we update, return true; otherwise, false (no updated needed).
3216 */
3217 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3218 int dirty, snapid_t follows,
3219 MClientCaps *m, MClientCaps *ack,
3220 bool *need_flush)
3221 {
3222 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3223 << " issued " << ccap_string(cap ? cap->issued() : 0)
3224 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3225 << " on " << *in << dendl;
3226 assert(in->is_auth());
3227 client_t client = m->get_source().num();
3228 inode_t *latest = in->get_projected_inode();
3229
3230 // increase or zero max_size?
3231 uint64_t size = m->get_size();
3232 bool change_max = false;
3233 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3234 uint64_t new_max = old_max;
3235
3236 if (in->is_file()) {
3237 bool forced_change_max = false;
3238 dout(20) << "inode is file" << dendl;
3239 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3240 dout(20) << "client has write caps; m->get_max_size="
3241 << m->get_max_size() << "; old_max=" << old_max << dendl;
3242 if (m->get_max_size() > new_max) {
3243 dout(10) << "client requests file_max " << m->get_max_size()
3244 << " > max " << old_max << dendl;
3245 change_max = true;
3246 forced_change_max = true;
3247 new_max = calc_new_max_size(latest, m->get_max_size());
3248 } else {
3249 new_max = calc_new_max_size(latest, size);
3250
3251 if (new_max > old_max)
3252 change_max = true;
3253 else
3254 new_max = old_max;
3255 }
3256 } else {
3257 if (old_max) {
3258 change_max = true;
3259 new_max = 0;
3260 }
3261 }
3262
3263 if (in->last == CEPH_NOSNAP &&
3264 change_max &&
3265 !in->filelock.can_wrlock(client) &&
3266 !in->filelock.can_force_wrlock(client)) {
3267 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3268 if (in->filelock.is_stable()) {
3269 bool need_issue = false;
3270 if (cap)
3271 cap->inc_suppress();
3272 if (in->mds_caps_wanted.empty() &&
3273 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3274 if (in->filelock.get_state() != LOCK_EXCL)
3275 file_excl(&in->filelock, &need_issue);
3276 } else
3277 simple_lock(&in->filelock, &need_issue);
3278 if (need_issue)
3279 issue_caps(in);
3280 if (cap)
3281 cap->dec_suppress();
3282 }
3283 if (!in->filelock.can_wrlock(client) &&
3284 !in->filelock.can_force_wrlock(client)) {
3285 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3286 forced_change_max ? new_max : 0,
3287 0, utime_t());
3288
3289 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3290 change_max = false;
3291 }
3292 }
3293 }
3294
3295 if (m->flockbl.length()) {
3296 int32_t num_locks;
3297 bufferlist::iterator bli = m->flockbl.begin();
3298 ::decode(num_locks, bli);
3299 for ( int i=0; i < num_locks; ++i) {
3300 ceph_filelock decoded_lock;
3301 ::decode(decoded_lock, bli);
3302 in->get_fcntl_lock_state()->held_locks.
3303 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3304 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3305 }
3306 ::decode(num_locks, bli);
3307 for ( int i=0; i < num_locks; ++i) {
3308 ceph_filelock decoded_lock;
3309 ::decode(decoded_lock, bli);
3310 in->get_flock_lock_state()->held_locks.
3311 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3312 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3313 }
3314 }
3315
3316 if (!dirty && !change_max)
3317 return false;
3318
3319 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3320 if (session->check_access(in, MAY_WRITE,
3321 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3322 session->put();
3323 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3324 return false;
3325 }
3326 session->put();
3327
3328 // do the update.
3329 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3330 mds->mdlog->start_entry(le);
3331
3332 // xattrs update?
3333 map<string,bufferptr> *px = 0;
3334 if ((dirty & CEPH_CAP_XATTR_EXCL) &&
3335 m->xattrbl.length() &&
3336 m->head.xattr_version > in->get_projected_inode()->xattr_version)
3337 px = new map<string,bufferptr>;
3338
3339 inode_t *pi = in->project_inode(px);
3340 pi->version = in->pre_dirty();
3341
3342 MutationRef mut(new MutationImpl());
3343 mut->ls = mds->mdlog->get_current_segment();
3344
3345 _update_cap_fields(in, dirty, m, pi);
3346
3347 if (change_max) {
3348 dout(7) << " max_size " << old_max << " -> " << new_max
3349 << " for " << *in << dendl;
3350 if (new_max) {
3351 pi->client_ranges[client].range.first = 0;
3352 pi->client_ranges[client].range.last = new_max;
3353 pi->client_ranges[client].follows = in->first - 1;
3354 } else
3355 pi->client_ranges.erase(client);
3356 }
3357
3358 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3359 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3360
3361 // auth
3362 if (dirty & CEPH_CAP_AUTH_EXCL)
3363 wrlock_force(&in->authlock, mut);
3364
3365 // xattr
3366 if (px) {
3367 dout(7) << " xattrs v" << pi->xattr_version << " -> " << m->head.xattr_version << dendl;
3368 pi->xattr_version = m->head.xattr_version;
3369 bufferlist::iterator p = m->xattrbl.begin();
3370 ::decode(*px, p);
3371
3372 wrlock_force(&in->xattrlock, mut);
3373 }
3374
3375 mut->auth_pin(in);
3376 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3377 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3378
3379 // "oldest flush tid" > 0 means client uses unique TID for each flush
3380 if (ack && ack->get_oldest_flush_tid() > 0)
3381 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3382 ack->get_oldest_flush_tid());
3383
3384 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3385 change_max, !!cap,
3386 client, ack));
3387 if (need_flush && !*need_flush &&
3388 ((change_max && new_max) || // max INCREASE
3389 _need_flush_mdlog(in, dirty)))
3390 *need_flush = true;
3391
3392 return true;
3393 }
3394
3395 /* This function DOES put the passed message before returning */
3396 void Locker::handle_client_cap_release(MClientCapRelease *m)
3397 {
3398 client_t client = m->get_source().num();
3399 dout(10) << "handle_client_cap_release " << *m << dendl;
3400
3401 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3402 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3403 return;
3404 }
3405
3406 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3407 // Pause RADOS operations until we see the required epoch
3408 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3409 }
3410
3411 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3412 // Record the barrier so that we will retransmit it to clients
3413 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3414 }
3415
3416 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
3417
3418 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3419 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3420 }
3421
3422 if (session) {
3423 session->notify_cap_release(m->caps.size());
3424 }
3425
3426 m->put();
3427 }
3428
3429 class C_Locker_RetryCapRelease : public LockerContext {
3430 client_t client;
3431 inodeno_t ino;
3432 uint64_t cap_id;
3433 ceph_seq_t migrate_seq;
3434 ceph_seq_t issue_seq;
3435 public:
3436 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3437 ceph_seq_t mseq, ceph_seq_t seq) :
3438 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3439 void finish(int r) override {
3440 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3441 }
3442 };
3443
3444 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3445 ceph_seq_t mseq, ceph_seq_t seq)
3446 {
3447 CInode *in = mdcache->get_inode(ino);
3448 if (!in) {
3449 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3450 return;
3451 }
3452 Capability *cap = in->get_client_cap(client);
3453 if (!cap) {
3454 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3455 return;
3456 }
3457
3458 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3459 if (cap->get_cap_id() != cap_id) {
3460 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3461 return;
3462 }
3463 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3464 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3465 return;
3466 }
3467 if (should_defer_client_cap_frozen(in)) {
3468 dout(7) << " freezing|frozen, deferring" << dendl;
3469 in->add_waiter(CInode::WAIT_UNFREEZE,
3470 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3471 return;
3472 }
3473 if (seq != cap->get_last_issue()) {
3474 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3475 // clean out any old revoke history
3476 cap->clean_revoke_from(seq);
3477 eval_cap_gather(in);
3478 return;
3479 }
3480 remove_client_cap(in, client);
3481 }
3482
3483 /* This function DOES put the passed message before returning */
3484
3485 void Locker::remove_client_cap(CInode *in, client_t client)
3486 {
3487 // clean out any pending snapflush state
3488 if (!in->client_need_snapflush.empty())
3489 _do_null_snapflush(in, client);
3490
3491 in->remove_client_cap(client);
3492
3493 if (in->is_auth()) {
3494 // make sure we clear out the client byte range
3495 if (in->get_projected_inode()->client_ranges.count(client) &&
3496 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3497 check_inode_max_size(in);
3498 } else {
3499 request_inode_file_caps(in);
3500 }
3501
3502 try_eval(in, CEPH_CAP_LOCKS);
3503 }
3504
3505
3506 /**
3507 * Return true if any currently revoking caps exceed the
3508 * mds_session_timeout threshold.
3509 */
3510 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3511 {
3512 xlist<Capability*>::const_iterator p = revoking.begin();
3513 if (p.end()) {
3514 // No revoking caps at the moment
3515 return false;
3516 } else {
3517 utime_t now = ceph_clock_now();
3518 utime_t age = now - (*p)->get_last_revoke_stamp();
3519 if (age <= g_conf->mds_session_timeout) {
3520 return false;
3521 } else {
3522 return true;
3523 }
3524 }
3525 }
3526
3527
3528 void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3529 {
3530 if (!any_late_revoking_caps(revoking_caps)) {
3531 // Fast path: no misbehaving clients, execute in O(1)
3532 return;
3533 }
3534
3535 // Slow path: execute in O(N_clients)
3536 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3537 for (client_rc_iter = revoking_caps_by_client.begin();
3538 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3539 xlist<Capability*> const &client_rc = client_rc_iter->second;
3540 bool any_late = any_late_revoking_caps(client_rc);
3541 if (any_late) {
3542 result->push_back(client_rc_iter->first);
3543 }
3544 }
3545 }
3546
3547 // Hard-code instead of surfacing a config settings because this is
3548 // really a hack that should go away at some point when we have better
3549 // inspection tools for getting at detailed cap state (#7316)
3550 #define MAX_WARN_CAPS 100
3551
3552 void Locker::caps_tick()
3553 {
3554 utime_t now = ceph_clock_now();
3555
3556 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3557
3558 int i = 0;
3559 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3560 Capability *cap = *p;
3561
3562 utime_t age = now - cap->get_last_revoke_stamp();
3563 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3564 if (age <= g_conf->mds_session_timeout) {
3565 dout(20) << __func__ << " age below timeout " << g_conf->mds_session_timeout << dendl;
3566 break;
3567 } else {
3568 ++i;
3569 if (i > MAX_WARN_CAPS) {
3570 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3571 << "revoking, ignoring subsequent caps" << dendl;
3572 break;
3573 }
3574 }
3575 // exponential backoff of warning intervals
3576 if (age > g_conf->mds_session_timeout * (1 << cap->get_num_revoke_warnings())) {
3577 cap->inc_num_revoke_warnings();
3578 stringstream ss;
3579 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3580 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3581 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3582 mds->clog->warn() << ss.str();
3583 dout(20) << __func__ << " " << ss.str() << dendl;
3584 } else {
3585 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3586 }
3587 }
3588 }
3589
3590
3591 void Locker::handle_client_lease(MClientLease *m)
3592 {
3593 dout(10) << "handle_client_lease " << *m << dendl;
3594
3595 assert(m->get_source().is_client());
3596 client_t client = m->get_source().num();
3597
3598 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3599 if (!in) {
3600 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3601 m->put();
3602 return;
3603 }
3604 CDentry *dn = 0;
3605
3606 frag_t fg = in->pick_dirfrag(m->dname);
3607 CDir *dir = in->get_dirfrag(fg);
3608 if (dir)
3609 dn = dir->lookup(m->dname);
3610 if (!dn) {
3611 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3612 m->put();
3613 return;
3614 }
3615 dout(10) << " on " << *dn << dendl;
3616
3617 // replica and lock
3618 ClientLease *l = dn->get_client_lease(client);
3619 if (!l) {
3620 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3621 m->put();
3622 return;
3623 }
3624
3625 switch (m->get_action()) {
3626 case CEPH_MDS_LEASE_REVOKE_ACK:
3627 case CEPH_MDS_LEASE_RELEASE:
3628 if (l->seq != m->get_seq()) {
3629 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3630 } else {
3631 dout(7) << "handle_client_lease client." << client
3632 << " on " << *dn << dendl;
3633 dn->remove_client_lease(l, this);
3634 }
3635 m->put();
3636 break;
3637
3638 case CEPH_MDS_LEASE_RENEW:
3639 {
3640 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3641 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3642 if (dn->lock.can_lease(client)) {
3643 int pool = 1; // fixme.. do something smart!
3644 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3645 m->h.seq = ++l->seq;
3646 m->clear_payload();
3647
3648 utime_t now = ceph_clock_now();
3649 now += mdcache->client_lease_durations[pool];
3650 mdcache->touch_client_lease(l, pool, now);
3651
3652 mds->send_message_client_counted(m, m->get_connection());
3653 }
3654 }
3655 break;
3656
3657 default:
3658 ceph_abort(); // implement me
3659 break;
3660 }
3661 }
3662
3663
3664 void Locker::issue_client_lease(CDentry *dn, client_t client,
3665 bufferlist &bl, utime_t now, Session *session)
3666 {
3667 CInode *diri = dn->get_dir()->get_inode();
3668 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3669 ((!diri->filelock.can_lease(client) &&
3670 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3671 dn->lock.can_lease(client)) {
3672 int pool = 1; // fixme.. do something smart!
3673 // issue a dentry lease
3674 ClientLease *l = dn->add_client_lease(client, session);
3675 session->touch_lease(l);
3676
3677 now += mdcache->client_lease_durations[pool];
3678 mdcache->touch_client_lease(l, pool, now);
3679
3680 LeaseStat e;
3681 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3682 e.seq = ++l->seq;
3683 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3684 ::encode(e, bl);
3685 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3686 << " on " << *dn << dendl;
3687 } else {
3688 // null lease
3689 LeaseStat e;
3690 e.mask = 0;
3691 e.seq = 0;
3692 e.duration_ms = 0;
3693 ::encode(e, bl);
3694 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3695 }
3696 }
3697
3698
3699 void Locker::revoke_client_leases(SimpleLock *lock)
3700 {
3701 int n = 0;
3702 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3703 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3704 p != dn->client_lease_map.end();
3705 ++p) {
3706 ClientLease *l = p->second;
3707
3708 n++;
3709 assert(lock->get_type() == CEPH_LOCK_DN);
3710
3711 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3712 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3713
3714 // i should also revoke the dir ICONTENT lease, if they have it!
3715 CInode *diri = dn->get_dir()->get_inode();
3716 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3717 mask,
3718 diri->ino(),
3719 diri->first, CEPH_NOSNAP,
3720 dn->get_name()),
3721 l->client);
3722 }
3723 }
3724
3725
3726
3727 // locks ----------------------------------------------------------------
3728
3729 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3730 {
3731 switch (lock_type) {
3732 case CEPH_LOCK_DN:
3733 {
3734 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3735 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3736 frag_t fg;
3737 CDir *dir = 0;
3738 CDentry *dn = 0;
3739 if (diri) {
3740 fg = diri->pick_dirfrag(info.dname);
3741 dir = diri->get_dirfrag(fg);
3742 if (dir)
3743 dn = dir->lookup(info.dname, info.snapid);
3744 }
3745 if (!dn) {
3746 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3747 return 0;
3748 }
3749 return &dn->lock;
3750 }
3751
3752 case CEPH_LOCK_IAUTH:
3753 case CEPH_LOCK_ILINK:
3754 case CEPH_LOCK_IDFT:
3755 case CEPH_LOCK_IFILE:
3756 case CEPH_LOCK_INEST:
3757 case CEPH_LOCK_IXATTR:
3758 case CEPH_LOCK_ISNAP:
3759 case CEPH_LOCK_IFLOCK:
3760 case CEPH_LOCK_IPOLICY:
3761 {
3762 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3763 if (!in) {
3764 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3765 return 0;
3766 }
3767 switch (lock_type) {
3768 case CEPH_LOCK_IAUTH: return &in->authlock;
3769 case CEPH_LOCK_ILINK: return &in->linklock;
3770 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3771 case CEPH_LOCK_IFILE: return &in->filelock;
3772 case CEPH_LOCK_INEST: return &in->nestlock;
3773 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3774 case CEPH_LOCK_ISNAP: return &in->snaplock;
3775 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3776 case CEPH_LOCK_IPOLICY: return &in->policylock;
3777 }
3778 }
3779
3780 default:
3781 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3782 ceph_abort();
3783 break;
3784 }
3785
3786 return 0;
3787 }
3788
3789 /* This function DOES put the passed message before returning */
3790 void Locker::handle_lock(MLock *m)
3791 {
3792 // nobody should be talking to us during recovery.
3793 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3794
3795 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3796 if (!lock) {
3797 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3798 m->put();
3799 return;
3800 }
3801
3802 switch (lock->get_type()) {
3803 case CEPH_LOCK_DN:
3804 case CEPH_LOCK_IAUTH:
3805 case CEPH_LOCK_ILINK:
3806 case CEPH_LOCK_ISNAP:
3807 case CEPH_LOCK_IXATTR:
3808 case CEPH_LOCK_IFLOCK:
3809 case CEPH_LOCK_IPOLICY:
3810 handle_simple_lock(lock, m);
3811 break;
3812
3813 case CEPH_LOCK_IDFT:
3814 case CEPH_LOCK_INEST:
3815 //handle_scatter_lock((ScatterLock*)lock, m);
3816 //break;
3817
3818 case CEPH_LOCK_IFILE:
3819 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3820 break;
3821
3822 default:
3823 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3824 ceph_abort();
3825 break;
3826 }
3827 }
3828
3829
3830
3831
3832
3833 // ==========================================================================
3834 // simple lock
3835
3836 /** This function may take a reference to m if it needs one, but does
3837 * not put references. */
3838 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3839 {
3840 MDSCacheObject *parent = lock->get_parent();
3841 if (parent->is_auth() &&
3842 lock->get_state() != LOCK_SYNC &&
3843 !parent->is_frozen()) {
3844 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3845 << " on " << *parent << dendl;
3846 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3847 if (lock->is_stable()) {
3848 simple_sync(lock);
3849 } else {
3850 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3851 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3852 new C_MDS_RetryMessage(mds, m->get()));
3853 }
3854 } else {
3855 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3856 << " on " << *parent << dendl;
3857 // replica should retry
3858 }
3859 }
3860
3861 /* This function DOES put the passed message before returning */
3862 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3863 {
3864 int from = m->get_asker();
3865
3866 dout(10) << "handle_simple_lock " << *m
3867 << " on " << *lock << " " << *lock->get_parent() << dendl;
3868
3869 if (mds->is_rejoin()) {
3870 if (lock->get_parent()->is_rejoining()) {
3871 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3872 << ", dropping " << *m << dendl;
3873 m->put();
3874 return;
3875 }
3876 }
3877
3878 switch (m->get_action()) {
3879 // -- replica --
3880 case LOCK_AC_SYNC:
3881 assert(lock->get_state() == LOCK_LOCK);
3882 lock->decode_locked_state(m->get_data());
3883 lock->set_state(LOCK_SYNC);
3884 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3885 break;
3886
3887 case LOCK_AC_LOCK:
3888 assert(lock->get_state() == LOCK_SYNC);
3889 lock->set_state(LOCK_SYNC_LOCK);
3890 if (lock->is_leased())
3891 revoke_client_leases(lock);
3892 eval_gather(lock, true);
3893 if (lock->is_unstable_and_locked())
3894 mds->mdlog->flush();
3895 break;
3896
3897
3898 // -- auth --
3899 case LOCK_AC_LOCKACK:
3900 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3901 lock->get_state() == LOCK_SYNC_EXCL);
3902 assert(lock->is_gathering(from));
3903 lock->remove_gather(from);
3904
3905 if (lock->is_gathering()) {
3906 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3907 << ", still gathering " << lock->get_gather_set() << dendl;
3908 } else {
3909 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3910 << ", last one" << dendl;
3911 eval_gather(lock);
3912 }
3913 break;
3914
3915 case LOCK_AC_REQRDLOCK:
3916 handle_reqrdlock(lock, m);
3917 break;
3918
3919 }
3920
3921 m->put();
3922 }
3923
3924 /* unused, currently.
3925
3926 class C_Locker_SimpleEval : public Context {
3927 Locker *locker;
3928 SimpleLock *lock;
3929 public:
3930 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3931 void finish(int r) {
3932 locker->try_simple_eval(lock);
3933 }
3934 };
3935
3936 void Locker::try_simple_eval(SimpleLock *lock)
3937 {
3938 // unstable and ambiguous auth?
3939 if (!lock->is_stable() &&
3940 lock->get_parent()->is_ambiguous_auth()) {
3941 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3942 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3943 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3944 return;
3945 }
3946
3947 if (!lock->get_parent()->is_auth()) {
3948 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3949 return;
3950 }
3951
3952 if (!lock->get_parent()->can_auth_pin()) {
3953 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3954 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3955 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3956 return;
3957 }
3958
3959 if (lock->is_stable())
3960 simple_eval(lock);
3961 }
3962 */
3963
3964
3965 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3966 {
3967 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3968
3969 assert(lock->get_parent()->is_auth());
3970 assert(lock->is_stable());
3971
3972 if (lock->get_parent()->is_freezing_or_frozen()) {
3973 // dentry lock in unreadable state can block path traverse
3974 if ((lock->get_type() != CEPH_LOCK_DN ||
3975 lock->get_state() == LOCK_SYNC ||
3976 lock->get_parent()->is_frozen()))
3977 return;
3978 }
3979
3980 if (mdcache->is_readonly()) {
3981 if (lock->get_state() != LOCK_SYNC) {
3982 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3983 simple_sync(lock, need_issue);
3984 }
3985 return;
3986 }
3987
3988 CInode *in = 0;
3989 int wanted = 0;
3990 if (lock->get_type() != CEPH_LOCK_DN) {
3991 in = static_cast<CInode*>(lock->get_parent());
3992 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
3993 }
3994
3995 // -> excl?
3996 if (lock->get_state() != LOCK_EXCL &&
3997 in && in->get_target_loner() >= 0 &&
3998 (wanted & CEPH_CAP_GEXCL)) {
3999 dout(7) << "simple_eval stable, going to excl " << *lock
4000 << " on " << *lock->get_parent() << dendl;
4001 simple_excl(lock, need_issue);
4002 }
4003
4004 // stable -> sync?
4005 else if (lock->get_state() != LOCK_SYNC &&
4006 !lock->is_wrlocked() &&
4007 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4008 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4009 dout(7) << "simple_eval stable, syncing " << *lock
4010 << " on " << *lock->get_parent() << dendl;
4011 simple_sync(lock, need_issue);
4012 }
4013 }
4014
4015
4016 // mid
4017
4018 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4019 {
4020 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4021 assert(lock->get_parent()->is_auth());
4022 assert(lock->is_stable());
4023
4024 CInode *in = 0;
4025 if (lock->get_cap_shift())
4026 in = static_cast<CInode *>(lock->get_parent());
4027
4028 int old_state = lock->get_state();
4029
4030 if (old_state != LOCK_TSYN) {
4031
4032 switch (lock->get_state()) {
4033 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4034 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4035 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4036 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4037 default: ceph_abort();
4038 }
4039
4040 int gather = 0;
4041 if (lock->is_wrlocked())
4042 gather++;
4043
4044 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4045 send_lock_message(lock, LOCK_AC_SYNC);
4046 lock->init_gather();
4047 gather++;
4048 }
4049
4050 if (in && in->is_head()) {
4051 if (in->issued_caps_need_gather(lock)) {
4052 if (need_issue)
4053 *need_issue = true;
4054 else
4055 issue_caps(in);
4056 gather++;
4057 }
4058 }
4059
4060 bool need_recover = false;
4061 if (lock->get_type() == CEPH_LOCK_IFILE) {
4062 assert(in);
4063 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4064 mds->mdcache->queue_file_recover(in);
4065 need_recover = true;
4066 gather++;
4067 }
4068 }
4069
4070 if (!gather && lock->is_dirty()) {
4071 lock->get_parent()->auth_pin(lock);
4072 scatter_writebehind(static_cast<ScatterLock*>(lock));
4073 mds->mdlog->flush();
4074 return false;
4075 }
4076
4077 if (gather) {
4078 lock->get_parent()->auth_pin(lock);
4079 if (need_recover)
4080 mds->mdcache->do_file_recover();
4081 return false;
4082 }
4083 }
4084
4085 if (lock->get_parent()->is_replicated()) { // FIXME
4086 bufferlist data;
4087 lock->encode_locked_state(data);
4088 send_lock_message(lock, LOCK_AC_SYNC, data);
4089 }
4090 lock->set_state(LOCK_SYNC);
4091 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4092 if (in && in->is_head()) {
4093 if (need_issue)
4094 *need_issue = true;
4095 else
4096 issue_caps(in);
4097 }
4098 return true;
4099 }
4100
4101 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4102 {
4103 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4104 assert(lock->get_parent()->is_auth());
4105 assert(lock->is_stable());
4106
4107 CInode *in = 0;
4108 if (lock->get_cap_shift())
4109 in = static_cast<CInode *>(lock->get_parent());
4110
4111 switch (lock->get_state()) {
4112 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4113 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4114 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4115 default: ceph_abort();
4116 }
4117
4118 int gather = 0;
4119 if (lock->is_rdlocked())
4120 gather++;
4121 if (lock->is_wrlocked())
4122 gather++;
4123
4124 if (lock->get_parent()->is_replicated() &&
4125 lock->get_state() != LOCK_LOCK_EXCL &&
4126 lock->get_state() != LOCK_XSYN_EXCL) {
4127 send_lock_message(lock, LOCK_AC_LOCK);
4128 lock->init_gather();
4129 gather++;
4130 }
4131
4132 if (in && in->is_head()) {
4133 if (in->issued_caps_need_gather(lock)) {
4134 if (need_issue)
4135 *need_issue = true;
4136 else
4137 issue_caps(in);
4138 gather++;
4139 }
4140 }
4141
4142 if (gather) {
4143 lock->get_parent()->auth_pin(lock);
4144 } else {
4145 lock->set_state(LOCK_EXCL);
4146 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4147 if (in) {
4148 if (need_issue)
4149 *need_issue = true;
4150 else
4151 issue_caps(in);
4152 }
4153 }
4154 }
4155
4156 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4157 {
4158 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4159 assert(lock->get_parent()->is_auth());
4160 assert(lock->is_stable());
4161 assert(lock->get_state() != LOCK_LOCK);
4162
4163 CInode *in = 0;
4164 if (lock->get_cap_shift())
4165 in = static_cast<CInode *>(lock->get_parent());
4166
4167 int old_state = lock->get_state();
4168
4169 switch (lock->get_state()) {
4170 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4171 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4172 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4173 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4174 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4175 break;
4176 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4177 default: ceph_abort();
4178 }
4179
4180 int gather = 0;
4181 if (lock->is_leased()) {
4182 gather++;
4183 revoke_client_leases(lock);
4184 }
4185 if (lock->is_rdlocked())
4186 gather++;
4187 if (in && in->is_head()) {
4188 if (in->issued_caps_need_gather(lock)) {
4189 if (need_issue)
4190 *need_issue = true;
4191 else
4192 issue_caps(in);
4193 gather++;
4194 }
4195 }
4196
4197 bool need_recover = false;
4198 if (lock->get_type() == CEPH_LOCK_IFILE) {
4199 assert(in);
4200 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4201 mds->mdcache->queue_file_recover(in);
4202 need_recover = true;
4203 gather++;
4204 }
4205 }
4206
4207 if (lock->get_parent()->is_replicated() &&
4208 lock->get_state() == LOCK_MIX_LOCK &&
4209 gather) {
4210 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4211 } else {
4212 // move to second stage of gather now, so we don't send the lock action later.
4213 if (lock->get_state() == LOCK_MIX_LOCK)
4214 lock->set_state(LOCK_MIX_LOCK2);
4215
4216 if (lock->get_parent()->is_replicated() &&
4217 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4218 gather++;
4219 send_lock_message(lock, LOCK_AC_LOCK);
4220 lock->init_gather();
4221 }
4222 }
4223
4224 if (!gather && lock->is_dirty()) {
4225 lock->get_parent()->auth_pin(lock);
4226 scatter_writebehind(static_cast<ScatterLock*>(lock));
4227 mds->mdlog->flush();
4228 return;
4229 }
4230
4231 if (gather) {
4232 lock->get_parent()->auth_pin(lock);
4233 if (need_recover)
4234 mds->mdcache->do_file_recover();
4235 } else {
4236 lock->set_state(LOCK_LOCK);
4237 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4238 }
4239 }
4240
4241
4242 void Locker::simple_xlock(SimpleLock *lock)
4243 {
4244 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4245 assert(lock->get_parent()->is_auth());
4246 //assert(lock->is_stable());
4247 assert(lock->get_state() != LOCK_XLOCK);
4248
4249 CInode *in = 0;
4250 if (lock->get_cap_shift())
4251 in = static_cast<CInode *>(lock->get_parent());
4252
4253 if (lock->is_stable())
4254 lock->get_parent()->auth_pin(lock);
4255
4256 switch (lock->get_state()) {
4257 case LOCK_LOCK:
4258 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4259 default: ceph_abort();
4260 }
4261
4262 int gather = 0;
4263 if (lock->is_rdlocked())
4264 gather++;
4265 if (lock->is_wrlocked())
4266 gather++;
4267
4268 if (in && in->is_head()) {
4269 if (in->issued_caps_need_gather(lock)) {
4270 issue_caps(in);
4271 gather++;
4272 }
4273 }
4274
4275 if (!gather) {
4276 lock->set_state(LOCK_PREXLOCK);
4277 //assert("shouldn't be called if we are already xlockable" == 0);
4278 }
4279 }
4280
4281
4282
4283
4284
4285 // ==========================================================================
4286 // scatter lock
4287
4288 /*
4289
4290 Some notes on scatterlocks.
4291
4292 - The scatter/gather is driven by the inode lock. The scatter always
4293 brings in the latest metadata from the fragments.
4294
4295 - When in a scattered/MIX state, fragments are only allowed to
4296 update/be written to if the accounted stat matches the inode's
4297 current version.
4298
4299 - That means, on gather, we _only_ assimilate diffs for frag metadata
4300 that match the current version, because those are the only ones
4301 written during this scatter/gather cycle. (Others didn't permit
4302 it.) We increment the version and journal this to disk.
4303
4304 - When possible, we also simultaneously update our local frag
4305 accounted stats to match.
4306
4307 - On scatter, the new inode info is broadcast to frags, both local
4308 and remote. If possible (auth and !frozen), the dirfrag auth
4309 should update the accounted state (if it isn't already up to date).
4310 Note that this may occur on both the local inode auth node and
4311 inode replicas, so there are two potential paths. If it is NOT
4312 possible, they need to mark_stale to prevent any possible writes.
4313
4314 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4315 only). Both are opportunities to update the frag accounted stats,
4316 even though only the MIX case is affected by a stale dirfrag.
4317
4318 - Because many scatter/gather cycles can potentially go by without a
4319 frag being able to update its accounted stats (due to being frozen
4320 by exports/refragments in progress), the frag may have (even very)
4321 old stat versions. That's fine. If when we do want to update it,
4322 we can update accounted_* and the version first.
4323
4324 */
4325
4326 class C_Locker_ScatterWB : public LockerLogContext {
4327 ScatterLock *lock;
4328 MutationRef mut;
4329 public:
4330 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4331 LockerLogContext(l), lock(sl), mut(m) {}
4332 void finish(int r) override {
4333 locker->scatter_writebehind_finish(lock, mut);
4334 }
4335 };
4336
4337 void Locker::scatter_writebehind(ScatterLock *lock)
4338 {
4339 CInode *in = static_cast<CInode*>(lock->get_parent());
4340 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4341
4342 // journal
4343 MutationRef mut(new MutationImpl());
4344 mut->ls = mds->mdlog->get_current_segment();
4345
4346 // forcefully take a wrlock
4347 lock->get_wrlock(true);
4348 mut->wrlocks.insert(lock);
4349 mut->locks.insert(lock);
4350
4351 in->pre_cow_old_inode(); // avoid cow mayhem
4352
4353 inode_t *pi = in->project_inode();
4354 pi->version = in->pre_dirty();
4355
4356 in->finish_scatter_gather_update(lock->get_type());
4357 lock->start_flush();
4358
4359 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4360 mds->mdlog->start_entry(le);
4361
4362 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4363 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4364
4365 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4366
4367 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4368 }
4369
4370 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4371 {
4372 CInode *in = static_cast<CInode*>(lock->get_parent());
4373 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4374 in->pop_and_dirty_projected_inode(mut->ls);
4375
4376 lock->finish_flush();
4377
4378 // if replicas may have flushed in a mix->lock state, send another
4379 // message so they can finish_flush().
4380 if (in->is_replicated()) {
4381 switch (lock->get_state()) {
4382 case LOCK_MIX_LOCK:
4383 case LOCK_MIX_LOCK2:
4384 case LOCK_MIX_EXCL:
4385 case LOCK_MIX_TSYN:
4386 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4387 }
4388 }
4389
4390 mut->apply();
4391 drop_locks(mut.get());
4392 mut->cleanup();
4393
4394 if (lock->is_stable())
4395 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4396
4397 //scatter_eval_gather(lock);
4398 }
4399
4400 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4401 {
4402 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4403
4404 assert(lock->get_parent()->is_auth());
4405 assert(lock->is_stable());
4406
4407 if (lock->get_parent()->is_freezing_or_frozen()) {
4408 dout(20) << " freezing|frozen" << dendl;
4409 return;
4410 }
4411
4412 if (mdcache->is_readonly()) {
4413 if (lock->get_state() != LOCK_SYNC) {
4414 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4415 simple_sync(lock, need_issue);
4416 }
4417 return;
4418 }
4419
4420 if (!lock->is_rdlocked() &&
4421 lock->get_state() != LOCK_MIX &&
4422 lock->get_scatter_wanted()) {
4423 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4424 << " on " << *lock->get_parent() << dendl;
4425 scatter_mix(lock, need_issue);
4426 return;
4427 }
4428
4429 if (lock->get_type() == CEPH_LOCK_INEST) {
4430 // in general, we want to keep INEST writable at all times.
4431 if (!lock->is_rdlocked()) {
4432 if (lock->get_parent()->is_replicated()) {
4433 if (lock->get_state() != LOCK_MIX)
4434 scatter_mix(lock, need_issue);
4435 } else {
4436 if (lock->get_state() != LOCK_LOCK)
4437 simple_lock(lock, need_issue);
4438 }
4439 }
4440 return;
4441 }
4442
4443 CInode *in = static_cast<CInode*>(lock->get_parent());
4444 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4445 // i _should_ be sync.
4446 if (!lock->is_wrlocked() &&
4447 lock->get_state() != LOCK_SYNC) {
4448 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4449 simple_sync(lock, need_issue);
4450 }
4451 }
4452 }
4453
4454
4455 /*
4456 * mark a scatterlock to indicate that the dir fnode has some dirty data
4457 */
4458 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4459 {
4460 lock->mark_dirty();
4461 if (lock->get_updated_item()->is_on_list()) {
4462 dout(10) << "mark_updated_scatterlock " << *lock
4463 << " - already on list since " << lock->get_update_stamp() << dendl;
4464 } else {
4465 updated_scatterlocks.push_back(lock->get_updated_item());
4466 utime_t now = ceph_clock_now();
4467 lock->set_update_stamp(now);
4468 dout(10) << "mark_updated_scatterlock " << *lock
4469 << " - added at " << now << dendl;
4470 }
4471 }
4472
4473 /*
4474 * this is called by scatter_tick and LogSegment::try_to_trim() when
4475 * trying to flush dirty scattered data (i.e. updated fnode) back to
4476 * the inode.
4477 *
4478 * we need to lock|scatter in order to push fnode changes into the
4479 * inode.dirstat.
4480 */
4481 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4482 {
4483 CInode *p = static_cast<CInode *>(lock->get_parent());
4484
4485 if (p->is_frozen() || p->is_freezing()) {
4486 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4487 if (c)
4488 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4489 else if (lock->is_dirty())
4490 // just requeue. not ideal.. starvation prone..
4491 updated_scatterlocks.push_back(lock->get_updated_item());
4492 return;
4493 }
4494
4495 if (p->is_ambiguous_auth()) {
4496 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4497 if (c)
4498 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4499 else if (lock->is_dirty())
4500 // just requeue. not ideal.. starvation prone..
4501 updated_scatterlocks.push_back(lock->get_updated_item());
4502 return;
4503 }
4504
4505 if (p->is_auth()) {
4506 int count = 0;
4507 while (true) {
4508 if (lock->is_stable()) {
4509 // can we do it now?
4510 // (only if we're not replicated.. if we are, we really do need
4511 // to nudge the lock state!)
4512 /*
4513 actually, even if we're not replicated, we can't stay in MIX, because another mds
4514 could discover and replicate us at any time. if that happens while we're flushing,
4515 they end up in MIX but their inode has the old scatterstat version.
4516
4517 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4518 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4519 scatter_writebehind(lock);
4520 if (c)
4521 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4522 return;
4523 }
4524 */
4525
4526 if (mdcache->is_readonly()) {
4527 if (lock->get_state() != LOCK_SYNC) {
4528 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4529 simple_sync(static_cast<ScatterLock*>(lock));
4530 }
4531 break;
4532 }
4533
4534 // adjust lock state
4535 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4536 switch (lock->get_type()) {
4537 case CEPH_LOCK_IFILE:
4538 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4539 scatter_mix(static_cast<ScatterLock*>(lock));
4540 else if (lock->get_state() != LOCK_LOCK)
4541 simple_lock(static_cast<ScatterLock*>(lock));
4542 else
4543 simple_sync(static_cast<ScatterLock*>(lock));
4544 break;
4545
4546 case CEPH_LOCK_IDFT:
4547 case CEPH_LOCK_INEST:
4548 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4549 scatter_mix(lock);
4550 else if (lock->get_state() != LOCK_LOCK)
4551 simple_lock(lock);
4552 else
4553 simple_sync(lock);
4554 break;
4555 default:
4556 ceph_abort();
4557 }
4558 ++count;
4559 if (lock->is_stable() && count == 2) {
4560 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4561 // this should only realy happen when called via
4562 // handle_file_lock due to AC_NUDGE, because the rest of the
4563 // time we are replicated or have dirty data and won't get
4564 // called. bailing here avoids an infinite loop.
4565 assert(!c);
4566 break;
4567 }
4568 } else {
4569 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4570 if (c)
4571 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4572 return;
4573 }
4574 }
4575 } else {
4576 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4577 << *lock << " on " << *p << dendl;
4578 // request unscatter?
4579 mds_rank_t auth = lock->get_parent()->authority().first;
4580 if (!mds->is_cluster_degraded() ||
4581 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4582 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4583
4584 // wait...
4585 if (c)
4586 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4587
4588 // also, requeue, in case we had wrong auth or something
4589 if (lock->is_dirty())
4590 updated_scatterlocks.push_back(lock->get_updated_item());
4591 }
4592 }
4593
4594 void Locker::scatter_tick()
4595 {
4596 dout(10) << "scatter_tick" << dendl;
4597
4598 // updated
4599 utime_t now = ceph_clock_now();
4600 int n = updated_scatterlocks.size();
4601 while (!updated_scatterlocks.empty()) {
4602 ScatterLock *lock = updated_scatterlocks.front();
4603
4604 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4605
4606 if (!lock->is_dirty()) {
4607 updated_scatterlocks.pop_front();
4608 dout(10) << " removing from updated_scatterlocks "
4609 << *lock << " " << *lock->get_parent() << dendl;
4610 continue;
4611 }
4612 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4613 break;
4614 updated_scatterlocks.pop_front();
4615 scatter_nudge(lock, 0);
4616 }
4617 mds->mdlog->flush();
4618 }
4619
4620
4621 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4622 {
4623 dout(10) << "scatter_tempsync " << *lock
4624 << " on " << *lock->get_parent() << dendl;
4625 assert(lock->get_parent()->is_auth());
4626 assert(lock->is_stable());
4627
4628 assert(0 == "not fully implemented, at least not for filelock");
4629
4630 CInode *in = static_cast<CInode *>(lock->get_parent());
4631
4632 switch (lock->get_state()) {
4633 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4634 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4635 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4636 default: ceph_abort();
4637 }
4638
4639 int gather = 0;
4640 if (lock->is_wrlocked())
4641 gather++;
4642
4643 if (lock->get_cap_shift() &&
4644 in->is_head() &&
4645 in->issued_caps_need_gather(lock)) {
4646 if (need_issue)
4647 *need_issue = true;
4648 else
4649 issue_caps(in);
4650 gather++;
4651 }
4652
4653 if (lock->get_state() == LOCK_MIX_TSYN &&
4654 in->is_replicated()) {
4655 lock->init_gather();
4656 send_lock_message(lock, LOCK_AC_LOCK);
4657 gather++;
4658 }
4659
4660 if (gather) {
4661 in->auth_pin(lock);
4662 } else {
4663 // do tempsync
4664 lock->set_state(LOCK_TSYN);
4665 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4666 if (lock->get_cap_shift()) {
4667 if (need_issue)
4668 *need_issue = true;
4669 else
4670 issue_caps(in);
4671 }
4672 }
4673 }
4674
4675
4676
4677 // ==========================================================================
4678 // local lock
4679
4680 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4681 {
4682 dout(7) << "local_wrlock_grab on " << *lock
4683 << " on " << *lock->get_parent() << dendl;
4684
4685 assert(lock->get_parent()->is_auth());
4686 assert(lock->can_wrlock());
4687 assert(!mut->wrlocks.count(lock));
4688 lock->get_wrlock(mut->get_client());
4689 mut->wrlocks.insert(lock);
4690 mut->locks.insert(lock);
4691 }
4692
4693 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4694 {
4695 dout(7) << "local_wrlock_start on " << *lock
4696 << " on " << *lock->get_parent() << dendl;
4697
4698 assert(lock->get_parent()->is_auth());
4699 if (lock->can_wrlock()) {
4700 assert(!mut->wrlocks.count(lock));
4701 lock->get_wrlock(mut->get_client());
4702 mut->wrlocks.insert(lock);
4703 mut->locks.insert(lock);
4704 return true;
4705 } else {
4706 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4707 return false;
4708 }
4709 }
4710
4711 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4712 {
4713 dout(7) << "local_wrlock_finish on " << *lock
4714 << " on " << *lock->get_parent() << dendl;
4715 lock->put_wrlock();
4716 mut->wrlocks.erase(lock);
4717 mut->locks.erase(lock);
4718 if (lock->get_num_wrlocks() == 0) {
4719 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4720 SimpleLock::WAIT_WR |
4721 SimpleLock::WAIT_RD);
4722 }
4723 }
4724
4725 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4726 {
4727 dout(7) << "local_xlock_start on " << *lock
4728 << " on " << *lock->get_parent() << dendl;
4729
4730 assert(lock->get_parent()->is_auth());
4731 if (!lock->can_xlock_local()) {
4732 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4733 return false;
4734 }
4735
4736 lock->get_xlock(mut, mut->get_client());
4737 mut->xlocks.insert(lock);
4738 mut->locks.insert(lock);
4739 return true;
4740 }
4741
4742 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4743 {
4744 dout(7) << "local_xlock_finish on " << *lock
4745 << " on " << *lock->get_parent() << dendl;
4746 lock->put_xlock();
4747 mut->xlocks.erase(lock);
4748 mut->locks.erase(lock);
4749
4750 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4751 SimpleLock::WAIT_WR |
4752 SimpleLock::WAIT_RD);
4753 }
4754
4755
4756
4757 // ==========================================================================
4758 // file lock
4759
4760
4761 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4762 {
4763 CInode *in = static_cast<CInode*>(lock->get_parent());
4764 int loner_wanted, other_wanted;
4765 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4766 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4767 << " loner_wanted=" << gcap_string(loner_wanted)
4768 << " other_wanted=" << gcap_string(other_wanted)
4769 << " filelock=" << *lock << " on " << *lock->get_parent()
4770 << dendl;
4771
4772 assert(lock->get_parent()->is_auth());
4773 assert(lock->is_stable());
4774
4775 if (lock->get_parent()->is_freezing_or_frozen())
4776 return;
4777
4778 if (mdcache->is_readonly()) {
4779 if (lock->get_state() != LOCK_SYNC) {
4780 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4781 simple_sync(lock, need_issue);
4782 }
4783 return;
4784 }
4785
4786 // excl -> *?
4787 if (lock->get_state() == LOCK_EXCL) {
4788 dout(20) << " is excl" << dendl;
4789 int loner_issued, other_issued, xlocker_issued;
4790 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4791 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4792 << " other_issued=" << gcap_string(other_issued)
4793 << " xlocker_issued=" << gcap_string(xlocker_issued)
4794 << dendl;
4795 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4796 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4797 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4798 dout(20) << " should lose it" << dendl;
4799 // we should lose it.
4800 // loner other want
4801 // R R SYNC
4802 // R R|W MIX
4803 // R W MIX
4804 // R|W R MIX
4805 // R|W R|W MIX
4806 // R|W W MIX
4807 // W R MIX
4808 // W R|W MIX
4809 // W W MIX
4810 // -> any writer means MIX; RD doesn't matter.
4811 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4812 lock->is_waiter_for(SimpleLock::WAIT_WR))
4813 scatter_mix(lock, need_issue);
4814 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4815 simple_sync(lock, need_issue);
4816 else
4817 dout(10) << " waiting for wrlock to drain" << dendl;
4818 }
4819 }
4820
4821 // * -> excl?
4822 else if (lock->get_state() != LOCK_EXCL &&
4823 !lock->is_rdlocked() &&
4824 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4825 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4826 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4827 in->get_target_loner() >= 0) {
4828 dout(7) << "file_eval stable, bump to loner " << *lock
4829 << " on " << *lock->get_parent() << dendl;
4830 file_excl(lock, need_issue);
4831 }
4832
4833 // * -> mixed?
4834 else if (lock->get_state() != LOCK_MIX &&
4835 !lock->is_rdlocked() &&
4836 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4837 (lock->get_scatter_wanted() ||
4838 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4839 dout(7) << "file_eval stable, bump to mixed " << *lock
4840 << " on " << *lock->get_parent() << dendl;
4841 scatter_mix(lock, need_issue);
4842 }
4843
4844 // * -> sync?
4845 else if (lock->get_state() != LOCK_SYNC &&
4846 !lock->is_wrlocked() && // drain wrlocks first!
4847 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4848 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4849 !((lock->get_state() == LOCK_MIX) &&
4850 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4851 //((wanted & CEPH_CAP_RD) ||
4852 //in->is_replicated() ||
4853 //lock->is_leased() ||
4854 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4855 ) {
4856 dout(7) << "file_eval stable, bump to sync " << *lock
4857 << " on " << *lock->get_parent() << dendl;
4858 simple_sync(lock, need_issue);
4859 }
4860 }
4861
4862
4863
4864 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4865 {
4866 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4867
4868 CInode *in = static_cast<CInode*>(lock->get_parent());
4869 assert(in->is_auth());
4870 assert(lock->is_stable());
4871
4872 if (lock->get_state() == LOCK_LOCK) {
4873 in->start_scatter(lock);
4874 if (in->is_replicated()) {
4875 // data
4876 bufferlist softdata;
4877 lock->encode_locked_state(softdata);
4878
4879 // bcast to replicas
4880 send_lock_message(lock, LOCK_AC_MIX, softdata);
4881 }
4882
4883 // change lock
4884 lock->set_state(LOCK_MIX);
4885 lock->clear_scatter_wanted();
4886 if (lock->get_cap_shift()) {
4887 if (need_issue)
4888 *need_issue = true;
4889 else
4890 issue_caps(in);
4891 }
4892 } else {
4893 // gather?
4894 switch (lock->get_state()) {
4895 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4896 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4897 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
4898 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4899 default: ceph_abort();
4900 }
4901
4902 int gather = 0;
4903 if (lock->is_rdlocked())
4904 gather++;
4905 if (in->is_replicated()) {
4906 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
4907 send_lock_message(lock, LOCK_AC_MIX);
4908 lock->init_gather();
4909 gather++;
4910 }
4911 }
4912 if (lock->is_leased()) {
4913 revoke_client_leases(lock);
4914 gather++;
4915 }
4916 if (lock->get_cap_shift() &&
4917 in->is_head() &&
4918 in->issued_caps_need_gather(lock)) {
4919 if (need_issue)
4920 *need_issue = true;
4921 else
4922 issue_caps(in);
4923 gather++;
4924 }
4925 bool need_recover = false;
4926 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4927 mds->mdcache->queue_file_recover(in);
4928 need_recover = true;
4929 gather++;
4930 }
4931
4932 if (gather) {
4933 lock->get_parent()->auth_pin(lock);
4934 if (need_recover)
4935 mds->mdcache->do_file_recover();
4936 } else {
4937 in->start_scatter(lock);
4938 lock->set_state(LOCK_MIX);
4939 lock->clear_scatter_wanted();
4940 if (in->is_replicated()) {
4941 bufferlist softdata;
4942 lock->encode_locked_state(softdata);
4943 send_lock_message(lock, LOCK_AC_MIX, softdata);
4944 }
4945 if (lock->get_cap_shift()) {
4946 if (need_issue)
4947 *need_issue = true;
4948 else
4949 issue_caps(in);
4950 }
4951 }
4952 }
4953 }
4954
4955
4956 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4957 {
4958 CInode *in = static_cast<CInode*>(lock->get_parent());
4959 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4960
4961 assert(in->is_auth());
4962 assert(lock->is_stable());
4963
4964 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4965 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4966
4967 switch (lock->get_state()) {
4968 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4969 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4970 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4971 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4972 default: ceph_abort();
4973 }
4974 int gather = 0;
4975
4976 if (lock->is_rdlocked())
4977 gather++;
4978 if (lock->is_wrlocked())
4979 gather++;
4980
4981 if (in->is_replicated() &&
4982 lock->get_state() != LOCK_LOCK_EXCL &&
4983 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4984 send_lock_message(lock, LOCK_AC_LOCK);
4985 lock->init_gather();
4986 gather++;
4987 }
4988 if (lock->is_leased()) {
4989 revoke_client_leases(lock);
4990 gather++;
4991 }
4992 if (in->is_head() &&
4993 in->issued_caps_need_gather(lock)) {
4994 if (need_issue)
4995 *need_issue = true;
4996 else
4997 issue_caps(in);
4998 gather++;
4999 }
5000 bool need_recover = false;
5001 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5002 mds->mdcache->queue_file_recover(in);
5003 need_recover = true;
5004 gather++;
5005 }
5006
5007 if (gather) {
5008 lock->get_parent()->auth_pin(lock);
5009 if (need_recover)
5010 mds->mdcache->do_file_recover();
5011 } else {
5012 lock->set_state(LOCK_EXCL);
5013 if (need_issue)
5014 *need_issue = true;
5015 else
5016 issue_caps(in);
5017 }
5018 }
5019
5020 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5021 {
5022 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5023 CInode *in = static_cast<CInode *>(lock->get_parent());
5024 assert(in->is_auth());
5025 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5026
5027 switch (lock->get_state()) {
5028 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5029 default: ceph_abort();
5030 }
5031
5032 int gather = 0;
5033 if (lock->is_wrlocked())
5034 gather++;
5035
5036 if (in->is_head() &&
5037 in->issued_caps_need_gather(lock)) {
5038 if (need_issue)
5039 *need_issue = true;
5040 else
5041 issue_caps(in);
5042 gather++;
5043 }
5044
5045 if (gather) {
5046 lock->get_parent()->auth_pin(lock);
5047 } else {
5048 lock->set_state(LOCK_XSYN);
5049 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5050 if (need_issue)
5051 *need_issue = true;
5052 else
5053 issue_caps(in);
5054 }
5055 }
5056
5057 void Locker::file_recover(ScatterLock *lock)
5058 {
5059 CInode *in = static_cast<CInode *>(lock->get_parent());
5060 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5061
5062 assert(in->is_auth());
5063 //assert(lock->is_stable());
5064 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5065
5066 int gather = 0;
5067
5068 /*
5069 if (in->is_replicated()
5070 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5071 send_lock_message(lock, LOCK_AC_LOCK);
5072 lock->init_gather();
5073 gather++;
5074 }
5075 */
5076 if (in->is_head() &&
5077 in->issued_caps_need_gather(lock)) {
5078 issue_caps(in);
5079 gather++;
5080 }
5081
5082 lock->set_state(LOCK_SCAN);
5083 if (gather)
5084 in->state_set(CInode::STATE_NEEDSRECOVER);
5085 else
5086 mds->mdcache->queue_file_recover(in);
5087 }
5088
5089
5090 // messenger
5091 /* This function DOES put the passed message before returning */
5092 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5093 {
5094 CInode *in = static_cast<CInode*>(lock->get_parent());
5095 int from = m->get_asker();
5096
5097 if (mds->is_rejoin()) {
5098 if (in->is_rejoining()) {
5099 dout(7) << "handle_file_lock still rejoining " << *in
5100 << ", dropping " << *m << dendl;
5101 m->put();
5102 return;
5103 }
5104 }
5105
5106 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5107 << " on " << *lock
5108 << " from mds." << from << " "
5109 << *in << dendl;
5110
5111 bool caps = lock->get_cap_shift();
5112
5113 switch (m->get_action()) {
5114 // -- replica --
5115 case LOCK_AC_SYNC:
5116 assert(lock->get_state() == LOCK_LOCK ||
5117 lock->get_state() == LOCK_MIX ||
5118 lock->get_state() == LOCK_MIX_SYNC2);
5119
5120 if (lock->get_state() == LOCK_MIX) {
5121 lock->set_state(LOCK_MIX_SYNC);
5122 eval_gather(lock, true);
5123 if (lock->is_unstable_and_locked())
5124 mds->mdlog->flush();
5125 break;
5126 }
5127
5128 (static_cast<ScatterLock *>(lock))->finish_flush();
5129 (static_cast<ScatterLock *>(lock))->clear_flushed();
5130
5131 // ok
5132 lock->decode_locked_state(m->get_data());
5133 lock->set_state(LOCK_SYNC);
5134
5135 lock->get_rdlock();
5136 if (caps)
5137 issue_caps(in);
5138 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5139 lock->put_rdlock();
5140 break;
5141
5142 case LOCK_AC_LOCK:
5143 switch (lock->get_state()) {
5144 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5145 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5146 default: ceph_abort();
5147 }
5148
5149 eval_gather(lock, true);
5150 if (lock->is_unstable_and_locked())
5151 mds->mdlog->flush();
5152
5153 break;
5154
5155 case LOCK_AC_LOCKFLUSHED:
5156 (static_cast<ScatterLock *>(lock))->finish_flush();
5157 (static_cast<ScatterLock *>(lock))->clear_flushed();
5158 // wake up scatter_nudge waiters
5159 if (lock->is_stable())
5160 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5161 break;
5162
5163 case LOCK_AC_MIX:
5164 assert(lock->get_state() == LOCK_SYNC ||
5165 lock->get_state() == LOCK_LOCK ||
5166 lock->get_state() == LOCK_SYNC_MIX2);
5167
5168 if (lock->get_state() == LOCK_SYNC) {
5169 // MIXED
5170 lock->set_state(LOCK_SYNC_MIX);
5171 eval_gather(lock, true);
5172 if (lock->is_unstable_and_locked())
5173 mds->mdlog->flush();
5174 break;
5175 }
5176
5177 // ok
5178 lock->set_state(LOCK_MIX);
5179 lock->decode_locked_state(m->get_data());
5180
5181 if (caps)
5182 issue_caps(in);
5183
5184 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5185 break;
5186
5187
5188 // -- auth --
5189 case LOCK_AC_LOCKACK:
5190 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5191 lock->get_state() == LOCK_MIX_LOCK ||
5192 lock->get_state() == LOCK_MIX_LOCK2 ||
5193 lock->get_state() == LOCK_MIX_EXCL ||
5194 lock->get_state() == LOCK_SYNC_EXCL ||
5195 lock->get_state() == LOCK_SYNC_MIX ||
5196 lock->get_state() == LOCK_MIX_TSYN);
5197 assert(lock->is_gathering(from));
5198 lock->remove_gather(from);
5199
5200 if (lock->get_state() == LOCK_MIX_LOCK ||
5201 lock->get_state() == LOCK_MIX_LOCK2 ||
5202 lock->get_state() == LOCK_MIX_EXCL ||
5203 lock->get_state() == LOCK_MIX_TSYN) {
5204 lock->decode_locked_state(m->get_data());
5205 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5206 // delay calling scatter_writebehind().
5207 lock->clear_flushed();
5208 }
5209
5210 if (lock->is_gathering()) {
5211 dout(7) << "handle_file_lock " << *in << " from " << from
5212 << ", still gathering " << lock->get_gather_set() << dendl;
5213 } else {
5214 dout(7) << "handle_file_lock " << *in << " from " << from
5215 << ", last one" << dendl;
5216 eval_gather(lock);
5217 }
5218 break;
5219
5220 case LOCK_AC_SYNCACK:
5221 assert(lock->get_state() == LOCK_MIX_SYNC);
5222 assert(lock->is_gathering(from));
5223 lock->remove_gather(from);
5224
5225 lock->decode_locked_state(m->get_data());
5226
5227 if (lock->is_gathering()) {
5228 dout(7) << "handle_file_lock " << *in << " from " << from
5229 << ", still gathering " << lock->get_gather_set() << dendl;
5230 } else {
5231 dout(7) << "handle_file_lock " << *in << " from " << from
5232 << ", last one" << dendl;
5233 eval_gather(lock);
5234 }
5235 break;
5236
5237 case LOCK_AC_MIXACK:
5238 assert(lock->get_state() == LOCK_SYNC_MIX);
5239 assert(lock->is_gathering(from));
5240 lock->remove_gather(from);
5241
5242 if (lock->is_gathering()) {
5243 dout(7) << "handle_file_lock " << *in << " from " << from
5244 << ", still gathering " << lock->get_gather_set() << dendl;
5245 } else {
5246 dout(7) << "handle_file_lock " << *in << " from " << from
5247 << ", last one" << dendl;
5248 eval_gather(lock);
5249 }
5250 break;
5251
5252
5253 // requests....
5254 case LOCK_AC_REQSCATTER:
5255 if (lock->is_stable()) {
5256 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5257 * because the replica should be holding an auth_pin if they're
5258 * doing this (and thus, we are freezing, not frozen, and indefinite
5259 * starvation isn't an issue).
5260 */
5261 dout(7) << "handle_file_lock got scatter request on " << *lock
5262 << " on " << *lock->get_parent() << dendl;
5263 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5264 scatter_mix(lock);
5265 } else {
5266 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5267 << " on " << *lock->get_parent() << dendl;
5268 lock->set_scatter_wanted();
5269 }
5270 break;
5271
5272 case LOCK_AC_REQUNSCATTER:
5273 if (lock->is_stable()) {
5274 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5275 * because the replica should be holding an auth_pin if they're
5276 * doing this (and thus, we are freezing, not frozen, and indefinite
5277 * starvation isn't an issue).
5278 */
5279 dout(7) << "handle_file_lock got unscatter request on " << *lock
5280 << " on " << *lock->get_parent() << dendl;
5281 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5282 simple_lock(lock); // FIXME tempsync?
5283 } else {
5284 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5285 << " on " << *lock->get_parent() << dendl;
5286 lock->set_unscatter_wanted();
5287 }
5288 break;
5289
5290 case LOCK_AC_REQRDLOCK:
5291 handle_reqrdlock(lock, m);
5292 break;
5293
5294 case LOCK_AC_NUDGE:
5295 if (!lock->get_parent()->is_auth()) {
5296 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5297 << " on " << *lock->get_parent() << dendl;
5298 } else if (!lock->get_parent()->is_replicated()) {
5299 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5300 << " on " << *lock->get_parent() << dendl;
5301 } else {
5302 dout(7) << "handle_file_lock trying nudge on " << *lock
5303 << " on " << *lock->get_parent() << dendl;
5304 scatter_nudge(lock, 0, true);
5305 mds->mdlog->flush();
5306 }
5307 break;
5308
5309 default:
5310 ceph_abort();
5311 }
5312
5313 m->put();
5314 }
5315
5316
5317
5318
5319
5320