]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "MDSRank.h"
18 #include "MDCache.h"
19 #include "Locker.h"
20 #include "MDBalancer.h"
21 #include "CInode.h"
22 #include "CDir.h"
23 #include "CDentry.h"
24 #include "Mutation.h"
25 #include "MDSContext.h"
26
27 #include "MDLog.h"
28 #include "MDSMap.h"
29
30 #include "events/EUpdate.h"
31 #include "events/EOpen.h"
32
33 #include "msg/Messenger.h"
34 #include "osdc/Objecter.h"
35
36 #include "messages/MInodeFileCaps.h"
37 #include "messages/MLock.h"
38 #include "messages/MClientLease.h"
39 #include "messages/MClientReply.h"
40 #include "messages/MClientCaps.h"
41 #include "messages/MClientCapRelease.h"
42
43 #include "messages/MMDSSlaveRequest.h"
44
45 #include <errno.h>
46
47 #include "common/config.h"
48
49
50 #define dout_subsys ceph_subsys_mds
51 #undef dout_prefix
52 #define dout_context g_ceph_context
53 #define dout_prefix _prefix(_dout, mds)
54 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
55 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
56 }
57
58
59 class LockerContext : public MDSInternalContextBase {
60 protected:
61 Locker *locker;
62 MDSRank *get_mds() override
63 {
64 return locker->mds;
65 }
66
67 public:
68 explicit LockerContext(Locker *locker_) : locker(locker_) {
69 assert(locker != NULL);
70 }
71 };
72
73 class LockerLogContext : public MDSLogContextBase {
74 protected:
75 Locker *locker;
76 MDSRank *get_mds() override
77 {
78 return locker->mds;
79 }
80
81 public:
82 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
83 assert(locker != NULL);
84 }
85 };
86
87 /* This function DOES put the passed message before returning */
88 void Locker::dispatch(Message *m)
89 {
90
91 switch (m->get_type()) {
92
93 // inter-mds locking
94 case MSG_MDS_LOCK:
95 handle_lock(static_cast<MLock*>(m));
96 break;
97 // inter-mds caps
98 case MSG_MDS_INODEFILECAPS:
99 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
100 break;
101
102 // client sync
103 case CEPH_MSG_CLIENT_CAPS:
104 handle_client_caps(static_cast<MClientCaps*>(m));
105
106 break;
107 case CEPH_MSG_CLIENT_CAPRELEASE:
108 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
109 break;
110 case CEPH_MSG_CLIENT_LEASE:
111 handle_client_lease(static_cast<MClientLease*>(m));
112 break;
113
114 default:
115 derr << "locker unknown message " << m->get_type() << dendl;
116 assert(0 == "locker unknown message");
117 }
118 }
119
120 void Locker::tick()
121 {
122 scatter_tick();
123 caps_tick();
124 }
125
126 /*
127 * locks vs rejoin
128 *
129 *
130 *
131 */
132
133 void Locker::send_lock_message(SimpleLock *lock, int msg)
134 {
135 for (const auto &it : lock->get_parent()->get_replicas()) {
136 if (mds->is_cluster_degraded() &&
137 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
138 continue;
139 MLock *m = new MLock(lock, msg, mds->get_nodeid());
140 mds->send_message_mds(m, it.first);
141 }
142 }
143
144 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
145 {
146 for (const auto &it : lock->get_parent()->get_replicas()) {
147 if (mds->is_cluster_degraded() &&
148 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
149 continue;
150 MLock *m = new MLock(lock, msg, mds->get_nodeid());
151 m->set_data(data);
152 mds->send_message_mds(m, it.first);
153 }
154 }
155
156
157
158
159 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
160 {
161 // rdlock ancestor snaps
162 CInode *t = in;
163 rdlocks.insert(&in->snaplock);
164 while (t->get_projected_parent_dn()) {
165 t = t->get_projected_parent_dn()->get_dir()->get_inode();
166 rdlocks.insert(&t->snaplock);
167 }
168 }
169
170 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
171 file_layout_t **layout)
172 {
173 //rdlock ancestor snaps
174 CInode *t = in;
175 rdlocks.insert(&in->snaplock);
176 rdlocks.insert(&in->policylock);
177 bool found_layout = false;
178 while (t) {
179 rdlocks.insert(&t->snaplock);
180 if (!found_layout) {
181 rdlocks.insert(&t->policylock);
182 if (t->get_projected_inode()->has_layout()) {
183 *layout = &t->get_projected_inode()->layout;
184 found_layout = true;
185 }
186 }
187 if (t->get_projected_parent_dn() &&
188 t->get_projected_parent_dn()->get_dir())
189 t = t->get_projected_parent_dn()->get_dir()->get_inode();
190 else t = NULL;
191 }
192 }
193
194 struct MarkEventOnDestruct {
195 MDRequestRef& mdr;
196 const char* message;
197 bool mark_event;
198 MarkEventOnDestruct(MDRequestRef& _mdr,
199 const char *_message) : mdr(_mdr),
200 message(_message),
201 mark_event(true) {}
202 ~MarkEventOnDestruct() {
203 if (mark_event)
204 mdr->mark_event(message);
205 }
206 };
207
208 /* If this function returns false, the mdr has been placed
209 * on the appropriate wait list */
210 bool Locker::acquire_locks(MDRequestRef& mdr,
211 set<SimpleLock*> &rdlocks,
212 set<SimpleLock*> &wrlocks,
213 set<SimpleLock*> &xlocks,
214 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
215 CInode *auth_pin_freeze,
216 bool auth_pin_nonblock)
217 {
218 if (mdr->done_locking &&
219 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
220 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
221 return true; // at least we had better be!
222 }
223 dout(10) << "acquire_locks " << *mdr << dendl;
224
225 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
226
227 client_t client = mdr->get_client();
228
229 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
230 set<MDSCacheObject*> mustpin; // items to authpin
231
232 // xlocks
233 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
234 SimpleLock *lock = *p;
235
236 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
237 lock->get_type() == CEPH_LOCK_IPOLICY) &&
238 mds->is_cluster_degraded() &&
239 mdr->is_master() &&
240 !mdr->is_queued_for_replay()) {
241 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
242 // get processed in proper order.
243 bool wait = false;
244 if (lock->get_parent()->is_auth()) {
245 if (!mdr->locks.count(lock)) {
246 set<mds_rank_t> ls;
247 lock->get_parent()->list_replicas(ls);
248 for (auto m : ls) {
249 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
250 wait = true;
251 break;
252 }
253 }
254 }
255 } else {
256 // if the lock is the latest locked one, it's possible that slave mds got the lock
257 // while there are recovering mds.
258 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
259 wait = true;
260 }
261 if (wait) {
262 dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
263 << ", waiting for cluster recovered" << dendl;
264 mds->locker->drop_locks(mdr.get(), NULL);
265 mdr->drop_local_auth_pins();
266 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
267 return false;
268 }
269 }
270
271 dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
272
273 sorted.insert(lock);
274 mustpin.insert(lock->get_parent());
275
276 // augment xlock with a versionlock?
277 if ((*p)->get_type() == CEPH_LOCK_DN) {
278 CDentry *dn = (CDentry*)lock->get_parent();
279 if (!dn->is_auth())
280 continue;
281
282 if (xlocks.count(&dn->versionlock))
283 continue; // we're xlocking the versionlock too; don't wrlock it!
284
285 if (mdr->is_master()) {
286 // master. wrlock versionlock so we can pipeline dentry updates to journal.
287 wrlocks.insert(&dn->versionlock);
288 } else {
289 // slave. exclusively lock the dentry version (i.e. block other journal updates).
290 // this makes rollback safe.
291 xlocks.insert(&dn->versionlock);
292 sorted.insert(&dn->versionlock);
293 }
294 }
295 if (lock->get_type() > CEPH_LOCK_IVERSION) {
296 // inode version lock?
297 CInode *in = (CInode*)lock->get_parent();
298 if (!in->is_auth())
299 continue;
300 if (mdr->is_master()) {
301 // master. wrlock versionlock so we can pipeline inode updates to journal.
302 wrlocks.insert(&in->versionlock);
303 } else {
304 // slave. exclusively lock the inode version (i.e. block other journal updates).
305 // this makes rollback safe.
306 xlocks.insert(&in->versionlock);
307 sorted.insert(&in->versionlock);
308 }
309 }
310 }
311
312 // wrlocks
313 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
314 MDSCacheObject *object = (*p)->get_parent();
315 dout(20) << " must wrlock " << **p << " " << *object << dendl;
316 sorted.insert(*p);
317 if (object->is_auth())
318 mustpin.insert(object);
319 else if (!object->is_auth() &&
320 !(*p)->can_wrlock(client) && // we might have to request a scatter
321 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
322 dout(15) << " will also auth_pin " << *object
323 << " in case we need to request a scatter" << dendl;
324 mustpin.insert(object);
325 }
326 }
327
328 // remote_wrlocks
329 if (remote_wrlocks) {
330 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
331 MDSCacheObject *object = p->first->get_parent();
332 dout(20) << " must remote_wrlock on mds." << p->second << " "
333 << *p->first << " " << *object << dendl;
334 sorted.insert(p->first);
335 mustpin.insert(object);
336 }
337 }
338
339 // rdlocks
340 for (set<SimpleLock*>::iterator p = rdlocks.begin();
341 p != rdlocks.end();
342 ++p) {
343 MDSCacheObject *object = (*p)->get_parent();
344 dout(20) << " must rdlock " << **p << " " << *object << dendl;
345 sorted.insert(*p);
346 if (object->is_auth())
347 mustpin.insert(object);
348 else if (!object->is_auth() &&
349 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
350 dout(15) << " will also auth_pin " << *object
351 << " in case we need to request a rdlock" << dendl;
352 mustpin.insert(object);
353 }
354 }
355
356
357 // AUTH PINS
358 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
359
360 // can i auth pin them all now?
361 marker.message = "failed to authpin local pins";
362 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
363 p != mustpin.end();
364 ++p) {
365 MDSCacheObject *object = *p;
366
367 dout(10) << " must authpin " << *object << dendl;
368
369 if (mdr->is_auth_pinned(object)) {
370 if (object != (MDSCacheObject*)auth_pin_freeze)
371 continue;
372 if (mdr->more()->is_remote_frozen_authpin) {
373 if (mdr->more()->rename_inode == auth_pin_freeze)
374 continue;
375 // unfreeze auth pin for the wrong inode
376 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
377 }
378 }
379
380 if (!object->is_auth()) {
381 if (!mdr->locks.empty())
382 drop_locks(mdr.get());
383 if (object->is_ambiguous_auth()) {
384 // wait
385 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
386 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
387 mdr->drop_local_auth_pins();
388 return false;
389 }
390 mustpin_remote[object->authority().first].insert(object);
391 continue;
392 }
393 if (!object->can_auth_pin()) {
394 // wait
395 drop_locks(mdr.get());
396 mdr->drop_local_auth_pins();
397 if (auth_pin_nonblock) {
398 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
399 mdr->aborted = true;
400 return false;
401 }
402 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
403 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
404
405 if (!mdr->remote_auth_pins.empty())
406 notify_freeze_waiter(object);
407
408 return false;
409 }
410 }
411
412 // ok, grab local auth pins
413 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
414 p != mustpin.end();
415 ++p) {
416 MDSCacheObject *object = *p;
417 if (mdr->is_auth_pinned(object)) {
418 dout(10) << " already auth_pinned " << *object << dendl;
419 } else if (object->is_auth()) {
420 dout(10) << " auth_pinning " << *object << dendl;
421 mdr->auth_pin(object);
422 }
423 }
424
425 // request remote auth_pins
426 if (!mustpin_remote.empty()) {
427 marker.message = "requesting remote authpins";
428 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
429 p != mdr->remote_auth_pins.end();
430 ++p) {
431 if (mustpin.count(p->first)) {
432 assert(p->second == p->first->authority().first);
433 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
434 if (q != mustpin_remote.end())
435 q->second.insert(p->first);
436 }
437 }
438 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
439 p != mustpin_remote.end();
440 ++p) {
441 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
442
443 // wait for active auth
444 if (mds->is_cluster_degraded() &&
445 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
446 dout(10) << " mds." << p->first << " is not active" << dendl;
447 if (mdr->more()->waiting_on_slave.empty())
448 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
449 return false;
450 }
451
452 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
453 MMDSSlaveRequest::OP_AUTHPIN);
454 for (set<MDSCacheObject*>::iterator q = p->second.begin();
455 q != p->second.end();
456 ++q) {
457 dout(10) << " req remote auth_pin of " << **q << dendl;
458 MDSCacheObjectInfo info;
459 (*q)->set_object_info(info);
460 req->get_authpins().push_back(info);
461 if (*q == auth_pin_freeze)
462 (*q)->set_object_info(req->get_authpin_freeze());
463 mdr->pin(*q);
464 }
465 if (auth_pin_nonblock)
466 req->mark_nonblock();
467 mds->send_message_mds(req, p->first);
468
469 // put in waiting list
470 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
471 mdr->more()->waiting_on_slave.insert(p->first);
472 }
473 return false;
474 }
475
476 // caps i'll need to issue
477 set<CInode*> issue_set;
478 bool result = false;
479
480 // acquire locks.
481 // make sure they match currently acquired locks.
482 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
483 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
484 p != sorted.end();
485 ++p) {
486 bool need_wrlock = !!wrlocks.count(*p);
487 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
488
489 // already locked?
490 if (existing != mdr->locks.end() && *existing == *p) {
491 // right kind?
492 SimpleLock *have = *existing;
493 ++existing;
494 if (xlocks.count(have) && mdr->xlocks.count(have)) {
495 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
496 continue;
497 }
498 if (mdr->remote_wrlocks.count(have)) {
499 if (!need_remote_wrlock ||
500 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
501 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
502 << " " << *have << " " << *have->get_parent() << dendl;
503 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
504 }
505 }
506 if (need_wrlock || need_remote_wrlock) {
507 if (need_wrlock == !!mdr->wrlocks.count(have) &&
508 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
509 if (need_wrlock)
510 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
511 if (need_remote_wrlock)
512 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
513 continue;
514 }
515 }
516 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
517 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
518 continue;
519 }
520 }
521
522 // hose any stray locks
523 if (existing != mdr->locks.end() && *existing == *p) {
524 assert(need_wrlock || need_remote_wrlock);
525 SimpleLock *lock = *existing;
526 if (mdr->wrlocks.count(lock)) {
527 if (!need_wrlock)
528 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
529 else if (need_remote_wrlock) // acquire remote_wrlock first
530 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
531 bool need_issue = false;
532 wrlock_finish(lock, mdr.get(), &need_issue);
533 if (need_issue)
534 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
535 }
536 ++existing;
537 }
538 while (existing != mdr->locks.end()) {
539 SimpleLock *stray = *existing;
540 ++existing;
541 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
542 bool need_issue = false;
543 if (mdr->xlocks.count(stray)) {
544 xlock_finish(stray, mdr.get(), &need_issue);
545 } else if (mdr->rdlocks.count(stray)) {
546 rdlock_finish(stray, mdr.get(), &need_issue);
547 } else {
548 // may have acquired both wrlock and remore wrlock
549 if (mdr->wrlocks.count(stray))
550 wrlock_finish(stray, mdr.get(), &need_issue);
551 if (mdr->remote_wrlocks.count(stray))
552 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
553 }
554 if (need_issue)
555 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
556 }
557
558 // lock
559 if (mdr->locking && *p != mdr->locking) {
560 cancel_locking(mdr.get(), &issue_set);
561 }
562 if (xlocks.count(*p)) {
563 marker.message = "failed to xlock, waiting";
564 if (!xlock_start(*p, mdr))
565 goto out;
566 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
567 } else if (need_wrlock || need_remote_wrlock) {
568 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
569 marker.message = "waiting for remote wrlocks";
570 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
571 goto out;
572 }
573 if (need_wrlock && !mdr->wrlocks.count(*p)) {
574 marker.message = "failed to wrlock, waiting";
575 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
576 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
577 // can't take the wrlock because the scatter lock is gathering. need to
578 // release the remote wrlock, so that the gathering process can finish.
579 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
580 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
581 goto out;
582 }
583 // nowait if we have already gotten remote wrlock
584 if (!wrlock_start(*p, mdr, need_remote_wrlock))
585 goto out;
586 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
587 }
588 } else {
589 assert(mdr->is_master());
590 if ((*p)->needs_recover()) {
591 if (mds->is_cluster_degraded()) {
592 if (!mdr->is_queued_for_replay()) {
593 // see comments in SimpleLock::set_state_rejoin() and
594 // ScatterLock::encode_state_for_rejoin()
595 drop_locks(mdr.get());
596 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
597 dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
598 << ", waiting for cluster recovered" << dendl;
599 marker.message = "rejoin recovering lock, waiting for cluster recovered";
600 return false;
601 }
602 } else {
603 (*p)->clear_need_recover();
604 }
605 }
606
607 marker.message = "failed to rdlock, waiting";
608 if (!rdlock_start(*p, mdr))
609 goto out;
610 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
611 }
612 }
613
614 // any extra unneeded locks?
615 while (existing != mdr->locks.end()) {
616 SimpleLock *stray = *existing;
617 ++existing;
618 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
619 bool need_issue = false;
620 if (mdr->xlocks.count(stray)) {
621 xlock_finish(stray, mdr.get(), &need_issue);
622 } else if (mdr->rdlocks.count(stray)) {
623 rdlock_finish(stray, mdr.get(), &need_issue);
624 } else {
625 // may have acquired both wrlock and remore wrlock
626 if (mdr->wrlocks.count(stray))
627 wrlock_finish(stray, mdr.get(), &need_issue);
628 if (mdr->remote_wrlocks.count(stray))
629 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
630 }
631 if (need_issue)
632 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
633 }
634
635 mdr->done_locking = true;
636 mdr->set_mds_stamp(ceph_clock_now());
637 result = true;
638 marker.message = "acquired locks";
639
640 out:
641 issue_caps_set(issue_set);
642 return result;
643 }
644
645 void Locker::notify_freeze_waiter(MDSCacheObject *o)
646 {
647 CDir *dir = NULL;
648 if (CInode *in = dynamic_cast<CInode*>(o)) {
649 if (!in->is_root())
650 dir = in->get_parent_dir();
651 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
652 dir = dn->get_dir();
653 } else {
654 dir = dynamic_cast<CDir*>(o);
655 assert(dir);
656 }
657 if (dir) {
658 if (dir->is_freezing_dir())
659 mdcache->fragment_freeze_inc_num_waiters(dir);
660 if (dir->is_freezing_tree()) {
661 while (!dir->is_freezing_tree_root())
662 dir = dir->get_parent_dir();
663 mdcache->migrator->export_freeze_inc_num_waiters(dir);
664 }
665 }
666 }
667
668 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
669 {
670 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
671 p != mut->xlocks.end();
672 ++p) {
673 MDSCacheObject *object = (*p)->get_parent();
674 assert(object->is_auth());
675 if (skip_dentry &&
676 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
677 continue;
678 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
679 (*p)->set_xlock_done();
680 }
681 }
682
683 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
684 {
685 while (!mut->rdlocks.empty()) {
686 bool ni = false;
687 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
688 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
689 if (ni)
690 pneed_issue->insert(static_cast<CInode*>(p));
691 }
692 }
693
694 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
695 {
696 set<mds_rank_t> slaves;
697
698 while (!mut->xlocks.empty()) {
699 SimpleLock *lock = *mut->xlocks.begin();
700 MDSCacheObject *p = lock->get_parent();
701 if (!p->is_auth()) {
702 assert(lock->get_sm()->can_remote_xlock);
703 slaves.insert(p->authority().first);
704 lock->put_xlock();
705 mut->locks.erase(lock);
706 mut->xlocks.erase(lock);
707 continue;
708 }
709 bool ni = false;
710 xlock_finish(lock, mut, &ni);
711 if (ni)
712 pneed_issue->insert(static_cast<CInode*>(p));
713 }
714
715 while (!mut->remote_wrlocks.empty()) {
716 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
717 slaves.insert(p->second);
718 if (mut->wrlocks.count(p->first) == 0)
719 mut->locks.erase(p->first);
720 mut->remote_wrlocks.erase(p);
721 }
722
723 while (!mut->wrlocks.empty()) {
724 bool ni = false;
725 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
726 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
727 if (ni)
728 pneed_issue->insert(static_cast<CInode*>(p));
729 }
730
731 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
732 if (!mds->is_cluster_degraded() ||
733 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
734 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
735 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
736 MMDSSlaveRequest::OP_DROPLOCKS);
737 mds->send_message_mds(slavereq, *p);
738 }
739 }
740 }
741
742 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
743 {
744 SimpleLock *lock = mut->locking;
745 assert(lock);
746 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
747
748 if (lock->get_parent()->is_auth()) {
749 bool need_issue = false;
750 if (lock->get_state() == LOCK_PREXLOCK) {
751 _finish_xlock(lock, -1, &need_issue);
752 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
753 lock->get_num_xlocks() == 0) {
754 lock->set_state(LOCK_XLOCKDONE);
755 eval_gather(lock, true, &need_issue);
756 }
757 if (need_issue)
758 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
759 }
760 mut->finish_locking(lock);
761 }
762
763 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
764 {
765 // leftover locks
766 set<CInode*> my_need_issue;
767 if (!pneed_issue)
768 pneed_issue = &my_need_issue;
769
770 if (mut->locking)
771 cancel_locking(mut, pneed_issue);
772 _drop_non_rdlocks(mut, pneed_issue);
773 _drop_rdlocks(mut, pneed_issue);
774
775 if (pneed_issue == &my_need_issue)
776 issue_caps_set(*pneed_issue);
777 mut->done_locking = false;
778 }
779
780 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
781 {
782 set<CInode*> my_need_issue;
783 if (!pneed_issue)
784 pneed_issue = &my_need_issue;
785
786 _drop_non_rdlocks(mut, pneed_issue);
787
788 if (pneed_issue == &my_need_issue)
789 issue_caps_set(*pneed_issue);
790 }
791
792 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
793 {
794 set<CInode*> need_issue;
795
796 for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
797 SimpleLock *lock = *p;
798 ++p;
799 // make later mksnap/setlayout (at other mds) wait for this unsafe request
800 if (lock->get_type() == CEPH_LOCK_ISNAP ||
801 lock->get_type() == CEPH_LOCK_IPOLICY)
802 continue;
803 bool ni = false;
804 rdlock_finish(lock, mut, &ni);
805 if (ni)
806 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
807 }
808
809 issue_caps_set(need_issue);
810 }
811
812 // generics
813
814 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
815 {
816 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
817 assert(!lock->is_stable());
818
819 int next = lock->get_next_state();
820
821 CInode *in = 0;
822 bool caps = lock->get_cap_shift();
823 if (lock->get_type() != CEPH_LOCK_DN)
824 in = static_cast<CInode *>(lock->get_parent());
825
826 bool need_issue = false;
827
828 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
829 assert(!caps || in != NULL);
830 if (caps && in->is_head()) {
831 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
832 lock->get_cap_shift(), lock->get_cap_mask());
833 dout(10) << " next state is " << lock->get_state_name(next)
834 << " issued/allows loner " << gcap_string(loner_issued)
835 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
836 << " xlocker " << gcap_string(xlocker_issued)
837 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
838 << " other " << gcap_string(other_issued)
839 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
840 << dendl;
841
842 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
843 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
844 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
845 need_issue = true;
846 }
847
848 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
849 bool auth = lock->get_parent()->is_auth();
850 if (!lock->is_gathering() &&
851 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
852 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
853 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
854 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
855 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
856 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
857 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
858 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
859 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
860 lock->get_state() != LOCK_MIX_SYNC2
861 ) {
862 dout(7) << "eval_gather finished gather on " << *lock
863 << " on " << *lock->get_parent() << dendl;
864
865 if (lock->get_sm() == &sm_filelock) {
866 assert(in);
867 if (in->state_test(CInode::STATE_RECOVERING)) {
868 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
869 return;
870 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
871 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
872 mds->mdcache->queue_file_recover(in);
873 mds->mdcache->do_file_recover();
874 return;
875 }
876 }
877
878 if (!lock->get_parent()->is_auth()) {
879 // replica: tell auth
880 mds_rank_t auth = lock->get_parent()->authority().first;
881
882 if (lock->get_parent()->is_rejoining() &&
883 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
884 dout(7) << "eval_gather finished gather, but still rejoining "
885 << *lock->get_parent() << dendl;
886 return;
887 }
888
889 if (!mds->is_cluster_degraded() ||
890 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
891 switch (lock->get_state()) {
892 case LOCK_SYNC_LOCK:
893 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
894 auth);
895 break;
896
897 case LOCK_MIX_SYNC:
898 {
899 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
900 lock->encode_locked_state(reply->get_data());
901 mds->send_message_mds(reply, auth);
902 next = LOCK_MIX_SYNC2;
903 (static_cast<ScatterLock *>(lock))->start_flush();
904 }
905 break;
906
907 case LOCK_MIX_SYNC2:
908 (static_cast<ScatterLock *>(lock))->finish_flush();
909 (static_cast<ScatterLock *>(lock))->clear_flushed();
910
911 case LOCK_SYNC_MIX2:
912 // do nothing, we already acked
913 break;
914
915 case LOCK_SYNC_MIX:
916 {
917 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
918 mds->send_message_mds(reply, auth);
919 next = LOCK_SYNC_MIX2;
920 }
921 break;
922
923 case LOCK_MIX_LOCK:
924 {
925 bufferlist data;
926 lock->encode_locked_state(data);
927 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
928 (static_cast<ScatterLock *>(lock))->start_flush();
929 // we'll get an AC_LOCKFLUSHED to complete
930 }
931 break;
932
933 default:
934 ceph_abort();
935 }
936 }
937 } else {
938 // auth
939
940 // once the first (local) stage of mix->lock gather complete we can
941 // gather from replicas
942 if (lock->get_state() == LOCK_MIX_LOCK &&
943 lock->get_parent()->is_replicated()) {
944 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
945 send_lock_message(lock, LOCK_AC_LOCK);
946 lock->init_gather();
947 lock->set_state(LOCK_MIX_LOCK2);
948 return;
949 }
950
951 if (lock->is_dirty() && !lock->is_flushed()) {
952 scatter_writebehind(static_cast<ScatterLock *>(lock));
953 mds->mdlog->flush();
954 return;
955 }
956 lock->clear_flushed();
957
958 switch (lock->get_state()) {
959 // to mixed
960 case LOCK_TSYN_MIX:
961 case LOCK_SYNC_MIX:
962 case LOCK_EXCL_MIX:
963 case LOCK_XSYN_MIX:
964 in->start_scatter(static_cast<ScatterLock *>(lock));
965 if (lock->get_parent()->is_replicated()) {
966 bufferlist softdata;
967 lock->encode_locked_state(softdata);
968 send_lock_message(lock, LOCK_AC_MIX, softdata);
969 }
970 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
971 break;
972
973 case LOCK_XLOCK:
974 case LOCK_XLOCKDONE:
975 if (next != LOCK_SYNC)
976 break;
977 // fall-thru
978
979 // to sync
980 case LOCK_EXCL_SYNC:
981 case LOCK_LOCK_SYNC:
982 case LOCK_MIX_SYNC:
983 case LOCK_XSYN_SYNC:
984 if (lock->get_parent()->is_replicated()) {
985 bufferlist softdata;
986 lock->encode_locked_state(softdata);
987 send_lock_message(lock, LOCK_AC_SYNC, softdata);
988 }
989 break;
990 }
991
992 }
993
994 lock->set_state(next);
995
996 if (lock->get_parent()->is_auth() &&
997 lock->is_stable())
998 lock->get_parent()->auth_unpin(lock);
999
1000 // drop loner before doing waiters
1001 if (caps &&
1002 in->is_head() &&
1003 in->is_auth() &&
1004 in->get_wanted_loner() != in->get_loner()) {
1005 dout(10) << " trying to drop loner" << dendl;
1006 if (in->try_drop_loner()) {
1007 dout(10) << " dropped loner" << dendl;
1008 need_issue = true;
1009 }
1010 }
1011
1012 if (pfinishers)
1013 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1014 *pfinishers);
1015 else
1016 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1017
1018 if (caps && in->is_head())
1019 need_issue = true;
1020
1021 if (lock->get_parent()->is_auth() &&
1022 lock->is_stable())
1023 try_eval(lock, &need_issue);
1024 }
1025
1026 if (need_issue) {
1027 if (pneed_issue)
1028 *pneed_issue = true;
1029 else if (in->is_head())
1030 issue_caps(in);
1031 }
1032
1033 }
1034
1035 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1036 {
1037 bool need_issue = caps_imported;
1038 list<MDSInternalContextBase*> finishers;
1039
1040 dout(10) << "eval " << mask << " " << *in << dendl;
1041
1042 // choose loner?
1043 if (in->is_auth() && in->is_head()) {
1044 client_t orig_loner = in->get_loner();
1045 if (in->choose_ideal_loner()) {
1046 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1047 need_issue = true;
1048 mask = -1;
1049 } else if (in->get_wanted_loner() != in->get_loner()) {
1050 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1051 mask = -1;
1052 }
1053 }
1054
1055 retry:
1056 if (mask & CEPH_LOCK_IFILE)
1057 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1058 if (mask & CEPH_LOCK_IAUTH)
1059 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1060 if (mask & CEPH_LOCK_ILINK)
1061 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1062 if (mask & CEPH_LOCK_IXATTR)
1063 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1064 if (mask & CEPH_LOCK_INEST)
1065 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1066 if (mask & CEPH_LOCK_IFLOCK)
1067 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1068 if (mask & CEPH_LOCK_IPOLICY)
1069 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1070
1071 // drop loner?
1072 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1073 if (in->try_drop_loner()) {
1074 need_issue = true;
1075 if (in->get_wanted_loner() >= 0) {
1076 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1077 bool ok = in->try_set_loner();
1078 assert(ok);
1079 mask = -1;
1080 goto retry;
1081 }
1082 }
1083 }
1084
1085 finish_contexts(g_ceph_context, finishers);
1086
1087 if (need_issue && in->is_head())
1088 issue_caps(in);
1089
1090 dout(10) << "eval done" << dendl;
1091 return need_issue;
1092 }
1093
1094 class C_Locker_Eval : public LockerContext {
1095 MDSCacheObject *p;
1096 int mask;
1097 public:
1098 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1099 // We are used as an MDSCacheObject waiter, so should
1100 // only be invoked by someone already holding the big lock.
1101 assert(locker->mds->mds_lock.is_locked_by_me());
1102 p->get(MDSCacheObject::PIN_PTRWAITER);
1103 }
1104 void finish(int r) override {
1105 locker->try_eval(p, mask);
1106 p->put(MDSCacheObject::PIN_PTRWAITER);
1107 }
1108 };
1109
1110 void Locker::try_eval(MDSCacheObject *p, int mask)
1111 {
1112 // unstable and ambiguous auth?
1113 if (p->is_ambiguous_auth()) {
1114 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1115 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1116 return;
1117 }
1118
1119 if (p->is_auth() && p->is_frozen()) {
1120 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1121 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1122 return;
1123 }
1124
1125 if (mask & CEPH_LOCK_DN) {
1126 assert(mask == CEPH_LOCK_DN);
1127 bool need_issue = false; // ignore this, no caps on dentries
1128 CDentry *dn = static_cast<CDentry *>(p);
1129 eval_any(&dn->lock, &need_issue);
1130 } else {
1131 CInode *in = static_cast<CInode *>(p);
1132 eval(in, mask);
1133 }
1134 }
1135
1136 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1137 {
1138 MDSCacheObject *p = lock->get_parent();
1139
1140 // unstable and ambiguous auth?
1141 if (p->is_ambiguous_auth()) {
1142 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1143 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1144 return;
1145 }
1146
1147 if (!p->is_auth()) {
1148 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1149 return;
1150 }
1151
1152 if (p->is_frozen()) {
1153 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1154 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1155 return;
1156 }
1157
1158 /*
1159 * We could have a situation like:
1160 *
1161 * - mds A authpins item on mds B
1162 * - mds B starts to freeze tree containing item
1163 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1164 * - mds B lock is unstable, sets scatter_wanted
1165 * - mds B lock stabilizes, calls try_eval.
1166 *
1167 * We can defer while freezing without causing a deadlock. Honor
1168 * scatter_wanted flag here. This will never get deferred by the
1169 * checks above due to the auth_pin held by the master.
1170 */
1171 if (lock->is_scatterlock()) {
1172 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1173 if (slock->get_scatter_wanted() &&
1174 slock->get_state() != LOCK_MIX) {
1175 scatter_mix(slock, pneed_issue);
1176 if (!lock->is_stable())
1177 return;
1178 } else if (slock->get_unscatter_wanted() &&
1179 slock->get_state() != LOCK_LOCK) {
1180 simple_lock(slock, pneed_issue);
1181 if (!lock->is_stable()) {
1182 return;
1183 }
1184 }
1185 }
1186
1187 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1188 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1189 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1190 return;
1191 }
1192
1193 eval(lock, pneed_issue);
1194 }
1195
1196 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1197 {
1198 bool need_issue = false;
1199 list<MDSInternalContextBase*> finishers;
1200
1201 // kick locks now
1202 if (!in->filelock.is_stable())
1203 eval_gather(&in->filelock, false, &need_issue, &finishers);
1204 if (!in->authlock.is_stable())
1205 eval_gather(&in->authlock, false, &need_issue, &finishers);
1206 if (!in->linklock.is_stable())
1207 eval_gather(&in->linklock, false, &need_issue, &finishers);
1208 if (!in->xattrlock.is_stable())
1209 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1210
1211 if (need_issue && in->is_head()) {
1212 if (issue_set)
1213 issue_set->insert(in);
1214 else
1215 issue_caps(in);
1216 }
1217
1218 finish_contexts(g_ceph_context, finishers);
1219 }
1220
1221 void Locker::eval_scatter_gathers(CInode *in)
1222 {
1223 bool need_issue = false;
1224 list<MDSInternalContextBase*> finishers;
1225
1226 dout(10) << "eval_scatter_gathers " << *in << dendl;
1227
1228 // kick locks now
1229 if (!in->filelock.is_stable())
1230 eval_gather(&in->filelock, false, &need_issue, &finishers);
1231 if (!in->nestlock.is_stable())
1232 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1233 if (!in->dirfragtreelock.is_stable())
1234 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1235
1236 if (need_issue && in->is_head())
1237 issue_caps(in);
1238
1239 finish_contexts(g_ceph_context, finishers);
1240 }
1241
1242 void Locker::eval(SimpleLock *lock, bool *need_issue)
1243 {
1244 switch (lock->get_type()) {
1245 case CEPH_LOCK_IFILE:
1246 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1247 case CEPH_LOCK_IDFT:
1248 case CEPH_LOCK_INEST:
1249 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1250 default:
1251 return simple_eval(lock, need_issue);
1252 }
1253 }
1254
1255
1256 // ------------------
1257 // rdlock
1258
1259 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1260 {
1261 // kick the lock
1262 if (lock->is_stable()) {
1263 if (lock->get_parent()->is_auth()) {
1264 if (lock->get_sm() == &sm_scatterlock) {
1265 // not until tempsync is fully implemented
1266 //if (lock->get_parent()->is_replicated())
1267 //scatter_tempsync((ScatterLock*)lock);
1268 //else
1269 simple_sync(lock);
1270 } else if (lock->get_sm() == &sm_filelock) {
1271 CInode *in = static_cast<CInode*>(lock->get_parent());
1272 if (lock->get_state() == LOCK_EXCL &&
1273 in->get_target_loner() >= 0 &&
1274 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1275 file_xsyn(lock);
1276 else
1277 simple_sync(lock);
1278 } else
1279 simple_sync(lock);
1280 return true;
1281 } else {
1282 // request rdlock state change from auth
1283 mds_rank_t auth = lock->get_parent()->authority().first;
1284 if (!mds->is_cluster_degraded() ||
1285 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1286 dout(10) << "requesting rdlock from auth on "
1287 << *lock << " on " << *lock->get_parent() << dendl;
1288 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1289 }
1290 return false;
1291 }
1292 }
1293 if (lock->get_type() == CEPH_LOCK_IFILE) {
1294 CInode *in = static_cast<CInode *>(lock->get_parent());
1295 if (in->state_test(CInode::STATE_RECOVERING)) {
1296 mds->mdcache->recovery_queue.prioritize(in);
1297 }
1298 }
1299
1300 return false;
1301 }
1302
1303 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1304 {
1305 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1306
1307 // can read? grab ref.
1308 if (lock->can_rdlock(client))
1309 return true;
1310
1311 _rdlock_kick(lock, false);
1312
1313 if (lock->can_rdlock(client))
1314 return true;
1315
1316 // wait!
1317 if (con) {
1318 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1319 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1320 }
1321 return false;
1322 }
1323
1324 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1325 {
1326 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1327
1328 // client may be allowed to rdlock the same item it has xlocked.
1329 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1330 if (mut->snapid != CEPH_NOSNAP)
1331 as_anon = true;
1332 client_t client = as_anon ? -1 : mut->get_client();
1333
1334 CInode *in = 0;
1335 if (lock->get_type() != CEPH_LOCK_DN)
1336 in = static_cast<CInode *>(lock->get_parent());
1337
1338 /*
1339 if (!lock->get_parent()->is_auth() &&
1340 lock->fw_rdlock_to_auth()) {
1341 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1342 return false;
1343 }
1344 */
1345
1346 while (1) {
1347 // can read? grab ref.
1348 if (lock->can_rdlock(client)) {
1349 lock->get_rdlock();
1350 mut->rdlocks.insert(lock);
1351 mut->locks.insert(lock);
1352 return true;
1353 }
1354
1355 // hmm, wait a second.
1356 if (in && !in->is_head() && in->is_auth() &&
1357 lock->get_state() == LOCK_SNAP_SYNC) {
1358 // okay, we actually need to kick the head's lock to get ourselves synced up.
1359 CInode *head = mdcache->get_inode(in->ino());
1360 assert(head);
1361 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1362 if (hlock->get_state() == LOCK_SYNC)
1363 hlock = head->get_lock(lock->get_type());
1364
1365 if (hlock->get_state() != LOCK_SYNC) {
1366 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1367 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1368 return false;
1369 // oh, check our lock again then
1370 }
1371 }
1372
1373 if (!_rdlock_kick(lock, as_anon))
1374 break;
1375 }
1376
1377 // wait!
1378 int wait_on;
1379 if (lock->get_parent()->is_auth() && lock->is_stable())
1380 wait_on = SimpleLock::WAIT_RD;
1381 else
1382 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1383 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1384 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1385 nudge_log(lock);
1386 return false;
1387 }
1388
1389 void Locker::nudge_log(SimpleLock *lock)
1390 {
1391 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1392 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1393 mds->mdlog->flush();
1394 }
1395
1396 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1397 {
1398 // drop ref
1399 lock->put_rdlock();
1400 if (mut) {
1401 mut->rdlocks.erase(lock);
1402 mut->locks.erase(lock);
1403 }
1404
1405 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1406
1407 // last one?
1408 if (!lock->is_rdlocked()) {
1409 if (!lock->is_stable())
1410 eval_gather(lock, false, pneed_issue);
1411 else if (lock->get_parent()->is_auth())
1412 try_eval(lock, pneed_issue);
1413 }
1414 }
1415
1416
1417 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1418 {
1419 dout(10) << "can_rdlock_set " << locks << dendl;
1420 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1421 if (!(*p)->can_rdlock(-1)) {
1422 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1423 return false;
1424 }
1425 return true;
1426 }
1427
1428 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1429 {
1430 dout(10) << "rdlock_try_set " << locks << dendl;
1431 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1432 if (!rdlock_try(*p, -1, NULL)) {
1433 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1434 return false;
1435 }
1436 return true;
1437 }
1438
1439 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1440 {
1441 dout(10) << "rdlock_take_set " << locks << dendl;
1442 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1443 (*p)->get_rdlock();
1444 mut->rdlocks.insert(*p);
1445 mut->locks.insert(*p);
1446 }
1447 }
1448
1449 // ------------------
1450 // wrlock
1451
1452 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1453 {
1454 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1455 lock->get_type() == CEPH_LOCK_DVERSION)
1456 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1457
1458 dout(7) << "wrlock_force on " << *lock
1459 << " on " << *lock->get_parent() << dendl;
1460 lock->get_wrlock(true);
1461 mut->wrlocks.insert(lock);
1462 mut->locks.insert(lock);
1463 }
1464
1465 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1466 {
1467 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1468 lock->get_type() == CEPH_LOCK_DVERSION)
1469 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1470
1471 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1472
1473 CInode *in = static_cast<CInode *>(lock->get_parent());
1474 client_t client = mut->get_client();
1475 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1476 (in->has_subtree_or_exporting_dirfrag() ||
1477 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1478
1479 while (1) {
1480 // wrlock?
1481 if (lock->can_wrlock(client) &&
1482 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1483 lock->get_wrlock();
1484 mut->wrlocks.insert(lock);
1485 mut->locks.insert(lock);
1486 return true;
1487 }
1488
1489 if (lock->get_type() == CEPH_LOCK_IFILE &&
1490 in->state_test(CInode::STATE_RECOVERING)) {
1491 mds->mdcache->recovery_queue.prioritize(in);
1492 }
1493
1494 if (!lock->is_stable())
1495 break;
1496
1497 if (in->is_auth()) {
1498 // don't do nested lock state change if we have dirty scatterdata and
1499 // may scatter_writebehind or start_scatter, because nowait==true implies
1500 // that the caller already has a log entry open!
1501 if (nowait && lock->is_dirty())
1502 return false;
1503
1504 if (want_scatter)
1505 scatter_mix(static_cast<ScatterLock*>(lock));
1506 else
1507 simple_lock(lock);
1508
1509 if (nowait && !lock->can_wrlock(client))
1510 return false;
1511
1512 } else {
1513 // replica.
1514 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1515 mds_rank_t auth = lock->get_parent()->authority().first;
1516 if (!mds->is_cluster_degraded() ||
1517 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1518 dout(10) << "requesting scatter from auth on "
1519 << *lock << " on " << *lock->get_parent() << dendl;
1520 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1521 }
1522 break;
1523 }
1524 }
1525
1526 if (!nowait) {
1527 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1528 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1529 nudge_log(lock);
1530 }
1531
1532 return false;
1533 }
1534
1535 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1536 {
1537 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1538 lock->get_type() == CEPH_LOCK_DVERSION)
1539 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1540
1541 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1542 lock->put_wrlock();
1543 if (mut) {
1544 mut->wrlocks.erase(lock);
1545 if (mut->remote_wrlocks.count(lock) == 0)
1546 mut->locks.erase(lock);
1547 }
1548
1549 if (!lock->is_wrlocked()) {
1550 if (!lock->is_stable())
1551 eval_gather(lock, false, pneed_issue);
1552 else if (lock->get_parent()->is_auth())
1553 try_eval(lock, pneed_issue);
1554 }
1555 }
1556
1557
1558 // remote wrlock
1559
1560 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1561 {
1562 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1563
1564 // wait for active target
1565 if (mds->is_cluster_degraded() &&
1566 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1567 dout(7) << " mds." << target << " is not active" << dendl;
1568 if (mut->more()->waiting_on_slave.empty())
1569 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1570 return;
1571 }
1572
1573 // send lock request
1574 mut->start_locking(lock, target);
1575 mut->more()->slaves.insert(target);
1576 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1577 MMDSSlaveRequest::OP_WRLOCK);
1578 r->set_lock_type(lock->get_type());
1579 lock->get_parent()->set_object_info(r->get_object_info());
1580 mds->send_message_mds(r, target);
1581
1582 assert(mut->more()->waiting_on_slave.count(target) == 0);
1583 mut->more()->waiting_on_slave.insert(target);
1584 }
1585
1586 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1587 MutationImpl *mut)
1588 {
1589 // drop ref
1590 mut->remote_wrlocks.erase(lock);
1591 if (mut->wrlocks.count(lock) == 0)
1592 mut->locks.erase(lock);
1593
1594 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1595 << " " << *lock->get_parent() << dendl;
1596 if (!mds->is_cluster_degraded() ||
1597 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1598 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1599 MMDSSlaveRequest::OP_UNWRLOCK);
1600 slavereq->set_lock_type(lock->get_type());
1601 lock->get_parent()->set_object_info(slavereq->get_object_info());
1602 mds->send_message_mds(slavereq, target);
1603 }
1604 }
1605
1606
1607 // ------------------
1608 // xlock
1609
1610 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1611 {
1612 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1613 lock->get_type() == CEPH_LOCK_DVERSION)
1614 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1615
1616 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1617 client_t client = mut->get_client();
1618
1619 // auth?
1620 if (lock->get_parent()->is_auth()) {
1621 // auth
1622 while (1) {
1623 if (lock->can_xlock(client)) {
1624 lock->set_state(LOCK_XLOCK);
1625 lock->get_xlock(mut, client);
1626 mut->xlocks.insert(lock);
1627 mut->locks.insert(lock);
1628 mut->finish_locking(lock);
1629 return true;
1630 }
1631
1632 if (lock->get_type() == CEPH_LOCK_IFILE) {
1633 CInode *in = static_cast<CInode*>(lock->get_parent());
1634 if (in->state_test(CInode::STATE_RECOVERING)) {
1635 mds->mdcache->recovery_queue.prioritize(in);
1636 }
1637 }
1638
1639 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1640 lock->get_xlock_by_client() != client ||
1641 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1642 break;
1643
1644 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1645 mut->start_locking(lock);
1646 simple_xlock(lock);
1647 } else {
1648 simple_lock(lock);
1649 }
1650 }
1651
1652 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1653 nudge_log(lock);
1654 return false;
1655 } else {
1656 // replica
1657 assert(lock->get_sm()->can_remote_xlock);
1658 assert(!mut->slave_request);
1659
1660 // wait for single auth
1661 if (lock->get_parent()->is_ambiguous_auth()) {
1662 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1663 new C_MDS_RetryRequest(mdcache, mut));
1664 return false;
1665 }
1666
1667 // wait for active auth
1668 mds_rank_t auth = lock->get_parent()->authority().first;
1669 if (mds->is_cluster_degraded() &&
1670 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1671 dout(7) << " mds." << auth << " is not active" << dendl;
1672 if (mut->more()->waiting_on_slave.empty())
1673 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1674 return false;
1675 }
1676
1677 // send lock request
1678 mut->more()->slaves.insert(auth);
1679 mut->start_locking(lock, auth);
1680 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1681 MMDSSlaveRequest::OP_XLOCK);
1682 r->set_lock_type(lock->get_type());
1683 lock->get_parent()->set_object_info(r->get_object_info());
1684 mds->send_message_mds(r, auth);
1685
1686 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1687 mut->more()->waiting_on_slave.insert(auth);
1688
1689 return false;
1690 }
1691 }
1692
1693 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1694 {
1695 assert(!lock->is_stable());
1696 if (lock->get_num_rdlocks() == 0 &&
1697 lock->get_num_wrlocks() == 0 &&
1698 !lock->is_leased() &&
1699 lock->get_state() != LOCK_XLOCKSNAP &&
1700 lock->get_type() != CEPH_LOCK_DN) {
1701 CInode *in = static_cast<CInode*>(lock->get_parent());
1702 client_t loner = in->get_target_loner();
1703 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1704 lock->set_state(LOCK_EXCL);
1705 lock->get_parent()->auth_unpin(lock);
1706 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1707 if (lock->get_cap_shift())
1708 *pneed_issue = true;
1709 if (lock->get_parent()->is_auth() &&
1710 lock->is_stable())
1711 try_eval(lock, pneed_issue);
1712 return;
1713 }
1714 }
1715 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1716 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1717 }
1718
1719 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1720 {
1721 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1722 lock->get_type() == CEPH_LOCK_DVERSION)
1723 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1724
1725 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1726
1727 client_t xlocker = lock->get_xlock_by_client();
1728
1729 // drop ref
1730 lock->put_xlock();
1731 assert(mut);
1732 mut->xlocks.erase(lock);
1733 mut->locks.erase(lock);
1734
1735 bool do_issue = false;
1736
1737 // remote xlock?
1738 if (!lock->get_parent()->is_auth()) {
1739 assert(lock->get_sm()->can_remote_xlock);
1740
1741 // tell auth
1742 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1743 mds_rank_t auth = lock->get_parent()->authority().first;
1744 if (!mds->is_cluster_degraded() ||
1745 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1746 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1747 MMDSSlaveRequest::OP_UNXLOCK);
1748 slavereq->set_lock_type(lock->get_type());
1749 lock->get_parent()->set_object_info(slavereq->get_object_info());
1750 mds->send_message_mds(slavereq, auth);
1751 }
1752 // others waiting?
1753 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1754 SimpleLock::WAIT_WR |
1755 SimpleLock::WAIT_RD, 0);
1756 } else {
1757 if (lock->get_num_xlocks() == 0) {
1758 if (lock->get_state() == LOCK_LOCK_XLOCK)
1759 lock->set_state(LOCK_XLOCKDONE);
1760 _finish_xlock(lock, xlocker, &do_issue);
1761 }
1762 }
1763
1764 if (do_issue) {
1765 CInode *in = static_cast<CInode*>(lock->get_parent());
1766 if (in->is_head()) {
1767 if (pneed_issue)
1768 *pneed_issue = true;
1769 else
1770 issue_caps(in);
1771 }
1772 }
1773 }
1774
1775 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1776 {
1777 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1778
1779 lock->put_xlock();
1780 mut->xlocks.erase(lock);
1781 mut->locks.erase(lock);
1782
1783 MDSCacheObject *p = lock->get_parent();
1784 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1785
1786 if (!lock->is_stable())
1787 lock->get_parent()->auth_unpin(lock);
1788
1789 lock->set_state(LOCK_LOCK);
1790 }
1791
1792 void Locker::xlock_import(SimpleLock *lock)
1793 {
1794 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1795 lock->get_parent()->auth_pin(lock);
1796 }
1797
1798
1799
1800 // file i/o -----------------------------------------
1801
1802 version_t Locker::issue_file_data_version(CInode *in)
1803 {
1804 dout(7) << "issue_file_data_version on " << *in << dendl;
1805 return in->inode.file_data_version;
1806 }
1807
1808 class C_Locker_FileUpdate_finish : public LockerLogContext {
1809 CInode *in;
1810 MutationRef mut;
1811 bool share_max;
1812 bool need_issue;
1813 client_t client;
1814 MClientCaps *ack;
1815 public:
1816 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1817 bool sm=false, bool ni=false, client_t c=-1,
1818 MClientCaps *ac = 0)
1819 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1820 client(c), ack(ac) {
1821 in->get(CInode::PIN_PTRWAITER);
1822 }
1823 void finish(int r) override {
1824 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1825 in->put(CInode::PIN_PTRWAITER);
1826 }
1827 };
1828
1829 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1830 client_t client, MClientCaps *ack)
1831 {
1832 dout(10) << "file_update_finish on " << *in << dendl;
1833 in->pop_and_dirty_projected_inode(mut->ls);
1834
1835 mut->apply();
1836
1837 if (ack) {
1838 Session *session = mds->get_session(client);
1839 if (session) {
1840 // "oldest flush tid" > 0 means client uses unique TID for each flush
1841 if (ack->get_oldest_flush_tid() > 0)
1842 session->add_completed_flush(ack->get_client_tid());
1843 mds->send_message_client_counted(ack, session);
1844 } else {
1845 dout(10) << " no session for client." << client << " " << *ack << dendl;
1846 ack->put();
1847 }
1848 }
1849
1850 set<CInode*> need_issue;
1851 drop_locks(mut.get(), &need_issue);
1852
1853 if (!in->is_head() && !in->client_snap_caps.empty()) {
1854 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1855 // check for snap writeback completion
1856 bool gather = false;
1857 auto p = in->client_snap_caps.begin();
1858 while (p != in->client_snap_caps.end()) {
1859 SimpleLock *lock = in->get_lock(p->first);
1860 assert(lock);
1861 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1862 << " lock " << *lock << " on " << *in << dendl;
1863 lock->put_wrlock();
1864
1865 p->second.erase(client);
1866 if (p->second.empty()) {
1867 gather = true;
1868 in->client_snap_caps.erase(p++);
1869 } else
1870 ++p;
1871 }
1872 if (gather) {
1873 if (in->client_snap_caps.empty())
1874 in->item_open_file.remove_myself();
1875 eval_cap_gather(in, &need_issue);
1876 }
1877 } else {
1878 if (issue_client_cap && need_issue.count(in) == 0) {
1879 Capability *cap = in->get_client_cap(client);
1880 if (cap && (cap->wanted() & ~cap->pending()))
1881 issue_caps(in, cap);
1882 }
1883
1884 if (share_max && in->is_auth() &&
1885 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1886 share_inode_max_size(in);
1887 }
1888 issue_caps_set(need_issue);
1889
1890 utime_t now = ceph_clock_now();
1891 mds->balancer->hit_inode(now, in, META_POP_IWR);
1892
1893 // auth unpin after issuing caps
1894 mut->cleanup();
1895 }
1896
1897 Capability* Locker::issue_new_caps(CInode *in,
1898 int mode,
1899 Session *session,
1900 SnapRealm *realm,
1901 bool is_replay)
1902 {
1903 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1904 bool is_new;
1905
1906 // if replay, try to reconnect cap, and otherwise do nothing.
1907 if (is_replay) {
1908 mds->mdcache->try_reconnect_cap(in, session);
1909 return 0;
1910 }
1911
1912 // my needs
1913 assert(session->info.inst.name.is_client());
1914 client_t my_client = session->info.inst.name.num();
1915 int my_want = ceph_caps_for_mode(mode);
1916
1917 // register a capability
1918 Capability *cap = in->get_client_cap(my_client);
1919 if (!cap) {
1920 // new cap
1921 cap = in->add_client_cap(my_client, session, realm);
1922 cap->set_wanted(my_want);
1923 cap->mark_new();
1924 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1925 is_new = true;
1926 } else {
1927 is_new = false;
1928 // make sure it wants sufficient caps
1929 if (my_want & ~cap->wanted()) {
1930 // augment wanted caps for this client
1931 cap->set_wanted(cap->wanted() | my_want);
1932 }
1933 }
1934
1935 if (in->is_auth()) {
1936 // [auth] twiddle mode?
1937 eval(in, CEPH_CAP_LOCKS);
1938
1939 if (_need_flush_mdlog(in, my_want))
1940 mds->mdlog->flush();
1941
1942 } else {
1943 // [replica] tell auth about any new caps wanted
1944 request_inode_file_caps(in);
1945 }
1946
1947 // issue caps (pot. incl new one)
1948 //issue_caps(in); // note: _eval above may have done this already...
1949
1950 // re-issue whatever we can
1951 //cap->issue(cap->pending());
1952
1953 if (is_new)
1954 cap->dec_suppress();
1955
1956 return cap;
1957 }
1958
1959
1960 void Locker::issue_caps_set(set<CInode*>& inset)
1961 {
1962 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1963 issue_caps(*p);
1964 }
1965
1966 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1967 {
1968 // allowed caps are determined by the lock mode.
1969 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1970 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1971 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1972
1973 client_t loner = in->get_loner();
1974 if (loner >= 0) {
1975 dout(7) << "issue_caps loner client." << loner
1976 << " allowed=" << ccap_string(loner_allowed)
1977 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1978 << ", others allowed=" << ccap_string(all_allowed)
1979 << " on " << *in << dendl;
1980 } else {
1981 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1982 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1983 << " on " << *in << dendl;
1984 }
1985
1986 assert(in->is_head());
1987
1988 // count conflicts with
1989 int nissued = 0;
1990
1991 // client caps
1992 map<client_t, Capability*>::iterator it;
1993 if (only_cap)
1994 it = in->client_caps.find(only_cap->get_client());
1995 else
1996 it = in->client_caps.begin();
1997 for (; it != in->client_caps.end(); ++it) {
1998 Capability *cap = it->second;
1999 if (cap->is_stale())
2000 continue;
2001
2002 // do not issue _new_ bits when size|mtime is projected
2003 int allowed;
2004 if (loner == it->first)
2005 allowed = loner_allowed;
2006 else
2007 allowed = all_allowed;
2008
2009 // add in any xlocker-only caps (for locks this client is the xlocker for)
2010 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2011
2012 Session *session = mds->get_session(it->first);
2013 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
2014 !(session && session->connection &&
2015 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
2016 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2017
2018 int pending = cap->pending();
2019 int wanted = cap->wanted();
2020
2021 dout(20) << " client." << it->first
2022 << " pending " << ccap_string(pending)
2023 << " allowed " << ccap_string(allowed)
2024 << " wanted " << ccap_string(wanted)
2025 << dendl;
2026
2027 if (!(pending & ~allowed)) {
2028 // skip if suppress or new, and not revocation
2029 if (cap->is_new() || cap->is_suppress()) {
2030 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2031 continue;
2032 }
2033 }
2034
2035 // notify clients about deleted inode, to make sure they release caps ASAP.
2036 if (in->inode.nlink == 0)
2037 wanted |= CEPH_CAP_LINK_SHARED;
2038
2039 // are there caps that the client _wants_ and can have, but aren't pending?
2040 // or do we need to revoke?
2041 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2042 (pending & ~allowed)) { // need to revoke ~allowed caps.
2043 // issue
2044 nissued++;
2045
2046 // include caps that clients generally like, while we're at it.
2047 int likes = in->get_caps_liked();
2048 int before = pending;
2049 long seq;
2050 if (pending & ~allowed)
2051 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2052 else
2053 seq = cap->issue((wanted|likes) & allowed);
2054 int after = cap->pending();
2055
2056 if (cap->is_new()) {
2057 // haven't send caps to client yet
2058 if (before & ~after)
2059 cap->confirm_receipt(seq, after);
2060 } else {
2061 dout(7) << " sending MClientCaps to client." << it->first
2062 << " seq " << cap->get_last_seq()
2063 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2064 << dendl;
2065
2066 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2067 if (op == CEPH_CAP_OP_REVOKE) {
2068 revoking_caps.push_back(&cap->item_revoking_caps);
2069 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2070 cap->set_last_revoke_stamp(ceph_clock_now());
2071 cap->reset_num_revoke_warnings();
2072 }
2073
2074 MClientCaps *m = new MClientCaps(op, in->ino(),
2075 in->find_snaprealm()->inode->ino(),
2076 cap->get_cap_id(), cap->get_last_seq(),
2077 after, wanted, 0,
2078 cap->get_mseq(),
2079 mds->get_osd_epoch_barrier());
2080 in->encode_cap_message(m, cap);
2081
2082 mds->send_message_client_counted(m, it->first);
2083 }
2084 }
2085
2086 if (only_cap)
2087 break;
2088 }
2089
2090 return (nissued == 0); // true if no re-issued, no callbacks
2091 }
2092
2093 void Locker::issue_truncate(CInode *in)
2094 {
2095 dout(7) << "issue_truncate on " << *in << dendl;
2096
2097 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2098 it != in->client_caps.end();
2099 ++it) {
2100 Capability *cap = it->second;
2101 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2102 in->ino(),
2103 in->find_snaprealm()->inode->ino(),
2104 cap->get_cap_id(), cap->get_last_seq(),
2105 cap->pending(), cap->wanted(), 0,
2106 cap->get_mseq(),
2107 mds->get_osd_epoch_barrier());
2108 in->encode_cap_message(m, cap);
2109 mds->send_message_client_counted(m, it->first);
2110 }
2111
2112 // should we increase max_size?
2113 if (in->is_auth() && in->is_file())
2114 check_inode_max_size(in);
2115 }
2116
2117
2118 void Locker::revoke_stale_caps(Capability *cap)
2119 {
2120 CInode *in = cap->get_inode();
2121 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2122 // if export succeeds, the cap will be removed. if export fails, we need to
2123 // revoke the cap if it's still stale.
2124 in->state_set(CInode::STATE_EVALSTALECAPS);
2125 return;
2126 }
2127
2128 int issued = cap->issued();
2129 if (issued & ~CEPH_CAP_PIN) {
2130 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2131 cap->revoke();
2132
2133 if (in->is_auth() &&
2134 in->inode.client_ranges.count(cap->get_client()))
2135 in->state_set(CInode::STATE_NEEDSRECOVER);
2136
2137 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2138 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2139 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2140 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2141
2142 if (in->is_auth()) {
2143 try_eval(in, CEPH_CAP_LOCKS);
2144 } else {
2145 request_inode_file_caps(in);
2146 }
2147 }
2148 }
2149
2150 void Locker::revoke_stale_caps(Session *session)
2151 {
2152 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2153
2154 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2155 Capability *cap = *p;
2156 cap->mark_stale();
2157 revoke_stale_caps(cap);
2158 }
2159 }
2160
2161 void Locker::resume_stale_caps(Session *session)
2162 {
2163 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2164
2165 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2166 Capability *cap = *p;
2167 CInode *in = cap->get_inode();
2168 assert(in->is_head());
2169 if (cap->is_stale()) {
2170 dout(10) << " clearing stale flag on " << *in << dendl;
2171 cap->clear_stale();
2172
2173 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2174 // if export succeeds, the cap will be removed. if export fails,
2175 // we need to re-issue the cap if it's not stale.
2176 in->state_set(CInode::STATE_EVALSTALECAPS);
2177 continue;
2178 }
2179
2180 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2181 issue_caps(in, cap);
2182 }
2183 }
2184 }
2185
2186 void Locker::remove_stale_leases(Session *session)
2187 {
2188 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2189 xlist<ClientLease*>::iterator p = session->leases.begin();
2190 while (!p.end()) {
2191 ClientLease *l = *p;
2192 ++p;
2193 CDentry *parent = static_cast<CDentry*>(l->parent);
2194 dout(15) << " removing lease on " << *parent << dendl;
2195 parent->remove_client_lease(l, this);
2196 }
2197 }
2198
2199
2200 class C_MDL_RequestInodeFileCaps : public LockerContext {
2201 CInode *in;
2202 public:
2203 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2204 in->get(CInode::PIN_PTRWAITER);
2205 }
2206 void finish(int r) override {
2207 if (!in->is_auth())
2208 locker->request_inode_file_caps(in);
2209 in->put(CInode::PIN_PTRWAITER);
2210 }
2211 };
2212
2213 void Locker::request_inode_file_caps(CInode *in)
2214 {
2215 assert(!in->is_auth());
2216
2217 int wanted = in->get_caps_wanted() & ~CEPH_CAP_PIN;
2218 if (wanted != in->replica_caps_wanted) {
2219 // wait for single auth
2220 if (in->is_ambiguous_auth()) {
2221 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2222 new C_MDL_RequestInodeFileCaps(this, in));
2223 return;
2224 }
2225
2226 mds_rank_t auth = in->authority().first;
2227 if (mds->is_cluster_degraded() &&
2228 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2229 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2230 return;
2231 }
2232
2233 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2234 << " was " << ccap_string(in->replica_caps_wanted)
2235 << " on " << *in << " to mds." << auth << dendl;
2236
2237 in->replica_caps_wanted = wanted;
2238
2239 if (!mds->is_cluster_degraded() ||
2240 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2241 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2242 auth);
2243 }
2244 }
2245
2246 /* This function DOES put the passed message before returning */
2247 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2248 {
2249 // nobody should be talking to us during recovery.
2250 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2251
2252 // ok
2253 CInode *in = mdcache->get_inode(m->get_ino());
2254 mds_rank_t from = mds_rank_t(m->get_source().num());
2255
2256 assert(in);
2257 assert(in->is_auth());
2258
2259 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2260
2261 if (m->get_caps())
2262 in->mds_caps_wanted[from] = m->get_caps();
2263 else
2264 in->mds_caps_wanted.erase(from);
2265
2266 try_eval(in, CEPH_CAP_LOCKS);
2267 m->put();
2268 }
2269
2270
2271 class C_MDL_CheckMaxSize : public LockerContext {
2272 CInode *in;
2273 uint64_t new_max_size;
2274 uint64_t newsize;
2275 utime_t mtime;
2276
2277 public:
2278 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2279 uint64_t _newsize, utime_t _mtime) :
2280 LockerContext(l), in(i),
2281 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2282 {
2283 in->get(CInode::PIN_PTRWAITER);
2284 }
2285 void finish(int r) override {
2286 if (in->is_auth())
2287 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2288 in->put(CInode::PIN_PTRWAITER);
2289 }
2290 };
2291
2292 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
2293 {
2294 uint64_t new_max = (size + 1) << 1;
2295 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2296 if (max_inc > 0) {
2297 max_inc *= pi->layout.object_size;
2298 new_max = MIN(new_max, size + max_inc);
2299 }
2300 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2301 }
2302
2303 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2304 CInode::mempool_inode::client_range_map *new_ranges,
2305 bool *max_increased)
2306 {
2307 auto latest = in->get_projected_inode();
2308 uint64_t ms;
2309 if(latest->has_layout()) {
2310 ms = calc_new_max_size(latest, size);
2311 } else {
2312 // Layout-less directories like ~mds0/, have zero size
2313 ms = 0;
2314 }
2315
2316 // increase ranges as appropriate.
2317 // shrink to 0 if no WR|BUFFER caps issued.
2318 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2319 p != in->client_caps.end();
2320 ++p) {
2321 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2322 client_writeable_range_t& nr = (*new_ranges)[p->first];
2323 nr.range.first = 0;
2324 if (latest->client_ranges.count(p->first)) {
2325 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2326 if (ms > oldr.range.last)
2327 *max_increased = true;
2328 nr.range.last = MAX(ms, oldr.range.last);
2329 nr.follows = oldr.follows;
2330 } else {
2331 *max_increased = true;
2332 nr.range.last = ms;
2333 nr.follows = in->first - 1;
2334 }
2335 }
2336 }
2337 }
2338
2339 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2340 uint64_t new_max_size, uint64_t new_size,
2341 utime_t new_mtime)
2342 {
2343 assert(in->is_auth());
2344 assert(in->is_file());
2345
2346 CInode::mempool_inode *latest = in->get_projected_inode();
2347 CInode::mempool_inode::client_range_map new_ranges;
2348 uint64_t size = latest->size;
2349 bool update_size = new_size > 0;
2350 bool update_max = false;
2351 bool max_increased = false;
2352
2353 if (update_size) {
2354 new_size = size = MAX(size, new_size);
2355 new_mtime = MAX(new_mtime, latest->mtime);
2356 if (latest->size == new_size && latest->mtime == new_mtime)
2357 update_size = false;
2358 }
2359
2360 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2361
2362 if (max_increased || latest->client_ranges != new_ranges)
2363 update_max = true;
2364
2365 if (!update_size && !update_max) {
2366 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2367 return false;
2368 }
2369
2370 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2371 << " update_size " << update_size
2372 << " on " << *in << dendl;
2373
2374 if (in->is_frozen()) {
2375 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2376 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2377 new_max_size,
2378 new_size,
2379 new_mtime);
2380 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2381 return false;
2382 }
2383 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2384 // lock?
2385 if (in->filelock.is_stable()) {
2386 if (in->get_target_loner() >= 0)
2387 file_excl(&in->filelock);
2388 else
2389 simple_lock(&in->filelock);
2390 }
2391 if (!in->filelock.can_wrlock(in->get_loner())) {
2392 // try again later
2393 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2394 new_max_size,
2395 new_size,
2396 new_mtime);
2397
2398 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2399 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2400 return false;
2401 }
2402 }
2403
2404 MutationRef mut(new MutationImpl());
2405 mut->ls = mds->mdlog->get_current_segment();
2406
2407 auto &pi = in->project_inode();
2408 pi.inode.version = in->pre_dirty();
2409
2410 if (update_max) {
2411 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2412 pi.inode.client_ranges = new_ranges;
2413 }
2414
2415 if (update_size) {
2416 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2417 pi.inode.size = new_size;
2418 pi.inode.rstat.rbytes = new_size;
2419 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2420 pi.inode.mtime = new_mtime;
2421 if (new_mtime > pi.inode.ctime)
2422 pi.inode.ctime = pi.inode.rstat.rctime = new_mtime;
2423 }
2424
2425 // use EOpen if the file is still open; otherwise, use EUpdate.
2426 // this is just an optimization to push open files forward into
2427 // newer log segments.
2428 LogEvent *le;
2429 EMetaBlob *metablob;
2430 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2431 EOpen *eo = new EOpen(mds->mdlog);
2432 eo->add_ino(in->ino());
2433 metablob = &eo->metablob;
2434 le = eo;
2435 mut->ls->open_files.push_back(&in->item_open_file);
2436 } else {
2437 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2438 metablob = &eu->metablob;
2439 le = eu;
2440 }
2441 mds->mdlog->start_entry(le);
2442 if (update_size) { // FIXME if/when we do max_size nested accounting
2443 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2444 // no cow, here!
2445 CDentry *parent = in->get_projected_parent_dn();
2446 metablob->add_primary_dentry(parent, in, true);
2447 } else {
2448 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2449 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2450 }
2451 mds->mdlog->submit_entry(le,
2452 new C_Locker_FileUpdate_finish(this, in, mut, true));
2453 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2454 mut->auth_pin(in);
2455
2456 // make max_size _increase_ timely
2457 if (max_increased)
2458 mds->mdlog->flush();
2459
2460 return true;
2461 }
2462
2463
2464 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2465 {
2466 /*
2467 * only share if currently issued a WR cap. if client doesn't have it,
2468 * file_max doesn't matter, and the client will get it if/when they get
2469 * the cap later.
2470 */
2471 dout(10) << "share_inode_max_size on " << *in << dendl;
2472 map<client_t, Capability*>::iterator it;
2473 if (only_cap)
2474 it = in->client_caps.find(only_cap->get_client());
2475 else
2476 it = in->client_caps.begin();
2477 for (; it != in->client_caps.end(); ++it) {
2478 const client_t client = it->first;
2479 Capability *cap = it->second;
2480 if (cap->is_suppress())
2481 continue;
2482 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2483 dout(10) << "share_inode_max_size with client." << client << dendl;
2484 cap->inc_last_seq();
2485 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2486 in->ino(),
2487 in->find_snaprealm()->inode->ino(),
2488 cap->get_cap_id(), cap->get_last_seq(),
2489 cap->pending(), cap->wanted(), 0,
2490 cap->get_mseq(),
2491 mds->get_osd_epoch_barrier());
2492 in->encode_cap_message(m, cap);
2493 mds->send_message_client_counted(m, client);
2494 }
2495 if (only_cap)
2496 break;
2497 }
2498 }
2499
2500 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2501 {
2502 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2503 * pending mutations. */
2504 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2505 in->filelock.is_unstable_and_locked()) ||
2506 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2507 in->authlock.is_unstable_and_locked()) ||
2508 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2509 in->linklock.is_unstable_and_locked()) ||
2510 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2511 in->xattrlock.is_unstable_and_locked()))
2512 return true;
2513 return false;
2514 }
2515
2516 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2517 {
2518 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2519 dout(10) << " wanted " << ccap_string(cap->wanted())
2520 << " -> " << ccap_string(wanted) << dendl;
2521 cap->set_wanted(wanted);
2522 } else if (wanted & ~cap->wanted()) {
2523 dout(10) << " wanted " << ccap_string(cap->wanted())
2524 << " -> " << ccap_string(wanted)
2525 << " (added caps even though we had seq mismatch!)" << dendl;
2526 cap->set_wanted(wanted | cap->wanted());
2527 } else {
2528 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2529 << " -> " << ccap_string(wanted)
2530 << " (issue_seq " << issue_seq << " != last_issue "
2531 << cap->get_last_issue() << ")" << dendl;
2532 return;
2533 }
2534
2535 CInode *cur = cap->get_inode();
2536 if (!cur->is_auth()) {
2537 request_inode_file_caps(cur);
2538 return;
2539 }
2540
2541 if (cap->wanted() == 0) {
2542 if (cur->item_open_file.is_on_list() &&
2543 !cur->is_any_caps_wanted()) {
2544 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2545 cur->item_open_file.remove_myself();
2546 }
2547 } else {
2548 if (cur->state_test(CInode::STATE_RECOVERING) &&
2549 (cap->wanted() & (CEPH_CAP_FILE_RD |
2550 CEPH_CAP_FILE_WR))) {
2551 mds->mdcache->recovery_queue.prioritize(cur);
2552 }
2553
2554 if (!cur->item_open_file.is_on_list()) {
2555 dout(10) << " adding to open file list " << *cur << dendl;
2556 assert(cur->last == CEPH_NOSNAP);
2557 LogSegment *ls = mds->mdlog->get_current_segment();
2558 EOpen *le = new EOpen(mds->mdlog);
2559 mds->mdlog->start_entry(le);
2560 le->add_clean_inode(cur);
2561 ls->open_files.push_back(&cur->item_open_file);
2562 mds->mdlog->submit_entry(le);
2563 }
2564 }
2565 }
2566
2567
2568
2569 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2570 {
2571 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2572 for (auto p = head_in->client_need_snapflush.begin();
2573 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2574 snapid_t snapid = p->first;
2575 auto &clients = p->second;
2576 ++p; // be careful, q loop below depends on this
2577
2578 if (clients.count(client)) {
2579 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2580 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2581 assert(sin);
2582 assert(sin->first <= snapid);
2583 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2584 head_in->remove_need_snapflush(sin, snapid, client);
2585 }
2586 }
2587 }
2588
2589
2590 bool Locker::should_defer_client_cap_frozen(CInode *in)
2591 {
2592 /*
2593 * This policy needs to be AT LEAST as permissive as allowing a client request
2594 * to go forward, or else a client request can release something, the release
2595 * gets deferred, but the request gets processed and deadlocks because when the
2596 * caps can't get revoked.
2597 *
2598 * Currently, a request wait if anything locked is freezing (can't
2599 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2600 * _MUST_ be in the lock/auth_pin set.
2601 *
2602 * auth_pins==0 implies no unstable lock and not auth pinnned by
2603 * client request, otherwise continue even it's freezing.
2604 */
2605 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2606 }
2607
2608 /*
2609 * This function DOES put the passed message before returning
2610 */
2611 void Locker::handle_client_caps(MClientCaps *m)
2612 {
2613 client_t client = m->get_source().num();
2614 snapid_t follows = m->get_snap_follows();
2615 dout(7) << "handle_client_caps "
2616 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2617 << " on " << m->get_ino()
2618 << " tid " << m->get_client_tid() << " follows " << follows
2619 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2620
2621 Session *session = mds->get_session(m);
2622 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2623 if (!session) {
2624 dout(5) << " no session, dropping " << *m << dendl;
2625 m->put();
2626 return;
2627 }
2628 if (session->is_closed() ||
2629 session->is_closing() ||
2630 session->is_killing()) {
2631 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2632 m->put();
2633 return;
2634 }
2635 if (mds->is_reconnect() &&
2636 m->get_dirty() && m->get_client_tid() > 0 &&
2637 !session->have_completed_flush(m->get_client_tid())) {
2638 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2639 }
2640 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2641 return;
2642 }
2643
2644 if (m->get_client_tid() > 0 && session &&
2645 session->have_completed_flush(m->get_client_tid())) {
2646 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2647 << " for client." << client << dendl;
2648 MClientCaps *ack;
2649 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2650 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2651 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2652 } else {
2653 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2654 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2655 mds->get_osd_epoch_barrier());
2656 }
2657 ack->set_snap_follows(follows);
2658 ack->set_client_tid(m->get_client_tid());
2659 mds->send_message_client_counted(ack, m->get_connection());
2660 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2661 m->put();
2662 return;
2663 } else {
2664 // fall-thru because the message may release some caps
2665 m->clear_dirty();
2666 m->set_op(CEPH_CAP_OP_UPDATE);
2667 }
2668 }
2669
2670 // "oldest flush tid" > 0 means client uses unique TID for each flush
2671 if (m->get_oldest_flush_tid() > 0 && session) {
2672 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2673 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2674
2675 if (session->get_num_trim_flushes_warnings() > 0 &&
2676 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2677 session->reset_num_trim_flushes_warnings();
2678 } else {
2679 if (session->get_num_completed_flushes() >=
2680 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2681 session->inc_num_trim_flushes_warnings();
2682 stringstream ss;
2683 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2684 << m->get_oldest_flush_tid() << "), "
2685 << session->get_num_completed_flushes()
2686 << " completed flushes recorded in session";
2687 mds->clog->warn() << ss.str();
2688 dout(20) << __func__ << " " << ss.str() << dendl;
2689 }
2690 }
2691 }
2692
2693 CInode *head_in = mdcache->get_inode(m->get_ino());
2694 if (!head_in) {
2695 if (mds->is_clientreplay()) {
2696 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2697 << ", will try again after replayed client requests" << dendl;
2698 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2699 return;
2700 }
2701
2702 /*
2703 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2704 * Sequence of events that cause this are:
2705 * - client sends caps message to mds.a
2706 * - mds finishes subtree migration, send cap export to client
2707 * - mds trim its cache
2708 * - mds receives cap messages from client
2709 */
2710 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2711 m->put();
2712 return;
2713 }
2714
2715 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2716 // Pause RADOS operations until we see the required epoch
2717 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2718 }
2719
2720 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2721 // Record the barrier so that we will retransmit it to clients
2722 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2723 }
2724
2725 dout(10) << " head inode " << *head_in << dendl;
2726
2727 Capability *cap = 0;
2728 cap = head_in->get_client_cap(client);
2729 if (!cap) {
2730 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
2731 m->put();
2732 return;
2733 }
2734 assert(cap);
2735
2736 // freezing|frozen?
2737 if (should_defer_client_cap_frozen(head_in)) {
2738 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2739 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2740 return;
2741 }
2742 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2743 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2744 << ", dropping" << dendl;
2745 m->put();
2746 return;
2747 }
2748
2749 int op = m->get_op();
2750
2751 // flushsnap?
2752 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2753 if (!head_in->is_auth()) {
2754 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
2755 goto out;
2756 }
2757
2758 SnapRealm *realm = head_in->find_snaprealm();
2759 snapid_t snap = realm->get_snap_following(follows);
2760 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2761
2762 CInode *in = head_in;
2763 if (snap != CEPH_NOSNAP) {
2764 in = mdcache->pick_inode_snap(head_in, snap - 1);
2765 if (in != head_in)
2766 dout(10) << " snapped inode " << *in << dendl;
2767 }
2768
2769 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2770 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2771 // case we get a dup response, so whatever.)
2772 MClientCaps *ack = 0;
2773 if (m->get_dirty()) {
2774 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2775 ack->set_snap_follows(follows);
2776 ack->set_client_tid(m->get_client_tid());
2777 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2778 }
2779
2780 if (in == head_in ||
2781 (head_in->client_need_snapflush.count(snap) &&
2782 head_in->client_need_snapflush[snap].count(client))) {
2783 dout(7) << " flushsnap snap " << snap
2784 << " client." << client << " on " << *in << dendl;
2785
2786 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2787 if (in == head_in)
2788 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2789 else if (head_in->client_need_snapflush.begin()->first < snap)
2790 _do_null_snapflush(head_in, client, snap);
2791
2792 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2793
2794 if (in != head_in)
2795 head_in->remove_need_snapflush(in, snap, client);
2796 } else {
2797 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2798 if (ack)
2799 mds->send_message_client_counted(ack, m->get_connection());
2800 }
2801 goto out;
2802 }
2803
2804 if (cap->get_cap_id() != m->get_cap_id()) {
2805 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2806 } else {
2807 CInode *in = head_in;
2808 if (follows > 0) {
2809 in = mdcache->pick_inode_snap(head_in, follows);
2810 // intermediate snap inodes
2811 while (in != head_in) {
2812 assert(in->last != CEPH_NOSNAP);
2813 if (in->is_auth() && m->get_dirty()) {
2814 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2815 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2816 }
2817 in = mdcache->pick_inode_snap(head_in, in->last);
2818 }
2819 }
2820
2821 // head inode, and cap
2822 MClientCaps *ack = 0;
2823
2824 int caps = m->get_caps();
2825 if (caps & ~cap->issued()) {
2826 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2827 caps &= cap->issued();
2828 }
2829
2830 cap->confirm_receipt(m->get_seq(), caps);
2831 dout(10) << " follows " << follows
2832 << " retains " << ccap_string(m->get_caps())
2833 << " dirty " << ccap_string(m->get_dirty())
2834 << " on " << *in << dendl;
2835
2836
2837 // missing/skipped snapflush?
2838 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2839 // presently only does so when it has actual dirty metadata. But, we
2840 // set up the need_snapflush stuff based on the issued caps.
2841 // We can infer that the client WONT send a FLUSHSNAP once they have
2842 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2843 // update/release).
2844 if (!head_in->client_need_snapflush.empty()) {
2845 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2846 _do_null_snapflush(head_in, client);
2847 } else {
2848 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2849 }
2850 }
2851
2852 if (m->get_dirty() && in->is_auth()) {
2853 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2854 << " seq " << m->get_seq() << " on " << *in << dendl;
2855 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2856 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2857 ack->set_client_tid(m->get_client_tid());
2858 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2859 }
2860
2861 // filter wanted based on what we could ever give out (given auth/replica status)
2862 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2863 int new_wanted = m->get_wanted() & head_in->get_caps_allowed_ever();
2864 if (new_wanted != cap->wanted()) {
2865 if (!need_flush && (new_wanted & ~cap->pending())) {
2866 // exapnding caps. make sure we aren't waiting for a log flush
2867 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2868 }
2869
2870 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2871 }
2872
2873 if (in->is_auth() &&
2874 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2875 // updated
2876 eval(in, CEPH_CAP_LOCKS);
2877
2878 if (!need_flush && (cap->wanted() & ~cap->pending()))
2879 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2880 } else {
2881 // no update, ack now.
2882 if (ack)
2883 mds->send_message_client_counted(ack, m->get_connection());
2884
2885 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2886 if (!did_issue && (cap->wanted() & ~cap->pending()))
2887 issue_caps(in, cap);
2888
2889 if (cap->get_last_seq() == 0 &&
2890 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2891 cap->issue_norevoke(cap->issued());
2892 share_inode_max_size(in, cap);
2893 }
2894 }
2895
2896 if (need_flush)
2897 mds->mdlog->flush();
2898 }
2899
2900 out:
2901 m->put();
2902 }
2903
2904
2905 class C_Locker_RetryRequestCapRelease : public LockerContext {
2906 client_t client;
2907 ceph_mds_request_release item;
2908 public:
2909 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2910 LockerContext(l), client(c), item(it) { }
2911 void finish(int r) override {
2912 string dname;
2913 MDRequestRef null_ref;
2914 locker->process_request_cap_release(null_ref, client, item, dname);
2915 }
2916 };
2917
2918 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2919 boost::string_view dname)
2920 {
2921 inodeno_t ino = (uint64_t)item.ino;
2922 uint64_t cap_id = item.cap_id;
2923 int caps = item.caps;
2924 int wanted = item.wanted;
2925 int seq = item.seq;
2926 int issue_seq = item.issue_seq;
2927 int mseq = item.mseq;
2928
2929 CInode *in = mdcache->get_inode(ino);
2930 if (!in)
2931 return;
2932
2933 if (dname.length()) {
2934 frag_t fg = in->pick_dirfrag(dname);
2935 CDir *dir = in->get_dirfrag(fg);
2936 if (dir) {
2937 CDentry *dn = dir->lookup(dname);
2938 if (dn) {
2939 ClientLease *l = dn->get_client_lease(client);
2940 if (l) {
2941 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2942 dn->remove_client_lease(l, this);
2943 } else {
2944 dout(7) << "process_cap_release client." << client
2945 << " doesn't have lease on " << *dn << dendl;
2946 }
2947 } else {
2948 dout(7) << "process_cap_release client." << client << " released lease on dn "
2949 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2950 }
2951 }
2952 }
2953
2954 Capability *cap = in->get_client_cap(client);
2955 if (!cap)
2956 return;
2957
2958 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2959 << (mdr ? "" : " (DEFERRED, no mdr)")
2960 << dendl;
2961
2962 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2963 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2964 return;
2965 }
2966
2967 if (cap->get_cap_id() != cap_id) {
2968 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2969 return;
2970 }
2971
2972 if (should_defer_client_cap_frozen(in)) {
2973 dout(7) << " frozen, deferring" << dendl;
2974 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2975 return;
2976 }
2977
2978 if (caps & ~cap->issued()) {
2979 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2980 caps &= cap->issued();
2981 }
2982 cap->confirm_receipt(seq, caps);
2983
2984 if (!in->client_need_snapflush.empty() &&
2985 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2986 _do_null_snapflush(in, client);
2987 }
2988
2989 adjust_cap_wanted(cap, wanted, issue_seq);
2990
2991 if (mdr)
2992 cap->inc_suppress();
2993 eval(in, CEPH_CAP_LOCKS);
2994 if (mdr)
2995 cap->dec_suppress();
2996
2997 // take note; we may need to reissue on this cap later
2998 if (mdr)
2999 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3000 }
3001
3002 class C_Locker_RetryKickIssueCaps : public LockerContext {
3003 CInode *in;
3004 client_t client;
3005 ceph_seq_t seq;
3006 public:
3007 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3008 LockerContext(l), in(i), client(c), seq(s) {
3009 in->get(CInode::PIN_PTRWAITER);
3010 }
3011 void finish(int r) override {
3012 locker->kick_issue_caps(in, client, seq);
3013 in->put(CInode::PIN_PTRWAITER);
3014 }
3015 };
3016
3017 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3018 {
3019 Capability *cap = in->get_client_cap(client);
3020 if (!cap || cap->get_last_sent() != seq)
3021 return;
3022 if (in->is_frozen()) {
3023 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3024 in->add_waiter(CInode::WAIT_UNFREEZE,
3025 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3026 return;
3027 }
3028 dout(10) << "kick_issue_caps released at current seq " << seq
3029 << ", reissuing" << dendl;
3030 issue_caps(in, cap);
3031 }
3032
3033 void Locker::kick_cap_releases(MDRequestRef& mdr)
3034 {
3035 client_t client = mdr->get_client();
3036 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3037 p != mdr->cap_releases.end();
3038 ++p) {
3039 CInode *in = mdcache->get_inode(p->first);
3040 if (!in)
3041 continue;
3042 kick_issue_caps(in, client, p->second);
3043 }
3044 }
3045
3046 /**
3047 * m and ack might be NULL, so don't dereference them unless dirty != 0
3048 */
3049 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3050 {
3051 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3052 << " follows " << follows << " snap " << snap
3053 << " on " << *in << dendl;
3054
3055 if (snap == CEPH_NOSNAP) {
3056 // hmm, i guess snap was already deleted? just ack!
3057 dout(10) << " wow, the snap following " << follows
3058 << " was already deleted. nothing to record, just ack." << dendl;
3059 if (ack)
3060 mds->send_message_client_counted(ack, m->get_connection());
3061 return;
3062 }
3063
3064 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3065 mds->mdlog->start_entry(le);
3066 MutationRef mut = new MutationImpl();
3067 mut->ls = mds->mdlog->get_current_segment();
3068
3069 // normal metadata updates that we can apply to the head as well.
3070
3071 // update xattrs?
3072 CInode::mempool_xattr_map *px = nullptr;
3073 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3074 m->xattrbl.length() &&
3075 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3076
3077 CInode::mempool_old_inode *oi = 0;
3078 if (in->is_multiversion()) {
3079 oi = in->pick_old_inode(snap);
3080 }
3081
3082 CInode::mempool_inode *i;
3083 if (oi) {
3084 dout(10) << " writing into old inode" << dendl;
3085 auto &pi = in->project_inode();
3086 pi.inode.version = in->pre_dirty();
3087 if (snap > oi->first)
3088 in->split_old_inode(snap);
3089 i = &oi->inode;
3090 if (xattrs)
3091 px = &oi->xattrs;
3092 } else {
3093 auto &pi = in->project_inode(xattrs);
3094 pi.inode.version = in->pre_dirty();
3095 i = &pi.inode;
3096 if (xattrs)
3097 px = pi.xattrs.get();
3098 }
3099
3100 _update_cap_fields(in, dirty, m, i);
3101
3102 // xattr
3103 if (xattrs) {
3104 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3105 << " len " << m->xattrbl.length() << dendl;
3106 i->xattr_version = m->head.xattr_version;
3107 bufferlist::iterator p = m->xattrbl.begin();
3108 ::decode(*px, p);
3109 }
3110
3111 {
3112 auto it = i->client_ranges.find(client);
3113 if (it != i->client_ranges.end()) {
3114 if (in->last == snap) {
3115 dout(10) << " removing client_range entirely" << dendl;
3116 i->client_ranges.erase(it);
3117 } else {
3118 dout(10) << " client_range now follows " << snap << dendl;
3119 it->second.follows = snap;
3120 }
3121 }
3122 }
3123
3124 mut->auth_pin(in);
3125 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3126 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3127
3128 // "oldest flush tid" > 0 means client uses unique TID for each flush
3129 if (ack && ack->get_oldest_flush_tid() > 0)
3130 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3131 ack->get_oldest_flush_tid());
3132
3133 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3134 client, ack));
3135 }
3136
3137 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
3138 {
3139 if (dirty == 0)
3140 return;
3141
3142 /* m must be valid if there are dirty caps */
3143 assert(m);
3144 uint64_t features = m->get_connection()->get_features();
3145
3146 if (m->get_ctime() > pi->ctime) {
3147 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3148 << " for " << *in << dendl;
3149 pi->ctime = pi->rstat.rctime = m->get_ctime();
3150 }
3151
3152 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3153 m->get_change_attr() > pi->change_attr) {
3154 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3155 << " for " << *in << dendl;
3156 pi->change_attr = m->get_change_attr();
3157 }
3158
3159 // file
3160 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3161 utime_t atime = m->get_atime();
3162 utime_t mtime = m->get_mtime();
3163 uint64_t size = m->get_size();
3164 version_t inline_version = m->inline_version;
3165
3166 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3167 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3168 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3169 << " for " << *in << dendl;
3170 pi->mtime = mtime;
3171 }
3172 if (in->inode.is_file() && // ONLY if regular file
3173 size > pi->size) {
3174 dout(7) << " size " << pi->size << " -> " << size
3175 << " for " << *in << dendl;
3176 pi->size = size;
3177 pi->rstat.rbytes = size;
3178 }
3179 if (in->inode.is_file() &&
3180 (dirty & CEPH_CAP_FILE_WR) &&
3181 inline_version > pi->inline_data.version) {
3182 pi->inline_data.version = inline_version;
3183 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3184 pi->inline_data.get_data() = m->inline_data;
3185 else
3186 pi->inline_data.free_data();
3187 }
3188 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3189 dout(7) << " atime " << pi->atime << " -> " << atime
3190 << " for " << *in << dendl;
3191 pi->atime = atime;
3192 }
3193 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3194 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3195 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3196 << " for " << *in << dendl;
3197 pi->time_warp_seq = m->get_time_warp_seq();
3198 }
3199 }
3200 // auth
3201 if (dirty & CEPH_CAP_AUTH_EXCL) {
3202 if (m->head.uid != pi->uid) {
3203 dout(7) << " uid " << pi->uid
3204 << " -> " << m->head.uid
3205 << " for " << *in << dendl;
3206 pi->uid = m->head.uid;
3207 }
3208 if (m->head.gid != pi->gid) {
3209 dout(7) << " gid " << pi->gid
3210 << " -> " << m->head.gid
3211 << " for " << *in << dendl;
3212 pi->gid = m->head.gid;
3213 }
3214 if (m->head.mode != pi->mode) {
3215 dout(7) << " mode " << oct << pi->mode
3216 << " -> " << m->head.mode << dec
3217 << " for " << *in << dendl;
3218 pi->mode = m->head.mode;
3219 }
3220 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3221 dout(7) << " btime " << oct << pi->btime
3222 << " -> " << m->get_btime() << dec
3223 << " for " << *in << dendl;
3224 pi->btime = m->get_btime();
3225 }
3226 }
3227 }
3228
3229 /*
3230 * update inode based on cap flush|flushsnap|wanted.
3231 * adjust max_size, if needed.
3232 * if we update, return true; otherwise, false (no updated needed).
3233 */
3234 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3235 int dirty, snapid_t follows,
3236 MClientCaps *m, MClientCaps *ack,
3237 bool *need_flush)
3238 {
3239 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3240 << " issued " << ccap_string(cap ? cap->issued() : 0)
3241 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3242 << " on " << *in << dendl;
3243 assert(in->is_auth());
3244 client_t client = m->get_source().num();
3245 CInode::mempool_inode *latest = in->get_projected_inode();
3246
3247 // increase or zero max_size?
3248 uint64_t size = m->get_size();
3249 bool change_max = false;
3250 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3251 uint64_t new_max = old_max;
3252
3253 if (in->is_file()) {
3254 bool forced_change_max = false;
3255 dout(20) << "inode is file" << dendl;
3256 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3257 dout(20) << "client has write caps; m->get_max_size="
3258 << m->get_max_size() << "; old_max=" << old_max << dendl;
3259 if (m->get_max_size() > new_max) {
3260 dout(10) << "client requests file_max " << m->get_max_size()
3261 << " > max " << old_max << dendl;
3262 change_max = true;
3263 forced_change_max = true;
3264 new_max = calc_new_max_size(latest, m->get_max_size());
3265 } else {
3266 new_max = calc_new_max_size(latest, size);
3267
3268 if (new_max > old_max)
3269 change_max = true;
3270 else
3271 new_max = old_max;
3272 }
3273 } else {
3274 if (old_max) {
3275 change_max = true;
3276 new_max = 0;
3277 }
3278 }
3279
3280 if (in->last == CEPH_NOSNAP &&
3281 change_max &&
3282 !in->filelock.can_wrlock(client) &&
3283 !in->filelock.can_force_wrlock(client)) {
3284 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3285 if (in->filelock.is_stable()) {
3286 bool need_issue = false;
3287 if (cap)
3288 cap->inc_suppress();
3289 if (in->mds_caps_wanted.empty() &&
3290 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3291 if (in->filelock.get_state() != LOCK_EXCL)
3292 file_excl(&in->filelock, &need_issue);
3293 } else
3294 simple_lock(&in->filelock, &need_issue);
3295 if (need_issue)
3296 issue_caps(in);
3297 if (cap)
3298 cap->dec_suppress();
3299 }
3300 if (!in->filelock.can_wrlock(client) &&
3301 !in->filelock.can_force_wrlock(client)) {
3302 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3303 forced_change_max ? new_max : 0,
3304 0, utime_t());
3305
3306 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3307 change_max = false;
3308 }
3309 }
3310 }
3311
3312 if (m->flockbl.length()) {
3313 int32_t num_locks;
3314 bufferlist::iterator bli = m->flockbl.begin();
3315 ::decode(num_locks, bli);
3316 for ( int i=0; i < num_locks; ++i) {
3317 ceph_filelock decoded_lock;
3318 ::decode(decoded_lock, bli);
3319 in->get_fcntl_lock_state()->held_locks.
3320 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3321 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3322 }
3323 ::decode(num_locks, bli);
3324 for ( int i=0; i < num_locks; ++i) {
3325 ceph_filelock decoded_lock;
3326 ::decode(decoded_lock, bli);
3327 in->get_flock_lock_state()->held_locks.
3328 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3329 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3330 }
3331 }
3332
3333 if (!dirty && !change_max)
3334 return false;
3335
3336 Session *session = mds->get_session(m);
3337 if (session->check_access(in, MAY_WRITE,
3338 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3339 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3340 return false;
3341 }
3342
3343 // do the update.
3344 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3345 mds->mdlog->start_entry(le);
3346
3347 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3348 m->xattrbl.length() &&
3349 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3350
3351 auto &pi = in->project_inode(xattr);
3352 pi.inode.version = in->pre_dirty();
3353
3354 MutationRef mut(new MutationImpl());
3355 mut->ls = mds->mdlog->get_current_segment();
3356
3357 _update_cap_fields(in, dirty, m, &pi.inode);
3358
3359 if (change_max) {
3360 dout(7) << " max_size " << old_max << " -> " << new_max
3361 << " for " << *in << dendl;
3362 if (new_max) {
3363 auto &cr = pi.inode.client_ranges[client];
3364 cr.range.first = 0;
3365 cr.range.last = new_max;
3366 cr.follows = in->first - 1;
3367 } else
3368 pi.inode.client_ranges.erase(client);
3369 }
3370
3371 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3372 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3373
3374 // auth
3375 if (dirty & CEPH_CAP_AUTH_EXCL)
3376 wrlock_force(&in->authlock, mut);
3377
3378 // xattrs update?
3379 if (xattr) {
3380 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3381 pi.inode.xattr_version = m->head.xattr_version;
3382 bufferlist::iterator p = m->xattrbl.begin();
3383 ::decode(*pi.xattrs, p);
3384 wrlock_force(&in->xattrlock, mut);
3385 }
3386
3387 mut->auth_pin(in);
3388 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3389 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3390
3391 // "oldest flush tid" > 0 means client uses unique TID for each flush
3392 if (ack && ack->get_oldest_flush_tid() > 0)
3393 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3394 ack->get_oldest_flush_tid());
3395
3396 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3397 change_max, !!cap,
3398 client, ack));
3399 if (need_flush && !*need_flush &&
3400 ((change_max && new_max) || // max INCREASE
3401 _need_flush_mdlog(in, dirty)))
3402 *need_flush = true;
3403
3404 return true;
3405 }
3406
3407 /* This function DOES put the passed message before returning */
3408 void Locker::handle_client_cap_release(MClientCapRelease *m)
3409 {
3410 client_t client = m->get_source().num();
3411 dout(10) << "handle_client_cap_release " << *m << dendl;
3412
3413 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3414 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3415 return;
3416 }
3417
3418 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3419 // Pause RADOS operations until we see the required epoch
3420 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3421 }
3422
3423 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3424 // Record the barrier so that we will retransmit it to clients
3425 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3426 }
3427
3428 Session *session = mds->get_session(m);
3429
3430 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3431 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3432 }
3433
3434 if (session) {
3435 session->notify_cap_release(m->caps.size());
3436 }
3437
3438 m->put();
3439 }
3440
3441 class C_Locker_RetryCapRelease : public LockerContext {
3442 client_t client;
3443 inodeno_t ino;
3444 uint64_t cap_id;
3445 ceph_seq_t migrate_seq;
3446 ceph_seq_t issue_seq;
3447 public:
3448 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3449 ceph_seq_t mseq, ceph_seq_t seq) :
3450 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3451 void finish(int r) override {
3452 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3453 }
3454 };
3455
3456 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3457 ceph_seq_t mseq, ceph_seq_t seq)
3458 {
3459 CInode *in = mdcache->get_inode(ino);
3460 if (!in) {
3461 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3462 return;
3463 }
3464 Capability *cap = in->get_client_cap(client);
3465 if (!cap) {
3466 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3467 return;
3468 }
3469
3470 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3471 if (cap->get_cap_id() != cap_id) {
3472 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3473 return;
3474 }
3475 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3476 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3477 return;
3478 }
3479 if (should_defer_client_cap_frozen(in)) {
3480 dout(7) << " freezing|frozen, deferring" << dendl;
3481 in->add_waiter(CInode::WAIT_UNFREEZE,
3482 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3483 return;
3484 }
3485 if (seq != cap->get_last_issue()) {
3486 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3487 // clean out any old revoke history
3488 cap->clean_revoke_from(seq);
3489 eval_cap_gather(in);
3490 return;
3491 }
3492 remove_client_cap(in, client);
3493 }
3494
3495 /* This function DOES put the passed message before returning */
3496
3497 void Locker::remove_client_cap(CInode *in, client_t client)
3498 {
3499 // clean out any pending snapflush state
3500 if (!in->client_need_snapflush.empty())
3501 _do_null_snapflush(in, client);
3502
3503 in->remove_client_cap(client);
3504
3505 if (in->is_auth()) {
3506 // make sure we clear out the client byte range
3507 if (in->get_projected_inode()->client_ranges.count(client) &&
3508 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3509 check_inode_max_size(in);
3510 } else {
3511 request_inode_file_caps(in);
3512 }
3513
3514 try_eval(in, CEPH_CAP_LOCKS);
3515 }
3516
3517
3518 /**
3519 * Return true if any currently revoking caps exceed the
3520 * mds_session_timeout threshold.
3521 */
3522 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking) const
3523 {
3524 xlist<Capability*>::const_iterator p = revoking.begin();
3525 if (p.end()) {
3526 // No revoking caps at the moment
3527 return false;
3528 } else {
3529 utime_t now = ceph_clock_now();
3530 utime_t age = now - (*p)->get_last_revoke_stamp();
3531 if (age <= g_conf->mds_session_timeout) {
3532 return false;
3533 } else {
3534 return true;
3535 }
3536 }
3537 }
3538
3539
3540 void Locker::get_late_revoking_clients(std::list<client_t> *result) const
3541 {
3542 if (!any_late_revoking_caps(revoking_caps)) {
3543 // Fast path: no misbehaving clients, execute in O(1)
3544 return;
3545 }
3546
3547 // Slow path: execute in O(N_clients)
3548 std::map<client_t, xlist<Capability*> >::const_iterator client_rc_iter;
3549 for (client_rc_iter = revoking_caps_by_client.begin();
3550 client_rc_iter != revoking_caps_by_client.end(); ++client_rc_iter) {
3551 xlist<Capability*> const &client_rc = client_rc_iter->second;
3552 bool any_late = any_late_revoking_caps(client_rc);
3553 if (any_late) {
3554 result->push_back(client_rc_iter->first);
3555 }
3556 }
3557 }
3558
3559 // Hard-code instead of surfacing a config settings because this is
3560 // really a hack that should go away at some point when we have better
3561 // inspection tools for getting at detailed cap state (#7316)
3562 #define MAX_WARN_CAPS 100
3563
3564 void Locker::caps_tick()
3565 {
3566 utime_t now = ceph_clock_now();
3567
3568 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3569
3570 int i = 0;
3571 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3572 Capability *cap = *p;
3573
3574 utime_t age = now - cap->get_last_revoke_stamp();
3575 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3576 if (age <= g_conf->mds_session_timeout) {
3577 dout(20) << __func__ << " age below timeout " << g_conf->mds_session_timeout << dendl;
3578 break;
3579 } else {
3580 ++i;
3581 if (i > MAX_WARN_CAPS) {
3582 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3583 << "revoking, ignoring subsequent caps" << dendl;
3584 break;
3585 }
3586 }
3587 // exponential backoff of warning intervals
3588 if (age > g_conf->mds_session_timeout * (1 << cap->get_num_revoke_warnings())) {
3589 cap->inc_num_revoke_warnings();
3590 stringstream ss;
3591 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3592 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3593 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3594 mds->clog->warn() << ss.str();
3595 dout(20) << __func__ << " " << ss.str() << dendl;
3596 } else {
3597 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3598 }
3599 }
3600 }
3601
3602
3603 void Locker::handle_client_lease(MClientLease *m)
3604 {
3605 dout(10) << "handle_client_lease " << *m << dendl;
3606
3607 assert(m->get_source().is_client());
3608 client_t client = m->get_source().num();
3609
3610 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3611 if (!in) {
3612 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3613 m->put();
3614 return;
3615 }
3616 CDentry *dn = 0;
3617
3618 frag_t fg = in->pick_dirfrag(m->dname);
3619 CDir *dir = in->get_dirfrag(fg);
3620 if (dir)
3621 dn = dir->lookup(m->dname);
3622 if (!dn) {
3623 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3624 m->put();
3625 return;
3626 }
3627 dout(10) << " on " << *dn << dendl;
3628
3629 // replica and lock
3630 ClientLease *l = dn->get_client_lease(client);
3631 if (!l) {
3632 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3633 m->put();
3634 return;
3635 }
3636
3637 switch (m->get_action()) {
3638 case CEPH_MDS_LEASE_REVOKE_ACK:
3639 case CEPH_MDS_LEASE_RELEASE:
3640 if (l->seq != m->get_seq()) {
3641 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3642 } else {
3643 dout(7) << "handle_client_lease client." << client
3644 << " on " << *dn << dendl;
3645 dn->remove_client_lease(l, this);
3646 }
3647 m->put();
3648 break;
3649
3650 case CEPH_MDS_LEASE_RENEW:
3651 {
3652 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3653 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3654 if (dn->lock.can_lease(client)) {
3655 int pool = 1; // fixme.. do something smart!
3656 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3657 m->h.seq = ++l->seq;
3658 m->clear_payload();
3659
3660 utime_t now = ceph_clock_now();
3661 now += mdcache->client_lease_durations[pool];
3662 mdcache->touch_client_lease(l, pool, now);
3663
3664 mds->send_message_client_counted(m, m->get_connection());
3665 }
3666 }
3667 break;
3668
3669 default:
3670 ceph_abort(); // implement me
3671 break;
3672 }
3673 }
3674
3675
3676 void Locker::issue_client_lease(CDentry *dn, client_t client,
3677 bufferlist &bl, utime_t now, Session *session)
3678 {
3679 CInode *diri = dn->get_dir()->get_inode();
3680 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3681 ((!diri->filelock.can_lease(client) &&
3682 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3683 dn->lock.can_lease(client)) {
3684 int pool = 1; // fixme.. do something smart!
3685 // issue a dentry lease
3686 ClientLease *l = dn->add_client_lease(client, session);
3687 session->touch_lease(l);
3688
3689 now += mdcache->client_lease_durations[pool];
3690 mdcache->touch_client_lease(l, pool, now);
3691
3692 LeaseStat e;
3693 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3694 e.seq = ++l->seq;
3695 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3696 ::encode(e, bl);
3697 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3698 << " on " << *dn << dendl;
3699 } else {
3700 // null lease
3701 LeaseStat e;
3702 e.mask = 0;
3703 e.seq = 0;
3704 e.duration_ms = 0;
3705 ::encode(e, bl);
3706 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3707 }
3708 }
3709
3710
3711 void Locker::revoke_client_leases(SimpleLock *lock)
3712 {
3713 int n = 0;
3714 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3715 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3716 p != dn->client_lease_map.end();
3717 ++p) {
3718 ClientLease *l = p->second;
3719
3720 n++;
3721 assert(lock->get_type() == CEPH_LOCK_DN);
3722
3723 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3724 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3725
3726 // i should also revoke the dir ICONTENT lease, if they have it!
3727 CInode *diri = dn->get_dir()->get_inode();
3728 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3729 mask,
3730 diri->ino(),
3731 diri->first, CEPH_NOSNAP,
3732 dn->get_name()),
3733 l->client);
3734 }
3735 }
3736
3737
3738
3739 // locks ----------------------------------------------------------------
3740
3741 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3742 {
3743 switch (lock_type) {
3744 case CEPH_LOCK_DN:
3745 {
3746 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3747 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3748 frag_t fg;
3749 CDir *dir = 0;
3750 CDentry *dn = 0;
3751 if (diri) {
3752 fg = diri->pick_dirfrag(info.dname);
3753 dir = diri->get_dirfrag(fg);
3754 if (dir)
3755 dn = dir->lookup(info.dname, info.snapid);
3756 }
3757 if (!dn) {
3758 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3759 return 0;
3760 }
3761 return &dn->lock;
3762 }
3763
3764 case CEPH_LOCK_IAUTH:
3765 case CEPH_LOCK_ILINK:
3766 case CEPH_LOCK_IDFT:
3767 case CEPH_LOCK_IFILE:
3768 case CEPH_LOCK_INEST:
3769 case CEPH_LOCK_IXATTR:
3770 case CEPH_LOCK_ISNAP:
3771 case CEPH_LOCK_IFLOCK:
3772 case CEPH_LOCK_IPOLICY:
3773 {
3774 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3775 if (!in) {
3776 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3777 return 0;
3778 }
3779 switch (lock_type) {
3780 case CEPH_LOCK_IAUTH: return &in->authlock;
3781 case CEPH_LOCK_ILINK: return &in->linklock;
3782 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3783 case CEPH_LOCK_IFILE: return &in->filelock;
3784 case CEPH_LOCK_INEST: return &in->nestlock;
3785 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3786 case CEPH_LOCK_ISNAP: return &in->snaplock;
3787 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3788 case CEPH_LOCK_IPOLICY: return &in->policylock;
3789 }
3790 }
3791
3792 default:
3793 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3794 ceph_abort();
3795 break;
3796 }
3797
3798 return 0;
3799 }
3800
3801 /* This function DOES put the passed message before returning */
3802 void Locker::handle_lock(MLock *m)
3803 {
3804 // nobody should be talking to us during recovery.
3805 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3806
3807 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3808 if (!lock) {
3809 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3810 m->put();
3811 return;
3812 }
3813
3814 switch (lock->get_type()) {
3815 case CEPH_LOCK_DN:
3816 case CEPH_LOCK_IAUTH:
3817 case CEPH_LOCK_ILINK:
3818 case CEPH_LOCK_ISNAP:
3819 case CEPH_LOCK_IXATTR:
3820 case CEPH_LOCK_IFLOCK:
3821 case CEPH_LOCK_IPOLICY:
3822 handle_simple_lock(lock, m);
3823 break;
3824
3825 case CEPH_LOCK_IDFT:
3826 case CEPH_LOCK_INEST:
3827 //handle_scatter_lock((ScatterLock*)lock, m);
3828 //break;
3829
3830 case CEPH_LOCK_IFILE:
3831 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3832 break;
3833
3834 default:
3835 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3836 ceph_abort();
3837 break;
3838 }
3839 }
3840
3841
3842
3843
3844
3845 // ==========================================================================
3846 // simple lock
3847
3848 /** This function may take a reference to m if it needs one, but does
3849 * not put references. */
3850 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3851 {
3852 MDSCacheObject *parent = lock->get_parent();
3853 if (parent->is_auth() &&
3854 lock->get_state() != LOCK_SYNC &&
3855 !parent->is_frozen()) {
3856 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3857 << " on " << *parent << dendl;
3858 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3859 if (lock->is_stable()) {
3860 simple_sync(lock);
3861 } else {
3862 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3863 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3864 new C_MDS_RetryMessage(mds, m->get()));
3865 }
3866 } else {
3867 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3868 << " on " << *parent << dendl;
3869 // replica should retry
3870 }
3871 }
3872
3873 /* This function DOES put the passed message before returning */
3874 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3875 {
3876 int from = m->get_asker();
3877
3878 dout(10) << "handle_simple_lock " << *m
3879 << " on " << *lock << " " << *lock->get_parent() << dendl;
3880
3881 if (mds->is_rejoin()) {
3882 if (lock->get_parent()->is_rejoining()) {
3883 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3884 << ", dropping " << *m << dendl;
3885 m->put();
3886 return;
3887 }
3888 }
3889
3890 switch (m->get_action()) {
3891 // -- replica --
3892 case LOCK_AC_SYNC:
3893 assert(lock->get_state() == LOCK_LOCK);
3894 lock->decode_locked_state(m->get_data());
3895 lock->set_state(LOCK_SYNC);
3896 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3897 break;
3898
3899 case LOCK_AC_LOCK:
3900 assert(lock->get_state() == LOCK_SYNC);
3901 lock->set_state(LOCK_SYNC_LOCK);
3902 if (lock->is_leased())
3903 revoke_client_leases(lock);
3904 eval_gather(lock, true);
3905 if (lock->is_unstable_and_locked())
3906 mds->mdlog->flush();
3907 break;
3908
3909
3910 // -- auth --
3911 case LOCK_AC_LOCKACK:
3912 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3913 lock->get_state() == LOCK_SYNC_EXCL);
3914 assert(lock->is_gathering(from));
3915 lock->remove_gather(from);
3916
3917 if (lock->is_gathering()) {
3918 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3919 << ", still gathering " << lock->get_gather_set() << dendl;
3920 } else {
3921 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3922 << ", last one" << dendl;
3923 eval_gather(lock);
3924 }
3925 break;
3926
3927 case LOCK_AC_REQRDLOCK:
3928 handle_reqrdlock(lock, m);
3929 break;
3930
3931 }
3932
3933 m->put();
3934 }
3935
3936 /* unused, currently.
3937
3938 class C_Locker_SimpleEval : public Context {
3939 Locker *locker;
3940 SimpleLock *lock;
3941 public:
3942 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3943 void finish(int r) {
3944 locker->try_simple_eval(lock);
3945 }
3946 };
3947
3948 void Locker::try_simple_eval(SimpleLock *lock)
3949 {
3950 // unstable and ambiguous auth?
3951 if (!lock->is_stable() &&
3952 lock->get_parent()->is_ambiguous_auth()) {
3953 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3954 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3955 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3956 return;
3957 }
3958
3959 if (!lock->get_parent()->is_auth()) {
3960 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3961 return;
3962 }
3963
3964 if (!lock->get_parent()->can_auth_pin()) {
3965 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3966 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3967 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3968 return;
3969 }
3970
3971 if (lock->is_stable())
3972 simple_eval(lock);
3973 }
3974 */
3975
3976
3977 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3978 {
3979 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3980
3981 assert(lock->get_parent()->is_auth());
3982 assert(lock->is_stable());
3983
3984 if (lock->get_parent()->is_freezing_or_frozen()) {
3985 // dentry lock in unreadable state can block path traverse
3986 if ((lock->get_type() != CEPH_LOCK_DN ||
3987 lock->get_state() == LOCK_SYNC ||
3988 lock->get_parent()->is_frozen()))
3989 return;
3990 }
3991
3992 if (mdcache->is_readonly()) {
3993 if (lock->get_state() != LOCK_SYNC) {
3994 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
3995 simple_sync(lock, need_issue);
3996 }
3997 return;
3998 }
3999
4000 CInode *in = 0;
4001 int wanted = 0;
4002 if (lock->get_type() != CEPH_LOCK_DN) {
4003 in = static_cast<CInode*>(lock->get_parent());
4004 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4005 }
4006
4007 // -> excl?
4008 if (lock->get_state() != LOCK_EXCL &&
4009 in && in->get_target_loner() >= 0 &&
4010 (wanted & CEPH_CAP_GEXCL)) {
4011 dout(7) << "simple_eval stable, going to excl " << *lock
4012 << " on " << *lock->get_parent() << dendl;
4013 simple_excl(lock, need_issue);
4014 }
4015
4016 // stable -> sync?
4017 else if (lock->get_state() != LOCK_SYNC &&
4018 !lock->is_wrlocked() &&
4019 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4020 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4021 dout(7) << "simple_eval stable, syncing " << *lock
4022 << " on " << *lock->get_parent() << dendl;
4023 simple_sync(lock, need_issue);
4024 }
4025 }
4026
4027
4028 // mid
4029
4030 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4031 {
4032 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4033 assert(lock->get_parent()->is_auth());
4034 assert(lock->is_stable());
4035
4036 CInode *in = 0;
4037 if (lock->get_cap_shift())
4038 in = static_cast<CInode *>(lock->get_parent());
4039
4040 int old_state = lock->get_state();
4041
4042 if (old_state != LOCK_TSYN) {
4043
4044 switch (lock->get_state()) {
4045 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4046 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4047 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4048 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4049 default: ceph_abort();
4050 }
4051
4052 int gather = 0;
4053 if (lock->is_wrlocked())
4054 gather++;
4055
4056 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4057 send_lock_message(lock, LOCK_AC_SYNC);
4058 lock->init_gather();
4059 gather++;
4060 }
4061
4062 if (in && in->is_head()) {
4063 if (in->issued_caps_need_gather(lock)) {
4064 if (need_issue)
4065 *need_issue = true;
4066 else
4067 issue_caps(in);
4068 gather++;
4069 }
4070 }
4071
4072 bool need_recover = false;
4073 if (lock->get_type() == CEPH_LOCK_IFILE) {
4074 assert(in);
4075 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4076 mds->mdcache->queue_file_recover(in);
4077 need_recover = true;
4078 gather++;
4079 }
4080 }
4081
4082 if (!gather && lock->is_dirty()) {
4083 lock->get_parent()->auth_pin(lock);
4084 scatter_writebehind(static_cast<ScatterLock*>(lock));
4085 mds->mdlog->flush();
4086 return false;
4087 }
4088
4089 if (gather) {
4090 lock->get_parent()->auth_pin(lock);
4091 if (need_recover)
4092 mds->mdcache->do_file_recover();
4093 return false;
4094 }
4095 }
4096
4097 if (lock->get_parent()->is_replicated()) { // FIXME
4098 bufferlist data;
4099 lock->encode_locked_state(data);
4100 send_lock_message(lock, LOCK_AC_SYNC, data);
4101 }
4102 lock->set_state(LOCK_SYNC);
4103 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4104 if (in && in->is_head()) {
4105 if (need_issue)
4106 *need_issue = true;
4107 else
4108 issue_caps(in);
4109 }
4110 return true;
4111 }
4112
4113 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4114 {
4115 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4116 assert(lock->get_parent()->is_auth());
4117 assert(lock->is_stable());
4118
4119 CInode *in = 0;
4120 if (lock->get_cap_shift())
4121 in = static_cast<CInode *>(lock->get_parent());
4122
4123 switch (lock->get_state()) {
4124 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4125 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4126 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4127 default: ceph_abort();
4128 }
4129
4130 int gather = 0;
4131 if (lock->is_rdlocked())
4132 gather++;
4133 if (lock->is_wrlocked())
4134 gather++;
4135
4136 if (lock->get_parent()->is_replicated() &&
4137 lock->get_state() != LOCK_LOCK_EXCL &&
4138 lock->get_state() != LOCK_XSYN_EXCL) {
4139 send_lock_message(lock, LOCK_AC_LOCK);
4140 lock->init_gather();
4141 gather++;
4142 }
4143
4144 if (in && in->is_head()) {
4145 if (in->issued_caps_need_gather(lock)) {
4146 if (need_issue)
4147 *need_issue = true;
4148 else
4149 issue_caps(in);
4150 gather++;
4151 }
4152 }
4153
4154 if (gather) {
4155 lock->get_parent()->auth_pin(lock);
4156 } else {
4157 lock->set_state(LOCK_EXCL);
4158 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4159 if (in) {
4160 if (need_issue)
4161 *need_issue = true;
4162 else
4163 issue_caps(in);
4164 }
4165 }
4166 }
4167
4168 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4169 {
4170 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4171 assert(lock->get_parent()->is_auth());
4172 assert(lock->is_stable());
4173 assert(lock->get_state() != LOCK_LOCK);
4174
4175 CInode *in = 0;
4176 if (lock->get_cap_shift())
4177 in = static_cast<CInode *>(lock->get_parent());
4178
4179 int old_state = lock->get_state();
4180
4181 switch (lock->get_state()) {
4182 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4183 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4184 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4185 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4186 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4187 break;
4188 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4189 default: ceph_abort();
4190 }
4191
4192 int gather = 0;
4193 if (lock->is_leased()) {
4194 gather++;
4195 revoke_client_leases(lock);
4196 }
4197 if (lock->is_rdlocked())
4198 gather++;
4199 if (in && in->is_head()) {
4200 if (in->issued_caps_need_gather(lock)) {
4201 if (need_issue)
4202 *need_issue = true;
4203 else
4204 issue_caps(in);
4205 gather++;
4206 }
4207 }
4208
4209 bool need_recover = false;
4210 if (lock->get_type() == CEPH_LOCK_IFILE) {
4211 assert(in);
4212 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4213 mds->mdcache->queue_file_recover(in);
4214 need_recover = true;
4215 gather++;
4216 }
4217 }
4218
4219 if (lock->get_parent()->is_replicated() &&
4220 lock->get_state() == LOCK_MIX_LOCK &&
4221 gather) {
4222 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4223 } else {
4224 // move to second stage of gather now, so we don't send the lock action later.
4225 if (lock->get_state() == LOCK_MIX_LOCK)
4226 lock->set_state(LOCK_MIX_LOCK2);
4227
4228 if (lock->get_parent()->is_replicated() &&
4229 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4230 gather++;
4231 send_lock_message(lock, LOCK_AC_LOCK);
4232 lock->init_gather();
4233 }
4234 }
4235
4236 if (!gather && lock->is_dirty()) {
4237 lock->get_parent()->auth_pin(lock);
4238 scatter_writebehind(static_cast<ScatterLock*>(lock));
4239 mds->mdlog->flush();
4240 return;
4241 }
4242
4243 if (gather) {
4244 lock->get_parent()->auth_pin(lock);
4245 if (need_recover)
4246 mds->mdcache->do_file_recover();
4247 } else {
4248 lock->set_state(LOCK_LOCK);
4249 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4250 }
4251 }
4252
4253
4254 void Locker::simple_xlock(SimpleLock *lock)
4255 {
4256 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4257 assert(lock->get_parent()->is_auth());
4258 //assert(lock->is_stable());
4259 assert(lock->get_state() != LOCK_XLOCK);
4260
4261 CInode *in = 0;
4262 if (lock->get_cap_shift())
4263 in = static_cast<CInode *>(lock->get_parent());
4264
4265 if (lock->is_stable())
4266 lock->get_parent()->auth_pin(lock);
4267
4268 switch (lock->get_state()) {
4269 case LOCK_LOCK:
4270 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4271 default: ceph_abort();
4272 }
4273
4274 int gather = 0;
4275 if (lock->is_rdlocked())
4276 gather++;
4277 if (lock->is_wrlocked())
4278 gather++;
4279
4280 if (in && in->is_head()) {
4281 if (in->issued_caps_need_gather(lock)) {
4282 issue_caps(in);
4283 gather++;
4284 }
4285 }
4286
4287 if (!gather) {
4288 lock->set_state(LOCK_PREXLOCK);
4289 //assert("shouldn't be called if we are already xlockable" == 0);
4290 }
4291 }
4292
4293
4294
4295
4296
4297 // ==========================================================================
4298 // scatter lock
4299
4300 /*
4301
4302 Some notes on scatterlocks.
4303
4304 - The scatter/gather is driven by the inode lock. The scatter always
4305 brings in the latest metadata from the fragments.
4306
4307 - When in a scattered/MIX state, fragments are only allowed to
4308 update/be written to if the accounted stat matches the inode's
4309 current version.
4310
4311 - That means, on gather, we _only_ assimilate diffs for frag metadata
4312 that match the current version, because those are the only ones
4313 written during this scatter/gather cycle. (Others didn't permit
4314 it.) We increment the version and journal this to disk.
4315
4316 - When possible, we also simultaneously update our local frag
4317 accounted stats to match.
4318
4319 - On scatter, the new inode info is broadcast to frags, both local
4320 and remote. If possible (auth and !frozen), the dirfrag auth
4321 should update the accounted state (if it isn't already up to date).
4322 Note that this may occur on both the local inode auth node and
4323 inode replicas, so there are two potential paths. If it is NOT
4324 possible, they need to mark_stale to prevent any possible writes.
4325
4326 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4327 only). Both are opportunities to update the frag accounted stats,
4328 even though only the MIX case is affected by a stale dirfrag.
4329
4330 - Because many scatter/gather cycles can potentially go by without a
4331 frag being able to update its accounted stats (due to being frozen
4332 by exports/refragments in progress), the frag may have (even very)
4333 old stat versions. That's fine. If when we do want to update it,
4334 we can update accounted_* and the version first.
4335
4336 */
4337
4338 class C_Locker_ScatterWB : public LockerLogContext {
4339 ScatterLock *lock;
4340 MutationRef mut;
4341 public:
4342 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4343 LockerLogContext(l), lock(sl), mut(m) {}
4344 void finish(int r) override {
4345 locker->scatter_writebehind_finish(lock, mut);
4346 }
4347 };
4348
4349 void Locker::scatter_writebehind(ScatterLock *lock)
4350 {
4351 CInode *in = static_cast<CInode*>(lock->get_parent());
4352 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4353
4354 // journal
4355 MutationRef mut(new MutationImpl());
4356 mut->ls = mds->mdlog->get_current_segment();
4357
4358 // forcefully take a wrlock
4359 lock->get_wrlock(true);
4360 mut->wrlocks.insert(lock);
4361 mut->locks.insert(lock);
4362
4363 in->pre_cow_old_inode(); // avoid cow mayhem
4364
4365 auto &pi = in->project_inode();
4366 pi.inode.version = in->pre_dirty();
4367
4368 in->finish_scatter_gather_update(lock->get_type());
4369 lock->start_flush();
4370
4371 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4372 mds->mdlog->start_entry(le);
4373
4374 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4375 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4376
4377 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4378
4379 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4380 }
4381
4382 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4383 {
4384 CInode *in = static_cast<CInode*>(lock->get_parent());
4385 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4386 in->pop_and_dirty_projected_inode(mut->ls);
4387
4388 lock->finish_flush();
4389
4390 // if replicas may have flushed in a mix->lock state, send another
4391 // message so they can finish_flush().
4392 if (in->is_replicated()) {
4393 switch (lock->get_state()) {
4394 case LOCK_MIX_LOCK:
4395 case LOCK_MIX_LOCK2:
4396 case LOCK_MIX_EXCL:
4397 case LOCK_MIX_TSYN:
4398 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4399 }
4400 }
4401
4402 mut->apply();
4403 drop_locks(mut.get());
4404 mut->cleanup();
4405
4406 if (lock->is_stable())
4407 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4408
4409 //scatter_eval_gather(lock);
4410 }
4411
4412 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4413 {
4414 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4415
4416 assert(lock->get_parent()->is_auth());
4417 assert(lock->is_stable());
4418
4419 if (lock->get_parent()->is_freezing_or_frozen()) {
4420 dout(20) << " freezing|frozen" << dendl;
4421 return;
4422 }
4423
4424 if (mdcache->is_readonly()) {
4425 if (lock->get_state() != LOCK_SYNC) {
4426 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4427 simple_sync(lock, need_issue);
4428 }
4429 return;
4430 }
4431
4432 if (!lock->is_rdlocked() &&
4433 lock->get_state() != LOCK_MIX &&
4434 lock->get_scatter_wanted()) {
4435 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4436 << " on " << *lock->get_parent() << dendl;
4437 scatter_mix(lock, need_issue);
4438 return;
4439 }
4440
4441 if (lock->get_type() == CEPH_LOCK_INEST) {
4442 // in general, we want to keep INEST writable at all times.
4443 if (!lock->is_rdlocked()) {
4444 if (lock->get_parent()->is_replicated()) {
4445 if (lock->get_state() != LOCK_MIX)
4446 scatter_mix(lock, need_issue);
4447 } else {
4448 if (lock->get_state() != LOCK_LOCK)
4449 simple_lock(lock, need_issue);
4450 }
4451 }
4452 return;
4453 }
4454
4455 CInode *in = static_cast<CInode*>(lock->get_parent());
4456 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4457 // i _should_ be sync.
4458 if (!lock->is_wrlocked() &&
4459 lock->get_state() != LOCK_SYNC) {
4460 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4461 simple_sync(lock, need_issue);
4462 }
4463 }
4464 }
4465
4466
4467 /*
4468 * mark a scatterlock to indicate that the dir fnode has some dirty data
4469 */
4470 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4471 {
4472 lock->mark_dirty();
4473 if (lock->get_updated_item()->is_on_list()) {
4474 dout(10) << "mark_updated_scatterlock " << *lock
4475 << " - already on list since " << lock->get_update_stamp() << dendl;
4476 } else {
4477 updated_scatterlocks.push_back(lock->get_updated_item());
4478 utime_t now = ceph_clock_now();
4479 lock->set_update_stamp(now);
4480 dout(10) << "mark_updated_scatterlock " << *lock
4481 << " - added at " << now << dendl;
4482 }
4483 }
4484
4485 /*
4486 * this is called by scatter_tick and LogSegment::try_to_trim() when
4487 * trying to flush dirty scattered data (i.e. updated fnode) back to
4488 * the inode.
4489 *
4490 * we need to lock|scatter in order to push fnode changes into the
4491 * inode.dirstat.
4492 */
4493 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4494 {
4495 CInode *p = static_cast<CInode *>(lock->get_parent());
4496
4497 if (p->is_frozen() || p->is_freezing()) {
4498 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4499 if (c)
4500 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4501 else if (lock->is_dirty())
4502 // just requeue. not ideal.. starvation prone..
4503 updated_scatterlocks.push_back(lock->get_updated_item());
4504 return;
4505 }
4506
4507 if (p->is_ambiguous_auth()) {
4508 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4509 if (c)
4510 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4511 else if (lock->is_dirty())
4512 // just requeue. not ideal.. starvation prone..
4513 updated_scatterlocks.push_back(lock->get_updated_item());
4514 return;
4515 }
4516
4517 if (p->is_auth()) {
4518 int count = 0;
4519 while (true) {
4520 if (lock->is_stable()) {
4521 // can we do it now?
4522 // (only if we're not replicated.. if we are, we really do need
4523 // to nudge the lock state!)
4524 /*
4525 actually, even if we're not replicated, we can't stay in MIX, because another mds
4526 could discover and replicate us at any time. if that happens while we're flushing,
4527 they end up in MIX but their inode has the old scatterstat version.
4528
4529 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4530 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4531 scatter_writebehind(lock);
4532 if (c)
4533 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4534 return;
4535 }
4536 */
4537
4538 if (mdcache->is_readonly()) {
4539 if (lock->get_state() != LOCK_SYNC) {
4540 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4541 simple_sync(static_cast<ScatterLock*>(lock));
4542 }
4543 break;
4544 }
4545
4546 // adjust lock state
4547 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4548 switch (lock->get_type()) {
4549 case CEPH_LOCK_IFILE:
4550 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4551 scatter_mix(static_cast<ScatterLock*>(lock));
4552 else if (lock->get_state() != LOCK_LOCK)
4553 simple_lock(static_cast<ScatterLock*>(lock));
4554 else
4555 simple_sync(static_cast<ScatterLock*>(lock));
4556 break;
4557
4558 case CEPH_LOCK_IDFT:
4559 case CEPH_LOCK_INEST:
4560 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4561 scatter_mix(lock);
4562 else if (lock->get_state() != LOCK_LOCK)
4563 simple_lock(lock);
4564 else
4565 simple_sync(lock);
4566 break;
4567 default:
4568 ceph_abort();
4569 }
4570 ++count;
4571 if (lock->is_stable() && count == 2) {
4572 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4573 // this should only realy happen when called via
4574 // handle_file_lock due to AC_NUDGE, because the rest of the
4575 // time we are replicated or have dirty data and won't get
4576 // called. bailing here avoids an infinite loop.
4577 assert(!c);
4578 break;
4579 }
4580 } else {
4581 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4582 if (c)
4583 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4584 return;
4585 }
4586 }
4587 } else {
4588 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4589 << *lock << " on " << *p << dendl;
4590 // request unscatter?
4591 mds_rank_t auth = lock->get_parent()->authority().first;
4592 if (!mds->is_cluster_degraded() ||
4593 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4594 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4595
4596 // wait...
4597 if (c)
4598 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4599
4600 // also, requeue, in case we had wrong auth or something
4601 if (lock->is_dirty())
4602 updated_scatterlocks.push_back(lock->get_updated_item());
4603 }
4604 }
4605
4606 void Locker::scatter_tick()
4607 {
4608 dout(10) << "scatter_tick" << dendl;
4609
4610 // updated
4611 utime_t now = ceph_clock_now();
4612 int n = updated_scatterlocks.size();
4613 while (!updated_scatterlocks.empty()) {
4614 ScatterLock *lock = updated_scatterlocks.front();
4615
4616 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4617
4618 if (!lock->is_dirty()) {
4619 updated_scatterlocks.pop_front();
4620 dout(10) << " removing from updated_scatterlocks "
4621 << *lock << " " << *lock->get_parent() << dendl;
4622 continue;
4623 }
4624 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4625 break;
4626 updated_scatterlocks.pop_front();
4627 scatter_nudge(lock, 0);
4628 }
4629 mds->mdlog->flush();
4630 }
4631
4632
4633 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4634 {
4635 dout(10) << "scatter_tempsync " << *lock
4636 << " on " << *lock->get_parent() << dendl;
4637 assert(lock->get_parent()->is_auth());
4638 assert(lock->is_stable());
4639
4640 assert(0 == "not fully implemented, at least not for filelock");
4641
4642 CInode *in = static_cast<CInode *>(lock->get_parent());
4643
4644 switch (lock->get_state()) {
4645 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4646 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4647 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4648 default: ceph_abort();
4649 }
4650
4651 int gather = 0;
4652 if (lock->is_wrlocked())
4653 gather++;
4654
4655 if (lock->get_cap_shift() &&
4656 in->is_head() &&
4657 in->issued_caps_need_gather(lock)) {
4658 if (need_issue)
4659 *need_issue = true;
4660 else
4661 issue_caps(in);
4662 gather++;
4663 }
4664
4665 if (lock->get_state() == LOCK_MIX_TSYN &&
4666 in->is_replicated()) {
4667 lock->init_gather();
4668 send_lock_message(lock, LOCK_AC_LOCK);
4669 gather++;
4670 }
4671
4672 if (gather) {
4673 in->auth_pin(lock);
4674 } else {
4675 // do tempsync
4676 lock->set_state(LOCK_TSYN);
4677 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4678 if (lock->get_cap_shift()) {
4679 if (need_issue)
4680 *need_issue = true;
4681 else
4682 issue_caps(in);
4683 }
4684 }
4685 }
4686
4687
4688
4689 // ==========================================================================
4690 // local lock
4691
4692 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4693 {
4694 dout(7) << "local_wrlock_grab on " << *lock
4695 << " on " << *lock->get_parent() << dendl;
4696
4697 assert(lock->get_parent()->is_auth());
4698 assert(lock->can_wrlock());
4699 assert(!mut->wrlocks.count(lock));
4700 lock->get_wrlock(mut->get_client());
4701 mut->wrlocks.insert(lock);
4702 mut->locks.insert(lock);
4703 }
4704
4705 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4706 {
4707 dout(7) << "local_wrlock_start on " << *lock
4708 << " on " << *lock->get_parent() << dendl;
4709
4710 assert(lock->get_parent()->is_auth());
4711 if (lock->can_wrlock()) {
4712 assert(!mut->wrlocks.count(lock));
4713 lock->get_wrlock(mut->get_client());
4714 mut->wrlocks.insert(lock);
4715 mut->locks.insert(lock);
4716 return true;
4717 } else {
4718 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4719 return false;
4720 }
4721 }
4722
4723 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4724 {
4725 dout(7) << "local_wrlock_finish on " << *lock
4726 << " on " << *lock->get_parent() << dendl;
4727 lock->put_wrlock();
4728 mut->wrlocks.erase(lock);
4729 mut->locks.erase(lock);
4730 if (lock->get_num_wrlocks() == 0) {
4731 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4732 SimpleLock::WAIT_WR |
4733 SimpleLock::WAIT_RD);
4734 }
4735 }
4736
4737 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4738 {
4739 dout(7) << "local_xlock_start on " << *lock
4740 << " on " << *lock->get_parent() << dendl;
4741
4742 assert(lock->get_parent()->is_auth());
4743 if (!lock->can_xlock_local()) {
4744 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4745 return false;
4746 }
4747
4748 lock->get_xlock(mut, mut->get_client());
4749 mut->xlocks.insert(lock);
4750 mut->locks.insert(lock);
4751 return true;
4752 }
4753
4754 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4755 {
4756 dout(7) << "local_xlock_finish on " << *lock
4757 << " on " << *lock->get_parent() << dendl;
4758 lock->put_xlock();
4759 mut->xlocks.erase(lock);
4760 mut->locks.erase(lock);
4761
4762 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4763 SimpleLock::WAIT_WR |
4764 SimpleLock::WAIT_RD);
4765 }
4766
4767
4768
4769 // ==========================================================================
4770 // file lock
4771
4772
4773 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4774 {
4775 CInode *in = static_cast<CInode*>(lock->get_parent());
4776 int loner_wanted, other_wanted;
4777 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4778 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4779 << " loner_wanted=" << gcap_string(loner_wanted)
4780 << " other_wanted=" << gcap_string(other_wanted)
4781 << " filelock=" << *lock << " on " << *lock->get_parent()
4782 << dendl;
4783
4784 assert(lock->get_parent()->is_auth());
4785 assert(lock->is_stable());
4786
4787 if (lock->get_parent()->is_freezing_or_frozen())
4788 return;
4789
4790 if (mdcache->is_readonly()) {
4791 if (lock->get_state() != LOCK_SYNC) {
4792 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4793 simple_sync(lock, need_issue);
4794 }
4795 return;
4796 }
4797
4798 // excl -> *?
4799 if (lock->get_state() == LOCK_EXCL) {
4800 dout(20) << " is excl" << dendl;
4801 int loner_issued, other_issued, xlocker_issued;
4802 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4803 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4804 << " other_issued=" << gcap_string(other_issued)
4805 << " xlocker_issued=" << gcap_string(xlocker_issued)
4806 << dendl;
4807 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4808 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4809 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4810 dout(20) << " should lose it" << dendl;
4811 // we should lose it.
4812 // loner other want
4813 // R R SYNC
4814 // R R|W MIX
4815 // R W MIX
4816 // R|W R MIX
4817 // R|W R|W MIX
4818 // R|W W MIX
4819 // W R MIX
4820 // W R|W MIX
4821 // W W MIX
4822 // -> any writer means MIX; RD doesn't matter.
4823 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4824 lock->is_waiter_for(SimpleLock::WAIT_WR))
4825 scatter_mix(lock, need_issue);
4826 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4827 simple_sync(lock, need_issue);
4828 else
4829 dout(10) << " waiting for wrlock to drain" << dendl;
4830 }
4831 }
4832
4833 // * -> excl?
4834 else if (lock->get_state() != LOCK_EXCL &&
4835 !lock->is_rdlocked() &&
4836 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4837 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4838 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4839 in->get_target_loner() >= 0) {
4840 dout(7) << "file_eval stable, bump to loner " << *lock
4841 << " on " << *lock->get_parent() << dendl;
4842 file_excl(lock, need_issue);
4843 }
4844
4845 // * -> mixed?
4846 else if (lock->get_state() != LOCK_MIX &&
4847 !lock->is_rdlocked() &&
4848 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4849 (lock->get_scatter_wanted() ||
4850 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4851 dout(7) << "file_eval stable, bump to mixed " << *lock
4852 << " on " << *lock->get_parent() << dendl;
4853 scatter_mix(lock, need_issue);
4854 }
4855
4856 // * -> sync?
4857 else if (lock->get_state() != LOCK_SYNC &&
4858 !lock->is_wrlocked() && // drain wrlocks first!
4859 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4860 !(wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) &&
4861 !((lock->get_state() == LOCK_MIX) &&
4862 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4863 //((wanted & CEPH_CAP_RD) ||
4864 //in->is_replicated() ||
4865 //lock->is_leased() ||
4866 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4867 ) {
4868 dout(7) << "file_eval stable, bump to sync " << *lock
4869 << " on " << *lock->get_parent() << dendl;
4870 simple_sync(lock, need_issue);
4871 }
4872 }
4873
4874
4875
4876 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4877 {
4878 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4879
4880 CInode *in = static_cast<CInode*>(lock->get_parent());
4881 assert(in->is_auth());
4882 assert(lock->is_stable());
4883
4884 if (lock->get_state() == LOCK_LOCK) {
4885 in->start_scatter(lock);
4886 if (in->is_replicated()) {
4887 // data
4888 bufferlist softdata;
4889 lock->encode_locked_state(softdata);
4890
4891 // bcast to replicas
4892 send_lock_message(lock, LOCK_AC_MIX, softdata);
4893 }
4894
4895 // change lock
4896 lock->set_state(LOCK_MIX);
4897 lock->clear_scatter_wanted();
4898 if (lock->get_cap_shift()) {
4899 if (need_issue)
4900 *need_issue = true;
4901 else
4902 issue_caps(in);
4903 }
4904 } else {
4905 // gather?
4906 switch (lock->get_state()) {
4907 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4908 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4909 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
4910 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4911 default: ceph_abort();
4912 }
4913
4914 int gather = 0;
4915 if (lock->is_rdlocked())
4916 gather++;
4917 if (in->is_replicated()) {
4918 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
4919 send_lock_message(lock, LOCK_AC_MIX);
4920 lock->init_gather();
4921 gather++;
4922 }
4923 }
4924 if (lock->is_leased()) {
4925 revoke_client_leases(lock);
4926 gather++;
4927 }
4928 if (lock->get_cap_shift() &&
4929 in->is_head() &&
4930 in->issued_caps_need_gather(lock)) {
4931 if (need_issue)
4932 *need_issue = true;
4933 else
4934 issue_caps(in);
4935 gather++;
4936 }
4937 bool need_recover = false;
4938 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4939 mds->mdcache->queue_file_recover(in);
4940 need_recover = true;
4941 gather++;
4942 }
4943
4944 if (gather) {
4945 lock->get_parent()->auth_pin(lock);
4946 if (need_recover)
4947 mds->mdcache->do_file_recover();
4948 } else {
4949 in->start_scatter(lock);
4950 lock->set_state(LOCK_MIX);
4951 lock->clear_scatter_wanted();
4952 if (in->is_replicated()) {
4953 bufferlist softdata;
4954 lock->encode_locked_state(softdata);
4955 send_lock_message(lock, LOCK_AC_MIX, softdata);
4956 }
4957 if (lock->get_cap_shift()) {
4958 if (need_issue)
4959 *need_issue = true;
4960 else
4961 issue_caps(in);
4962 }
4963 }
4964 }
4965 }
4966
4967
4968 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4969 {
4970 CInode *in = static_cast<CInode*>(lock->get_parent());
4971 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4972
4973 assert(in->is_auth());
4974 assert(lock->is_stable());
4975
4976 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4977 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4978
4979 switch (lock->get_state()) {
4980 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4981 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4982 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4983 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4984 default: ceph_abort();
4985 }
4986 int gather = 0;
4987
4988 if (lock->is_rdlocked())
4989 gather++;
4990 if (lock->is_wrlocked())
4991 gather++;
4992
4993 if (in->is_replicated() &&
4994 lock->get_state() != LOCK_LOCK_EXCL &&
4995 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
4996 send_lock_message(lock, LOCK_AC_LOCK);
4997 lock->init_gather();
4998 gather++;
4999 }
5000 if (lock->is_leased()) {
5001 revoke_client_leases(lock);
5002 gather++;
5003 }
5004 if (in->is_head() &&
5005 in->issued_caps_need_gather(lock)) {
5006 if (need_issue)
5007 *need_issue = true;
5008 else
5009 issue_caps(in);
5010 gather++;
5011 }
5012 bool need_recover = false;
5013 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5014 mds->mdcache->queue_file_recover(in);
5015 need_recover = true;
5016 gather++;
5017 }
5018
5019 if (gather) {
5020 lock->get_parent()->auth_pin(lock);
5021 if (need_recover)
5022 mds->mdcache->do_file_recover();
5023 } else {
5024 lock->set_state(LOCK_EXCL);
5025 if (need_issue)
5026 *need_issue = true;
5027 else
5028 issue_caps(in);
5029 }
5030 }
5031
5032 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5033 {
5034 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5035 CInode *in = static_cast<CInode *>(lock->get_parent());
5036 assert(in->is_auth());
5037 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5038
5039 switch (lock->get_state()) {
5040 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5041 default: ceph_abort();
5042 }
5043
5044 int gather = 0;
5045 if (lock->is_wrlocked())
5046 gather++;
5047
5048 if (in->is_head() &&
5049 in->issued_caps_need_gather(lock)) {
5050 if (need_issue)
5051 *need_issue = true;
5052 else
5053 issue_caps(in);
5054 gather++;
5055 }
5056
5057 if (gather) {
5058 lock->get_parent()->auth_pin(lock);
5059 } else {
5060 lock->set_state(LOCK_XSYN);
5061 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5062 if (need_issue)
5063 *need_issue = true;
5064 else
5065 issue_caps(in);
5066 }
5067 }
5068
5069 void Locker::file_recover(ScatterLock *lock)
5070 {
5071 CInode *in = static_cast<CInode *>(lock->get_parent());
5072 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5073
5074 assert(in->is_auth());
5075 //assert(lock->is_stable());
5076 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5077
5078 int gather = 0;
5079
5080 /*
5081 if (in->is_replicated()
5082 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5083 send_lock_message(lock, LOCK_AC_LOCK);
5084 lock->init_gather();
5085 gather++;
5086 }
5087 */
5088 if (in->is_head() &&
5089 in->issued_caps_need_gather(lock)) {
5090 issue_caps(in);
5091 gather++;
5092 }
5093
5094 lock->set_state(LOCK_SCAN);
5095 if (gather)
5096 in->state_set(CInode::STATE_NEEDSRECOVER);
5097 else
5098 mds->mdcache->queue_file_recover(in);
5099 }
5100
5101
5102 // messenger
5103 /* This function DOES put the passed message before returning */
5104 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5105 {
5106 CInode *in = static_cast<CInode*>(lock->get_parent());
5107 int from = m->get_asker();
5108
5109 if (mds->is_rejoin()) {
5110 if (in->is_rejoining()) {
5111 dout(7) << "handle_file_lock still rejoining " << *in
5112 << ", dropping " << *m << dendl;
5113 m->put();
5114 return;
5115 }
5116 }
5117
5118 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5119 << " on " << *lock
5120 << " from mds." << from << " "
5121 << *in << dendl;
5122
5123 bool caps = lock->get_cap_shift();
5124
5125 switch (m->get_action()) {
5126 // -- replica --
5127 case LOCK_AC_SYNC:
5128 assert(lock->get_state() == LOCK_LOCK ||
5129 lock->get_state() == LOCK_MIX ||
5130 lock->get_state() == LOCK_MIX_SYNC2);
5131
5132 if (lock->get_state() == LOCK_MIX) {
5133 lock->set_state(LOCK_MIX_SYNC);
5134 eval_gather(lock, true);
5135 if (lock->is_unstable_and_locked())
5136 mds->mdlog->flush();
5137 break;
5138 }
5139
5140 (static_cast<ScatterLock *>(lock))->finish_flush();
5141 (static_cast<ScatterLock *>(lock))->clear_flushed();
5142
5143 // ok
5144 lock->decode_locked_state(m->get_data());
5145 lock->set_state(LOCK_SYNC);
5146
5147 lock->get_rdlock();
5148 if (caps)
5149 issue_caps(in);
5150 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5151 lock->put_rdlock();
5152 break;
5153
5154 case LOCK_AC_LOCK:
5155 switch (lock->get_state()) {
5156 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5157 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5158 default: ceph_abort();
5159 }
5160
5161 eval_gather(lock, true);
5162 if (lock->is_unstable_and_locked())
5163 mds->mdlog->flush();
5164
5165 break;
5166
5167 case LOCK_AC_LOCKFLUSHED:
5168 (static_cast<ScatterLock *>(lock))->finish_flush();
5169 (static_cast<ScatterLock *>(lock))->clear_flushed();
5170 // wake up scatter_nudge waiters
5171 if (lock->is_stable())
5172 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5173 break;
5174
5175 case LOCK_AC_MIX:
5176 assert(lock->get_state() == LOCK_SYNC ||
5177 lock->get_state() == LOCK_LOCK ||
5178 lock->get_state() == LOCK_SYNC_MIX2);
5179
5180 if (lock->get_state() == LOCK_SYNC) {
5181 // MIXED
5182 lock->set_state(LOCK_SYNC_MIX);
5183 eval_gather(lock, true);
5184 if (lock->is_unstable_and_locked())
5185 mds->mdlog->flush();
5186 break;
5187 }
5188
5189 // ok
5190 lock->set_state(LOCK_MIX);
5191 lock->decode_locked_state(m->get_data());
5192
5193 if (caps)
5194 issue_caps(in);
5195
5196 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5197 break;
5198
5199
5200 // -- auth --
5201 case LOCK_AC_LOCKACK:
5202 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5203 lock->get_state() == LOCK_MIX_LOCK ||
5204 lock->get_state() == LOCK_MIX_LOCK2 ||
5205 lock->get_state() == LOCK_MIX_EXCL ||
5206 lock->get_state() == LOCK_SYNC_EXCL ||
5207 lock->get_state() == LOCK_SYNC_MIX ||
5208 lock->get_state() == LOCK_MIX_TSYN);
5209 assert(lock->is_gathering(from));
5210 lock->remove_gather(from);
5211
5212 if (lock->get_state() == LOCK_MIX_LOCK ||
5213 lock->get_state() == LOCK_MIX_LOCK2 ||
5214 lock->get_state() == LOCK_MIX_EXCL ||
5215 lock->get_state() == LOCK_MIX_TSYN) {
5216 lock->decode_locked_state(m->get_data());
5217 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5218 // delay calling scatter_writebehind().
5219 lock->clear_flushed();
5220 }
5221
5222 if (lock->is_gathering()) {
5223 dout(7) << "handle_file_lock " << *in << " from " << from
5224 << ", still gathering " << lock->get_gather_set() << dendl;
5225 } else {
5226 dout(7) << "handle_file_lock " << *in << " from " << from
5227 << ", last one" << dendl;
5228 eval_gather(lock);
5229 }
5230 break;
5231
5232 case LOCK_AC_SYNCACK:
5233 assert(lock->get_state() == LOCK_MIX_SYNC);
5234 assert(lock->is_gathering(from));
5235 lock->remove_gather(from);
5236
5237 lock->decode_locked_state(m->get_data());
5238
5239 if (lock->is_gathering()) {
5240 dout(7) << "handle_file_lock " << *in << " from " << from
5241 << ", still gathering " << lock->get_gather_set() << dendl;
5242 } else {
5243 dout(7) << "handle_file_lock " << *in << " from " << from
5244 << ", last one" << dendl;
5245 eval_gather(lock);
5246 }
5247 break;
5248
5249 case LOCK_AC_MIXACK:
5250 assert(lock->get_state() == LOCK_SYNC_MIX);
5251 assert(lock->is_gathering(from));
5252 lock->remove_gather(from);
5253
5254 if (lock->is_gathering()) {
5255 dout(7) << "handle_file_lock " << *in << " from " << from
5256 << ", still gathering " << lock->get_gather_set() << dendl;
5257 } else {
5258 dout(7) << "handle_file_lock " << *in << " from " << from
5259 << ", last one" << dendl;
5260 eval_gather(lock);
5261 }
5262 break;
5263
5264
5265 // requests....
5266 case LOCK_AC_REQSCATTER:
5267 if (lock->is_stable()) {
5268 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5269 * because the replica should be holding an auth_pin if they're
5270 * doing this (and thus, we are freezing, not frozen, and indefinite
5271 * starvation isn't an issue).
5272 */
5273 dout(7) << "handle_file_lock got scatter request on " << *lock
5274 << " on " << *lock->get_parent() << dendl;
5275 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5276 scatter_mix(lock);
5277 } else {
5278 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5279 << " on " << *lock->get_parent() << dendl;
5280 lock->set_scatter_wanted();
5281 }
5282 break;
5283
5284 case LOCK_AC_REQUNSCATTER:
5285 if (lock->is_stable()) {
5286 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5287 * because the replica should be holding an auth_pin if they're
5288 * doing this (and thus, we are freezing, not frozen, and indefinite
5289 * starvation isn't an issue).
5290 */
5291 dout(7) << "handle_file_lock got unscatter request on " << *lock
5292 << " on " << *lock->get_parent() << dendl;
5293 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5294 simple_lock(lock); // FIXME tempsync?
5295 } else {
5296 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5297 << " on " << *lock->get_parent() << dendl;
5298 lock->set_unscatter_wanted();
5299 }
5300 break;
5301
5302 case LOCK_AC_REQRDLOCK:
5303 handle_reqrdlock(lock, m);
5304 break;
5305
5306 case LOCK_AC_NUDGE:
5307 if (!lock->get_parent()->is_auth()) {
5308 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5309 << " on " << *lock->get_parent() << dendl;
5310 } else if (!lock->get_parent()->is_replicated()) {
5311 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5312 << " on " << *lock->get_parent() << dendl;
5313 } else {
5314 dout(7) << "handle_file_lock trying nudge on " << *lock
5315 << " on " << *lock->get_parent() << dendl;
5316 scatter_nudge(lock, 0, true);
5317 mds->mdlog->flush();
5318 }
5319 break;
5320
5321 default:
5322 ceph_abort();
5323 }
5324
5325 m->put();
5326 }
5327
5328
5329
5330
5331
5332