]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
update source to 12.2.11
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "MDSRank.h"
18 #include "MDCache.h"
19 #include "Locker.h"
20 #include "MDBalancer.h"
21 #include "Migrator.h"
22 #include "CInode.h"
23 #include "CDir.h"
24 #include "CDentry.h"
25 #include "Mutation.h"
26 #include "MDSContext.h"
27
28 #include "MDLog.h"
29 #include "MDSMap.h"
30
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
33
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
36
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
43
44 #include "messages/MMDSSlaveRequest.h"
45
46 #include <errno.h>
47
48 #include "common/config.h"
49
50
51 #define dout_subsys ceph_subsys_mds
52 #undef dout_prefix
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
56 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
57 }
58
59
60 class LockerContext : public MDSInternalContextBase {
61 protected:
62 Locker *locker;
63 MDSRank *get_mds() override
64 {
65 return locker->mds;
66 }
67
68 public:
69 explicit LockerContext(Locker *locker_) : locker(locker_) {
70 assert(locker != NULL);
71 }
72 };
73
74 class LockerLogContext : public MDSLogContextBase {
75 protected:
76 Locker *locker;
77 MDSRank *get_mds() override
78 {
79 return locker->mds;
80 }
81
82 public:
83 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
84 assert(locker != NULL);
85 }
86 };
87
88 /* This function DOES put the passed message before returning */
89 void Locker::dispatch(Message *m)
90 {
91
92 switch (m->get_type()) {
93
94 // inter-mds locking
95 case MSG_MDS_LOCK:
96 handle_lock(static_cast<MLock*>(m));
97 break;
98 // inter-mds caps
99 case MSG_MDS_INODEFILECAPS:
100 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
101 break;
102
103 // client sync
104 case CEPH_MSG_CLIENT_CAPS:
105 handle_client_caps(static_cast<MClientCaps*>(m));
106
107 break;
108 case CEPH_MSG_CLIENT_CAPRELEASE:
109 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
110 break;
111 case CEPH_MSG_CLIENT_LEASE:
112 handle_client_lease(static_cast<MClientLease*>(m));
113 break;
114
115 default:
116 derr << "locker unknown message " << m->get_type() << dendl;
117 assert(0 == "locker unknown message");
118 }
119 }
120
121 void Locker::tick()
122 {
123 scatter_tick();
124 caps_tick();
125 }
126
127 /*
128 * locks vs rejoin
129 *
130 *
131 *
132 */
133
134 void Locker::send_lock_message(SimpleLock *lock, int msg)
135 {
136 for (const auto &it : lock->get_parent()->get_replicas()) {
137 if (mds->is_cluster_degraded() &&
138 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
139 continue;
140 MLock *m = new MLock(lock, msg, mds->get_nodeid());
141 mds->send_message_mds(m, it.first);
142 }
143 }
144
145 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
146 {
147 for (const auto &it : lock->get_parent()->get_replicas()) {
148 if (mds->is_cluster_degraded() &&
149 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
150 continue;
151 MLock *m = new MLock(lock, msg, mds->get_nodeid());
152 m->set_data(data);
153 mds->send_message_mds(m, it.first);
154 }
155 }
156
157
158
159
160 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
161 {
162 // rdlock ancestor snaps
163 CInode *t = in;
164 rdlocks.insert(&in->snaplock);
165 while (t->get_projected_parent_dn()) {
166 t = t->get_projected_parent_dn()->get_dir()->get_inode();
167 rdlocks.insert(&t->snaplock);
168 }
169 }
170
171 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
172 file_layout_t **layout)
173 {
174 //rdlock ancestor snaps
175 CInode *t = in;
176 rdlocks.insert(&in->snaplock);
177 rdlocks.insert(&in->policylock);
178 bool found_layout = false;
179 while (t) {
180 rdlocks.insert(&t->snaplock);
181 if (!found_layout) {
182 rdlocks.insert(&t->policylock);
183 if (t->get_projected_inode()->has_layout()) {
184 *layout = &t->get_projected_inode()->layout;
185 found_layout = true;
186 }
187 }
188 if (t->get_projected_parent_dn() &&
189 t->get_projected_parent_dn()->get_dir())
190 t = t->get_projected_parent_dn()->get_dir()->get_inode();
191 else t = NULL;
192 }
193 }
194
195 struct MarkEventOnDestruct {
196 MDRequestRef& mdr;
197 const char* message;
198 bool mark_event;
199 MarkEventOnDestruct(MDRequestRef& _mdr,
200 const char *_message) : mdr(_mdr),
201 message(_message),
202 mark_event(true) {}
203 ~MarkEventOnDestruct() {
204 if (mark_event)
205 mdr->mark_event(message);
206 }
207 };
208
209 /* If this function returns false, the mdr has been placed
210 * on the appropriate wait list */
211 bool Locker::acquire_locks(MDRequestRef& mdr,
212 set<SimpleLock*> &rdlocks,
213 set<SimpleLock*> &wrlocks,
214 set<SimpleLock*> &xlocks,
215 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
216 CInode *auth_pin_freeze,
217 bool auth_pin_nonblock)
218 {
219 if (mdr->done_locking &&
220 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
221 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
222 return true; // at least we had better be!
223 }
224 dout(10) << "acquire_locks " << *mdr << dendl;
225
226 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
227
228 client_t client = mdr->get_client();
229
230 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
231 set<MDSCacheObject*> mustpin; // items to authpin
232
233 // xlocks
234 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
235 SimpleLock *lock = *p;
236
237 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
238 lock->get_type() == CEPH_LOCK_IPOLICY) &&
239 mds->is_cluster_degraded() &&
240 mdr->is_master() &&
241 !mdr->is_queued_for_replay()) {
242 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
243 // get processed in proper order.
244 bool wait = false;
245 if (lock->get_parent()->is_auth()) {
246 if (!mdr->locks.count(lock)) {
247 set<mds_rank_t> ls;
248 lock->get_parent()->list_replicas(ls);
249 for (auto m : ls) {
250 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
251 wait = true;
252 break;
253 }
254 }
255 }
256 } else {
257 // if the lock is the latest locked one, it's possible that slave mds got the lock
258 // while there are recovering mds.
259 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
260 wait = true;
261 }
262 if (wait) {
263 dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
264 << ", waiting for cluster recovered" << dendl;
265 mds->locker->drop_locks(mdr.get(), NULL);
266 mdr->drop_local_auth_pins();
267 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
268 return false;
269 }
270 }
271
272 dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
273
274 sorted.insert(lock);
275 mustpin.insert(lock->get_parent());
276
277 // augment xlock with a versionlock?
278 if ((*p)->get_type() == CEPH_LOCK_DN) {
279 CDentry *dn = (CDentry*)lock->get_parent();
280 if (!dn->is_auth())
281 continue;
282
283 if (xlocks.count(&dn->versionlock))
284 continue; // we're xlocking the versionlock too; don't wrlock it!
285
286 if (mdr->is_master()) {
287 // master. wrlock versionlock so we can pipeline dentry updates to journal.
288 wrlocks.insert(&dn->versionlock);
289 } else {
290 // slave. exclusively lock the dentry version (i.e. block other journal updates).
291 // this makes rollback safe.
292 xlocks.insert(&dn->versionlock);
293 sorted.insert(&dn->versionlock);
294 }
295 }
296 if (lock->get_type() > CEPH_LOCK_IVERSION) {
297 // inode version lock?
298 CInode *in = (CInode*)lock->get_parent();
299 if (!in->is_auth())
300 continue;
301 if (mdr->is_master()) {
302 // master. wrlock versionlock so we can pipeline inode updates to journal.
303 wrlocks.insert(&in->versionlock);
304 } else {
305 // slave. exclusively lock the inode version (i.e. block other journal updates).
306 // this makes rollback safe.
307 xlocks.insert(&in->versionlock);
308 sorted.insert(&in->versionlock);
309 }
310 }
311 }
312
313 // wrlocks
314 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
315 MDSCacheObject *object = (*p)->get_parent();
316 dout(20) << " must wrlock " << **p << " " << *object << dendl;
317 sorted.insert(*p);
318 if (object->is_auth())
319 mustpin.insert(object);
320 else if (!object->is_auth() &&
321 !(*p)->can_wrlock(client) && // we might have to request a scatter
322 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
323 dout(15) << " will also auth_pin " << *object
324 << " in case we need to request a scatter" << dendl;
325 mustpin.insert(object);
326 }
327 }
328
329 // remote_wrlocks
330 if (remote_wrlocks) {
331 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
332 MDSCacheObject *object = p->first->get_parent();
333 dout(20) << " must remote_wrlock on mds." << p->second << " "
334 << *p->first << " " << *object << dendl;
335 sorted.insert(p->first);
336 mustpin.insert(object);
337 }
338 }
339
340 // rdlocks
341 for (set<SimpleLock*>::iterator p = rdlocks.begin();
342 p != rdlocks.end();
343 ++p) {
344 MDSCacheObject *object = (*p)->get_parent();
345 dout(20) << " must rdlock " << **p << " " << *object << dendl;
346 sorted.insert(*p);
347 if (object->is_auth())
348 mustpin.insert(object);
349 else if (!object->is_auth() &&
350 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
351 dout(15) << " will also auth_pin " << *object
352 << " in case we need to request a rdlock" << dendl;
353 mustpin.insert(object);
354 }
355 }
356
357
358 // AUTH PINS
359 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
360
361 // can i auth pin them all now?
362 marker.message = "failed to authpin local pins";
363 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
364 p != mustpin.end();
365 ++p) {
366 MDSCacheObject *object = *p;
367
368 dout(10) << " must authpin " << *object << dendl;
369
370 if (mdr->is_auth_pinned(object)) {
371 if (object != (MDSCacheObject*)auth_pin_freeze)
372 continue;
373 if (mdr->more()->is_remote_frozen_authpin) {
374 if (mdr->more()->rename_inode == auth_pin_freeze)
375 continue;
376 // unfreeze auth pin for the wrong inode
377 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
378 }
379 }
380
381 if (!object->is_auth()) {
382 if (!mdr->locks.empty())
383 drop_locks(mdr.get());
384 if (object->is_ambiguous_auth()) {
385 // wait
386 marker.message = "waiting for single auth, object is being migrated";
387 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
388 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
389 mdr->drop_local_auth_pins();
390 return false;
391 }
392 mustpin_remote[object->authority().first].insert(object);
393 continue;
394 }
395 int err = 0;
396 if (!object->can_auth_pin(&err)) {
397 // wait
398 drop_locks(mdr.get());
399 mdr->drop_local_auth_pins();
400 if (auth_pin_nonblock) {
401 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
402 mdr->aborted = true;
403 return false;
404 }
405 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
406 marker.message = "failed to authpin, subtree is being exported";
407 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
408 marker.message = "failed to authpin, dir is being fragmented";
409 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
410 marker.message = "failed to authpin, inode is being exported";
411 }
412 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
413 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
414
415 if (!mdr->remote_auth_pins.empty())
416 notify_freeze_waiter(object);
417
418 return false;
419 }
420 }
421
422 // ok, grab local auth pins
423 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
424 p != mustpin.end();
425 ++p) {
426 MDSCacheObject *object = *p;
427 if (mdr->is_auth_pinned(object)) {
428 dout(10) << " already auth_pinned " << *object << dendl;
429 } else if (object->is_auth()) {
430 dout(10) << " auth_pinning " << *object << dendl;
431 mdr->auth_pin(object);
432 }
433 }
434
435 // request remote auth_pins
436 if (!mustpin_remote.empty()) {
437 marker.message = "requesting remote authpins";
438 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
439 p != mdr->remote_auth_pins.end();
440 ++p) {
441 if (mustpin.count(p->first)) {
442 assert(p->second == p->first->authority().first);
443 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
444 if (q != mustpin_remote.end())
445 q->second.insert(p->first);
446 }
447 }
448 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
449 p != mustpin_remote.end();
450 ++p) {
451 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
452
453 // wait for active auth
454 if (mds->is_cluster_degraded() &&
455 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
456 dout(10) << " mds." << p->first << " is not active" << dendl;
457 if (mdr->more()->waiting_on_slave.empty())
458 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
459 return false;
460 }
461
462 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
463 MMDSSlaveRequest::OP_AUTHPIN);
464 for (set<MDSCacheObject*>::iterator q = p->second.begin();
465 q != p->second.end();
466 ++q) {
467 dout(10) << " req remote auth_pin of " << **q << dendl;
468 MDSCacheObjectInfo info;
469 (*q)->set_object_info(info);
470 req->get_authpins().push_back(info);
471 if (*q == auth_pin_freeze)
472 (*q)->set_object_info(req->get_authpin_freeze());
473 mdr->pin(*q);
474 }
475 if (auth_pin_nonblock)
476 req->mark_nonblock();
477 mds->send_message_mds(req, p->first);
478
479 // put in waiting list
480 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
481 mdr->more()->waiting_on_slave.insert(p->first);
482 }
483 return false;
484 }
485
486 // caps i'll need to issue
487 set<CInode*> issue_set;
488 bool result = false;
489
490 // acquire locks.
491 // make sure they match currently acquired locks.
492 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
493 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
494 p != sorted.end();
495 ++p) {
496 bool need_wrlock = !!wrlocks.count(*p);
497 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
498
499 // already locked?
500 if (existing != mdr->locks.end() && *existing == *p) {
501 // right kind?
502 SimpleLock *have = *existing;
503 ++existing;
504 if (xlocks.count(have) && mdr->xlocks.count(have)) {
505 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
506 continue;
507 }
508 if (mdr->remote_wrlocks.count(have)) {
509 if (!need_remote_wrlock ||
510 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
511 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
512 << " " << *have << " " << *have->get_parent() << dendl;
513 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
514 }
515 }
516 if (need_wrlock || need_remote_wrlock) {
517 if (need_wrlock == !!mdr->wrlocks.count(have) &&
518 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
519 if (need_wrlock)
520 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
521 if (need_remote_wrlock)
522 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
523 continue;
524 }
525 }
526 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
527 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
528 continue;
529 }
530 }
531
532 // hose any stray locks
533 if (existing != mdr->locks.end() && *existing == *p) {
534 assert(need_wrlock || need_remote_wrlock);
535 SimpleLock *lock = *existing;
536 if (mdr->wrlocks.count(lock)) {
537 if (!need_wrlock)
538 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
539 else if (need_remote_wrlock) // acquire remote_wrlock first
540 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
541 bool need_issue = false;
542 wrlock_finish(lock, mdr.get(), &need_issue);
543 if (need_issue)
544 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
545 }
546 ++existing;
547 }
548 while (existing != mdr->locks.end()) {
549 SimpleLock *stray = *existing;
550 ++existing;
551 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
552 bool need_issue = false;
553 if (mdr->xlocks.count(stray)) {
554 xlock_finish(stray, mdr.get(), &need_issue);
555 } else if (mdr->rdlocks.count(stray)) {
556 rdlock_finish(stray, mdr.get(), &need_issue);
557 } else {
558 // may have acquired both wrlock and remore wrlock
559 if (mdr->wrlocks.count(stray))
560 wrlock_finish(stray, mdr.get(), &need_issue);
561 if (mdr->remote_wrlocks.count(stray))
562 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
563 }
564 if (need_issue)
565 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
566 }
567
568 // lock
569 if (mdr->locking && *p != mdr->locking) {
570 cancel_locking(mdr.get(), &issue_set);
571 }
572 if (xlocks.count(*p)) {
573 marker.message = "failed to xlock, waiting";
574 if (!xlock_start(*p, mdr))
575 goto out;
576 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
577 } else if (need_wrlock || need_remote_wrlock) {
578 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
579 marker.message = "waiting for remote wrlocks";
580 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
581 goto out;
582 }
583 if (need_wrlock && !mdr->wrlocks.count(*p)) {
584 marker.message = "failed to wrlock, waiting";
585 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
586 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
587 // can't take the wrlock because the scatter lock is gathering. need to
588 // release the remote wrlock, so that the gathering process can finish.
589 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
590 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
591 goto out;
592 }
593 // nowait if we have already gotten remote wrlock
594 if (!wrlock_start(*p, mdr, need_remote_wrlock))
595 goto out;
596 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
597 }
598 } else {
599 assert(mdr->is_master());
600 if ((*p)->needs_recover()) {
601 if (mds->is_cluster_degraded()) {
602 if (!mdr->is_queued_for_replay()) {
603 // see comments in SimpleLock::set_state_rejoin() and
604 // ScatterLock::encode_state_for_rejoin()
605 drop_locks(mdr.get());
606 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
607 dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
608 << ", waiting for cluster recovered" << dendl;
609 marker.message = "rejoin recovering lock, waiting for cluster recovered";
610 return false;
611 }
612 } else {
613 (*p)->clear_need_recover();
614 }
615 }
616
617 marker.message = "failed to rdlock, waiting";
618 if (!rdlock_start(*p, mdr))
619 goto out;
620 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
621 }
622 }
623
624 // any extra unneeded locks?
625 while (existing != mdr->locks.end()) {
626 SimpleLock *stray = *existing;
627 ++existing;
628 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
629 bool need_issue = false;
630 if (mdr->xlocks.count(stray)) {
631 xlock_finish(stray, mdr.get(), &need_issue);
632 } else if (mdr->rdlocks.count(stray)) {
633 rdlock_finish(stray, mdr.get(), &need_issue);
634 } else {
635 // may have acquired both wrlock and remore wrlock
636 if (mdr->wrlocks.count(stray))
637 wrlock_finish(stray, mdr.get(), &need_issue);
638 if (mdr->remote_wrlocks.count(stray))
639 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
640 }
641 if (need_issue)
642 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
643 }
644
645 mdr->done_locking = true;
646 mdr->set_mds_stamp(ceph_clock_now());
647 result = true;
648 marker.message = "acquired locks";
649
650 out:
651 issue_caps_set(issue_set);
652 return result;
653 }
654
655 void Locker::notify_freeze_waiter(MDSCacheObject *o)
656 {
657 CDir *dir = NULL;
658 if (CInode *in = dynamic_cast<CInode*>(o)) {
659 if (!in->is_root())
660 dir = in->get_parent_dir();
661 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
662 dir = dn->get_dir();
663 } else {
664 dir = dynamic_cast<CDir*>(o);
665 assert(dir);
666 }
667 if (dir) {
668 if (dir->is_freezing_dir())
669 mdcache->fragment_freeze_inc_num_waiters(dir);
670 if (dir->is_freezing_tree()) {
671 while (!dir->is_freezing_tree_root())
672 dir = dir->get_parent_dir();
673 mdcache->migrator->export_freeze_inc_num_waiters(dir);
674 }
675 }
676 }
677
678 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
679 {
680 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
681 p != mut->xlocks.end();
682 ++p) {
683 MDSCacheObject *object = (*p)->get_parent();
684 assert(object->is_auth());
685 if (skip_dentry &&
686 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
687 continue;
688 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
689 (*p)->set_xlock_done();
690 }
691 }
692
693 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
694 {
695 while (!mut->rdlocks.empty()) {
696 bool ni = false;
697 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
698 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
699 if (ni)
700 pneed_issue->insert(static_cast<CInode*>(p));
701 }
702 }
703
704 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
705 {
706 set<mds_rank_t> slaves;
707
708 while (!mut->xlocks.empty()) {
709 SimpleLock *lock = *mut->xlocks.begin();
710 MDSCacheObject *p = lock->get_parent();
711 if (!p->is_auth()) {
712 assert(lock->get_sm()->can_remote_xlock);
713 slaves.insert(p->authority().first);
714 lock->put_xlock();
715 mut->locks.erase(lock);
716 mut->xlocks.erase(lock);
717 continue;
718 }
719 bool ni = false;
720 xlock_finish(lock, mut, &ni);
721 if (ni)
722 pneed_issue->insert(static_cast<CInode*>(p));
723 }
724
725 while (!mut->remote_wrlocks.empty()) {
726 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
727 slaves.insert(p->second);
728 if (mut->wrlocks.count(p->first) == 0)
729 mut->locks.erase(p->first);
730 mut->remote_wrlocks.erase(p);
731 }
732
733 while (!mut->wrlocks.empty()) {
734 bool ni = false;
735 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
736 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
737 if (ni)
738 pneed_issue->insert(static_cast<CInode*>(p));
739 }
740
741 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
742 if (!mds->is_cluster_degraded() ||
743 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
744 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
745 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
746 MMDSSlaveRequest::OP_DROPLOCKS);
747 mds->send_message_mds(slavereq, *p);
748 }
749 }
750 }
751
752 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
753 {
754 SimpleLock *lock = mut->locking;
755 assert(lock);
756 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
757
758 if (lock->get_parent()->is_auth()) {
759 bool need_issue = false;
760 if (lock->get_state() == LOCK_PREXLOCK) {
761 _finish_xlock(lock, -1, &need_issue);
762 } else if (lock->get_state() == LOCK_LOCK_XLOCK &&
763 lock->get_num_xlocks() == 0) {
764 lock->set_state(LOCK_XLOCKDONE);
765 eval_gather(lock, true, &need_issue);
766 }
767 if (need_issue)
768 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
769 }
770 mut->finish_locking(lock);
771 }
772
773 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
774 {
775 // leftover locks
776 set<CInode*> my_need_issue;
777 if (!pneed_issue)
778 pneed_issue = &my_need_issue;
779
780 if (mut->locking)
781 cancel_locking(mut, pneed_issue);
782 _drop_non_rdlocks(mut, pneed_issue);
783 _drop_rdlocks(mut, pneed_issue);
784
785 if (pneed_issue == &my_need_issue)
786 issue_caps_set(*pneed_issue);
787 mut->done_locking = false;
788 }
789
790 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
791 {
792 set<CInode*> my_need_issue;
793 if (!pneed_issue)
794 pneed_issue = &my_need_issue;
795
796 _drop_non_rdlocks(mut, pneed_issue);
797
798 if (pneed_issue == &my_need_issue)
799 issue_caps_set(*pneed_issue);
800 }
801
802 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
803 {
804 set<CInode*> need_issue;
805
806 for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
807 SimpleLock *lock = *p;
808 ++p;
809 // make later mksnap/setlayout (at other mds) wait for this unsafe request
810 if (lock->get_type() == CEPH_LOCK_ISNAP ||
811 lock->get_type() == CEPH_LOCK_IPOLICY)
812 continue;
813 bool ni = false;
814 rdlock_finish(lock, mut, &ni);
815 if (ni)
816 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
817 }
818
819 issue_caps_set(need_issue);
820 }
821
822 // generics
823
824 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
825 {
826 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
827 assert(!lock->is_stable());
828
829 int next = lock->get_next_state();
830
831 CInode *in = 0;
832 bool caps = lock->get_cap_shift();
833 if (lock->get_type() != CEPH_LOCK_DN)
834 in = static_cast<CInode *>(lock->get_parent());
835
836 bool need_issue = false;
837
838 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
839 assert(!caps || in != NULL);
840 if (caps && in->is_head()) {
841 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
842 lock->get_cap_shift(), lock->get_cap_mask());
843 dout(10) << " next state is " << lock->get_state_name(next)
844 << " issued/allows loner " << gcap_string(loner_issued)
845 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
846 << " xlocker " << gcap_string(xlocker_issued)
847 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
848 << " other " << gcap_string(other_issued)
849 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
850 << dendl;
851
852 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
853 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
854 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
855 need_issue = true;
856 }
857
858 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
859 bool auth = lock->get_parent()->is_auth();
860 if (!lock->is_gathering() &&
861 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
862 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
863 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
864 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
865 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
866 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
867 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
868 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
869 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
870 lock->get_state() != LOCK_MIX_SYNC2
871 ) {
872 dout(7) << "eval_gather finished gather on " << *lock
873 << " on " << *lock->get_parent() << dendl;
874
875 if (lock->get_sm() == &sm_filelock) {
876 assert(in);
877 if (in->state_test(CInode::STATE_RECOVERING)) {
878 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
879 return;
880 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
881 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
882 mds->mdcache->queue_file_recover(in);
883 mds->mdcache->do_file_recover();
884 return;
885 }
886 }
887
888 if (!lock->get_parent()->is_auth()) {
889 // replica: tell auth
890 mds_rank_t auth = lock->get_parent()->authority().first;
891
892 if (lock->get_parent()->is_rejoining() &&
893 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
894 dout(7) << "eval_gather finished gather, but still rejoining "
895 << *lock->get_parent() << dendl;
896 return;
897 }
898
899 if (!mds->is_cluster_degraded() ||
900 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
901 switch (lock->get_state()) {
902 case LOCK_SYNC_LOCK:
903 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
904 auth);
905 break;
906
907 case LOCK_MIX_SYNC:
908 {
909 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
910 lock->encode_locked_state(reply->get_data());
911 mds->send_message_mds(reply, auth);
912 next = LOCK_MIX_SYNC2;
913 (static_cast<ScatterLock *>(lock))->start_flush();
914 }
915 break;
916
917 case LOCK_MIX_SYNC2:
918 (static_cast<ScatterLock *>(lock))->finish_flush();
919 (static_cast<ScatterLock *>(lock))->clear_flushed();
920
921 case LOCK_SYNC_MIX2:
922 // do nothing, we already acked
923 break;
924
925 case LOCK_SYNC_MIX:
926 {
927 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
928 mds->send_message_mds(reply, auth);
929 next = LOCK_SYNC_MIX2;
930 }
931 break;
932
933 case LOCK_MIX_LOCK:
934 {
935 bufferlist data;
936 lock->encode_locked_state(data);
937 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
938 (static_cast<ScatterLock *>(lock))->start_flush();
939 // we'll get an AC_LOCKFLUSHED to complete
940 }
941 break;
942
943 default:
944 ceph_abort();
945 }
946 }
947 } else {
948 // auth
949
950 // once the first (local) stage of mix->lock gather complete we can
951 // gather from replicas
952 if (lock->get_state() == LOCK_MIX_LOCK &&
953 lock->get_parent()->is_replicated()) {
954 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
955 send_lock_message(lock, LOCK_AC_LOCK);
956 lock->init_gather();
957 lock->set_state(LOCK_MIX_LOCK2);
958 return;
959 }
960
961 if (lock->is_dirty() && !lock->is_flushed()) {
962 scatter_writebehind(static_cast<ScatterLock *>(lock));
963 mds->mdlog->flush();
964 return;
965 }
966 lock->clear_flushed();
967
968 switch (lock->get_state()) {
969 // to mixed
970 case LOCK_TSYN_MIX:
971 case LOCK_SYNC_MIX:
972 case LOCK_EXCL_MIX:
973 case LOCK_XSYN_MIX:
974 in->start_scatter(static_cast<ScatterLock *>(lock));
975 if (lock->get_parent()->is_replicated()) {
976 bufferlist softdata;
977 lock->encode_locked_state(softdata);
978 send_lock_message(lock, LOCK_AC_MIX, softdata);
979 }
980 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
981 break;
982
983 case LOCK_XLOCK:
984 case LOCK_XLOCKDONE:
985 if (next != LOCK_SYNC)
986 break;
987 // fall-thru
988
989 // to sync
990 case LOCK_EXCL_SYNC:
991 case LOCK_LOCK_SYNC:
992 case LOCK_MIX_SYNC:
993 case LOCK_XSYN_SYNC:
994 if (lock->get_parent()->is_replicated()) {
995 bufferlist softdata;
996 lock->encode_locked_state(softdata);
997 send_lock_message(lock, LOCK_AC_SYNC, softdata);
998 }
999 break;
1000 }
1001
1002 }
1003
1004 lock->set_state(next);
1005
1006 if (lock->get_parent()->is_auth() &&
1007 lock->is_stable())
1008 lock->get_parent()->auth_unpin(lock);
1009
1010 // drop loner before doing waiters
1011 if (caps &&
1012 in->is_head() &&
1013 in->is_auth() &&
1014 in->get_wanted_loner() != in->get_loner()) {
1015 dout(10) << " trying to drop loner" << dendl;
1016 if (in->try_drop_loner()) {
1017 dout(10) << " dropped loner" << dendl;
1018 need_issue = true;
1019 }
1020 }
1021
1022 if (pfinishers)
1023 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1024 *pfinishers);
1025 else
1026 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1027
1028 if (caps && in->is_head())
1029 need_issue = true;
1030
1031 if (lock->get_parent()->is_auth() &&
1032 lock->is_stable())
1033 try_eval(lock, &need_issue);
1034 }
1035
1036 if (need_issue) {
1037 if (pneed_issue)
1038 *pneed_issue = true;
1039 else if (in->is_head())
1040 issue_caps(in);
1041 }
1042
1043 }
1044
1045 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1046 {
1047 bool need_issue = caps_imported;
1048 list<MDSInternalContextBase*> finishers;
1049
1050 dout(10) << "eval " << mask << " " << *in << dendl;
1051
1052 // choose loner?
1053 if (in->is_auth() && in->is_head()) {
1054 client_t orig_loner = in->get_loner();
1055 if (in->choose_ideal_loner()) {
1056 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1057 need_issue = true;
1058 mask = -1;
1059 } else if (in->get_wanted_loner() != in->get_loner()) {
1060 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1061 mask = -1;
1062 }
1063 }
1064
1065 retry:
1066 if (mask & CEPH_LOCK_IFILE)
1067 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1068 if (mask & CEPH_LOCK_IAUTH)
1069 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1070 if (mask & CEPH_LOCK_ILINK)
1071 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1072 if (mask & CEPH_LOCK_IXATTR)
1073 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1074 if (mask & CEPH_LOCK_INEST)
1075 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1076 if (mask & CEPH_LOCK_IFLOCK)
1077 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1078 if (mask & CEPH_LOCK_IPOLICY)
1079 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1080
1081 // drop loner?
1082 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1083 if (in->try_drop_loner()) {
1084 need_issue = true;
1085 if (in->get_wanted_loner() >= 0) {
1086 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1087 bool ok = in->try_set_loner();
1088 assert(ok);
1089 mask = -1;
1090 goto retry;
1091 }
1092 }
1093 }
1094
1095 finish_contexts(g_ceph_context, finishers);
1096
1097 if (need_issue && in->is_head())
1098 issue_caps(in);
1099
1100 dout(10) << "eval done" << dendl;
1101 return need_issue;
1102 }
1103
1104 class C_Locker_Eval : public LockerContext {
1105 MDSCacheObject *p;
1106 int mask;
1107 public:
1108 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1109 // We are used as an MDSCacheObject waiter, so should
1110 // only be invoked by someone already holding the big lock.
1111 assert(locker->mds->mds_lock.is_locked_by_me());
1112 p->get(MDSCacheObject::PIN_PTRWAITER);
1113 }
1114 void finish(int r) override {
1115 locker->try_eval(p, mask);
1116 p->put(MDSCacheObject::PIN_PTRWAITER);
1117 }
1118 };
1119
1120 void Locker::try_eval(MDSCacheObject *p, int mask)
1121 {
1122 // unstable and ambiguous auth?
1123 if (p->is_ambiguous_auth()) {
1124 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1125 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1126 return;
1127 }
1128
1129 if (p->is_auth() && p->is_frozen()) {
1130 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1131 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1132 return;
1133 }
1134
1135 if (mask & CEPH_LOCK_DN) {
1136 assert(mask == CEPH_LOCK_DN);
1137 bool need_issue = false; // ignore this, no caps on dentries
1138 CDentry *dn = static_cast<CDentry *>(p);
1139 eval_any(&dn->lock, &need_issue);
1140 } else {
1141 CInode *in = static_cast<CInode *>(p);
1142 eval(in, mask);
1143 }
1144 }
1145
1146 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1147 {
1148 MDSCacheObject *p = lock->get_parent();
1149
1150 // unstable and ambiguous auth?
1151 if (p->is_ambiguous_auth()) {
1152 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1153 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1154 return;
1155 }
1156
1157 if (!p->is_auth()) {
1158 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1159 return;
1160 }
1161
1162 if (p->is_frozen()) {
1163 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1164 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1165 return;
1166 }
1167
1168 /*
1169 * We could have a situation like:
1170 *
1171 * - mds A authpins item on mds B
1172 * - mds B starts to freeze tree containing item
1173 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1174 * - mds B lock is unstable, sets scatter_wanted
1175 * - mds B lock stabilizes, calls try_eval.
1176 *
1177 * We can defer while freezing without causing a deadlock. Honor
1178 * scatter_wanted flag here. This will never get deferred by the
1179 * checks above due to the auth_pin held by the master.
1180 */
1181 if (lock->is_scatterlock()) {
1182 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1183 if (slock->get_scatter_wanted() &&
1184 slock->get_state() != LOCK_MIX) {
1185 scatter_mix(slock, pneed_issue);
1186 if (!lock->is_stable())
1187 return;
1188 } else if (slock->get_unscatter_wanted() &&
1189 slock->get_state() != LOCK_LOCK) {
1190 simple_lock(slock, pneed_issue);
1191 if (!lock->is_stable()) {
1192 return;
1193 }
1194 }
1195 }
1196
1197 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1198 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1199 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1200 return;
1201 }
1202
1203 eval(lock, pneed_issue);
1204 }
1205
1206 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1207 {
1208 bool need_issue = false;
1209 list<MDSInternalContextBase*> finishers;
1210
1211 // kick locks now
1212 if (!in->filelock.is_stable())
1213 eval_gather(&in->filelock, false, &need_issue, &finishers);
1214 if (!in->authlock.is_stable())
1215 eval_gather(&in->authlock, false, &need_issue, &finishers);
1216 if (!in->linklock.is_stable())
1217 eval_gather(&in->linklock, false, &need_issue, &finishers);
1218 if (!in->xattrlock.is_stable())
1219 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1220
1221 if (need_issue && in->is_head()) {
1222 if (issue_set)
1223 issue_set->insert(in);
1224 else
1225 issue_caps(in);
1226 }
1227
1228 finish_contexts(g_ceph_context, finishers);
1229 }
1230
1231 void Locker::eval_scatter_gathers(CInode *in)
1232 {
1233 bool need_issue = false;
1234 list<MDSInternalContextBase*> finishers;
1235
1236 dout(10) << "eval_scatter_gathers " << *in << dendl;
1237
1238 // kick locks now
1239 if (!in->filelock.is_stable())
1240 eval_gather(&in->filelock, false, &need_issue, &finishers);
1241 if (!in->nestlock.is_stable())
1242 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1243 if (!in->dirfragtreelock.is_stable())
1244 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1245
1246 if (need_issue && in->is_head())
1247 issue_caps(in);
1248
1249 finish_contexts(g_ceph_context, finishers);
1250 }
1251
1252 void Locker::eval(SimpleLock *lock, bool *need_issue)
1253 {
1254 switch (lock->get_type()) {
1255 case CEPH_LOCK_IFILE:
1256 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1257 case CEPH_LOCK_IDFT:
1258 case CEPH_LOCK_INEST:
1259 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1260 default:
1261 return simple_eval(lock, need_issue);
1262 }
1263 }
1264
1265
1266 // ------------------
1267 // rdlock
1268
1269 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1270 {
1271 // kick the lock
1272 if (lock->is_stable()) {
1273 if (lock->get_parent()->is_auth()) {
1274 if (lock->get_sm() == &sm_scatterlock) {
1275 // not until tempsync is fully implemented
1276 //if (lock->get_parent()->is_replicated())
1277 //scatter_tempsync((ScatterLock*)lock);
1278 //else
1279 simple_sync(lock);
1280 } else if (lock->get_sm() == &sm_filelock) {
1281 CInode *in = static_cast<CInode*>(lock->get_parent());
1282 if (lock->get_state() == LOCK_EXCL &&
1283 in->get_target_loner() >= 0 &&
1284 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1285 file_xsyn(lock);
1286 else
1287 simple_sync(lock);
1288 } else
1289 simple_sync(lock);
1290 return true;
1291 } else {
1292 // request rdlock state change from auth
1293 mds_rank_t auth = lock->get_parent()->authority().first;
1294 if (!mds->is_cluster_degraded() ||
1295 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1296 dout(10) << "requesting rdlock from auth on "
1297 << *lock << " on " << *lock->get_parent() << dendl;
1298 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1299 }
1300 return false;
1301 }
1302 }
1303 if (lock->get_type() == CEPH_LOCK_IFILE) {
1304 CInode *in = static_cast<CInode *>(lock->get_parent());
1305 if (in->state_test(CInode::STATE_RECOVERING)) {
1306 mds->mdcache->recovery_queue.prioritize(in);
1307 }
1308 }
1309
1310 return false;
1311 }
1312
1313 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1314 {
1315 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1316
1317 // can read? grab ref.
1318 if (lock->can_rdlock(client))
1319 return true;
1320
1321 _rdlock_kick(lock, false);
1322
1323 if (lock->can_rdlock(client))
1324 return true;
1325
1326 // wait!
1327 if (con) {
1328 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1329 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1330 }
1331 return false;
1332 }
1333
1334 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1335 {
1336 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1337
1338 // client may be allowed to rdlock the same item it has xlocked.
1339 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1340 if (mut->snapid != CEPH_NOSNAP)
1341 as_anon = true;
1342 client_t client = as_anon ? -1 : mut->get_client();
1343
1344 CInode *in = 0;
1345 if (lock->get_type() != CEPH_LOCK_DN)
1346 in = static_cast<CInode *>(lock->get_parent());
1347
1348 /*
1349 if (!lock->get_parent()->is_auth() &&
1350 lock->fw_rdlock_to_auth()) {
1351 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1352 return false;
1353 }
1354 */
1355
1356 while (1) {
1357 // can read? grab ref.
1358 if (lock->can_rdlock(client)) {
1359 lock->get_rdlock();
1360 mut->rdlocks.insert(lock);
1361 mut->locks.insert(lock);
1362 return true;
1363 }
1364
1365 // hmm, wait a second.
1366 if (in && !in->is_head() && in->is_auth() &&
1367 lock->get_state() == LOCK_SNAP_SYNC) {
1368 // okay, we actually need to kick the head's lock to get ourselves synced up.
1369 CInode *head = mdcache->get_inode(in->ino());
1370 assert(head);
1371 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1372 if (hlock->get_state() == LOCK_SYNC)
1373 hlock = head->get_lock(lock->get_type());
1374
1375 if (hlock->get_state() != LOCK_SYNC) {
1376 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1377 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1378 return false;
1379 // oh, check our lock again then
1380 }
1381 }
1382
1383 if (!_rdlock_kick(lock, as_anon))
1384 break;
1385 }
1386
1387 // wait!
1388 int wait_on;
1389 if (lock->get_parent()->is_auth() && lock->is_stable())
1390 wait_on = SimpleLock::WAIT_RD;
1391 else
1392 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1393 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1394 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1395 nudge_log(lock);
1396 return false;
1397 }
1398
1399 void Locker::nudge_log(SimpleLock *lock)
1400 {
1401 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1402 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1403 mds->mdlog->flush();
1404 }
1405
1406 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1407 {
1408 // drop ref
1409 lock->put_rdlock();
1410 if (mut) {
1411 mut->rdlocks.erase(lock);
1412 mut->locks.erase(lock);
1413 }
1414
1415 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1416
1417 // last one?
1418 if (!lock->is_rdlocked()) {
1419 if (!lock->is_stable())
1420 eval_gather(lock, false, pneed_issue);
1421 else if (lock->get_parent()->is_auth())
1422 try_eval(lock, pneed_issue);
1423 }
1424 }
1425
1426
1427 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1428 {
1429 dout(10) << "can_rdlock_set " << locks << dendl;
1430 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1431 if (!(*p)->can_rdlock(-1)) {
1432 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1433 return false;
1434 }
1435 return true;
1436 }
1437
1438 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1439 {
1440 dout(10) << "rdlock_try_set " << locks << dendl;
1441 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1442 if (!rdlock_try(*p, -1, NULL)) {
1443 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1444 return false;
1445 }
1446 return true;
1447 }
1448
1449 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1450 {
1451 dout(10) << "rdlock_take_set " << locks << dendl;
1452 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1453 (*p)->get_rdlock();
1454 mut->rdlocks.insert(*p);
1455 mut->locks.insert(*p);
1456 }
1457 }
1458
1459 // ------------------
1460 // wrlock
1461
1462 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1463 {
1464 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1465 lock->get_type() == CEPH_LOCK_DVERSION)
1466 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1467
1468 dout(7) << "wrlock_force on " << *lock
1469 << " on " << *lock->get_parent() << dendl;
1470 lock->get_wrlock(true);
1471 mut->wrlocks.insert(lock);
1472 mut->locks.insert(lock);
1473 }
1474
1475 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1476 {
1477 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1478 lock->get_type() == CEPH_LOCK_DVERSION)
1479 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1480
1481 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1482
1483 CInode *in = static_cast<CInode *>(lock->get_parent());
1484 client_t client = mut->get_client();
1485 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1486 (in->has_subtree_or_exporting_dirfrag() ||
1487 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1488
1489 while (1) {
1490 // wrlock?
1491 if (lock->can_wrlock(client) &&
1492 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1493 lock->get_wrlock();
1494 mut->wrlocks.insert(lock);
1495 mut->locks.insert(lock);
1496 return true;
1497 }
1498
1499 if (lock->get_type() == CEPH_LOCK_IFILE &&
1500 in->state_test(CInode::STATE_RECOVERING)) {
1501 mds->mdcache->recovery_queue.prioritize(in);
1502 }
1503
1504 if (!lock->is_stable())
1505 break;
1506
1507 if (in->is_auth()) {
1508 // don't do nested lock state change if we have dirty scatterdata and
1509 // may scatter_writebehind or start_scatter, because nowait==true implies
1510 // that the caller already has a log entry open!
1511 if (nowait && lock->is_dirty())
1512 return false;
1513
1514 if (want_scatter)
1515 scatter_mix(static_cast<ScatterLock*>(lock));
1516 else
1517 simple_lock(lock);
1518
1519 if (nowait && !lock->can_wrlock(client))
1520 return false;
1521
1522 } else {
1523 // replica.
1524 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1525 mds_rank_t auth = lock->get_parent()->authority().first;
1526 if (!mds->is_cluster_degraded() ||
1527 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1528 dout(10) << "requesting scatter from auth on "
1529 << *lock << " on " << *lock->get_parent() << dendl;
1530 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1531 }
1532 break;
1533 }
1534 }
1535
1536 if (!nowait) {
1537 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1538 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1539 nudge_log(lock);
1540 }
1541
1542 return false;
1543 }
1544
1545 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1546 {
1547 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1548 lock->get_type() == CEPH_LOCK_DVERSION)
1549 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1550
1551 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1552 lock->put_wrlock();
1553 if (mut) {
1554 mut->wrlocks.erase(lock);
1555 if (mut->remote_wrlocks.count(lock) == 0)
1556 mut->locks.erase(lock);
1557 }
1558
1559 if (!lock->is_wrlocked()) {
1560 if (!lock->is_stable())
1561 eval_gather(lock, false, pneed_issue);
1562 else if (lock->get_parent()->is_auth())
1563 try_eval(lock, pneed_issue);
1564 }
1565 }
1566
1567
1568 // remote wrlock
1569
1570 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1571 {
1572 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1573
1574 // wait for active target
1575 if (mds->is_cluster_degraded() &&
1576 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1577 dout(7) << " mds." << target << " is not active" << dendl;
1578 if (mut->more()->waiting_on_slave.empty())
1579 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1580 return;
1581 }
1582
1583 // send lock request
1584 mut->start_locking(lock, target);
1585 mut->more()->slaves.insert(target);
1586 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1587 MMDSSlaveRequest::OP_WRLOCK);
1588 r->set_lock_type(lock->get_type());
1589 lock->get_parent()->set_object_info(r->get_object_info());
1590 mds->send_message_mds(r, target);
1591
1592 assert(mut->more()->waiting_on_slave.count(target) == 0);
1593 mut->more()->waiting_on_slave.insert(target);
1594 }
1595
1596 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1597 MutationImpl *mut)
1598 {
1599 // drop ref
1600 mut->remote_wrlocks.erase(lock);
1601 if (mut->wrlocks.count(lock) == 0)
1602 mut->locks.erase(lock);
1603
1604 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1605 << " " << *lock->get_parent() << dendl;
1606 if (!mds->is_cluster_degraded() ||
1607 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1608 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1609 MMDSSlaveRequest::OP_UNWRLOCK);
1610 slavereq->set_lock_type(lock->get_type());
1611 lock->get_parent()->set_object_info(slavereq->get_object_info());
1612 mds->send_message_mds(slavereq, target);
1613 }
1614 }
1615
1616
1617 // ------------------
1618 // xlock
1619
1620 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1621 {
1622 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1623 lock->get_type() == CEPH_LOCK_DVERSION)
1624 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1625
1626 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1627 client_t client = mut->get_client();
1628
1629 // auth?
1630 if (lock->get_parent()->is_auth()) {
1631 // auth
1632 while (1) {
1633 if (lock->can_xlock(client)) {
1634 lock->set_state(LOCK_XLOCK);
1635 lock->get_xlock(mut, client);
1636 mut->xlocks.insert(lock);
1637 mut->locks.insert(lock);
1638 mut->finish_locking(lock);
1639 return true;
1640 }
1641
1642 if (lock->get_type() == CEPH_LOCK_IFILE) {
1643 CInode *in = static_cast<CInode*>(lock->get_parent());
1644 if (in->state_test(CInode::STATE_RECOVERING)) {
1645 mds->mdcache->recovery_queue.prioritize(in);
1646 }
1647 }
1648
1649 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1650 lock->get_xlock_by_client() != client ||
1651 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1652 break;
1653
1654 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1655 mut->start_locking(lock);
1656 simple_xlock(lock);
1657 } else {
1658 simple_lock(lock);
1659 }
1660 }
1661
1662 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1663 nudge_log(lock);
1664 return false;
1665 } else {
1666 // replica
1667 assert(lock->get_sm()->can_remote_xlock);
1668 assert(!mut->slave_request);
1669
1670 // wait for single auth
1671 if (lock->get_parent()->is_ambiguous_auth()) {
1672 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1673 new C_MDS_RetryRequest(mdcache, mut));
1674 return false;
1675 }
1676
1677 // wait for active auth
1678 mds_rank_t auth = lock->get_parent()->authority().first;
1679 if (mds->is_cluster_degraded() &&
1680 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1681 dout(7) << " mds." << auth << " is not active" << dendl;
1682 if (mut->more()->waiting_on_slave.empty())
1683 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1684 return false;
1685 }
1686
1687 // send lock request
1688 mut->more()->slaves.insert(auth);
1689 mut->start_locking(lock, auth);
1690 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1691 MMDSSlaveRequest::OP_XLOCK);
1692 r->set_lock_type(lock->get_type());
1693 lock->get_parent()->set_object_info(r->get_object_info());
1694 mds->send_message_mds(r, auth);
1695
1696 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1697 mut->more()->waiting_on_slave.insert(auth);
1698
1699 return false;
1700 }
1701 }
1702
1703 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1704 {
1705 assert(!lock->is_stable());
1706 if (lock->get_num_rdlocks() == 0 &&
1707 lock->get_num_wrlocks() == 0 &&
1708 !lock->is_leased() &&
1709 lock->get_state() != LOCK_XLOCKSNAP &&
1710 lock->get_type() != CEPH_LOCK_DN) {
1711 CInode *in = static_cast<CInode*>(lock->get_parent());
1712 client_t loner = in->get_target_loner();
1713 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1714 lock->set_state(LOCK_EXCL);
1715 lock->get_parent()->auth_unpin(lock);
1716 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1717 if (lock->get_cap_shift())
1718 *pneed_issue = true;
1719 if (lock->get_parent()->is_auth() &&
1720 lock->is_stable())
1721 try_eval(lock, pneed_issue);
1722 return;
1723 }
1724 }
1725 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1726 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1727 }
1728
1729 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1730 {
1731 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1732 lock->get_type() == CEPH_LOCK_DVERSION)
1733 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1734
1735 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1736
1737 client_t xlocker = lock->get_xlock_by_client();
1738
1739 // drop ref
1740 lock->put_xlock();
1741 assert(mut);
1742 mut->xlocks.erase(lock);
1743 mut->locks.erase(lock);
1744
1745 bool do_issue = false;
1746
1747 // remote xlock?
1748 if (!lock->get_parent()->is_auth()) {
1749 assert(lock->get_sm()->can_remote_xlock);
1750
1751 // tell auth
1752 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1753 mds_rank_t auth = lock->get_parent()->authority().first;
1754 if (!mds->is_cluster_degraded() ||
1755 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1756 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1757 MMDSSlaveRequest::OP_UNXLOCK);
1758 slavereq->set_lock_type(lock->get_type());
1759 lock->get_parent()->set_object_info(slavereq->get_object_info());
1760 mds->send_message_mds(slavereq, auth);
1761 }
1762 // others waiting?
1763 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1764 SimpleLock::WAIT_WR |
1765 SimpleLock::WAIT_RD, 0);
1766 } else {
1767 if (lock->get_num_xlocks() == 0) {
1768 if (lock->get_state() == LOCK_LOCK_XLOCK)
1769 lock->set_state(LOCK_XLOCKDONE);
1770 _finish_xlock(lock, xlocker, &do_issue);
1771 }
1772 }
1773
1774 if (do_issue) {
1775 CInode *in = static_cast<CInode*>(lock->get_parent());
1776 if (in->is_head()) {
1777 if (pneed_issue)
1778 *pneed_issue = true;
1779 else
1780 issue_caps(in);
1781 }
1782 }
1783 }
1784
1785 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1786 {
1787 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1788
1789 lock->put_xlock();
1790 mut->xlocks.erase(lock);
1791 mut->locks.erase(lock);
1792
1793 MDSCacheObject *p = lock->get_parent();
1794 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1795
1796 if (!lock->is_stable())
1797 lock->get_parent()->auth_unpin(lock);
1798
1799 lock->set_state(LOCK_LOCK);
1800 }
1801
1802 void Locker::xlock_import(SimpleLock *lock)
1803 {
1804 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1805 lock->get_parent()->auth_pin(lock);
1806 }
1807
1808
1809
1810 // file i/o -----------------------------------------
1811
1812 version_t Locker::issue_file_data_version(CInode *in)
1813 {
1814 dout(7) << "issue_file_data_version on " << *in << dendl;
1815 return in->inode.file_data_version;
1816 }
1817
1818 class C_Locker_FileUpdate_finish : public LockerLogContext {
1819 CInode *in;
1820 MutationRef mut;
1821 bool share_max;
1822 bool need_issue;
1823 client_t client;
1824 MClientCaps *ack;
1825 public:
1826 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1827 bool sm=false, bool ni=false, client_t c=-1,
1828 MClientCaps *ac = 0)
1829 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1830 client(c), ack(ac) {
1831 in->get(CInode::PIN_PTRWAITER);
1832 }
1833 void finish(int r) override {
1834 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1835 in->put(CInode::PIN_PTRWAITER);
1836 }
1837 };
1838
1839 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1840 client_t client, MClientCaps *ack)
1841 {
1842 dout(10) << "file_update_finish on " << *in << dendl;
1843 in->pop_and_dirty_projected_inode(mut->ls);
1844
1845 mut->apply();
1846
1847 if (ack) {
1848 Session *session = mds->get_session(client);
1849 if (session) {
1850 // "oldest flush tid" > 0 means client uses unique TID for each flush
1851 if (ack->get_oldest_flush_tid() > 0)
1852 session->add_completed_flush(ack->get_client_tid());
1853 mds->send_message_client_counted(ack, session);
1854 } else {
1855 dout(10) << " no session for client." << client << " " << *ack << dendl;
1856 ack->put();
1857 }
1858 }
1859
1860 set<CInode*> need_issue;
1861 drop_locks(mut.get(), &need_issue);
1862
1863 if (!in->is_head() && !in->client_snap_caps.empty()) {
1864 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1865 // check for snap writeback completion
1866 bool gather = false;
1867 auto p = in->client_snap_caps.begin();
1868 while (p != in->client_snap_caps.end()) {
1869 SimpleLock *lock = in->get_lock(p->first);
1870 assert(lock);
1871 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1872 << " lock " << *lock << " on " << *in << dendl;
1873 lock->put_wrlock();
1874
1875 p->second.erase(client);
1876 if (p->second.empty()) {
1877 gather = true;
1878 in->client_snap_caps.erase(p++);
1879 } else
1880 ++p;
1881 }
1882 if (gather) {
1883 if (in->client_snap_caps.empty())
1884 in->item_open_file.remove_myself();
1885 eval_cap_gather(in, &need_issue);
1886 }
1887 } else {
1888 if (issue_client_cap && need_issue.count(in) == 0) {
1889 Capability *cap = in->get_client_cap(client);
1890 if (cap && (cap->wanted() & ~cap->pending()))
1891 issue_caps(in, cap);
1892 }
1893
1894 if (share_max && in->is_auth() &&
1895 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1896 share_inode_max_size(in);
1897 }
1898 issue_caps_set(need_issue);
1899
1900 utime_t now = ceph_clock_now();
1901 mds->balancer->hit_inode(now, in, META_POP_IWR);
1902
1903 // auth unpin after issuing caps
1904 mut->cleanup();
1905 }
1906
1907 Capability* Locker::issue_new_caps(CInode *in,
1908 int mode,
1909 Session *session,
1910 SnapRealm *realm,
1911 bool is_replay)
1912 {
1913 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1914 bool is_new;
1915
1916 // if replay, try to reconnect cap, and otherwise do nothing.
1917 if (is_replay) {
1918 mds->mdcache->try_reconnect_cap(in, session);
1919 return 0;
1920 }
1921
1922 // my needs
1923 assert(session->info.inst.name.is_client());
1924 client_t my_client = session->info.inst.name.num();
1925 int my_want = ceph_caps_for_mode(mode);
1926
1927 // register a capability
1928 Capability *cap = in->get_client_cap(my_client);
1929 if (!cap) {
1930 // new cap
1931 cap = in->add_client_cap(my_client, session, realm);
1932 cap->set_wanted(my_want);
1933 cap->mark_new();
1934 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1935 is_new = true;
1936 } else {
1937 is_new = false;
1938 // make sure it wants sufficient caps
1939 if (my_want & ~cap->wanted()) {
1940 // augment wanted caps for this client
1941 cap->set_wanted(cap->wanted() | my_want);
1942 }
1943 }
1944
1945 if (in->is_auth()) {
1946 // [auth] twiddle mode?
1947 eval(in, CEPH_CAP_LOCKS);
1948
1949 if (_need_flush_mdlog(in, my_want))
1950 mds->mdlog->flush();
1951
1952 } else {
1953 // [replica] tell auth about any new caps wanted
1954 request_inode_file_caps(in);
1955 }
1956
1957 // issue caps (pot. incl new one)
1958 //issue_caps(in); // note: _eval above may have done this already...
1959
1960 // re-issue whatever we can
1961 //cap->issue(cap->pending());
1962
1963 if (is_new)
1964 cap->dec_suppress();
1965
1966 return cap;
1967 }
1968
1969
1970 void Locker::issue_caps_set(set<CInode*>& inset)
1971 {
1972 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1973 issue_caps(*p);
1974 }
1975
1976 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1977 {
1978 // allowed caps are determined by the lock mode.
1979 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1980 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
1981 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
1982
1983 client_t loner = in->get_loner();
1984 if (loner >= 0) {
1985 dout(7) << "issue_caps loner client." << loner
1986 << " allowed=" << ccap_string(loner_allowed)
1987 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1988 << ", others allowed=" << ccap_string(all_allowed)
1989 << " on " << *in << dendl;
1990 } else {
1991 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
1992 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
1993 << " on " << *in << dendl;
1994 }
1995
1996 assert(in->is_head());
1997
1998 // count conflicts with
1999 int nissued = 0;
2000
2001 // client caps
2002 map<client_t, Capability*>::iterator it;
2003 if (only_cap)
2004 it = in->client_caps.find(only_cap->get_client());
2005 else
2006 it = in->client_caps.begin();
2007 for (; it != in->client_caps.end(); ++it) {
2008 Capability *cap = it->second;
2009 if (cap->is_stale())
2010 continue;
2011
2012 // do not issue _new_ bits when size|mtime is projected
2013 int allowed;
2014 if (loner == it->first)
2015 allowed = loner_allowed;
2016 else
2017 allowed = all_allowed;
2018
2019 // add in any xlocker-only caps (for locks this client is the xlocker for)
2020 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2021
2022 Session *session = mds->get_session(it->first);
2023 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
2024 !(session && session->connection &&
2025 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
2026 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2027
2028 int pending = cap->pending();
2029 int wanted = cap->wanted();
2030
2031 dout(20) << " client." << it->first
2032 << " pending " << ccap_string(pending)
2033 << " allowed " << ccap_string(allowed)
2034 << " wanted " << ccap_string(wanted)
2035 << dendl;
2036
2037 if (!(pending & ~allowed)) {
2038 // skip if suppress or new, and not revocation
2039 if (cap->is_new() || cap->is_suppress()) {
2040 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2041 continue;
2042 }
2043 }
2044
2045 // notify clients about deleted inode, to make sure they release caps ASAP.
2046 if (in->inode.nlink == 0)
2047 wanted |= CEPH_CAP_LINK_SHARED;
2048
2049 // are there caps that the client _wants_ and can have, but aren't pending?
2050 // or do we need to revoke?
2051 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2052 (pending & ~allowed)) { // need to revoke ~allowed caps.
2053 // issue
2054 nissued++;
2055
2056 // include caps that clients generally like, while we're at it.
2057 int likes = in->get_caps_liked();
2058 int before = pending;
2059 long seq;
2060 if (pending & ~allowed)
2061 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2062 else
2063 seq = cap->issue((wanted|likes) & allowed);
2064 int after = cap->pending();
2065
2066 if (cap->is_new()) {
2067 // haven't send caps to client yet
2068 if (before & ~after)
2069 cap->confirm_receipt(seq, after);
2070 } else {
2071 dout(7) << " sending MClientCaps to client." << it->first
2072 << " seq " << cap->get_last_seq()
2073 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2074 << dendl;
2075
2076 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2077 if (op == CEPH_CAP_OP_REVOKE) {
2078 revoking_caps.push_back(&cap->item_revoking_caps);
2079 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2080 cap->set_last_revoke_stamp(ceph_clock_now());
2081 cap->reset_num_revoke_warnings();
2082 }
2083
2084 MClientCaps *m = new MClientCaps(op, in->ino(),
2085 in->find_snaprealm()->inode->ino(),
2086 cap->get_cap_id(), cap->get_last_seq(),
2087 after, wanted, 0,
2088 cap->get_mseq(),
2089 mds->get_osd_epoch_barrier());
2090 in->encode_cap_message(m, cap);
2091
2092 mds->send_message_client_counted(m, it->first);
2093 }
2094 }
2095
2096 if (only_cap)
2097 break;
2098 }
2099
2100 return (nissued == 0); // true if no re-issued, no callbacks
2101 }
2102
2103 void Locker::issue_truncate(CInode *in)
2104 {
2105 dout(7) << "issue_truncate on " << *in << dendl;
2106
2107 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2108 it != in->client_caps.end();
2109 ++it) {
2110 Capability *cap = it->second;
2111 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2112 in->ino(),
2113 in->find_snaprealm()->inode->ino(),
2114 cap->get_cap_id(), cap->get_last_seq(),
2115 cap->pending(), cap->wanted(), 0,
2116 cap->get_mseq(),
2117 mds->get_osd_epoch_barrier());
2118 in->encode_cap_message(m, cap);
2119 mds->send_message_client_counted(m, it->first);
2120 }
2121
2122 // should we increase max_size?
2123 if (in->is_auth() && in->is_file())
2124 check_inode_max_size(in);
2125 }
2126
2127
2128 void Locker::revoke_stale_caps(Capability *cap)
2129 {
2130 CInode *in = cap->get_inode();
2131 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2132 // if export succeeds, the cap will be removed. if export fails, we need to
2133 // revoke the cap if it's still stale.
2134 in->state_set(CInode::STATE_EVALSTALECAPS);
2135 return;
2136 }
2137
2138 int issued = cap->issued();
2139 if (issued & ~CEPH_CAP_PIN) {
2140 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2141 cap->revoke();
2142
2143 if (in->is_auth() &&
2144 in->inode.client_ranges.count(cap->get_client()))
2145 in->state_set(CInode::STATE_NEEDSRECOVER);
2146
2147 if (!in->filelock.is_stable()) eval_gather(&in->filelock);
2148 if (!in->linklock.is_stable()) eval_gather(&in->linklock);
2149 if (!in->authlock.is_stable()) eval_gather(&in->authlock);
2150 if (!in->xattrlock.is_stable()) eval_gather(&in->xattrlock);
2151
2152 if (in->is_auth()) {
2153 try_eval(in, CEPH_CAP_LOCKS);
2154 } else {
2155 request_inode_file_caps(in);
2156 }
2157 }
2158 }
2159
2160 void Locker::revoke_stale_caps(Session *session)
2161 {
2162 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2163
2164 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2165 Capability *cap = *p;
2166 cap->mark_stale();
2167 revoke_stale_caps(cap);
2168 }
2169 }
2170
2171 void Locker::resume_stale_caps(Session *session)
2172 {
2173 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2174
2175 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
2176 Capability *cap = *p;
2177 CInode *in = cap->get_inode();
2178 assert(in->is_head());
2179 if (cap->is_stale()) {
2180 dout(10) << " clearing stale flag on " << *in << dendl;
2181 cap->clear_stale();
2182
2183 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2184 // if export succeeds, the cap will be removed. if export fails,
2185 // we need to re-issue the cap if it's not stale.
2186 in->state_set(CInode::STATE_EVALSTALECAPS);
2187 continue;
2188 }
2189
2190 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2191 issue_caps(in, cap);
2192 }
2193 }
2194 }
2195
2196 void Locker::remove_stale_leases(Session *session)
2197 {
2198 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2199 xlist<ClientLease*>::iterator p = session->leases.begin();
2200 while (!p.end()) {
2201 ClientLease *l = *p;
2202 ++p;
2203 CDentry *parent = static_cast<CDentry*>(l->parent);
2204 dout(15) << " removing lease on " << *parent << dendl;
2205 parent->remove_client_lease(l, this);
2206 }
2207 }
2208
2209
2210 class C_MDL_RequestInodeFileCaps : public LockerContext {
2211 CInode *in;
2212 public:
2213 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2214 in->get(CInode::PIN_PTRWAITER);
2215 }
2216 void finish(int r) override {
2217 if (!in->is_auth())
2218 locker->request_inode_file_caps(in);
2219 in->put(CInode::PIN_PTRWAITER);
2220 }
2221 };
2222
2223 void Locker::request_inode_file_caps(CInode *in)
2224 {
2225 assert(!in->is_auth());
2226
2227 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2228 if (wanted != in->replica_caps_wanted) {
2229 // wait for single auth
2230 if (in->is_ambiguous_auth()) {
2231 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2232 new C_MDL_RequestInodeFileCaps(this, in));
2233 return;
2234 }
2235
2236 mds_rank_t auth = in->authority().first;
2237 if (mds->is_cluster_degraded() &&
2238 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2239 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2240 return;
2241 }
2242
2243 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2244 << " was " << ccap_string(in->replica_caps_wanted)
2245 << " on " << *in << " to mds." << auth << dendl;
2246
2247 in->replica_caps_wanted = wanted;
2248
2249 if (!mds->is_cluster_degraded() ||
2250 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2251 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2252 auth);
2253 }
2254 }
2255
2256 /* This function DOES put the passed message before returning */
2257 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2258 {
2259 // nobody should be talking to us during recovery.
2260 assert(mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
2261
2262 // ok
2263 CInode *in = mdcache->get_inode(m->get_ino());
2264 mds_rank_t from = mds_rank_t(m->get_source().num());
2265
2266 assert(in);
2267 assert(in->is_auth());
2268
2269 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2270
2271 if (m->get_caps())
2272 in->mds_caps_wanted[from] = m->get_caps();
2273 else
2274 in->mds_caps_wanted.erase(from);
2275
2276 try_eval(in, CEPH_CAP_LOCKS);
2277 m->put();
2278 }
2279
2280
2281 class C_MDL_CheckMaxSize : public LockerContext {
2282 CInode *in;
2283 uint64_t new_max_size;
2284 uint64_t newsize;
2285 utime_t mtime;
2286
2287 public:
2288 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2289 uint64_t _newsize, utime_t _mtime) :
2290 LockerContext(l), in(i),
2291 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2292 {
2293 in->get(CInode::PIN_PTRWAITER);
2294 }
2295 void finish(int r) override {
2296 if (in->is_auth())
2297 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2298 in->put(CInode::PIN_PTRWAITER);
2299 }
2300 };
2301
2302 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
2303 {
2304 uint64_t new_max = (size + 1) << 1;
2305 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2306 if (max_inc > 0) {
2307 max_inc *= pi->layout.object_size;
2308 new_max = MIN(new_max, size + max_inc);
2309 }
2310 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2311 }
2312
2313 void Locker::calc_new_client_ranges(CInode *in, uint64_t size,
2314 CInode::mempool_inode::client_range_map *new_ranges,
2315 bool *max_increased)
2316 {
2317 auto latest = in->get_projected_inode();
2318 uint64_t ms;
2319 if(latest->has_layout()) {
2320 ms = calc_new_max_size(latest, size);
2321 } else {
2322 // Layout-less directories like ~mds0/, have zero size
2323 ms = 0;
2324 }
2325
2326 // increase ranges as appropriate.
2327 // shrink to 0 if no WR|BUFFER caps issued.
2328 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2329 p != in->client_caps.end();
2330 ++p) {
2331 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2332 client_writeable_range_t& nr = (*new_ranges)[p->first];
2333 nr.range.first = 0;
2334 if (latest->client_ranges.count(p->first)) {
2335 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2336 if (ms > oldr.range.last)
2337 *max_increased = true;
2338 nr.range.last = MAX(ms, oldr.range.last);
2339 nr.follows = oldr.follows;
2340 } else {
2341 *max_increased = true;
2342 nr.range.last = ms;
2343 nr.follows = in->first - 1;
2344 }
2345 }
2346 }
2347 }
2348
2349 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2350 uint64_t new_max_size, uint64_t new_size,
2351 utime_t new_mtime)
2352 {
2353 assert(in->is_auth());
2354 assert(in->is_file());
2355
2356 CInode::mempool_inode *latest = in->get_projected_inode();
2357 CInode::mempool_inode::client_range_map new_ranges;
2358 uint64_t size = latest->size;
2359 bool update_size = new_size > 0;
2360 bool update_max = false;
2361 bool max_increased = false;
2362
2363 if (update_size) {
2364 new_size = size = MAX(size, new_size);
2365 new_mtime = MAX(new_mtime, latest->mtime);
2366 if (latest->size == new_size && latest->mtime == new_mtime)
2367 update_size = false;
2368 }
2369
2370 calc_new_client_ranges(in, max(new_max_size, size), &new_ranges, &max_increased);
2371
2372 if (max_increased || latest->client_ranges != new_ranges)
2373 update_max = true;
2374
2375 if (!update_size && !update_max) {
2376 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2377 return false;
2378 }
2379
2380 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2381 << " update_size " << update_size
2382 << " on " << *in << dendl;
2383
2384 if (in->is_frozen()) {
2385 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2386 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2387 new_max_size,
2388 new_size,
2389 new_mtime);
2390 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2391 return false;
2392 }
2393 if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2394 // lock?
2395 if (in->filelock.is_stable()) {
2396 if (in->get_target_loner() >= 0)
2397 file_excl(&in->filelock);
2398 else
2399 simple_lock(&in->filelock);
2400 }
2401 if (!in->filelock.can_wrlock(in->get_loner())) {
2402 // try again later
2403 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
2404 new_max_size,
2405 new_size,
2406 new_mtime);
2407
2408 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2409 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2410 return false;
2411 }
2412 }
2413
2414 MutationRef mut(new MutationImpl());
2415 mut->ls = mds->mdlog->get_current_segment();
2416
2417 auto &pi = in->project_inode();
2418 pi.inode.version = in->pre_dirty();
2419
2420 if (update_max) {
2421 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2422 pi.inode.client_ranges = new_ranges;
2423 }
2424
2425 if (update_size) {
2426 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2427 pi.inode.size = new_size;
2428 pi.inode.rstat.rbytes = new_size;
2429 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2430 pi.inode.mtime = new_mtime;
2431 if (new_mtime > pi.inode.ctime) {
2432 pi.inode.ctime = new_mtime;
2433 if (new_mtime > pi.inode.rstat.rctime)
2434 pi.inode.rstat.rctime = new_mtime;
2435 }
2436 }
2437
2438 // use EOpen if the file is still open; otherwise, use EUpdate.
2439 // this is just an optimization to push open files forward into
2440 // newer log segments.
2441 LogEvent *le;
2442 EMetaBlob *metablob;
2443 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2444 EOpen *eo = new EOpen(mds->mdlog);
2445 eo->add_ino(in->ino());
2446 metablob = &eo->metablob;
2447 le = eo;
2448 mut->ls->open_files.push_back(&in->item_open_file);
2449 } else {
2450 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2451 metablob = &eu->metablob;
2452 le = eu;
2453 }
2454 mds->mdlog->start_entry(le);
2455 if (update_size) { // FIXME if/when we do max_size nested accounting
2456 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2457 // no cow, here!
2458 CDentry *parent = in->get_projected_parent_dn();
2459 metablob->add_primary_dentry(parent, in, true);
2460 } else {
2461 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2462 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2463 }
2464 mds->mdlog->submit_entry(le,
2465 new C_Locker_FileUpdate_finish(this, in, mut, true));
2466 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2467 mut->auth_pin(in);
2468
2469 // make max_size _increase_ timely
2470 if (max_increased)
2471 mds->mdlog->flush();
2472
2473 return true;
2474 }
2475
2476
2477 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2478 {
2479 /*
2480 * only share if currently issued a WR cap. if client doesn't have it,
2481 * file_max doesn't matter, and the client will get it if/when they get
2482 * the cap later.
2483 */
2484 dout(10) << "share_inode_max_size on " << *in << dendl;
2485 map<client_t, Capability*>::iterator it;
2486 if (only_cap)
2487 it = in->client_caps.find(only_cap->get_client());
2488 else
2489 it = in->client_caps.begin();
2490 for (; it != in->client_caps.end(); ++it) {
2491 const client_t client = it->first;
2492 Capability *cap = it->second;
2493 if (cap->is_suppress())
2494 continue;
2495 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2496 dout(10) << "share_inode_max_size with client." << client << dendl;
2497 cap->inc_last_seq();
2498 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2499 in->ino(),
2500 in->find_snaprealm()->inode->ino(),
2501 cap->get_cap_id(), cap->get_last_seq(),
2502 cap->pending(), cap->wanted(), 0,
2503 cap->get_mseq(),
2504 mds->get_osd_epoch_barrier());
2505 in->encode_cap_message(m, cap);
2506 mds->send_message_client_counted(m, client);
2507 }
2508 if (only_cap)
2509 break;
2510 }
2511 }
2512
2513 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2514 {
2515 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2516 * pending mutations. */
2517 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2518 in->filelock.is_unstable_and_locked()) ||
2519 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2520 in->authlock.is_unstable_and_locked()) ||
2521 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2522 in->linklock.is_unstable_and_locked()) ||
2523 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2524 in->xattrlock.is_unstable_and_locked()))
2525 return true;
2526 return false;
2527 }
2528
2529 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2530 {
2531 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2532 dout(10) << " wanted " << ccap_string(cap->wanted())
2533 << " -> " << ccap_string(wanted) << dendl;
2534 cap->set_wanted(wanted);
2535 } else if (wanted & ~cap->wanted()) {
2536 dout(10) << " wanted " << ccap_string(cap->wanted())
2537 << " -> " << ccap_string(wanted)
2538 << " (added caps even though we had seq mismatch!)" << dendl;
2539 cap->set_wanted(wanted | cap->wanted());
2540 } else {
2541 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2542 << " -> " << ccap_string(wanted)
2543 << " (issue_seq " << issue_seq << " != last_issue "
2544 << cap->get_last_issue() << ")" << dendl;
2545 return;
2546 }
2547
2548 CInode *cur = cap->get_inode();
2549 if (!cur->is_auth()) {
2550 request_inode_file_caps(cur);
2551 return;
2552 }
2553
2554 if (cap->wanted() == 0) {
2555 if (cur->item_open_file.is_on_list() &&
2556 !cur->is_any_caps_wanted()) {
2557 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2558 cur->item_open_file.remove_myself();
2559 }
2560 } else {
2561 if (cur->state_test(CInode::STATE_RECOVERING) &&
2562 (cap->wanted() & (CEPH_CAP_FILE_RD |
2563 CEPH_CAP_FILE_WR))) {
2564 mds->mdcache->recovery_queue.prioritize(cur);
2565 }
2566
2567 if (!cur->item_open_file.is_on_list()) {
2568 dout(10) << " adding to open file list " << *cur << dendl;
2569 assert(cur->last == CEPH_NOSNAP);
2570 LogSegment *ls = mds->mdlog->get_current_segment();
2571 EOpen *le = new EOpen(mds->mdlog);
2572 mds->mdlog->start_entry(le);
2573 le->add_clean_inode(cur);
2574 ls->open_files.push_back(&cur->item_open_file);
2575 mds->mdlog->submit_entry(le);
2576 }
2577 }
2578 }
2579
2580
2581
2582 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2583 {
2584 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2585 for (auto p = head_in->client_need_snapflush.begin();
2586 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2587 snapid_t snapid = p->first;
2588 auto &clients = p->second;
2589 ++p; // be careful, q loop below depends on this
2590
2591 if (clients.count(client)) {
2592 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2593 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2594 assert(sin);
2595 assert(sin->first <= snapid);
2596 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2597 head_in->remove_need_snapflush(sin, snapid, client);
2598 }
2599 }
2600 }
2601
2602
2603 bool Locker::should_defer_client_cap_frozen(CInode *in)
2604 {
2605 /*
2606 * This policy needs to be AT LEAST as permissive as allowing a client request
2607 * to go forward, or else a client request can release something, the release
2608 * gets deferred, but the request gets processed and deadlocks because when the
2609 * caps can't get revoked.
2610 *
2611 * Currently, a request wait if anything locked is freezing (can't
2612 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2613 * _MUST_ be in the lock/auth_pin set.
2614 *
2615 * auth_pins==0 implies no unstable lock and not auth pinnned by
2616 * client request, otherwise continue even it's freezing.
2617 */
2618 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2619 }
2620
2621 /*
2622 * This function DOES put the passed message before returning
2623 */
2624 void Locker::handle_client_caps(MClientCaps *m)
2625 {
2626 client_t client = m->get_source().num();
2627 snapid_t follows = m->get_snap_follows();
2628 dout(7) << "handle_client_caps "
2629 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2630 << " on " << m->get_ino()
2631 << " tid " << m->get_client_tid() << " follows " << follows
2632 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2633
2634 Session *session = mds->get_session(m);
2635 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2636 if (!session) {
2637 dout(5) << " no session, dropping " << *m << dendl;
2638 m->put();
2639 return;
2640 }
2641 if (session->is_closed() ||
2642 session->is_closing() ||
2643 session->is_killing()) {
2644 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2645 m->put();
2646 return;
2647 }
2648 if (mds->is_reconnect() &&
2649 m->get_dirty() && m->get_client_tid() > 0 &&
2650 !session->have_completed_flush(m->get_client_tid())) {
2651 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2652 }
2653 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2654 return;
2655 }
2656
2657 if (m->get_client_tid() > 0 && session &&
2658 session->have_completed_flush(m->get_client_tid())) {
2659 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2660 << " for client." << client << dendl;
2661 MClientCaps *ack;
2662 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2663 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2664 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2665 } else {
2666 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2667 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2668 mds->get_osd_epoch_barrier());
2669 }
2670 ack->set_snap_follows(follows);
2671 ack->set_client_tid(m->get_client_tid());
2672 mds->send_message_client_counted(ack, m->get_connection());
2673 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2674 m->put();
2675 return;
2676 } else {
2677 // fall-thru because the message may release some caps
2678 m->clear_dirty();
2679 m->set_op(CEPH_CAP_OP_UPDATE);
2680 }
2681 }
2682
2683 // "oldest flush tid" > 0 means client uses unique TID for each flush
2684 if (m->get_oldest_flush_tid() > 0 && session) {
2685 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2686 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2687
2688 if (session->get_num_trim_flushes_warnings() > 0 &&
2689 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2690 session->reset_num_trim_flushes_warnings();
2691 } else {
2692 if (session->get_num_completed_flushes() >=
2693 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2694 session->inc_num_trim_flushes_warnings();
2695 stringstream ss;
2696 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2697 << m->get_oldest_flush_tid() << "), "
2698 << session->get_num_completed_flushes()
2699 << " completed flushes recorded in session";
2700 mds->clog->warn() << ss.str();
2701 dout(20) << __func__ << " " << ss.str() << dendl;
2702 }
2703 }
2704 }
2705
2706 CInode *head_in = mdcache->get_inode(m->get_ino());
2707 if (!head_in) {
2708 if (mds->is_clientreplay()) {
2709 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2710 << ", will try again after replayed client requests" << dendl;
2711 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2712 return;
2713 }
2714
2715 /*
2716 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2717 * Sequence of events that cause this are:
2718 * - client sends caps message to mds.a
2719 * - mds finishes subtree migration, send cap export to client
2720 * - mds trim its cache
2721 * - mds receives cap messages from client
2722 */
2723 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2724 m->put();
2725 return;
2726 }
2727
2728 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2729 // Pause RADOS operations until we see the required epoch
2730 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2731 }
2732
2733 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2734 // Record the barrier so that we will retransmit it to clients
2735 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2736 }
2737
2738 dout(10) << " head inode " << *head_in << dendl;
2739
2740 Capability *cap = 0;
2741 cap = head_in->get_client_cap(client);
2742 if (!cap) {
2743 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
2744 m->put();
2745 return;
2746 }
2747 assert(cap);
2748
2749 // freezing|frozen?
2750 if (should_defer_client_cap_frozen(head_in)) {
2751 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2752 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2753 return;
2754 }
2755 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2756 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2757 << ", dropping" << dendl;
2758 m->put();
2759 return;
2760 }
2761
2762 int op = m->get_op();
2763
2764 // flushsnap?
2765 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2766 if (!head_in->is_auth()) {
2767 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
2768 goto out;
2769 }
2770
2771 SnapRealm *realm = head_in->find_snaprealm();
2772 snapid_t snap = realm->get_snap_following(follows);
2773 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2774
2775 CInode *in = head_in;
2776 if (snap != CEPH_NOSNAP) {
2777 in = mdcache->pick_inode_snap(head_in, snap - 1);
2778 if (in != head_in)
2779 dout(10) << " snapped inode " << *in << dendl;
2780 }
2781
2782 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2783 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2784 // case we get a dup response, so whatever.)
2785 MClientCaps *ack = 0;
2786 if (m->get_dirty()) {
2787 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2788 ack->set_snap_follows(follows);
2789 ack->set_client_tid(m->get_client_tid());
2790 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2791 }
2792
2793 if (in == head_in ||
2794 (head_in->client_need_snapflush.count(snap) &&
2795 head_in->client_need_snapflush[snap].count(client))) {
2796 dout(7) << " flushsnap snap " << snap
2797 << " client." << client << " on " << *in << dendl;
2798
2799 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2800 if (in == head_in)
2801 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2802 else if (head_in->client_need_snapflush.begin()->first < snap)
2803 _do_null_snapflush(head_in, client, snap);
2804
2805 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2806
2807 if (in != head_in)
2808 head_in->remove_need_snapflush(in, snap, client);
2809 } else {
2810 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2811 if (ack)
2812 mds->send_message_client_counted(ack, m->get_connection());
2813 }
2814 goto out;
2815 }
2816
2817 if (cap->get_cap_id() != m->get_cap_id()) {
2818 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2819 } else {
2820 CInode *in = head_in;
2821 if (follows > 0) {
2822 in = mdcache->pick_inode_snap(head_in, follows);
2823 // intermediate snap inodes
2824 while (in != head_in) {
2825 assert(in->last != CEPH_NOSNAP);
2826 if (in->is_auth() && m->get_dirty()) {
2827 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2828 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2829 }
2830 in = mdcache->pick_inode_snap(head_in, in->last);
2831 }
2832 }
2833
2834 // head inode, and cap
2835 MClientCaps *ack = 0;
2836
2837 int caps = m->get_caps();
2838 if (caps & ~cap->issued()) {
2839 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2840 caps &= cap->issued();
2841 }
2842
2843 cap->confirm_receipt(m->get_seq(), caps);
2844 dout(10) << " follows " << follows
2845 << " retains " << ccap_string(m->get_caps())
2846 << " dirty " << ccap_string(m->get_dirty())
2847 << " on " << *in << dendl;
2848
2849
2850 // missing/skipped snapflush?
2851 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2852 // presently only does so when it has actual dirty metadata. But, we
2853 // set up the need_snapflush stuff based on the issued caps.
2854 // We can infer that the client WONT send a FLUSHSNAP once they have
2855 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2856 // update/release).
2857 if (!head_in->client_need_snapflush.empty()) {
2858 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2859 _do_null_snapflush(head_in, client);
2860 } else {
2861 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2862 }
2863 }
2864
2865 if (m->get_dirty() && in->is_auth()) {
2866 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2867 << " seq " << m->get_seq() << " on " << *in << dendl;
2868 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2869 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2870 ack->set_client_tid(m->get_client_tid());
2871 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2872 }
2873
2874 // filter wanted based on what we could ever give out (given auth/replica status)
2875 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2876 int new_wanted = m->get_wanted();
2877 if (new_wanted != cap->wanted()) {
2878 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
2879 // exapnding caps. make sure we aren't waiting for a log flush
2880 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2881 }
2882
2883 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2884 }
2885
2886 if (in->is_auth() &&
2887 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2888 // updated
2889 eval(in, CEPH_CAP_LOCKS);
2890
2891 if (!need_flush && (cap->wanted() & ~cap->pending()))
2892 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2893 } else {
2894 // no update, ack now.
2895 if (ack)
2896 mds->send_message_client_counted(ack, m->get_connection());
2897
2898 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2899 if (!did_issue && (cap->wanted() & ~cap->pending()))
2900 issue_caps(in, cap);
2901
2902 if (cap->get_last_seq() == 0 &&
2903 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2904 cap->issue_norevoke(cap->issued());
2905 share_inode_max_size(in, cap);
2906 }
2907 }
2908
2909 if (need_flush)
2910 mds->mdlog->flush();
2911 }
2912
2913 out:
2914 m->put();
2915 }
2916
2917
2918 class C_Locker_RetryRequestCapRelease : public LockerContext {
2919 client_t client;
2920 ceph_mds_request_release item;
2921 public:
2922 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2923 LockerContext(l), client(c), item(it) { }
2924 void finish(int r) override {
2925 string dname;
2926 MDRequestRef null_ref;
2927 locker->process_request_cap_release(null_ref, client, item, dname);
2928 }
2929 };
2930
2931 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2932 boost::string_view dname)
2933 {
2934 inodeno_t ino = (uint64_t)item.ino;
2935 uint64_t cap_id = item.cap_id;
2936 int caps = item.caps;
2937 int wanted = item.wanted;
2938 int seq = item.seq;
2939 int issue_seq = item.issue_seq;
2940 int mseq = item.mseq;
2941
2942 CInode *in = mdcache->get_inode(ino);
2943 if (!in)
2944 return;
2945
2946 if (dname.length()) {
2947 frag_t fg = in->pick_dirfrag(dname);
2948 CDir *dir = in->get_dirfrag(fg);
2949 if (dir) {
2950 CDentry *dn = dir->lookup(dname);
2951 if (dn) {
2952 ClientLease *l = dn->get_client_lease(client);
2953 if (l) {
2954 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2955 dn->remove_client_lease(l, this);
2956 } else {
2957 dout(7) << "process_cap_release client." << client
2958 << " doesn't have lease on " << *dn << dendl;
2959 }
2960 } else {
2961 dout(7) << "process_cap_release client." << client << " released lease on dn "
2962 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
2963 }
2964 }
2965 }
2966
2967 Capability *cap = in->get_client_cap(client);
2968 if (!cap)
2969 return;
2970
2971 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
2972 << (mdr ? "" : " (DEFERRED, no mdr)")
2973 << dendl;
2974
2975 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
2976 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
2977 return;
2978 }
2979
2980 if (cap->get_cap_id() != cap_id) {
2981 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
2982 return;
2983 }
2984
2985 if (should_defer_client_cap_frozen(in)) {
2986 dout(7) << " frozen, deferring" << dendl;
2987 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
2988 return;
2989 }
2990
2991 if (caps & ~cap->issued()) {
2992 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2993 caps &= cap->issued();
2994 }
2995 cap->confirm_receipt(seq, caps);
2996
2997 if (!in->client_need_snapflush.empty() &&
2998 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2999 _do_null_snapflush(in, client);
3000 }
3001
3002 adjust_cap_wanted(cap, wanted, issue_seq);
3003
3004 if (mdr)
3005 cap->inc_suppress();
3006 eval(in, CEPH_CAP_LOCKS);
3007 if (mdr)
3008 cap->dec_suppress();
3009
3010 // take note; we may need to reissue on this cap later
3011 if (mdr)
3012 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3013 }
3014
3015 class C_Locker_RetryKickIssueCaps : public LockerContext {
3016 CInode *in;
3017 client_t client;
3018 ceph_seq_t seq;
3019 public:
3020 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3021 LockerContext(l), in(i), client(c), seq(s) {
3022 in->get(CInode::PIN_PTRWAITER);
3023 }
3024 void finish(int r) override {
3025 locker->kick_issue_caps(in, client, seq);
3026 in->put(CInode::PIN_PTRWAITER);
3027 }
3028 };
3029
3030 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3031 {
3032 Capability *cap = in->get_client_cap(client);
3033 if (!cap || cap->get_last_sent() != seq)
3034 return;
3035 if (in->is_frozen()) {
3036 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3037 in->add_waiter(CInode::WAIT_UNFREEZE,
3038 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3039 return;
3040 }
3041 dout(10) << "kick_issue_caps released at current seq " << seq
3042 << ", reissuing" << dendl;
3043 issue_caps(in, cap);
3044 }
3045
3046 void Locker::kick_cap_releases(MDRequestRef& mdr)
3047 {
3048 client_t client = mdr->get_client();
3049 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3050 p != mdr->cap_releases.end();
3051 ++p) {
3052 CInode *in = mdcache->get_inode(p->first);
3053 if (!in)
3054 continue;
3055 kick_issue_caps(in, client, p->second);
3056 }
3057 }
3058
3059 /**
3060 * m and ack might be NULL, so don't dereference them unless dirty != 0
3061 */
3062 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3063 {
3064 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3065 << " follows " << follows << " snap " << snap
3066 << " on " << *in << dendl;
3067
3068 if (snap == CEPH_NOSNAP) {
3069 // hmm, i guess snap was already deleted? just ack!
3070 dout(10) << " wow, the snap following " << follows
3071 << " was already deleted. nothing to record, just ack." << dendl;
3072 if (ack)
3073 mds->send_message_client_counted(ack, m->get_connection());
3074 return;
3075 }
3076
3077 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3078 mds->mdlog->start_entry(le);
3079 MutationRef mut = new MutationImpl();
3080 mut->ls = mds->mdlog->get_current_segment();
3081
3082 // normal metadata updates that we can apply to the head as well.
3083
3084 // update xattrs?
3085 CInode::mempool_xattr_map *px = nullptr;
3086 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3087 m->xattrbl.length() &&
3088 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3089
3090 CInode::mempool_old_inode *oi = 0;
3091 if (in->is_multiversion()) {
3092 oi = in->pick_old_inode(snap);
3093 }
3094
3095 CInode::mempool_inode *i;
3096 if (oi) {
3097 dout(10) << " writing into old inode" << dendl;
3098 auto &pi = in->project_inode();
3099 pi.inode.version = in->pre_dirty();
3100 if (snap > oi->first)
3101 in->split_old_inode(snap);
3102 i = &oi->inode;
3103 if (xattrs)
3104 px = &oi->xattrs;
3105 } else {
3106 auto &pi = in->project_inode(xattrs);
3107 pi.inode.version = in->pre_dirty();
3108 i = &pi.inode;
3109 if (xattrs)
3110 px = pi.xattrs.get();
3111 }
3112
3113 _update_cap_fields(in, dirty, m, i);
3114
3115 // xattr
3116 if (xattrs) {
3117 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3118 << " len " << m->xattrbl.length() << dendl;
3119 i->xattr_version = m->head.xattr_version;
3120 bufferlist::iterator p = m->xattrbl.begin();
3121 ::decode(*px, p);
3122 }
3123
3124 {
3125 auto it = i->client_ranges.find(client);
3126 if (it != i->client_ranges.end()) {
3127 if (in->last == snap) {
3128 dout(10) << " removing client_range entirely" << dendl;
3129 i->client_ranges.erase(it);
3130 } else {
3131 dout(10) << " client_range now follows " << snap << dendl;
3132 it->second.follows = snap;
3133 }
3134 }
3135 }
3136
3137 mut->auth_pin(in);
3138 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3139 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3140
3141 // "oldest flush tid" > 0 means client uses unique TID for each flush
3142 if (ack && ack->get_oldest_flush_tid() > 0)
3143 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3144 ack->get_oldest_flush_tid());
3145
3146 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3147 client, ack));
3148 }
3149
3150 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
3151 {
3152 if (dirty == 0)
3153 return;
3154
3155 /* m must be valid if there are dirty caps */
3156 assert(m);
3157 uint64_t features = m->get_connection()->get_features();
3158
3159 if (m->get_ctime() > pi->ctime) {
3160 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3161 << " for " << *in << dendl;
3162 pi->ctime = m->get_ctime();
3163 if (m->get_ctime() > pi->rstat.rctime)
3164 pi->rstat.rctime = m->get_ctime();
3165 }
3166
3167 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3168 m->get_change_attr() > pi->change_attr) {
3169 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3170 << " for " << *in << dendl;
3171 pi->change_attr = m->get_change_attr();
3172 }
3173
3174 // file
3175 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3176 utime_t atime = m->get_atime();
3177 utime_t mtime = m->get_mtime();
3178 uint64_t size = m->get_size();
3179 version_t inline_version = m->inline_version;
3180
3181 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3182 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3183 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3184 << " for " << *in << dendl;
3185 pi->mtime = mtime;
3186 if (mtime > pi->rstat.rctime)
3187 pi->rstat.rctime = mtime;
3188 }
3189 if (in->inode.is_file() && // ONLY if regular file
3190 size > pi->size) {
3191 dout(7) << " size " << pi->size << " -> " << size
3192 << " for " << *in << dendl;
3193 pi->size = size;
3194 pi->rstat.rbytes = size;
3195 }
3196 if (in->inode.is_file() &&
3197 (dirty & CEPH_CAP_FILE_WR) &&
3198 inline_version > pi->inline_data.version) {
3199 pi->inline_data.version = inline_version;
3200 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3201 pi->inline_data.get_data() = m->inline_data;
3202 else
3203 pi->inline_data.free_data();
3204 }
3205 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3206 dout(7) << " atime " << pi->atime << " -> " << atime
3207 << " for " << *in << dendl;
3208 pi->atime = atime;
3209 }
3210 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3211 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3212 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3213 << " for " << *in << dendl;
3214 pi->time_warp_seq = m->get_time_warp_seq();
3215 }
3216 }
3217 // auth
3218 if (dirty & CEPH_CAP_AUTH_EXCL) {
3219 if (m->head.uid != pi->uid) {
3220 dout(7) << " uid " << pi->uid
3221 << " -> " << m->head.uid
3222 << " for " << *in << dendl;
3223 pi->uid = m->head.uid;
3224 }
3225 if (m->head.gid != pi->gid) {
3226 dout(7) << " gid " << pi->gid
3227 << " -> " << m->head.gid
3228 << " for " << *in << dendl;
3229 pi->gid = m->head.gid;
3230 }
3231 if (m->head.mode != pi->mode) {
3232 dout(7) << " mode " << oct << pi->mode
3233 << " -> " << m->head.mode << dec
3234 << " for " << *in << dendl;
3235 pi->mode = m->head.mode;
3236 }
3237 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3238 dout(7) << " btime " << oct << pi->btime
3239 << " -> " << m->get_btime() << dec
3240 << " for " << *in << dendl;
3241 pi->btime = m->get_btime();
3242 }
3243 }
3244 }
3245
3246 /*
3247 * update inode based on cap flush|flushsnap|wanted.
3248 * adjust max_size, if needed.
3249 * if we update, return true; otherwise, false (no updated needed).
3250 */
3251 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3252 int dirty, snapid_t follows,
3253 MClientCaps *m, MClientCaps *ack,
3254 bool *need_flush)
3255 {
3256 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3257 << " issued " << ccap_string(cap ? cap->issued() : 0)
3258 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3259 << " on " << *in << dendl;
3260 assert(in->is_auth());
3261 client_t client = m->get_source().num();
3262 CInode::mempool_inode *latest = in->get_projected_inode();
3263
3264 // increase or zero max_size?
3265 uint64_t size = m->get_size();
3266 bool change_max = false;
3267 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3268 uint64_t new_max = old_max;
3269
3270 if (in->is_file()) {
3271 bool forced_change_max = false;
3272 dout(20) << "inode is file" << dendl;
3273 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3274 dout(20) << "client has write caps; m->get_max_size="
3275 << m->get_max_size() << "; old_max=" << old_max << dendl;
3276 if (m->get_max_size() > new_max) {
3277 dout(10) << "client requests file_max " << m->get_max_size()
3278 << " > max " << old_max << dendl;
3279 change_max = true;
3280 forced_change_max = true;
3281 new_max = calc_new_max_size(latest, m->get_max_size());
3282 } else {
3283 new_max = calc_new_max_size(latest, size);
3284
3285 if (new_max > old_max)
3286 change_max = true;
3287 else
3288 new_max = old_max;
3289 }
3290 } else {
3291 if (old_max) {
3292 change_max = true;
3293 new_max = 0;
3294 }
3295 }
3296
3297 if (in->last == CEPH_NOSNAP &&
3298 change_max &&
3299 !in->filelock.can_wrlock(client) &&
3300 !in->filelock.can_force_wrlock(client)) {
3301 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3302 if (in->filelock.is_stable()) {
3303 bool need_issue = false;
3304 if (cap)
3305 cap->inc_suppress();
3306 if (in->mds_caps_wanted.empty() &&
3307 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3308 if (in->filelock.get_state() != LOCK_EXCL)
3309 file_excl(&in->filelock, &need_issue);
3310 } else
3311 simple_lock(&in->filelock, &need_issue);
3312 if (need_issue)
3313 issue_caps(in);
3314 if (cap)
3315 cap->dec_suppress();
3316 }
3317 if (!in->filelock.can_wrlock(client) &&
3318 !in->filelock.can_force_wrlock(client)) {
3319 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3320 forced_change_max ? new_max : 0,
3321 0, utime_t());
3322
3323 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3324 change_max = false;
3325 }
3326 }
3327 }
3328
3329 if (m->flockbl.length()) {
3330 int32_t num_locks;
3331 bufferlist::iterator bli = m->flockbl.begin();
3332 ::decode(num_locks, bli);
3333 for ( int i=0; i < num_locks; ++i) {
3334 ceph_filelock decoded_lock;
3335 ::decode(decoded_lock, bli);
3336 in->get_fcntl_lock_state()->held_locks.
3337 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3338 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3339 }
3340 ::decode(num_locks, bli);
3341 for ( int i=0; i < num_locks; ++i) {
3342 ceph_filelock decoded_lock;
3343 ::decode(decoded_lock, bli);
3344 in->get_flock_lock_state()->held_locks.
3345 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3346 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3347 }
3348 }
3349
3350 if (!dirty && !change_max)
3351 return false;
3352
3353 Session *session = mds->get_session(m);
3354 if (session->check_access(in, MAY_WRITE,
3355 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3356 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3357 return false;
3358 }
3359
3360 // do the update.
3361 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3362 mds->mdlog->start_entry(le);
3363
3364 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3365 m->xattrbl.length() &&
3366 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3367
3368 auto &pi = in->project_inode(xattr);
3369 pi.inode.version = in->pre_dirty();
3370
3371 MutationRef mut(new MutationImpl());
3372 mut->ls = mds->mdlog->get_current_segment();
3373
3374 _update_cap_fields(in, dirty, m, &pi.inode);
3375
3376 if (change_max) {
3377 dout(7) << " max_size " << old_max << " -> " << new_max
3378 << " for " << *in << dendl;
3379 if (new_max) {
3380 auto &cr = pi.inode.client_ranges[client];
3381 cr.range.first = 0;
3382 cr.range.last = new_max;
3383 cr.follows = in->first - 1;
3384 } else
3385 pi.inode.client_ranges.erase(client);
3386 }
3387
3388 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3389 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3390
3391 // auth
3392 if (dirty & CEPH_CAP_AUTH_EXCL)
3393 wrlock_force(&in->authlock, mut);
3394
3395 // xattrs update?
3396 if (xattr) {
3397 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3398 pi.inode.xattr_version = m->head.xattr_version;
3399 bufferlist::iterator p = m->xattrbl.begin();
3400 ::decode(*pi.xattrs, p);
3401 wrlock_force(&in->xattrlock, mut);
3402 }
3403
3404 mut->auth_pin(in);
3405 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3406 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3407
3408 // "oldest flush tid" > 0 means client uses unique TID for each flush
3409 if (ack && ack->get_oldest_flush_tid() > 0)
3410 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3411 ack->get_oldest_flush_tid());
3412
3413 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3414 change_max, !!cap,
3415 client, ack));
3416 if (need_flush && !*need_flush &&
3417 ((change_max && new_max) || // max INCREASE
3418 _need_flush_mdlog(in, dirty)))
3419 *need_flush = true;
3420
3421 return true;
3422 }
3423
3424 /* This function DOES put the passed message before returning */
3425 void Locker::handle_client_cap_release(MClientCapRelease *m)
3426 {
3427 client_t client = m->get_source().num();
3428 dout(10) << "handle_client_cap_release " << *m << dendl;
3429
3430 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3431 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3432 return;
3433 }
3434
3435 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3436 // Pause RADOS operations until we see the required epoch
3437 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3438 }
3439
3440 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3441 // Record the barrier so that we will retransmit it to clients
3442 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3443 }
3444
3445 Session *session = mds->get_session(m);
3446
3447 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3448 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3449 }
3450
3451 if (session) {
3452 session->notify_cap_release(m->caps.size());
3453 }
3454
3455 m->put();
3456 }
3457
3458 class C_Locker_RetryCapRelease : public LockerContext {
3459 client_t client;
3460 inodeno_t ino;
3461 uint64_t cap_id;
3462 ceph_seq_t migrate_seq;
3463 ceph_seq_t issue_seq;
3464 public:
3465 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3466 ceph_seq_t mseq, ceph_seq_t seq) :
3467 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3468 void finish(int r) override {
3469 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3470 }
3471 };
3472
3473 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3474 ceph_seq_t mseq, ceph_seq_t seq)
3475 {
3476 CInode *in = mdcache->get_inode(ino);
3477 if (!in) {
3478 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3479 return;
3480 }
3481 Capability *cap = in->get_client_cap(client);
3482 if (!cap) {
3483 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3484 return;
3485 }
3486
3487 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3488 if (cap->get_cap_id() != cap_id) {
3489 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3490 return;
3491 }
3492 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3493 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3494 return;
3495 }
3496 if (should_defer_client_cap_frozen(in)) {
3497 dout(7) << " freezing|frozen, deferring" << dendl;
3498 in->add_waiter(CInode::WAIT_UNFREEZE,
3499 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3500 return;
3501 }
3502 if (seq != cap->get_last_issue()) {
3503 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3504 // clean out any old revoke history
3505 cap->clean_revoke_from(seq);
3506 eval_cap_gather(in);
3507 return;
3508 }
3509 remove_client_cap(in, client);
3510 }
3511
3512 /* This function DOES put the passed message before returning */
3513
3514 void Locker::remove_client_cap(CInode *in, client_t client)
3515 {
3516 // clean out any pending snapflush state
3517 if (!in->client_need_snapflush.empty())
3518 _do_null_snapflush(in, client);
3519
3520 in->remove_client_cap(client);
3521
3522 if (in->is_auth()) {
3523 // make sure we clear out the client byte range
3524 if (in->get_projected_inode()->client_ranges.count(client) &&
3525 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3526 check_inode_max_size(in);
3527 } else {
3528 request_inode_file_caps(in);
3529 }
3530
3531 try_eval(in, CEPH_CAP_LOCKS);
3532 }
3533
3534
3535 /**
3536 * Return true if any currently revoking caps exceed the
3537 * session_timeout threshold.
3538 */
3539 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
3540 double timeout) const
3541 {
3542 xlist<Capability*>::const_iterator p = revoking.begin();
3543 if (p.end()) {
3544 // No revoking caps at the moment
3545 return false;
3546 } else {
3547 utime_t now = ceph_clock_now();
3548 utime_t age = now - (*p)->get_last_revoke_stamp();
3549 if (age <= timeout) {
3550 return false;
3551 } else {
3552 return true;
3553 }
3554 }
3555 }
3556
3557 void Locker::get_late_revoking_clients(std::list<client_t> *result,
3558 double timeout) const
3559 {
3560 if (!any_late_revoking_caps(revoking_caps, timeout)) {
3561 // Fast path: no misbehaving clients, execute in O(1)
3562 return;
3563 }
3564
3565 // Slow path: execute in O(N_clients)
3566 for (auto &p : revoking_caps_by_client) {
3567 if (any_late_revoking_caps(p.second, timeout)) {
3568 result->push_back(p.first);
3569 }
3570 }
3571 }
3572
3573 // Hard-code instead of surfacing a config settings because this is
3574 // really a hack that should go away at some point when we have better
3575 // inspection tools for getting at detailed cap state (#7316)
3576 #define MAX_WARN_CAPS 100
3577
3578 void Locker::caps_tick()
3579 {
3580 utime_t now = ceph_clock_now();
3581
3582 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3583
3584 int i = 0;
3585 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3586 Capability *cap = *p;
3587
3588 utime_t age = now - cap->get_last_revoke_stamp();
3589 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3590 if (age <= mds->mdsmap->get_session_timeout()) {
3591 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
3592 break;
3593 } else {
3594 ++i;
3595 if (i > MAX_WARN_CAPS) {
3596 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3597 << "revoking, ignoring subsequent caps" << dendl;
3598 break;
3599 }
3600 }
3601 // exponential backoff of warning intervals
3602 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
3603 cap->inc_num_revoke_warnings();
3604 stringstream ss;
3605 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3606 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3607 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3608 mds->clog->warn() << ss.str();
3609 dout(20) << __func__ << " " << ss.str() << dendl;
3610 } else {
3611 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3612 }
3613 }
3614 }
3615
3616
3617 void Locker::handle_client_lease(MClientLease *m)
3618 {
3619 dout(10) << "handle_client_lease " << *m << dendl;
3620
3621 assert(m->get_source().is_client());
3622 client_t client = m->get_source().num();
3623
3624 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3625 if (!in) {
3626 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3627 m->put();
3628 return;
3629 }
3630 CDentry *dn = 0;
3631
3632 frag_t fg = in->pick_dirfrag(m->dname);
3633 CDir *dir = in->get_dirfrag(fg);
3634 if (dir)
3635 dn = dir->lookup(m->dname);
3636 if (!dn) {
3637 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3638 m->put();
3639 return;
3640 }
3641 dout(10) << " on " << *dn << dendl;
3642
3643 // replica and lock
3644 ClientLease *l = dn->get_client_lease(client);
3645 if (!l) {
3646 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3647 m->put();
3648 return;
3649 }
3650
3651 switch (m->get_action()) {
3652 case CEPH_MDS_LEASE_REVOKE_ACK:
3653 case CEPH_MDS_LEASE_RELEASE:
3654 if (l->seq != m->get_seq()) {
3655 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3656 } else {
3657 dout(7) << "handle_client_lease client." << client
3658 << " on " << *dn << dendl;
3659 dn->remove_client_lease(l, this);
3660 }
3661 m->put();
3662 break;
3663
3664 case CEPH_MDS_LEASE_RENEW:
3665 {
3666 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3667 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3668 if (dn->lock.can_lease(client)) {
3669 int pool = 1; // fixme.. do something smart!
3670 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3671 m->h.seq = ++l->seq;
3672 m->clear_payload();
3673
3674 utime_t now = ceph_clock_now();
3675 now += mdcache->client_lease_durations[pool];
3676 mdcache->touch_client_lease(l, pool, now);
3677
3678 mds->send_message_client_counted(m, m->get_connection());
3679 }
3680 }
3681 break;
3682
3683 default:
3684 ceph_abort(); // implement me
3685 break;
3686 }
3687 }
3688
3689
3690 void Locker::issue_client_lease(CDentry *dn, client_t client,
3691 bufferlist &bl, utime_t now, Session *session)
3692 {
3693 CInode *diri = dn->get_dir()->get_inode();
3694 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3695 ((!diri->filelock.can_lease(client) &&
3696 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3697 dn->lock.can_lease(client)) {
3698 int pool = 1; // fixme.. do something smart!
3699 // issue a dentry lease
3700 ClientLease *l = dn->add_client_lease(client, session);
3701 session->touch_lease(l);
3702
3703 now += mdcache->client_lease_durations[pool];
3704 mdcache->touch_client_lease(l, pool, now);
3705
3706 LeaseStat e;
3707 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3708 e.seq = ++l->seq;
3709 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3710 ::encode(e, bl);
3711 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3712 << " on " << *dn << dendl;
3713 } else {
3714 // null lease
3715 LeaseStat e;
3716 e.mask = 0;
3717 e.seq = 0;
3718 e.duration_ms = 0;
3719 ::encode(e, bl);
3720 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3721 }
3722 }
3723
3724
3725 void Locker::revoke_client_leases(SimpleLock *lock)
3726 {
3727 int n = 0;
3728 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3729 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3730 p != dn->client_lease_map.end();
3731 ++p) {
3732 ClientLease *l = p->second;
3733
3734 n++;
3735 assert(lock->get_type() == CEPH_LOCK_DN);
3736
3737 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3738 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3739
3740 // i should also revoke the dir ICONTENT lease, if they have it!
3741 CInode *diri = dn->get_dir()->get_inode();
3742 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3743 mask,
3744 diri->ino(),
3745 diri->first, CEPH_NOSNAP,
3746 dn->get_name()),
3747 l->client);
3748 }
3749 }
3750
3751
3752
3753 // locks ----------------------------------------------------------------
3754
3755 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3756 {
3757 switch (lock_type) {
3758 case CEPH_LOCK_DN:
3759 {
3760 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3761 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3762 frag_t fg;
3763 CDir *dir = 0;
3764 CDentry *dn = 0;
3765 if (diri) {
3766 fg = diri->pick_dirfrag(info.dname);
3767 dir = diri->get_dirfrag(fg);
3768 if (dir)
3769 dn = dir->lookup(info.dname, info.snapid);
3770 }
3771 if (!dn) {
3772 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3773 return 0;
3774 }
3775 return &dn->lock;
3776 }
3777
3778 case CEPH_LOCK_IAUTH:
3779 case CEPH_LOCK_ILINK:
3780 case CEPH_LOCK_IDFT:
3781 case CEPH_LOCK_IFILE:
3782 case CEPH_LOCK_INEST:
3783 case CEPH_LOCK_IXATTR:
3784 case CEPH_LOCK_ISNAP:
3785 case CEPH_LOCK_IFLOCK:
3786 case CEPH_LOCK_IPOLICY:
3787 {
3788 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3789 if (!in) {
3790 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3791 return 0;
3792 }
3793 switch (lock_type) {
3794 case CEPH_LOCK_IAUTH: return &in->authlock;
3795 case CEPH_LOCK_ILINK: return &in->linklock;
3796 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3797 case CEPH_LOCK_IFILE: return &in->filelock;
3798 case CEPH_LOCK_INEST: return &in->nestlock;
3799 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3800 case CEPH_LOCK_ISNAP: return &in->snaplock;
3801 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3802 case CEPH_LOCK_IPOLICY: return &in->policylock;
3803 }
3804 }
3805
3806 default:
3807 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3808 ceph_abort();
3809 break;
3810 }
3811
3812 return 0;
3813 }
3814
3815 /* This function DOES put the passed message before returning */
3816 void Locker::handle_lock(MLock *m)
3817 {
3818 // nobody should be talking to us during recovery.
3819 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3820
3821 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3822 if (!lock) {
3823 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3824 m->put();
3825 return;
3826 }
3827
3828 switch (lock->get_type()) {
3829 case CEPH_LOCK_DN:
3830 case CEPH_LOCK_IAUTH:
3831 case CEPH_LOCK_ILINK:
3832 case CEPH_LOCK_ISNAP:
3833 case CEPH_LOCK_IXATTR:
3834 case CEPH_LOCK_IFLOCK:
3835 case CEPH_LOCK_IPOLICY:
3836 handle_simple_lock(lock, m);
3837 break;
3838
3839 case CEPH_LOCK_IDFT:
3840 case CEPH_LOCK_INEST:
3841 //handle_scatter_lock((ScatterLock*)lock, m);
3842 //break;
3843
3844 case CEPH_LOCK_IFILE:
3845 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3846 break;
3847
3848 default:
3849 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3850 ceph_abort();
3851 break;
3852 }
3853 }
3854
3855
3856
3857
3858
3859 // ==========================================================================
3860 // simple lock
3861
3862 /** This function may take a reference to m if it needs one, but does
3863 * not put references. */
3864 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3865 {
3866 MDSCacheObject *parent = lock->get_parent();
3867 if (parent->is_auth() &&
3868 lock->get_state() != LOCK_SYNC &&
3869 !parent->is_frozen()) {
3870 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3871 << " on " << *parent << dendl;
3872 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3873 if (lock->is_stable()) {
3874 simple_sync(lock);
3875 } else {
3876 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3877 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3878 new C_MDS_RetryMessage(mds, m->get()));
3879 }
3880 } else {
3881 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3882 << " on " << *parent << dendl;
3883 // replica should retry
3884 }
3885 }
3886
3887 /* This function DOES put the passed message before returning */
3888 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3889 {
3890 int from = m->get_asker();
3891
3892 dout(10) << "handle_simple_lock " << *m
3893 << " on " << *lock << " " << *lock->get_parent() << dendl;
3894
3895 if (mds->is_rejoin()) {
3896 if (lock->get_parent()->is_rejoining()) {
3897 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3898 << ", dropping " << *m << dendl;
3899 m->put();
3900 return;
3901 }
3902 }
3903
3904 switch (m->get_action()) {
3905 // -- replica --
3906 case LOCK_AC_SYNC:
3907 assert(lock->get_state() == LOCK_LOCK);
3908 lock->decode_locked_state(m->get_data());
3909 lock->set_state(LOCK_SYNC);
3910 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3911 break;
3912
3913 case LOCK_AC_LOCK:
3914 assert(lock->get_state() == LOCK_SYNC);
3915 lock->set_state(LOCK_SYNC_LOCK);
3916 if (lock->is_leased())
3917 revoke_client_leases(lock);
3918 eval_gather(lock, true);
3919 if (lock->is_unstable_and_locked())
3920 mds->mdlog->flush();
3921 break;
3922
3923
3924 // -- auth --
3925 case LOCK_AC_LOCKACK:
3926 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3927 lock->get_state() == LOCK_SYNC_EXCL);
3928 assert(lock->is_gathering(from));
3929 lock->remove_gather(from);
3930
3931 if (lock->is_gathering()) {
3932 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3933 << ", still gathering " << lock->get_gather_set() << dendl;
3934 } else {
3935 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3936 << ", last one" << dendl;
3937 eval_gather(lock);
3938 }
3939 break;
3940
3941 case LOCK_AC_REQRDLOCK:
3942 handle_reqrdlock(lock, m);
3943 break;
3944
3945 }
3946
3947 m->put();
3948 }
3949
3950 /* unused, currently.
3951
3952 class C_Locker_SimpleEval : public Context {
3953 Locker *locker;
3954 SimpleLock *lock;
3955 public:
3956 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
3957 void finish(int r) {
3958 locker->try_simple_eval(lock);
3959 }
3960 };
3961
3962 void Locker::try_simple_eval(SimpleLock *lock)
3963 {
3964 // unstable and ambiguous auth?
3965 if (!lock->is_stable() &&
3966 lock->get_parent()->is_ambiguous_auth()) {
3967 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
3968 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3969 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
3970 return;
3971 }
3972
3973 if (!lock->get_parent()->is_auth()) {
3974 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
3975 return;
3976 }
3977
3978 if (!lock->get_parent()->can_auth_pin()) {
3979 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
3980 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
3981 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
3982 return;
3983 }
3984
3985 if (lock->is_stable())
3986 simple_eval(lock);
3987 }
3988 */
3989
3990
3991 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
3992 {
3993 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
3994
3995 assert(lock->get_parent()->is_auth());
3996 assert(lock->is_stable());
3997
3998 if (lock->get_parent()->is_freezing_or_frozen()) {
3999 // dentry lock in unreadable state can block path traverse
4000 if ((lock->get_type() != CEPH_LOCK_DN ||
4001 lock->get_state() == LOCK_SYNC ||
4002 lock->get_parent()->is_frozen()))
4003 return;
4004 }
4005
4006 if (mdcache->is_readonly()) {
4007 if (lock->get_state() != LOCK_SYNC) {
4008 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4009 simple_sync(lock, need_issue);
4010 }
4011 return;
4012 }
4013
4014 CInode *in = 0;
4015 int wanted = 0;
4016 if (lock->get_type() != CEPH_LOCK_DN) {
4017 in = static_cast<CInode*>(lock->get_parent());
4018 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4019 }
4020
4021 // -> excl?
4022 if (lock->get_state() != LOCK_EXCL &&
4023 in && in->get_target_loner() >= 0 &&
4024 (wanted & CEPH_CAP_GEXCL)) {
4025 dout(7) << "simple_eval stable, going to excl " << *lock
4026 << " on " << *lock->get_parent() << dendl;
4027 simple_excl(lock, need_issue);
4028 }
4029
4030 // stable -> sync?
4031 else if (lock->get_state() != LOCK_SYNC &&
4032 !lock->is_wrlocked() &&
4033 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4034 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4035 dout(7) << "simple_eval stable, syncing " << *lock
4036 << " on " << *lock->get_parent() << dendl;
4037 simple_sync(lock, need_issue);
4038 }
4039 }
4040
4041
4042 // mid
4043
4044 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4045 {
4046 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4047 assert(lock->get_parent()->is_auth());
4048 assert(lock->is_stable());
4049
4050 CInode *in = 0;
4051 if (lock->get_cap_shift())
4052 in = static_cast<CInode *>(lock->get_parent());
4053
4054 int old_state = lock->get_state();
4055
4056 if (old_state != LOCK_TSYN) {
4057
4058 switch (lock->get_state()) {
4059 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4060 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4061 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4062 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4063 default: ceph_abort();
4064 }
4065
4066 int gather = 0;
4067 if (lock->is_wrlocked())
4068 gather++;
4069
4070 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4071 send_lock_message(lock, LOCK_AC_SYNC);
4072 lock->init_gather();
4073 gather++;
4074 }
4075
4076 if (in && in->is_head()) {
4077 if (in->issued_caps_need_gather(lock)) {
4078 if (need_issue)
4079 *need_issue = true;
4080 else
4081 issue_caps(in);
4082 gather++;
4083 }
4084 }
4085
4086 bool need_recover = false;
4087 if (lock->get_type() == CEPH_LOCK_IFILE) {
4088 assert(in);
4089 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4090 mds->mdcache->queue_file_recover(in);
4091 need_recover = true;
4092 gather++;
4093 }
4094 }
4095
4096 if (!gather && lock->is_dirty()) {
4097 lock->get_parent()->auth_pin(lock);
4098 scatter_writebehind(static_cast<ScatterLock*>(lock));
4099 mds->mdlog->flush();
4100 return false;
4101 }
4102
4103 if (gather) {
4104 lock->get_parent()->auth_pin(lock);
4105 if (need_recover)
4106 mds->mdcache->do_file_recover();
4107 return false;
4108 }
4109 }
4110
4111 if (lock->get_parent()->is_replicated()) { // FIXME
4112 bufferlist data;
4113 lock->encode_locked_state(data);
4114 send_lock_message(lock, LOCK_AC_SYNC, data);
4115 }
4116 lock->set_state(LOCK_SYNC);
4117 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4118 if (in && in->is_head()) {
4119 if (need_issue)
4120 *need_issue = true;
4121 else
4122 issue_caps(in);
4123 }
4124 return true;
4125 }
4126
4127 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4128 {
4129 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4130 assert(lock->get_parent()->is_auth());
4131 assert(lock->is_stable());
4132
4133 CInode *in = 0;
4134 if (lock->get_cap_shift())
4135 in = static_cast<CInode *>(lock->get_parent());
4136
4137 switch (lock->get_state()) {
4138 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4139 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4140 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4141 default: ceph_abort();
4142 }
4143
4144 int gather = 0;
4145 if (lock->is_rdlocked())
4146 gather++;
4147 if (lock->is_wrlocked())
4148 gather++;
4149
4150 if (lock->get_parent()->is_replicated() &&
4151 lock->get_state() != LOCK_LOCK_EXCL &&
4152 lock->get_state() != LOCK_XSYN_EXCL) {
4153 send_lock_message(lock, LOCK_AC_LOCK);
4154 lock->init_gather();
4155 gather++;
4156 }
4157
4158 if (in && in->is_head()) {
4159 if (in->issued_caps_need_gather(lock)) {
4160 if (need_issue)
4161 *need_issue = true;
4162 else
4163 issue_caps(in);
4164 gather++;
4165 }
4166 }
4167
4168 if (gather) {
4169 lock->get_parent()->auth_pin(lock);
4170 } else {
4171 lock->set_state(LOCK_EXCL);
4172 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4173 if (in) {
4174 if (need_issue)
4175 *need_issue = true;
4176 else
4177 issue_caps(in);
4178 }
4179 }
4180 }
4181
4182 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4183 {
4184 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4185 assert(lock->get_parent()->is_auth());
4186 assert(lock->is_stable());
4187 assert(lock->get_state() != LOCK_LOCK);
4188
4189 CInode *in = 0;
4190 if (lock->get_cap_shift())
4191 in = static_cast<CInode *>(lock->get_parent());
4192
4193 int old_state = lock->get_state();
4194
4195 switch (lock->get_state()) {
4196 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4197 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4198 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4199 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4200 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4201 break;
4202 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4203 default: ceph_abort();
4204 }
4205
4206 int gather = 0;
4207 if (lock->is_leased()) {
4208 gather++;
4209 revoke_client_leases(lock);
4210 }
4211 if (lock->is_rdlocked())
4212 gather++;
4213 if (in && in->is_head()) {
4214 if (in->issued_caps_need_gather(lock)) {
4215 if (need_issue)
4216 *need_issue = true;
4217 else
4218 issue_caps(in);
4219 gather++;
4220 }
4221 }
4222
4223 bool need_recover = false;
4224 if (lock->get_type() == CEPH_LOCK_IFILE) {
4225 assert(in);
4226 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4227 mds->mdcache->queue_file_recover(in);
4228 need_recover = true;
4229 gather++;
4230 }
4231 }
4232
4233 if (lock->get_parent()->is_replicated() &&
4234 lock->get_state() == LOCK_MIX_LOCK &&
4235 gather) {
4236 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4237 } else {
4238 // move to second stage of gather now, so we don't send the lock action later.
4239 if (lock->get_state() == LOCK_MIX_LOCK)
4240 lock->set_state(LOCK_MIX_LOCK2);
4241
4242 if (lock->get_parent()->is_replicated() &&
4243 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4244 gather++;
4245 send_lock_message(lock, LOCK_AC_LOCK);
4246 lock->init_gather();
4247 }
4248 }
4249
4250 if (!gather && lock->is_dirty()) {
4251 lock->get_parent()->auth_pin(lock);
4252 scatter_writebehind(static_cast<ScatterLock*>(lock));
4253 mds->mdlog->flush();
4254 return;
4255 }
4256
4257 if (gather) {
4258 lock->get_parent()->auth_pin(lock);
4259 if (need_recover)
4260 mds->mdcache->do_file_recover();
4261 } else {
4262 lock->set_state(LOCK_LOCK);
4263 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4264 }
4265 }
4266
4267
4268 void Locker::simple_xlock(SimpleLock *lock)
4269 {
4270 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4271 assert(lock->get_parent()->is_auth());
4272 //assert(lock->is_stable());
4273 assert(lock->get_state() != LOCK_XLOCK);
4274
4275 CInode *in = 0;
4276 if (lock->get_cap_shift())
4277 in = static_cast<CInode *>(lock->get_parent());
4278
4279 if (lock->is_stable())
4280 lock->get_parent()->auth_pin(lock);
4281
4282 switch (lock->get_state()) {
4283 case LOCK_LOCK:
4284 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4285 default: ceph_abort();
4286 }
4287
4288 int gather = 0;
4289 if (lock->is_rdlocked())
4290 gather++;
4291 if (lock->is_wrlocked())
4292 gather++;
4293
4294 if (in && in->is_head()) {
4295 if (in->issued_caps_need_gather(lock)) {
4296 issue_caps(in);
4297 gather++;
4298 }
4299 }
4300
4301 if (!gather) {
4302 lock->set_state(LOCK_PREXLOCK);
4303 //assert("shouldn't be called if we are already xlockable" == 0);
4304 }
4305 }
4306
4307
4308
4309
4310
4311 // ==========================================================================
4312 // scatter lock
4313
4314 /*
4315
4316 Some notes on scatterlocks.
4317
4318 - The scatter/gather is driven by the inode lock. The scatter always
4319 brings in the latest metadata from the fragments.
4320
4321 - When in a scattered/MIX state, fragments are only allowed to
4322 update/be written to if the accounted stat matches the inode's
4323 current version.
4324
4325 - That means, on gather, we _only_ assimilate diffs for frag metadata
4326 that match the current version, because those are the only ones
4327 written during this scatter/gather cycle. (Others didn't permit
4328 it.) We increment the version and journal this to disk.
4329
4330 - When possible, we also simultaneously update our local frag
4331 accounted stats to match.
4332
4333 - On scatter, the new inode info is broadcast to frags, both local
4334 and remote. If possible (auth and !frozen), the dirfrag auth
4335 should update the accounted state (if it isn't already up to date).
4336 Note that this may occur on both the local inode auth node and
4337 inode replicas, so there are two potential paths. If it is NOT
4338 possible, they need to mark_stale to prevent any possible writes.
4339
4340 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4341 only). Both are opportunities to update the frag accounted stats,
4342 even though only the MIX case is affected by a stale dirfrag.
4343
4344 - Because many scatter/gather cycles can potentially go by without a
4345 frag being able to update its accounted stats (due to being frozen
4346 by exports/refragments in progress), the frag may have (even very)
4347 old stat versions. That's fine. If when we do want to update it,
4348 we can update accounted_* and the version first.
4349
4350 */
4351
4352 class C_Locker_ScatterWB : public LockerLogContext {
4353 ScatterLock *lock;
4354 MutationRef mut;
4355 public:
4356 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4357 LockerLogContext(l), lock(sl), mut(m) {}
4358 void finish(int r) override {
4359 locker->scatter_writebehind_finish(lock, mut);
4360 }
4361 };
4362
4363 void Locker::scatter_writebehind(ScatterLock *lock)
4364 {
4365 CInode *in = static_cast<CInode*>(lock->get_parent());
4366 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4367
4368 // journal
4369 MutationRef mut(new MutationImpl());
4370 mut->ls = mds->mdlog->get_current_segment();
4371
4372 // forcefully take a wrlock
4373 lock->get_wrlock(true);
4374 mut->wrlocks.insert(lock);
4375 mut->locks.insert(lock);
4376
4377 in->pre_cow_old_inode(); // avoid cow mayhem
4378
4379 auto &pi = in->project_inode();
4380 pi.inode.version = in->pre_dirty();
4381
4382 in->finish_scatter_gather_update(lock->get_type());
4383 lock->start_flush();
4384
4385 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4386 mds->mdlog->start_entry(le);
4387
4388 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4389 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4390
4391 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4392
4393 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4394 }
4395
4396 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4397 {
4398 CInode *in = static_cast<CInode*>(lock->get_parent());
4399 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4400 in->pop_and_dirty_projected_inode(mut->ls);
4401
4402 lock->finish_flush();
4403
4404 // if replicas may have flushed in a mix->lock state, send another
4405 // message so they can finish_flush().
4406 if (in->is_replicated()) {
4407 switch (lock->get_state()) {
4408 case LOCK_MIX_LOCK:
4409 case LOCK_MIX_LOCK2:
4410 case LOCK_MIX_EXCL:
4411 case LOCK_MIX_TSYN:
4412 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4413 }
4414 }
4415
4416 mut->apply();
4417 drop_locks(mut.get());
4418 mut->cleanup();
4419
4420 if (lock->is_stable())
4421 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4422
4423 //scatter_eval_gather(lock);
4424 }
4425
4426 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4427 {
4428 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4429
4430 assert(lock->get_parent()->is_auth());
4431 assert(lock->is_stable());
4432
4433 if (lock->get_parent()->is_freezing_or_frozen()) {
4434 dout(20) << " freezing|frozen" << dendl;
4435 return;
4436 }
4437
4438 if (mdcache->is_readonly()) {
4439 if (lock->get_state() != LOCK_SYNC) {
4440 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4441 simple_sync(lock, need_issue);
4442 }
4443 return;
4444 }
4445
4446 if (!lock->is_rdlocked() &&
4447 lock->get_state() != LOCK_MIX &&
4448 lock->get_scatter_wanted()) {
4449 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4450 << " on " << *lock->get_parent() << dendl;
4451 scatter_mix(lock, need_issue);
4452 return;
4453 }
4454
4455 if (lock->get_type() == CEPH_LOCK_INEST) {
4456 // in general, we want to keep INEST writable at all times.
4457 if (!lock->is_rdlocked()) {
4458 if (lock->get_parent()->is_replicated()) {
4459 if (lock->get_state() != LOCK_MIX)
4460 scatter_mix(lock, need_issue);
4461 } else {
4462 if (lock->get_state() != LOCK_LOCK)
4463 simple_lock(lock, need_issue);
4464 }
4465 }
4466 return;
4467 }
4468
4469 CInode *in = static_cast<CInode*>(lock->get_parent());
4470 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4471 // i _should_ be sync.
4472 if (!lock->is_wrlocked() &&
4473 lock->get_state() != LOCK_SYNC) {
4474 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4475 simple_sync(lock, need_issue);
4476 }
4477 }
4478 }
4479
4480
4481 /*
4482 * mark a scatterlock to indicate that the dir fnode has some dirty data
4483 */
4484 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4485 {
4486 lock->mark_dirty();
4487 if (lock->get_updated_item()->is_on_list()) {
4488 dout(10) << "mark_updated_scatterlock " << *lock
4489 << " - already on list since " << lock->get_update_stamp() << dendl;
4490 } else {
4491 updated_scatterlocks.push_back(lock->get_updated_item());
4492 utime_t now = ceph_clock_now();
4493 lock->set_update_stamp(now);
4494 dout(10) << "mark_updated_scatterlock " << *lock
4495 << " - added at " << now << dendl;
4496 }
4497 }
4498
4499 /*
4500 * this is called by scatter_tick and LogSegment::try_to_trim() when
4501 * trying to flush dirty scattered data (i.e. updated fnode) back to
4502 * the inode.
4503 *
4504 * we need to lock|scatter in order to push fnode changes into the
4505 * inode.dirstat.
4506 */
4507 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4508 {
4509 CInode *p = static_cast<CInode *>(lock->get_parent());
4510
4511 if (p->is_frozen() || p->is_freezing()) {
4512 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4513 if (c)
4514 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4515 else if (lock->is_dirty())
4516 // just requeue. not ideal.. starvation prone..
4517 updated_scatterlocks.push_back(lock->get_updated_item());
4518 return;
4519 }
4520
4521 if (p->is_ambiguous_auth()) {
4522 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4523 if (c)
4524 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4525 else if (lock->is_dirty())
4526 // just requeue. not ideal.. starvation prone..
4527 updated_scatterlocks.push_back(lock->get_updated_item());
4528 return;
4529 }
4530
4531 if (p->is_auth()) {
4532 int count = 0;
4533 while (true) {
4534 if (lock->is_stable()) {
4535 // can we do it now?
4536 // (only if we're not replicated.. if we are, we really do need
4537 // to nudge the lock state!)
4538 /*
4539 actually, even if we're not replicated, we can't stay in MIX, because another mds
4540 could discover and replicate us at any time. if that happens while we're flushing,
4541 they end up in MIX but their inode has the old scatterstat version.
4542
4543 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4544 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4545 scatter_writebehind(lock);
4546 if (c)
4547 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4548 return;
4549 }
4550 */
4551
4552 if (mdcache->is_readonly()) {
4553 if (lock->get_state() != LOCK_SYNC) {
4554 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4555 simple_sync(static_cast<ScatterLock*>(lock));
4556 }
4557 break;
4558 }
4559
4560 // adjust lock state
4561 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4562 switch (lock->get_type()) {
4563 case CEPH_LOCK_IFILE:
4564 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4565 scatter_mix(static_cast<ScatterLock*>(lock));
4566 else if (lock->get_state() != LOCK_LOCK)
4567 simple_lock(static_cast<ScatterLock*>(lock));
4568 else
4569 simple_sync(static_cast<ScatterLock*>(lock));
4570 break;
4571
4572 case CEPH_LOCK_IDFT:
4573 case CEPH_LOCK_INEST:
4574 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4575 scatter_mix(lock);
4576 else if (lock->get_state() != LOCK_LOCK)
4577 simple_lock(lock);
4578 else
4579 simple_sync(lock);
4580 break;
4581 default:
4582 ceph_abort();
4583 }
4584 ++count;
4585 if (lock->is_stable() && count == 2) {
4586 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4587 // this should only realy happen when called via
4588 // handle_file_lock due to AC_NUDGE, because the rest of the
4589 // time we are replicated or have dirty data and won't get
4590 // called. bailing here avoids an infinite loop.
4591 assert(!c);
4592 break;
4593 }
4594 } else {
4595 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4596 if (c)
4597 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4598 return;
4599 }
4600 }
4601 } else {
4602 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4603 << *lock << " on " << *p << dendl;
4604 // request unscatter?
4605 mds_rank_t auth = lock->get_parent()->authority().first;
4606 if (!mds->is_cluster_degraded() ||
4607 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4608 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4609
4610 // wait...
4611 if (c)
4612 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4613
4614 // also, requeue, in case we had wrong auth or something
4615 if (lock->is_dirty())
4616 updated_scatterlocks.push_back(lock->get_updated_item());
4617 }
4618 }
4619
4620 void Locker::scatter_tick()
4621 {
4622 dout(10) << "scatter_tick" << dendl;
4623
4624 // updated
4625 utime_t now = ceph_clock_now();
4626 int n = updated_scatterlocks.size();
4627 while (!updated_scatterlocks.empty()) {
4628 ScatterLock *lock = updated_scatterlocks.front();
4629
4630 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4631
4632 if (!lock->is_dirty()) {
4633 updated_scatterlocks.pop_front();
4634 dout(10) << " removing from updated_scatterlocks "
4635 << *lock << " " << *lock->get_parent() << dendl;
4636 continue;
4637 }
4638 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4639 break;
4640 updated_scatterlocks.pop_front();
4641 scatter_nudge(lock, 0);
4642 }
4643 mds->mdlog->flush();
4644 }
4645
4646
4647 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4648 {
4649 dout(10) << "scatter_tempsync " << *lock
4650 << " on " << *lock->get_parent() << dendl;
4651 assert(lock->get_parent()->is_auth());
4652 assert(lock->is_stable());
4653
4654 assert(0 == "not fully implemented, at least not for filelock");
4655
4656 CInode *in = static_cast<CInode *>(lock->get_parent());
4657
4658 switch (lock->get_state()) {
4659 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4660 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4661 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4662 default: ceph_abort();
4663 }
4664
4665 int gather = 0;
4666 if (lock->is_wrlocked())
4667 gather++;
4668
4669 if (lock->get_cap_shift() &&
4670 in->is_head() &&
4671 in->issued_caps_need_gather(lock)) {
4672 if (need_issue)
4673 *need_issue = true;
4674 else
4675 issue_caps(in);
4676 gather++;
4677 }
4678
4679 if (lock->get_state() == LOCK_MIX_TSYN &&
4680 in->is_replicated()) {
4681 lock->init_gather();
4682 send_lock_message(lock, LOCK_AC_LOCK);
4683 gather++;
4684 }
4685
4686 if (gather) {
4687 in->auth_pin(lock);
4688 } else {
4689 // do tempsync
4690 lock->set_state(LOCK_TSYN);
4691 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4692 if (lock->get_cap_shift()) {
4693 if (need_issue)
4694 *need_issue = true;
4695 else
4696 issue_caps(in);
4697 }
4698 }
4699 }
4700
4701
4702
4703 // ==========================================================================
4704 // local lock
4705
4706 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4707 {
4708 dout(7) << "local_wrlock_grab on " << *lock
4709 << " on " << *lock->get_parent() << dendl;
4710
4711 assert(lock->get_parent()->is_auth());
4712 assert(lock->can_wrlock());
4713 assert(!mut->wrlocks.count(lock));
4714 lock->get_wrlock(mut->get_client());
4715 mut->wrlocks.insert(lock);
4716 mut->locks.insert(lock);
4717 }
4718
4719 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4720 {
4721 dout(7) << "local_wrlock_start on " << *lock
4722 << " on " << *lock->get_parent() << dendl;
4723
4724 assert(lock->get_parent()->is_auth());
4725 if (lock->can_wrlock()) {
4726 assert(!mut->wrlocks.count(lock));
4727 lock->get_wrlock(mut->get_client());
4728 mut->wrlocks.insert(lock);
4729 mut->locks.insert(lock);
4730 return true;
4731 } else {
4732 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4733 return false;
4734 }
4735 }
4736
4737 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4738 {
4739 dout(7) << "local_wrlock_finish on " << *lock
4740 << " on " << *lock->get_parent() << dendl;
4741 lock->put_wrlock();
4742 mut->wrlocks.erase(lock);
4743 mut->locks.erase(lock);
4744 if (lock->get_num_wrlocks() == 0) {
4745 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4746 SimpleLock::WAIT_WR |
4747 SimpleLock::WAIT_RD);
4748 }
4749 }
4750
4751 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4752 {
4753 dout(7) << "local_xlock_start on " << *lock
4754 << " on " << *lock->get_parent() << dendl;
4755
4756 assert(lock->get_parent()->is_auth());
4757 if (!lock->can_xlock_local()) {
4758 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4759 return false;
4760 }
4761
4762 lock->get_xlock(mut, mut->get_client());
4763 mut->xlocks.insert(lock);
4764 mut->locks.insert(lock);
4765 return true;
4766 }
4767
4768 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4769 {
4770 dout(7) << "local_xlock_finish on " << *lock
4771 << " on " << *lock->get_parent() << dendl;
4772 lock->put_xlock();
4773 mut->xlocks.erase(lock);
4774 mut->locks.erase(lock);
4775
4776 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4777 SimpleLock::WAIT_WR |
4778 SimpleLock::WAIT_RD);
4779 }
4780
4781
4782
4783 // ==========================================================================
4784 // file lock
4785
4786
4787 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4788 {
4789 CInode *in = static_cast<CInode*>(lock->get_parent());
4790 int loner_wanted, other_wanted;
4791 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4792 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4793 << " loner_wanted=" << gcap_string(loner_wanted)
4794 << " other_wanted=" << gcap_string(other_wanted)
4795 << " filelock=" << *lock << " on " << *lock->get_parent()
4796 << dendl;
4797
4798 assert(lock->get_parent()->is_auth());
4799 assert(lock->is_stable());
4800
4801 if (lock->get_parent()->is_freezing_or_frozen())
4802 return;
4803
4804 if (mdcache->is_readonly()) {
4805 if (lock->get_state() != LOCK_SYNC) {
4806 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4807 simple_sync(lock, need_issue);
4808 }
4809 return;
4810 }
4811
4812 // excl -> *?
4813 if (lock->get_state() == LOCK_EXCL) {
4814 dout(20) << " is excl" << dendl;
4815 int loner_issued, other_issued, xlocker_issued;
4816 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4817 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4818 << " other_issued=" << gcap_string(other_issued)
4819 << " xlocker_issued=" << gcap_string(xlocker_issued)
4820 << dendl;
4821 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4822 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4823 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4824 dout(20) << " should lose it" << dendl;
4825 // we should lose it.
4826 // loner other want
4827 // R R SYNC
4828 // R R|W MIX
4829 // R W MIX
4830 // R|W R MIX
4831 // R|W R|W MIX
4832 // R|W W MIX
4833 // W R MIX
4834 // W R|W MIX
4835 // W W MIX
4836 // -> any writer means MIX; RD doesn't matter.
4837 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4838 lock->is_waiter_for(SimpleLock::WAIT_WR))
4839 scatter_mix(lock, need_issue);
4840 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4841 simple_sync(lock, need_issue);
4842 else
4843 dout(10) << " waiting for wrlock to drain" << dendl;
4844 }
4845 }
4846
4847 // * -> excl?
4848 else if (lock->get_state() != LOCK_EXCL &&
4849 !lock->is_rdlocked() &&
4850 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4851 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4852 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4853 in->get_target_loner() >= 0) {
4854 dout(7) << "file_eval stable, bump to loner " << *lock
4855 << " on " << *lock->get_parent() << dendl;
4856 file_excl(lock, need_issue);
4857 }
4858
4859 // * -> mixed?
4860 else if (lock->get_state() != LOCK_MIX &&
4861 !lock->is_rdlocked() &&
4862 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4863 (lock->get_scatter_wanted() ||
4864 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4865 dout(7) << "file_eval stable, bump to mixed " << *lock
4866 << " on " << *lock->get_parent() << dendl;
4867 scatter_mix(lock, need_issue);
4868 }
4869
4870 // * -> sync?
4871 else if (lock->get_state() != LOCK_SYNC &&
4872 !lock->is_wrlocked() && // drain wrlocks first!
4873 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4874 !(wanted & CEPH_CAP_GWR) &&
4875 !((lock->get_state() == LOCK_MIX) &&
4876 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4877 //((wanted & CEPH_CAP_RD) ||
4878 //in->is_replicated() ||
4879 //lock->is_leased() ||
4880 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4881 ) {
4882 dout(7) << "file_eval stable, bump to sync " << *lock
4883 << " on " << *lock->get_parent() << dendl;
4884 simple_sync(lock, need_issue);
4885 }
4886 }
4887
4888
4889
4890 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4891 {
4892 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4893
4894 CInode *in = static_cast<CInode*>(lock->get_parent());
4895 assert(in->is_auth());
4896 assert(lock->is_stable());
4897
4898 if (lock->get_state() == LOCK_LOCK) {
4899 in->start_scatter(lock);
4900 if (in->is_replicated()) {
4901 // data
4902 bufferlist softdata;
4903 lock->encode_locked_state(softdata);
4904
4905 // bcast to replicas
4906 send_lock_message(lock, LOCK_AC_MIX, softdata);
4907 }
4908
4909 // change lock
4910 lock->set_state(LOCK_MIX);
4911 lock->clear_scatter_wanted();
4912 if (lock->get_cap_shift()) {
4913 if (need_issue)
4914 *need_issue = true;
4915 else
4916 issue_caps(in);
4917 }
4918 } else {
4919 // gather?
4920 switch (lock->get_state()) {
4921 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4922 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4923 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
4924 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4925 default: ceph_abort();
4926 }
4927
4928 int gather = 0;
4929 if (lock->is_rdlocked())
4930 gather++;
4931 if (in->is_replicated()) {
4932 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
4933 send_lock_message(lock, LOCK_AC_MIX);
4934 lock->init_gather();
4935 gather++;
4936 }
4937 }
4938 if (lock->is_leased()) {
4939 revoke_client_leases(lock);
4940 gather++;
4941 }
4942 if (lock->get_cap_shift() &&
4943 in->is_head() &&
4944 in->issued_caps_need_gather(lock)) {
4945 if (need_issue)
4946 *need_issue = true;
4947 else
4948 issue_caps(in);
4949 gather++;
4950 }
4951 bool need_recover = false;
4952 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4953 mds->mdcache->queue_file_recover(in);
4954 need_recover = true;
4955 gather++;
4956 }
4957
4958 if (gather) {
4959 lock->get_parent()->auth_pin(lock);
4960 if (need_recover)
4961 mds->mdcache->do_file_recover();
4962 } else {
4963 in->start_scatter(lock);
4964 lock->set_state(LOCK_MIX);
4965 lock->clear_scatter_wanted();
4966 if (in->is_replicated()) {
4967 bufferlist softdata;
4968 lock->encode_locked_state(softdata);
4969 send_lock_message(lock, LOCK_AC_MIX, softdata);
4970 }
4971 if (lock->get_cap_shift()) {
4972 if (need_issue)
4973 *need_issue = true;
4974 else
4975 issue_caps(in);
4976 }
4977 }
4978 }
4979 }
4980
4981
4982 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
4983 {
4984 CInode *in = static_cast<CInode*>(lock->get_parent());
4985 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
4986
4987 assert(in->is_auth());
4988 assert(lock->is_stable());
4989
4990 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
4991 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
4992
4993 switch (lock->get_state()) {
4994 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4995 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
4996 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4997 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4998 default: ceph_abort();
4999 }
5000 int gather = 0;
5001
5002 if (lock->is_rdlocked())
5003 gather++;
5004 if (lock->is_wrlocked())
5005 gather++;
5006
5007 if (in->is_replicated() &&
5008 lock->get_state() != LOCK_LOCK_EXCL &&
5009 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5010 send_lock_message(lock, LOCK_AC_LOCK);
5011 lock->init_gather();
5012 gather++;
5013 }
5014 if (lock->is_leased()) {
5015 revoke_client_leases(lock);
5016 gather++;
5017 }
5018 if (in->is_head() &&
5019 in->issued_caps_need_gather(lock)) {
5020 if (need_issue)
5021 *need_issue = true;
5022 else
5023 issue_caps(in);
5024 gather++;
5025 }
5026 bool need_recover = false;
5027 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5028 mds->mdcache->queue_file_recover(in);
5029 need_recover = true;
5030 gather++;
5031 }
5032
5033 if (gather) {
5034 lock->get_parent()->auth_pin(lock);
5035 if (need_recover)
5036 mds->mdcache->do_file_recover();
5037 } else {
5038 lock->set_state(LOCK_EXCL);
5039 if (need_issue)
5040 *need_issue = true;
5041 else
5042 issue_caps(in);
5043 }
5044 }
5045
5046 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5047 {
5048 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5049 CInode *in = static_cast<CInode *>(lock->get_parent());
5050 assert(in->is_auth());
5051 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5052
5053 switch (lock->get_state()) {
5054 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5055 default: ceph_abort();
5056 }
5057
5058 int gather = 0;
5059 if (lock->is_wrlocked())
5060 gather++;
5061
5062 if (in->is_head() &&
5063 in->issued_caps_need_gather(lock)) {
5064 if (need_issue)
5065 *need_issue = true;
5066 else
5067 issue_caps(in);
5068 gather++;
5069 }
5070
5071 if (gather) {
5072 lock->get_parent()->auth_pin(lock);
5073 } else {
5074 lock->set_state(LOCK_XSYN);
5075 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5076 if (need_issue)
5077 *need_issue = true;
5078 else
5079 issue_caps(in);
5080 }
5081 }
5082
5083 void Locker::file_recover(ScatterLock *lock)
5084 {
5085 CInode *in = static_cast<CInode *>(lock->get_parent());
5086 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5087
5088 assert(in->is_auth());
5089 //assert(lock->is_stable());
5090 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5091
5092 int gather = 0;
5093
5094 /*
5095 if (in->is_replicated()
5096 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5097 send_lock_message(lock, LOCK_AC_LOCK);
5098 lock->init_gather();
5099 gather++;
5100 }
5101 */
5102 if (in->is_head() &&
5103 in->issued_caps_need_gather(lock)) {
5104 issue_caps(in);
5105 gather++;
5106 }
5107
5108 lock->set_state(LOCK_SCAN);
5109 if (gather)
5110 in->state_set(CInode::STATE_NEEDSRECOVER);
5111 else
5112 mds->mdcache->queue_file_recover(in);
5113 }
5114
5115
5116 // messenger
5117 /* This function DOES put the passed message before returning */
5118 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5119 {
5120 CInode *in = static_cast<CInode*>(lock->get_parent());
5121 int from = m->get_asker();
5122
5123 if (mds->is_rejoin()) {
5124 if (in->is_rejoining()) {
5125 dout(7) << "handle_file_lock still rejoining " << *in
5126 << ", dropping " << *m << dendl;
5127 m->put();
5128 return;
5129 }
5130 }
5131
5132 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5133 << " on " << *lock
5134 << " from mds." << from << " "
5135 << *in << dendl;
5136
5137 bool caps = lock->get_cap_shift();
5138
5139 switch (m->get_action()) {
5140 // -- replica --
5141 case LOCK_AC_SYNC:
5142 assert(lock->get_state() == LOCK_LOCK ||
5143 lock->get_state() == LOCK_MIX ||
5144 lock->get_state() == LOCK_MIX_SYNC2);
5145
5146 if (lock->get_state() == LOCK_MIX) {
5147 lock->set_state(LOCK_MIX_SYNC);
5148 eval_gather(lock, true);
5149 if (lock->is_unstable_and_locked())
5150 mds->mdlog->flush();
5151 break;
5152 }
5153
5154 (static_cast<ScatterLock *>(lock))->finish_flush();
5155 (static_cast<ScatterLock *>(lock))->clear_flushed();
5156
5157 // ok
5158 lock->decode_locked_state(m->get_data());
5159 lock->set_state(LOCK_SYNC);
5160
5161 lock->get_rdlock();
5162 if (caps)
5163 issue_caps(in);
5164 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5165 lock->put_rdlock();
5166 break;
5167
5168 case LOCK_AC_LOCK:
5169 switch (lock->get_state()) {
5170 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5171 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5172 default: ceph_abort();
5173 }
5174
5175 eval_gather(lock, true);
5176 if (lock->is_unstable_and_locked())
5177 mds->mdlog->flush();
5178
5179 break;
5180
5181 case LOCK_AC_LOCKFLUSHED:
5182 (static_cast<ScatterLock *>(lock))->finish_flush();
5183 (static_cast<ScatterLock *>(lock))->clear_flushed();
5184 // wake up scatter_nudge waiters
5185 if (lock->is_stable())
5186 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5187 break;
5188
5189 case LOCK_AC_MIX:
5190 assert(lock->get_state() == LOCK_SYNC ||
5191 lock->get_state() == LOCK_LOCK ||
5192 lock->get_state() == LOCK_SYNC_MIX2);
5193
5194 if (lock->get_state() == LOCK_SYNC) {
5195 // MIXED
5196 lock->set_state(LOCK_SYNC_MIX);
5197 eval_gather(lock, true);
5198 if (lock->is_unstable_and_locked())
5199 mds->mdlog->flush();
5200 break;
5201 }
5202
5203 // ok
5204 lock->set_state(LOCK_MIX);
5205 lock->decode_locked_state(m->get_data());
5206
5207 if (caps)
5208 issue_caps(in);
5209
5210 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5211 break;
5212
5213
5214 // -- auth --
5215 case LOCK_AC_LOCKACK:
5216 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5217 lock->get_state() == LOCK_MIX_LOCK ||
5218 lock->get_state() == LOCK_MIX_LOCK2 ||
5219 lock->get_state() == LOCK_MIX_EXCL ||
5220 lock->get_state() == LOCK_SYNC_EXCL ||
5221 lock->get_state() == LOCK_SYNC_MIX ||
5222 lock->get_state() == LOCK_MIX_TSYN);
5223 assert(lock->is_gathering(from));
5224 lock->remove_gather(from);
5225
5226 if (lock->get_state() == LOCK_MIX_LOCK ||
5227 lock->get_state() == LOCK_MIX_LOCK2 ||
5228 lock->get_state() == LOCK_MIX_EXCL ||
5229 lock->get_state() == LOCK_MIX_TSYN) {
5230 lock->decode_locked_state(m->get_data());
5231 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5232 // delay calling scatter_writebehind().
5233 lock->clear_flushed();
5234 }
5235
5236 if (lock->is_gathering()) {
5237 dout(7) << "handle_file_lock " << *in << " from " << from
5238 << ", still gathering " << lock->get_gather_set() << dendl;
5239 } else {
5240 dout(7) << "handle_file_lock " << *in << " from " << from
5241 << ", last one" << dendl;
5242 eval_gather(lock);
5243 }
5244 break;
5245
5246 case LOCK_AC_SYNCACK:
5247 assert(lock->get_state() == LOCK_MIX_SYNC);
5248 assert(lock->is_gathering(from));
5249 lock->remove_gather(from);
5250
5251 lock->decode_locked_state(m->get_data());
5252
5253 if (lock->is_gathering()) {
5254 dout(7) << "handle_file_lock " << *in << " from " << from
5255 << ", still gathering " << lock->get_gather_set() << dendl;
5256 } else {
5257 dout(7) << "handle_file_lock " << *in << " from " << from
5258 << ", last one" << dendl;
5259 eval_gather(lock);
5260 }
5261 break;
5262
5263 case LOCK_AC_MIXACK:
5264 assert(lock->get_state() == LOCK_SYNC_MIX);
5265 assert(lock->is_gathering(from));
5266 lock->remove_gather(from);
5267
5268 if (lock->is_gathering()) {
5269 dout(7) << "handle_file_lock " << *in << " from " << from
5270 << ", still gathering " << lock->get_gather_set() << dendl;
5271 } else {
5272 dout(7) << "handle_file_lock " << *in << " from " << from
5273 << ", last one" << dendl;
5274 eval_gather(lock);
5275 }
5276 break;
5277
5278
5279 // requests....
5280 case LOCK_AC_REQSCATTER:
5281 if (lock->is_stable()) {
5282 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5283 * because the replica should be holding an auth_pin if they're
5284 * doing this (and thus, we are freezing, not frozen, and indefinite
5285 * starvation isn't an issue).
5286 */
5287 dout(7) << "handle_file_lock got scatter request on " << *lock
5288 << " on " << *lock->get_parent() << dendl;
5289 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5290 scatter_mix(lock);
5291 } else {
5292 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5293 << " on " << *lock->get_parent() << dendl;
5294 lock->set_scatter_wanted();
5295 }
5296 break;
5297
5298 case LOCK_AC_REQUNSCATTER:
5299 if (lock->is_stable()) {
5300 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5301 * because the replica should be holding an auth_pin if they're
5302 * doing this (and thus, we are freezing, not frozen, and indefinite
5303 * starvation isn't an issue).
5304 */
5305 dout(7) << "handle_file_lock got unscatter request on " << *lock
5306 << " on " << *lock->get_parent() << dendl;
5307 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5308 simple_lock(lock); // FIXME tempsync?
5309 } else {
5310 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5311 << " on " << *lock->get_parent() << dendl;
5312 lock->set_unscatter_wanted();
5313 }
5314 break;
5315
5316 case LOCK_AC_REQRDLOCK:
5317 handle_reqrdlock(lock, m);
5318 break;
5319
5320 case LOCK_AC_NUDGE:
5321 if (!lock->get_parent()->is_auth()) {
5322 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5323 << " on " << *lock->get_parent() << dendl;
5324 } else if (!lock->get_parent()->is_replicated()) {
5325 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5326 << " on " << *lock->get_parent() << dendl;
5327 } else {
5328 dout(7) << "handle_file_lock trying nudge on " << *lock
5329 << " on " << *lock->get_parent() << dendl;
5330 scatter_nudge(lock, 0, true);
5331 mds->mdlog->flush();
5332 }
5333 break;
5334
5335 default:
5336 ceph_abort();
5337 }
5338
5339 m->put();
5340 }
5341
5342
5343
5344
5345
5346