]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/Locker.cc
b2b4c21dc45cd936c539cf5c0cb099122e70e6f1
[ceph.git] / ceph / src / mds / Locker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "MDSRank.h"
18 #include "MDCache.h"
19 #include "Locker.h"
20 #include "MDBalancer.h"
21 #include "Migrator.h"
22 #include "CInode.h"
23 #include "CDir.h"
24 #include "CDentry.h"
25 #include "Mutation.h"
26 #include "MDSContext.h"
27
28 #include "MDLog.h"
29 #include "MDSMap.h"
30
31 #include "events/EUpdate.h"
32 #include "events/EOpen.h"
33
34 #include "msg/Messenger.h"
35 #include "osdc/Objecter.h"
36
37 #include "messages/MInodeFileCaps.h"
38 #include "messages/MLock.h"
39 #include "messages/MClientLease.h"
40 #include "messages/MClientReply.h"
41 #include "messages/MClientCaps.h"
42 #include "messages/MClientCapRelease.h"
43
44 #include "messages/MMDSSlaveRequest.h"
45
46 #include <errno.h>
47
48 #include "common/config.h"
49
50
51 #define dout_subsys ceph_subsys_mds
52 #undef dout_prefix
53 #define dout_context g_ceph_context
54 #define dout_prefix _prefix(_dout, mds)
55 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
56 return *_dout << "mds." << mds->get_nodeid() << ".locker ";
57 }
58
59
60 class LockerContext : public MDSInternalContextBase {
61 protected:
62 Locker *locker;
63 MDSRank *get_mds() override
64 {
65 return locker->mds;
66 }
67
68 public:
69 explicit LockerContext(Locker *locker_) : locker(locker_) {
70 assert(locker != NULL);
71 }
72 };
73
74 class LockerLogContext : public MDSLogContextBase {
75 protected:
76 Locker *locker;
77 MDSRank *get_mds() override
78 {
79 return locker->mds;
80 }
81
82 public:
83 explicit LockerLogContext(Locker *locker_) : locker(locker_) {
84 assert(locker != NULL);
85 }
86 };
87
88 /* This function DOES put the passed message before returning */
89 void Locker::dispatch(Message *m)
90 {
91
92 switch (m->get_type()) {
93
94 // inter-mds locking
95 case MSG_MDS_LOCK:
96 handle_lock(static_cast<MLock*>(m));
97 break;
98 // inter-mds caps
99 case MSG_MDS_INODEFILECAPS:
100 handle_inode_file_caps(static_cast<MInodeFileCaps*>(m));
101 break;
102
103 // client sync
104 case CEPH_MSG_CLIENT_CAPS:
105 handle_client_caps(static_cast<MClientCaps*>(m));
106
107 break;
108 case CEPH_MSG_CLIENT_CAPRELEASE:
109 handle_client_cap_release(static_cast<MClientCapRelease*>(m));
110 break;
111 case CEPH_MSG_CLIENT_LEASE:
112 handle_client_lease(static_cast<MClientLease*>(m));
113 break;
114
115 default:
116 derr << "locker unknown message " << m->get_type() << dendl;
117 assert(0 == "locker unknown message");
118 }
119 }
120
121 void Locker::tick()
122 {
123 scatter_tick();
124 caps_tick();
125 }
126
127 /*
128 * locks vs rejoin
129 *
130 *
131 *
132 */
133
134 void Locker::send_lock_message(SimpleLock *lock, int msg)
135 {
136 for (const auto &it : lock->get_parent()->get_replicas()) {
137 if (mds->is_cluster_degraded() &&
138 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
139 continue;
140 MLock *m = new MLock(lock, msg, mds->get_nodeid());
141 mds->send_message_mds(m, it.first);
142 }
143 }
144
145 void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data)
146 {
147 for (const auto &it : lock->get_parent()->get_replicas()) {
148 if (mds->is_cluster_degraded() &&
149 mds->mdsmap->get_state(it.first) < MDSMap::STATE_REJOIN)
150 continue;
151 MLock *m = new MLock(lock, msg, mds->get_nodeid());
152 m->set_data(data);
153 mds->send_message_mds(m, it.first);
154 }
155 }
156
157
158
159
160 void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
161 {
162 // rdlock ancestor snaps
163 CInode *t = in;
164 rdlocks.insert(&in->snaplock);
165 while (t->get_projected_parent_dn()) {
166 t = t->get_projected_parent_dn()->get_dir()->get_inode();
167 rdlocks.insert(&t->snaplock);
168 }
169 }
170
171 void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
172 file_layout_t **layout)
173 {
174 //rdlock ancestor snaps
175 CInode *t = in;
176 rdlocks.insert(&in->snaplock);
177 rdlocks.insert(&in->policylock);
178 bool found_layout = false;
179 while (t) {
180 rdlocks.insert(&t->snaplock);
181 if (!found_layout) {
182 rdlocks.insert(&t->policylock);
183 if (t->get_projected_inode()->has_layout()) {
184 *layout = &t->get_projected_inode()->layout;
185 found_layout = true;
186 }
187 }
188 if (t->get_projected_parent_dn() &&
189 t->get_projected_parent_dn()->get_dir())
190 t = t->get_projected_parent_dn()->get_dir()->get_inode();
191 else t = NULL;
192 }
193 }
194
195 struct MarkEventOnDestruct {
196 MDRequestRef& mdr;
197 const char* message;
198 bool mark_event;
199 MarkEventOnDestruct(MDRequestRef& _mdr,
200 const char *_message) : mdr(_mdr),
201 message(_message),
202 mark_event(true) {}
203 ~MarkEventOnDestruct() {
204 if (mark_event)
205 mdr->mark_event(message);
206 }
207 };
208
209 /* If this function returns false, the mdr has been placed
210 * on the appropriate wait list */
211 bool Locker::acquire_locks(MDRequestRef& mdr,
212 set<SimpleLock*> &rdlocks,
213 set<SimpleLock*> &wrlocks,
214 set<SimpleLock*> &xlocks,
215 map<SimpleLock*,mds_rank_t> *remote_wrlocks,
216 CInode *auth_pin_freeze,
217 bool auth_pin_nonblock)
218 {
219 if (mdr->done_locking &&
220 !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
221 dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
222 return true; // at least we had better be!
223 }
224 dout(10) << "acquire_locks " << *mdr << dendl;
225
226 MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
227
228 client_t client = mdr->get_client();
229
230 set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
231 set<MDSCacheObject*> mustpin; // items to authpin
232
233 // xlocks
234 for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
235 SimpleLock *lock = *p;
236
237 if ((lock->get_type() == CEPH_LOCK_ISNAP ||
238 lock->get_type() == CEPH_LOCK_IPOLICY) &&
239 mds->is_cluster_degraded() &&
240 mdr->is_master() &&
241 !mdr->is_queued_for_replay()) {
242 // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
243 // get processed in proper order.
244 bool wait = false;
245 if (lock->get_parent()->is_auth()) {
246 if (!mdr->locks.count(lock)) {
247 set<mds_rank_t> ls;
248 lock->get_parent()->list_replicas(ls);
249 for (auto m : ls) {
250 if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
251 wait = true;
252 break;
253 }
254 }
255 }
256 } else {
257 // if the lock is the latest locked one, it's possible that slave mds got the lock
258 // while there are recovering mds.
259 if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
260 wait = true;
261 }
262 if (wait) {
263 dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
264 << ", waiting for cluster recovered" << dendl;
265 mds->locker->drop_locks(mdr.get(), NULL);
266 mdr->drop_local_auth_pins();
267 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
268 return false;
269 }
270 }
271
272 dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
273
274 sorted.insert(lock);
275 mustpin.insert(lock->get_parent());
276
277 // augment xlock with a versionlock?
278 if ((*p)->get_type() == CEPH_LOCK_DN) {
279 CDentry *dn = (CDentry*)lock->get_parent();
280 if (!dn->is_auth())
281 continue;
282
283 if (xlocks.count(&dn->versionlock))
284 continue; // we're xlocking the versionlock too; don't wrlock it!
285
286 if (mdr->is_master()) {
287 // master. wrlock versionlock so we can pipeline dentry updates to journal.
288 wrlocks.insert(&dn->versionlock);
289 } else {
290 // slave. exclusively lock the dentry version (i.e. block other journal updates).
291 // this makes rollback safe.
292 xlocks.insert(&dn->versionlock);
293 sorted.insert(&dn->versionlock);
294 }
295 }
296 if (lock->get_type() > CEPH_LOCK_IVERSION) {
297 // inode version lock?
298 CInode *in = (CInode*)lock->get_parent();
299 if (!in->is_auth())
300 continue;
301 if (mdr->is_master()) {
302 // master. wrlock versionlock so we can pipeline inode updates to journal.
303 wrlocks.insert(&in->versionlock);
304 } else {
305 // slave. exclusively lock the inode version (i.e. block other journal updates).
306 // this makes rollback safe.
307 xlocks.insert(&in->versionlock);
308 sorted.insert(&in->versionlock);
309 }
310 }
311 }
312
313 // wrlocks
314 for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
315 MDSCacheObject *object = (*p)->get_parent();
316 dout(20) << " must wrlock " << **p << " " << *object << dendl;
317 sorted.insert(*p);
318 if (object->is_auth())
319 mustpin.insert(object);
320 else if (!object->is_auth() &&
321 !(*p)->can_wrlock(client) && // we might have to request a scatter
322 !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
323 dout(15) << " will also auth_pin " << *object
324 << " in case we need to request a scatter" << dendl;
325 mustpin.insert(object);
326 }
327 }
328
329 // remote_wrlocks
330 if (remote_wrlocks) {
331 for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
332 MDSCacheObject *object = p->first->get_parent();
333 dout(20) << " must remote_wrlock on mds." << p->second << " "
334 << *p->first << " " << *object << dendl;
335 sorted.insert(p->first);
336 mustpin.insert(object);
337 }
338 }
339
340 // rdlocks
341 for (set<SimpleLock*>::iterator p = rdlocks.begin();
342 p != rdlocks.end();
343 ++p) {
344 MDSCacheObject *object = (*p)->get_parent();
345 dout(20) << " must rdlock " << **p << " " << *object << dendl;
346 sorted.insert(*p);
347 if (object->is_auth())
348 mustpin.insert(object);
349 else if (!object->is_auth() &&
350 !(*p)->can_rdlock(client)) { // we might have to request an rdlock
351 dout(15) << " will also auth_pin " << *object
352 << " in case we need to request a rdlock" << dendl;
353 mustpin.insert(object);
354 }
355 }
356
357
358 // AUTH PINS
359 map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
360
361 // can i auth pin them all now?
362 marker.message = "failed to authpin local pins";
363 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
364 p != mustpin.end();
365 ++p) {
366 MDSCacheObject *object = *p;
367
368 dout(10) << " must authpin " << *object << dendl;
369
370 if (mdr->is_auth_pinned(object)) {
371 if (object != (MDSCacheObject*)auth_pin_freeze)
372 continue;
373 if (mdr->more()->is_remote_frozen_authpin) {
374 if (mdr->more()->rename_inode == auth_pin_freeze)
375 continue;
376 // unfreeze auth pin for the wrong inode
377 mustpin_remote[mdr->more()->rename_inode->authority().first].size();
378 }
379 }
380
381 if (!object->is_auth()) {
382 if (!mdr->locks.empty())
383 drop_locks(mdr.get());
384 if (object->is_ambiguous_auth()) {
385 // wait
386 marker.message = "waiting for single auth, object is being migrated";
387 dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
388 object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
389 mdr->drop_local_auth_pins();
390 return false;
391 }
392 mustpin_remote[object->authority().first].insert(object);
393 continue;
394 }
395 int err = 0;
396 if (!object->can_auth_pin(&err)) {
397 // wait
398 drop_locks(mdr.get());
399 mdr->drop_local_auth_pins();
400 if (auth_pin_nonblock) {
401 dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
402 mdr->aborted = true;
403 return false;
404 }
405 if (err == MDSCacheObject::ERR_EXPORTING_TREE) {
406 marker.message = "failed to authpin, subtree is being exported";
407 } else if (err == MDSCacheObject::ERR_FRAGMENTING_DIR) {
408 marker.message = "failed to authpin, dir is being fragmented";
409 } else if (err == MDSCacheObject::ERR_EXPORTING_INODE) {
410 marker.message = "failed to authpin, inode is being exported";
411 }
412 dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
413 object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
414
415 if (!mdr->remote_auth_pins.empty())
416 notify_freeze_waiter(object);
417
418 return false;
419 }
420 }
421
422 // ok, grab local auth pins
423 for (set<MDSCacheObject*>::iterator p = mustpin.begin();
424 p != mustpin.end();
425 ++p) {
426 MDSCacheObject *object = *p;
427 if (mdr->is_auth_pinned(object)) {
428 dout(10) << " already auth_pinned " << *object << dendl;
429 } else if (object->is_auth()) {
430 dout(10) << " auth_pinning " << *object << dendl;
431 mdr->auth_pin(object);
432 }
433 }
434
435 // request remote auth_pins
436 if (!mustpin_remote.empty()) {
437 marker.message = "requesting remote authpins";
438 for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
439 p != mdr->remote_auth_pins.end();
440 ++p) {
441 if (mustpin.count(p->first)) {
442 assert(p->second == p->first->authority().first);
443 map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
444 if (q != mustpin_remote.end())
445 q->second.insert(p->first);
446 }
447 }
448 for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
449 p != mustpin_remote.end();
450 ++p) {
451 dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
452
453 // wait for active auth
454 if (mds->is_cluster_degraded() &&
455 !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
456 dout(10) << " mds." << p->first << " is not active" << dendl;
457 if (mdr->more()->waiting_on_slave.empty())
458 mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
459 return false;
460 }
461
462 MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
463 MMDSSlaveRequest::OP_AUTHPIN);
464 for (set<MDSCacheObject*>::iterator q = p->second.begin();
465 q != p->second.end();
466 ++q) {
467 dout(10) << " req remote auth_pin of " << **q << dendl;
468 MDSCacheObjectInfo info;
469 (*q)->set_object_info(info);
470 req->get_authpins().push_back(info);
471 if (*q == auth_pin_freeze)
472 (*q)->set_object_info(req->get_authpin_freeze());
473 mdr->pin(*q);
474 }
475 if (auth_pin_nonblock)
476 req->mark_nonblock();
477 mds->send_message_mds(req, p->first);
478
479 // put in waiting list
480 assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
481 mdr->more()->waiting_on_slave.insert(p->first);
482 }
483 return false;
484 }
485
486 // caps i'll need to issue
487 set<CInode*> issue_set;
488 bool result = false;
489
490 // acquire locks.
491 // make sure they match currently acquired locks.
492 set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
493 for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
494 p != sorted.end();
495 ++p) {
496 bool need_wrlock = !!wrlocks.count(*p);
497 bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
498
499 // already locked?
500 if (existing != mdr->locks.end() && *existing == *p) {
501 // right kind?
502 SimpleLock *have = *existing;
503 ++existing;
504 if (xlocks.count(have) && mdr->xlocks.count(have)) {
505 dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
506 continue;
507 }
508 if (mdr->remote_wrlocks.count(have)) {
509 if (!need_remote_wrlock ||
510 mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
511 dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
512 << " " << *have << " " << *have->get_parent() << dendl;
513 remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
514 }
515 }
516 if (need_wrlock || need_remote_wrlock) {
517 if (need_wrlock == !!mdr->wrlocks.count(have) &&
518 need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
519 if (need_wrlock)
520 dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
521 if (need_remote_wrlock)
522 dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
523 continue;
524 }
525 }
526 if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
527 dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
528 continue;
529 }
530 }
531
532 // hose any stray locks
533 if (existing != mdr->locks.end() && *existing == *p) {
534 assert(need_wrlock || need_remote_wrlock);
535 SimpleLock *lock = *existing;
536 if (mdr->wrlocks.count(lock)) {
537 if (!need_wrlock)
538 dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
539 else if (need_remote_wrlock) // acquire remote_wrlock first
540 dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
541 bool need_issue = false;
542 wrlock_finish(lock, mdr.get(), &need_issue);
543 if (need_issue)
544 issue_set.insert(static_cast<CInode*>(lock->get_parent()));
545 }
546 ++existing;
547 }
548 while (existing != mdr->locks.end()) {
549 SimpleLock *stray = *existing;
550 ++existing;
551 dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
552 bool need_issue = false;
553 if (mdr->xlocks.count(stray)) {
554 xlock_finish(stray, mdr.get(), &need_issue);
555 } else if (mdr->rdlocks.count(stray)) {
556 rdlock_finish(stray, mdr.get(), &need_issue);
557 } else {
558 // may have acquired both wrlock and remore wrlock
559 if (mdr->wrlocks.count(stray))
560 wrlock_finish(stray, mdr.get(), &need_issue);
561 if (mdr->remote_wrlocks.count(stray))
562 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
563 }
564 if (need_issue)
565 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
566 }
567
568 // lock
569 if (mdr->locking && *p != mdr->locking) {
570 cancel_locking(mdr.get(), &issue_set);
571 }
572 if (xlocks.count(*p)) {
573 marker.message = "failed to xlock, waiting";
574 if (!xlock_start(*p, mdr))
575 goto out;
576 dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
577 } else if (need_wrlock || need_remote_wrlock) {
578 if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
579 marker.message = "waiting for remote wrlocks";
580 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
581 goto out;
582 }
583 if (need_wrlock && !mdr->wrlocks.count(*p)) {
584 marker.message = "failed to wrlock, waiting";
585 if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
586 marker.message = "failed to wrlock, dropping remote wrlock and waiting";
587 // can't take the wrlock because the scatter lock is gathering. need to
588 // release the remote wrlock, so that the gathering process can finish.
589 remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
590 remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
591 goto out;
592 }
593 // nowait if we have already gotten remote wrlock
594 if (!wrlock_start(*p, mdr, need_remote_wrlock))
595 goto out;
596 dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
597 }
598 } else {
599 assert(mdr->is_master());
600 if ((*p)->needs_recover()) {
601 if (mds->is_cluster_degraded()) {
602 if (!mdr->is_queued_for_replay()) {
603 // see comments in SimpleLock::set_state_rejoin() and
604 // ScatterLock::encode_state_for_rejoin()
605 drop_locks(mdr.get());
606 mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
607 dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
608 << ", waiting for cluster recovered" << dendl;
609 marker.message = "rejoin recovering lock, waiting for cluster recovered";
610 return false;
611 }
612 } else {
613 (*p)->clear_need_recover();
614 }
615 }
616
617 marker.message = "failed to rdlock, waiting";
618 if (!rdlock_start(*p, mdr))
619 goto out;
620 dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
621 }
622 }
623
624 // any extra unneeded locks?
625 while (existing != mdr->locks.end()) {
626 SimpleLock *stray = *existing;
627 ++existing;
628 dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
629 bool need_issue = false;
630 if (mdr->xlocks.count(stray)) {
631 xlock_finish(stray, mdr.get(), &need_issue);
632 } else if (mdr->rdlocks.count(stray)) {
633 rdlock_finish(stray, mdr.get(), &need_issue);
634 } else {
635 // may have acquired both wrlock and remore wrlock
636 if (mdr->wrlocks.count(stray))
637 wrlock_finish(stray, mdr.get(), &need_issue);
638 if (mdr->remote_wrlocks.count(stray))
639 remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
640 }
641 if (need_issue)
642 issue_set.insert(static_cast<CInode*>(stray->get_parent()));
643 }
644
645 mdr->done_locking = true;
646 mdr->set_mds_stamp(ceph_clock_now());
647 result = true;
648 marker.message = "acquired locks";
649
650 out:
651 issue_caps_set(issue_set);
652 return result;
653 }
654
655 void Locker::notify_freeze_waiter(MDSCacheObject *o)
656 {
657 CDir *dir = NULL;
658 if (CInode *in = dynamic_cast<CInode*>(o)) {
659 if (!in->is_root())
660 dir = in->get_parent_dir();
661 } else if (CDentry *dn = dynamic_cast<CDentry*>(o)) {
662 dir = dn->get_dir();
663 } else {
664 dir = dynamic_cast<CDir*>(o);
665 assert(dir);
666 }
667 if (dir) {
668 if (dir->is_freezing_dir())
669 mdcache->fragment_freeze_inc_num_waiters(dir);
670 if (dir->is_freezing_tree()) {
671 while (!dir->is_freezing_tree_root())
672 dir = dir->get_parent_dir();
673 mdcache->migrator->export_freeze_inc_num_waiters(dir);
674 }
675 }
676 }
677
678 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
679 {
680 for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
681 p != mut->xlocks.end();
682 ++p) {
683 MDSCacheObject *object = (*p)->get_parent();
684 assert(object->is_auth());
685 if (skip_dentry &&
686 ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
687 continue;
688 dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
689 (*p)->set_xlock_done();
690 }
691 }
692
693 void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
694 {
695 while (!mut->rdlocks.empty()) {
696 bool ni = false;
697 MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
698 rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
699 if (ni)
700 pneed_issue->insert(static_cast<CInode*>(p));
701 }
702 }
703
704 void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
705 {
706 set<mds_rank_t> slaves;
707
708 while (!mut->xlocks.empty()) {
709 SimpleLock *lock = *mut->xlocks.begin();
710 MDSCacheObject *p = lock->get_parent();
711 if (!p->is_auth()) {
712 assert(lock->get_sm()->can_remote_xlock);
713 slaves.insert(p->authority().first);
714 lock->put_xlock();
715 mut->locks.erase(lock);
716 mut->xlocks.erase(lock);
717 continue;
718 }
719 bool ni = false;
720 xlock_finish(lock, mut, &ni);
721 if (ni)
722 pneed_issue->insert(static_cast<CInode*>(p));
723 }
724
725 while (!mut->remote_wrlocks.empty()) {
726 map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
727 slaves.insert(p->second);
728 if (mut->wrlocks.count(p->first) == 0)
729 mut->locks.erase(p->first);
730 mut->remote_wrlocks.erase(p);
731 }
732
733 while (!mut->wrlocks.empty()) {
734 bool ni = false;
735 MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
736 wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
737 if (ni)
738 pneed_issue->insert(static_cast<CInode*>(p));
739 }
740
741 for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
742 if (!mds->is_cluster_degraded() ||
743 mds->mdsmap->get_state(*p) >= MDSMap::STATE_REJOIN) {
744 dout(10) << "_drop_non_rdlocks dropping remote locks on mds." << *p << dendl;
745 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
746 MMDSSlaveRequest::OP_DROPLOCKS);
747 mds->send_message_mds(slavereq, *p);
748 }
749 }
750 }
751
752 void Locker::cancel_locking(MutationImpl *mut, set<CInode*> *pneed_issue)
753 {
754 SimpleLock *lock = mut->locking;
755 assert(lock);
756 dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
757
758 if (lock->get_parent()->is_auth()) {
759 bool need_issue = false;
760 if (lock->get_state() == LOCK_PREXLOCK) {
761 _finish_xlock(lock, -1, &need_issue);
762 } else if (lock->get_state() == LOCK_LOCK_XLOCK) {
763 lock->set_state(LOCK_XLOCKDONE);
764 eval_gather(lock, true, &need_issue);
765 }
766 if (need_issue)
767 pneed_issue->insert(static_cast<CInode *>(lock->get_parent()));
768 }
769 mut->finish_locking(lock);
770 }
771
772 void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
773 {
774 // leftover locks
775 set<CInode*> my_need_issue;
776 if (!pneed_issue)
777 pneed_issue = &my_need_issue;
778
779 if (mut->locking)
780 cancel_locking(mut, pneed_issue);
781 _drop_non_rdlocks(mut, pneed_issue);
782 _drop_rdlocks(mut, pneed_issue);
783
784 if (pneed_issue == &my_need_issue)
785 issue_caps_set(*pneed_issue);
786 mut->done_locking = false;
787 }
788
789 void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
790 {
791 set<CInode*> my_need_issue;
792 if (!pneed_issue)
793 pneed_issue = &my_need_issue;
794
795 _drop_non_rdlocks(mut, pneed_issue);
796
797 if (pneed_issue == &my_need_issue)
798 issue_caps_set(*pneed_issue);
799 }
800
801 void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
802 {
803 set<CInode*> need_issue;
804
805 for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
806 SimpleLock *lock = *p;
807 ++p;
808 // make later mksnap/setlayout (at other mds) wait for this unsafe request
809 if (lock->get_type() == CEPH_LOCK_ISNAP ||
810 lock->get_type() == CEPH_LOCK_IPOLICY)
811 continue;
812 bool ni = false;
813 rdlock_finish(lock, mut, &ni);
814 if (ni)
815 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
816 }
817
818 issue_caps_set(need_issue);
819 }
820
821 void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
822 {
823 set<CInode*> need_issue;
824
825 for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
826 SimpleLock *lock = *it;
827 ++it;
828 if (lock->get_type() == CEPH_LOCK_IDFT) {
829 continue;
830 }
831 bool ni = false;
832 wrlock_finish(lock, mut, &ni);
833 if (ni)
834 need_issue.insert(static_cast<CInode*>(lock->get_parent()));
835 }
836 issue_caps_set(need_issue);
837 }
838
839 // generics
840
841 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
842 {
843 dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
844 assert(!lock->is_stable());
845
846 int next = lock->get_next_state();
847
848 CInode *in = 0;
849 bool caps = lock->get_cap_shift();
850 if (lock->get_type() != CEPH_LOCK_DN)
851 in = static_cast<CInode *>(lock->get_parent());
852
853 bool need_issue = false;
854
855 int loner_issued = 0, other_issued = 0, xlocker_issued = 0;
856 assert(!caps || in != NULL);
857 if (caps && in->is_head()) {
858 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued,
859 lock->get_cap_shift(), lock->get_cap_mask());
860 dout(10) << " next state is " << lock->get_state_name(next)
861 << " issued/allows loner " << gcap_string(loner_issued)
862 << "/" << gcap_string(lock->gcaps_allowed(CAP_LONER, next))
863 << " xlocker " << gcap_string(xlocker_issued)
864 << "/" << gcap_string(lock->gcaps_allowed(CAP_XLOCKER, next))
865 << " other " << gcap_string(other_issued)
866 << "/" << gcap_string(lock->gcaps_allowed(CAP_ANY, next))
867 << dendl;
868
869 if (first && ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) ||
870 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) ||
871 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued)))
872 need_issue = true;
873 }
874
875 #define IS_TRUE_AND_LT_AUTH(x, auth) (x && ((auth && x <= AUTH) || (!auth && x < AUTH)))
876 bool auth = lock->get_parent()->is_auth();
877 if (!lock->is_gathering() &&
878 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_rdlock, auth) || !lock->is_rdlocked()) &&
879 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_wrlock, auth) || !lock->is_wrlocked()) &&
880 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_xlock, auth) || !lock->is_xlocked()) &&
881 (IS_TRUE_AND_LT_AUTH(lock->get_sm()->states[next].can_lease, auth) || !lock->is_leased()) &&
882 !(lock->get_parent()->is_auth() && lock->is_flushing()) && // i.e. wait for scatter_writebehind!
883 (!caps || ((~lock->gcaps_allowed(CAP_ANY, next) & other_issued) == 0 &&
884 (~lock->gcaps_allowed(CAP_LONER, next) & loner_issued) == 0 &&
885 (~lock->gcaps_allowed(CAP_XLOCKER, next) & xlocker_issued) == 0)) &&
886 lock->get_state() != LOCK_SYNC_MIX2 && // these states need an explicit trigger from the auth mds
887 lock->get_state() != LOCK_MIX_SYNC2
888 ) {
889 dout(7) << "eval_gather finished gather on " << *lock
890 << " on " << *lock->get_parent() << dendl;
891
892 if (lock->get_sm() == &sm_filelock) {
893 assert(in);
894 if (in->state_test(CInode::STATE_RECOVERING)) {
895 dout(7) << "eval_gather finished gather, but still recovering" << dendl;
896 return;
897 } else if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
898 dout(7) << "eval_gather finished gather, but need to recover" << dendl;
899 mds->mdcache->queue_file_recover(in);
900 mds->mdcache->do_file_recover();
901 return;
902 }
903 }
904
905 if (!lock->get_parent()->is_auth()) {
906 // replica: tell auth
907 mds_rank_t auth = lock->get_parent()->authority().first;
908
909 if (lock->get_parent()->is_rejoining() &&
910 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
911 dout(7) << "eval_gather finished gather, but still rejoining "
912 << *lock->get_parent() << dendl;
913 return;
914 }
915
916 if (!mds->is_cluster_degraded() ||
917 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
918 switch (lock->get_state()) {
919 case LOCK_SYNC_LOCK:
920 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
921 auth);
922 break;
923
924 case LOCK_MIX_SYNC:
925 {
926 MLock *reply = new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid());
927 lock->encode_locked_state(reply->get_data());
928 mds->send_message_mds(reply, auth);
929 next = LOCK_MIX_SYNC2;
930 (static_cast<ScatterLock *>(lock))->start_flush();
931 }
932 break;
933
934 case LOCK_MIX_SYNC2:
935 (static_cast<ScatterLock *>(lock))->finish_flush();
936 (static_cast<ScatterLock *>(lock))->clear_flushed();
937
938 case LOCK_SYNC_MIX2:
939 // do nothing, we already acked
940 break;
941
942 case LOCK_SYNC_MIX:
943 {
944 MLock *reply = new MLock(lock, LOCK_AC_MIXACK, mds->get_nodeid());
945 mds->send_message_mds(reply, auth);
946 next = LOCK_SYNC_MIX2;
947 }
948 break;
949
950 case LOCK_MIX_LOCK:
951 {
952 bufferlist data;
953 lock->encode_locked_state(data);
954 mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
955 (static_cast<ScatterLock *>(lock))->start_flush();
956 // we'll get an AC_LOCKFLUSHED to complete
957 }
958 break;
959
960 default:
961 ceph_abort();
962 }
963 }
964 } else {
965 // auth
966
967 // once the first (local) stage of mix->lock gather complete we can
968 // gather from replicas
969 if (lock->get_state() == LOCK_MIX_LOCK &&
970 lock->get_parent()->is_replicated()) {
971 dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
972 send_lock_message(lock, LOCK_AC_LOCK);
973 lock->init_gather();
974 lock->set_state(LOCK_MIX_LOCK2);
975 return;
976 }
977
978 if (lock->is_dirty() && !lock->is_flushed()) {
979 scatter_writebehind(static_cast<ScatterLock *>(lock));
980 mds->mdlog->flush();
981 return;
982 }
983 lock->clear_flushed();
984
985 switch (lock->get_state()) {
986 // to mixed
987 case LOCK_TSYN_MIX:
988 case LOCK_SYNC_MIX:
989 case LOCK_EXCL_MIX:
990 case LOCK_XSYN_MIX:
991 in->start_scatter(static_cast<ScatterLock *>(lock));
992 if (lock->get_parent()->is_replicated()) {
993 bufferlist softdata;
994 lock->encode_locked_state(softdata);
995 send_lock_message(lock, LOCK_AC_MIX, softdata);
996 }
997 (static_cast<ScatterLock *>(lock))->clear_scatter_wanted();
998 break;
999
1000 case LOCK_XLOCK:
1001 case LOCK_XLOCKDONE:
1002 if (next != LOCK_SYNC)
1003 break;
1004 // fall-thru
1005
1006 // to sync
1007 case LOCK_EXCL_SYNC:
1008 case LOCK_LOCK_SYNC:
1009 case LOCK_MIX_SYNC:
1010 case LOCK_XSYN_SYNC:
1011 if (lock->get_parent()->is_replicated()) {
1012 bufferlist softdata;
1013 lock->encode_locked_state(softdata);
1014 send_lock_message(lock, LOCK_AC_SYNC, softdata);
1015 }
1016 break;
1017 }
1018
1019 }
1020
1021 lock->set_state(next);
1022
1023 if (lock->get_parent()->is_auth() &&
1024 lock->is_stable())
1025 lock->get_parent()->auth_unpin(lock);
1026
1027 // drop loner before doing waiters
1028 if (caps &&
1029 in->is_head() &&
1030 in->is_auth() &&
1031 in->get_wanted_loner() != in->get_loner()) {
1032 dout(10) << " trying to drop loner" << dendl;
1033 if (in->try_drop_loner()) {
1034 dout(10) << " dropped loner" << dendl;
1035 need_issue = true;
1036 }
1037 }
1038
1039 if (pfinishers)
1040 lock->take_waiting(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK,
1041 *pfinishers);
1042 else
1043 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD|SimpleLock::WAIT_XLOCK);
1044
1045 if (caps && in->is_head())
1046 need_issue = true;
1047
1048 if (lock->get_parent()->is_auth() &&
1049 lock->is_stable())
1050 try_eval(lock, &need_issue);
1051 }
1052
1053 if (need_issue) {
1054 if (pneed_issue)
1055 *pneed_issue = true;
1056 else if (in->is_head())
1057 issue_caps(in);
1058 }
1059
1060 }
1061
1062 bool Locker::eval(CInode *in, int mask, bool caps_imported)
1063 {
1064 bool need_issue = caps_imported;
1065 list<MDSInternalContextBase*> finishers;
1066
1067 dout(10) << "eval " << mask << " " << *in << dendl;
1068
1069 // choose loner?
1070 if (in->is_auth() && in->is_head()) {
1071 client_t orig_loner = in->get_loner();
1072 if (in->choose_ideal_loner()) {
1073 dout(10) << "eval set loner: client." << orig_loner << " -> client." << in->get_loner() << dendl;
1074 need_issue = true;
1075 mask = -1;
1076 } else if (in->get_wanted_loner() != in->get_loner()) {
1077 dout(10) << "eval want loner: client." << in->get_wanted_loner() << " but failed to set it" << dendl;
1078 mask = -1;
1079 }
1080 }
1081
1082 retry:
1083 if (mask & CEPH_LOCK_IFILE)
1084 eval_any(&in->filelock, &need_issue, &finishers, caps_imported);
1085 if (mask & CEPH_LOCK_IAUTH)
1086 eval_any(&in->authlock, &need_issue, &finishers, caps_imported);
1087 if (mask & CEPH_LOCK_ILINK)
1088 eval_any(&in->linklock, &need_issue, &finishers, caps_imported);
1089 if (mask & CEPH_LOCK_IXATTR)
1090 eval_any(&in->xattrlock, &need_issue, &finishers, caps_imported);
1091 if (mask & CEPH_LOCK_INEST)
1092 eval_any(&in->nestlock, &need_issue, &finishers, caps_imported);
1093 if (mask & CEPH_LOCK_IFLOCK)
1094 eval_any(&in->flocklock, &need_issue, &finishers, caps_imported);
1095 if (mask & CEPH_LOCK_IPOLICY)
1096 eval_any(&in->policylock, &need_issue, &finishers, caps_imported);
1097
1098 // drop loner?
1099 if (in->is_auth() && in->is_head() && in->get_wanted_loner() != in->get_loner()) {
1100 if (in->try_drop_loner()) {
1101 need_issue = true;
1102 if (in->get_wanted_loner() >= 0) {
1103 dout(10) << "eval end set loner to client." << in->get_loner() << dendl;
1104 bool ok = in->try_set_loner();
1105 assert(ok);
1106 mask = -1;
1107 goto retry;
1108 }
1109 }
1110 }
1111
1112 finish_contexts(g_ceph_context, finishers);
1113
1114 if (need_issue && in->is_head())
1115 issue_caps(in);
1116
1117 dout(10) << "eval done" << dendl;
1118 return need_issue;
1119 }
1120
1121 class C_Locker_Eval : public LockerContext {
1122 MDSCacheObject *p;
1123 int mask;
1124 public:
1125 C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
1126 // We are used as an MDSCacheObject waiter, so should
1127 // only be invoked by someone already holding the big lock.
1128 assert(locker->mds->mds_lock.is_locked_by_me());
1129 p->get(MDSCacheObject::PIN_PTRWAITER);
1130 }
1131 void finish(int r) override {
1132 locker->try_eval(p, mask);
1133 p->put(MDSCacheObject::PIN_PTRWAITER);
1134 }
1135 };
1136
1137 void Locker::try_eval(MDSCacheObject *p, int mask)
1138 {
1139 // unstable and ambiguous auth?
1140 if (p->is_ambiguous_auth()) {
1141 dout(7) << "try_eval ambiguous auth, waiting on " << *p << dendl;
1142 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, mask));
1143 return;
1144 }
1145
1146 if (p->is_auth() && p->is_frozen()) {
1147 dout(7) << "try_eval frozen, waiting on " << *p << dendl;
1148 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, mask));
1149 return;
1150 }
1151
1152 if (mask & CEPH_LOCK_DN) {
1153 assert(mask == CEPH_LOCK_DN);
1154 bool need_issue = false; // ignore this, no caps on dentries
1155 CDentry *dn = static_cast<CDentry *>(p);
1156 eval_any(&dn->lock, &need_issue);
1157 } else {
1158 CInode *in = static_cast<CInode *>(p);
1159 eval(in, mask);
1160 }
1161 }
1162
1163 void Locker::try_eval(SimpleLock *lock, bool *pneed_issue)
1164 {
1165 MDSCacheObject *p = lock->get_parent();
1166
1167 // unstable and ambiguous auth?
1168 if (p->is_ambiguous_auth()) {
1169 dout(7) << "try_eval " << *lock << " ambiguousauth, waiting on " << *p << dendl;
1170 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_Eval(this, p, lock->get_type()));
1171 return;
1172 }
1173
1174 if (!p->is_auth()) {
1175 dout(7) << "try_eval " << *lock << " not auth for " << *p << dendl;
1176 return;
1177 }
1178
1179 if (p->is_frozen()) {
1180 dout(7) << "try_eval " << *lock << " frozen, waiting on " << *p << dendl;
1181 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1182 return;
1183 }
1184
1185 /*
1186 * We could have a situation like:
1187 *
1188 * - mds A authpins item on mds B
1189 * - mds B starts to freeze tree containing item
1190 * - mds A tries wrlock_start on A, sends REQSCATTER to B
1191 * - mds B lock is unstable, sets scatter_wanted
1192 * - mds B lock stabilizes, calls try_eval.
1193 *
1194 * We can defer while freezing without causing a deadlock. Honor
1195 * scatter_wanted flag here. This will never get deferred by the
1196 * checks above due to the auth_pin held by the master.
1197 */
1198 if (lock->is_scatterlock()) {
1199 ScatterLock *slock = static_cast<ScatterLock *>(lock);
1200 if (slock->get_scatter_wanted() &&
1201 slock->get_state() != LOCK_MIX) {
1202 scatter_mix(slock, pneed_issue);
1203 if (!lock->is_stable())
1204 return;
1205 } else if (slock->get_unscatter_wanted() &&
1206 slock->get_state() != LOCK_LOCK) {
1207 simple_lock(slock, pneed_issue);
1208 if (!lock->is_stable()) {
1209 return;
1210 }
1211 }
1212 }
1213
1214 if (lock->get_type() != CEPH_LOCK_DN && p->is_freezing()) {
1215 dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
1216 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
1217 return;
1218 }
1219
1220 eval(lock, pneed_issue);
1221 }
1222
1223 void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
1224 {
1225 bool need_issue = false;
1226 list<MDSInternalContextBase*> finishers;
1227
1228 // kick locks now
1229 if (!in->filelock.is_stable())
1230 eval_gather(&in->filelock, false, &need_issue, &finishers);
1231 if (!in->authlock.is_stable())
1232 eval_gather(&in->authlock, false, &need_issue, &finishers);
1233 if (!in->linklock.is_stable())
1234 eval_gather(&in->linklock, false, &need_issue, &finishers);
1235 if (!in->xattrlock.is_stable())
1236 eval_gather(&in->xattrlock, false, &need_issue, &finishers);
1237
1238 if (need_issue && in->is_head()) {
1239 if (issue_set)
1240 issue_set->insert(in);
1241 else
1242 issue_caps(in);
1243 }
1244
1245 finish_contexts(g_ceph_context, finishers);
1246 }
1247
1248 void Locker::eval_scatter_gathers(CInode *in)
1249 {
1250 bool need_issue = false;
1251 list<MDSInternalContextBase*> finishers;
1252
1253 dout(10) << "eval_scatter_gathers " << *in << dendl;
1254
1255 // kick locks now
1256 if (!in->filelock.is_stable())
1257 eval_gather(&in->filelock, false, &need_issue, &finishers);
1258 if (!in->nestlock.is_stable())
1259 eval_gather(&in->nestlock, false, &need_issue, &finishers);
1260 if (!in->dirfragtreelock.is_stable())
1261 eval_gather(&in->dirfragtreelock, false, &need_issue, &finishers);
1262
1263 if (need_issue && in->is_head())
1264 issue_caps(in);
1265
1266 finish_contexts(g_ceph_context, finishers);
1267 }
1268
1269 void Locker::eval(SimpleLock *lock, bool *need_issue)
1270 {
1271 switch (lock->get_type()) {
1272 case CEPH_LOCK_IFILE:
1273 return file_eval(static_cast<ScatterLock*>(lock), need_issue);
1274 case CEPH_LOCK_IDFT:
1275 case CEPH_LOCK_INEST:
1276 return scatter_eval(static_cast<ScatterLock*>(lock), need_issue);
1277 default:
1278 return simple_eval(lock, need_issue);
1279 }
1280 }
1281
1282
1283 // ------------------
1284 // rdlock
1285
1286 bool Locker::_rdlock_kick(SimpleLock *lock, bool as_anon)
1287 {
1288 // kick the lock
1289 if (lock->is_stable()) {
1290 if (lock->get_parent()->is_auth()) {
1291 if (lock->get_sm() == &sm_scatterlock) {
1292 // not until tempsync is fully implemented
1293 //if (lock->get_parent()->is_replicated())
1294 //scatter_tempsync((ScatterLock*)lock);
1295 //else
1296 simple_sync(lock);
1297 } else if (lock->get_sm() == &sm_filelock) {
1298 CInode *in = static_cast<CInode*>(lock->get_parent());
1299 if (lock->get_state() == LOCK_EXCL &&
1300 in->get_target_loner() >= 0 &&
1301 !in->is_dir() && !as_anon) // as_anon => caller wants SYNC, not XSYN
1302 file_xsyn(lock);
1303 else
1304 simple_sync(lock);
1305 } else
1306 simple_sync(lock);
1307 return true;
1308 } else {
1309 // request rdlock state change from auth
1310 mds_rank_t auth = lock->get_parent()->authority().first;
1311 if (!mds->is_cluster_degraded() ||
1312 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1313 dout(10) << "requesting rdlock from auth on "
1314 << *lock << " on " << *lock->get_parent() << dendl;
1315 mds->send_message_mds(new MLock(lock, LOCK_AC_REQRDLOCK, mds->get_nodeid()), auth);
1316 }
1317 return false;
1318 }
1319 }
1320 if (lock->get_type() == CEPH_LOCK_IFILE) {
1321 CInode *in = static_cast<CInode *>(lock->get_parent());
1322 if (in->state_test(CInode::STATE_RECOVERING)) {
1323 mds->mdcache->recovery_queue.prioritize(in);
1324 }
1325 }
1326
1327 return false;
1328 }
1329
1330 bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
1331 {
1332 dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
1333
1334 // can read? grab ref.
1335 if (lock->can_rdlock(client))
1336 return true;
1337
1338 _rdlock_kick(lock, false);
1339
1340 if (lock->can_rdlock(client))
1341 return true;
1342
1343 // wait!
1344 if (con) {
1345 dout(7) << "rdlock_try waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1346 lock->add_waiter(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD, con);
1347 }
1348 return false;
1349 }
1350
1351 bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
1352 {
1353 dout(7) << "rdlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1354
1355 // client may be allowed to rdlock the same item it has xlocked.
1356 // UNLESS someone passes in as_anon, or we're reading snapped version here.
1357 if (mut->snapid != CEPH_NOSNAP)
1358 as_anon = true;
1359 client_t client = as_anon ? -1 : mut->get_client();
1360
1361 CInode *in = 0;
1362 if (lock->get_type() != CEPH_LOCK_DN)
1363 in = static_cast<CInode *>(lock->get_parent());
1364
1365 /*
1366 if (!lock->get_parent()->is_auth() &&
1367 lock->fw_rdlock_to_auth()) {
1368 mdcache->request_forward(mut, lock->get_parent()->authority().first);
1369 return false;
1370 }
1371 */
1372
1373 while (1) {
1374 // can read? grab ref.
1375 if (lock->can_rdlock(client)) {
1376 lock->get_rdlock();
1377 mut->rdlocks.insert(lock);
1378 mut->locks.insert(lock);
1379 return true;
1380 }
1381
1382 // hmm, wait a second.
1383 if (in && !in->is_head() && in->is_auth() &&
1384 lock->get_state() == LOCK_SNAP_SYNC) {
1385 // okay, we actually need to kick the head's lock to get ourselves synced up.
1386 CInode *head = mdcache->get_inode(in->ino());
1387 assert(head);
1388 SimpleLock *hlock = head->get_lock(CEPH_LOCK_IFILE);
1389 if (hlock->get_state() == LOCK_SYNC)
1390 hlock = head->get_lock(lock->get_type());
1391
1392 if (hlock->get_state() != LOCK_SYNC) {
1393 dout(10) << "rdlock_start trying head inode " << *head << dendl;
1394 if (!rdlock_start(hlock, mut, true)) // ** as_anon, no rdlock on EXCL **
1395 return false;
1396 // oh, check our lock again then
1397 }
1398 }
1399
1400 if (!_rdlock_kick(lock, as_anon))
1401 break;
1402 }
1403
1404 // wait!
1405 int wait_on;
1406 if (lock->get_parent()->is_auth() && lock->is_stable())
1407 wait_on = SimpleLock::WAIT_RD;
1408 else
1409 wait_on = SimpleLock::WAIT_STABLE; // REQRDLOCK is ignored if lock is unstable, so we need to retry.
1410 dout(7) << "rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1411 lock->add_waiter(wait_on, new C_MDS_RetryRequest(mdcache, mut));
1412 nudge_log(lock);
1413 return false;
1414 }
1415
1416 void Locker::nudge_log(SimpleLock *lock)
1417 {
1418 dout(10) << "nudge_log " << *lock << " on " << *lock->get_parent() << dendl;
1419 if (lock->get_parent()->is_auth() && lock->is_unstable_and_locked()) // as with xlockdone, or cap flush
1420 mds->mdlog->flush();
1421 }
1422
1423 void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1424 {
1425 // drop ref
1426 lock->put_rdlock();
1427 if (mut) {
1428 mut->rdlocks.erase(lock);
1429 mut->locks.erase(lock);
1430 }
1431
1432 dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1433
1434 // last one?
1435 if (!lock->is_rdlocked()) {
1436 if (!lock->is_stable())
1437 eval_gather(lock, false, pneed_issue);
1438 else if (lock->get_parent()->is_auth())
1439 try_eval(lock, pneed_issue);
1440 }
1441 }
1442
1443
1444 bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
1445 {
1446 dout(10) << "can_rdlock_set " << locks << dendl;
1447 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1448 if (!(*p)->can_rdlock(-1)) {
1449 dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1450 return false;
1451 }
1452 return true;
1453 }
1454
1455 bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
1456 {
1457 dout(10) << "rdlock_try_set " << locks << dendl;
1458 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
1459 if (!rdlock_try(*p, -1, NULL)) {
1460 dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
1461 return false;
1462 }
1463 return true;
1464 }
1465
1466 void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
1467 {
1468 dout(10) << "rdlock_take_set " << locks << dendl;
1469 for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
1470 (*p)->get_rdlock();
1471 mut->rdlocks.insert(*p);
1472 mut->locks.insert(*p);
1473 }
1474 }
1475
1476 // ------------------
1477 // wrlock
1478
1479 void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
1480 {
1481 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1482 lock->get_type() == CEPH_LOCK_DVERSION)
1483 return local_wrlock_grab(static_cast<LocalLock*>(lock), mut);
1484
1485 dout(7) << "wrlock_force on " << *lock
1486 << " on " << *lock->get_parent() << dendl;
1487 lock->get_wrlock(true);
1488 mut->wrlocks.insert(lock);
1489 mut->locks.insert(lock);
1490 }
1491
1492 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
1493 {
1494 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1495 lock->get_type() == CEPH_LOCK_DVERSION)
1496 return local_wrlock_start(static_cast<LocalLock*>(lock), mut);
1497
1498 dout(10) << "wrlock_start " << *lock << " on " << *lock->get_parent() << dendl;
1499
1500 CInode *in = static_cast<CInode *>(lock->get_parent());
1501 client_t client = mut->get_client();
1502 bool want_scatter = !nowait && lock->get_parent()->is_auth() &&
1503 (in->has_subtree_or_exporting_dirfrag() ||
1504 static_cast<ScatterLock*>(lock)->get_scatter_wanted());
1505
1506 while (1) {
1507 // wrlock?
1508 if (lock->can_wrlock(client) &&
1509 (!want_scatter || lock->get_state() == LOCK_MIX)) {
1510 lock->get_wrlock();
1511 mut->wrlocks.insert(lock);
1512 mut->locks.insert(lock);
1513 return true;
1514 }
1515
1516 if (lock->get_type() == CEPH_LOCK_IFILE &&
1517 in->state_test(CInode::STATE_RECOVERING)) {
1518 mds->mdcache->recovery_queue.prioritize(in);
1519 }
1520
1521 if (!lock->is_stable())
1522 break;
1523
1524 if (in->is_auth()) {
1525 // don't do nested lock state change if we have dirty scatterdata and
1526 // may scatter_writebehind or start_scatter, because nowait==true implies
1527 // that the caller already has a log entry open!
1528 if (nowait && lock->is_dirty())
1529 return false;
1530
1531 if (want_scatter)
1532 scatter_mix(static_cast<ScatterLock*>(lock));
1533 else
1534 simple_lock(lock);
1535
1536 if (nowait && !lock->can_wrlock(client))
1537 return false;
1538
1539 } else {
1540 // replica.
1541 // auth should be auth_pinned (see acquire_locks wrlock weird mustpin case).
1542 mds_rank_t auth = lock->get_parent()->authority().first;
1543 if (!mds->is_cluster_degraded() ||
1544 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1545 dout(10) << "requesting scatter from auth on "
1546 << *lock << " on " << *lock->get_parent() << dendl;
1547 mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
1548 }
1549 break;
1550 }
1551 }
1552
1553 if (!nowait) {
1554 dout(7) << "wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
1555 lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1556 nudge_log(lock);
1557 }
1558
1559 return false;
1560 }
1561
1562 void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1563 {
1564 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1565 lock->get_type() == CEPH_LOCK_DVERSION)
1566 return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
1567
1568 dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
1569 lock->put_wrlock();
1570 if (mut) {
1571 mut->wrlocks.erase(lock);
1572 if (mut->remote_wrlocks.count(lock) == 0)
1573 mut->locks.erase(lock);
1574 }
1575
1576 if (!lock->is_wrlocked()) {
1577 if (!lock->is_stable())
1578 eval_gather(lock, false, pneed_issue);
1579 else if (lock->get_parent()->is_auth())
1580 try_eval(lock, pneed_issue);
1581 }
1582 }
1583
1584
1585 // remote wrlock
1586
1587 void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut)
1588 {
1589 dout(7) << "remote_wrlock_start mds." << target << " on " << *lock << " on " << *lock->get_parent() << dendl;
1590
1591 // wait for active target
1592 if (mds->is_cluster_degraded() &&
1593 !mds->mdsmap->is_clientreplay_or_active_or_stopping(target)) {
1594 dout(7) << " mds." << target << " is not active" << dendl;
1595 if (mut->more()->waiting_on_slave.empty())
1596 mds->wait_for_active_peer(target, new C_MDS_RetryRequest(mdcache, mut));
1597 return;
1598 }
1599
1600 // send lock request
1601 mut->start_locking(lock, target);
1602 mut->more()->slaves.insert(target);
1603 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1604 MMDSSlaveRequest::OP_WRLOCK);
1605 r->set_lock_type(lock->get_type());
1606 lock->get_parent()->set_object_info(r->get_object_info());
1607 mds->send_message_mds(r, target);
1608
1609 assert(mut->more()->waiting_on_slave.count(target) == 0);
1610 mut->more()->waiting_on_slave.insert(target);
1611 }
1612
1613 void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
1614 MutationImpl *mut)
1615 {
1616 // drop ref
1617 mut->remote_wrlocks.erase(lock);
1618 if (mut->wrlocks.count(lock) == 0)
1619 mut->locks.erase(lock);
1620
1621 dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
1622 << " " << *lock->get_parent() << dendl;
1623 if (!mds->is_cluster_degraded() ||
1624 mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
1625 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1626 MMDSSlaveRequest::OP_UNWRLOCK);
1627 slavereq->set_lock_type(lock->get_type());
1628 lock->get_parent()->set_object_info(slavereq->get_object_info());
1629 mds->send_message_mds(slavereq, target);
1630 }
1631 }
1632
1633
1634 // ------------------
1635 // xlock
1636
1637 bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
1638 {
1639 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1640 lock->get_type() == CEPH_LOCK_DVERSION)
1641 return local_xlock_start(static_cast<LocalLock*>(lock), mut);
1642
1643 dout(7) << "xlock_start on " << *lock << " on " << *lock->get_parent() << dendl;
1644 client_t client = mut->get_client();
1645
1646 CInode *in = nullptr;
1647 if (lock->get_cap_shift())
1648 in = static_cast<CInode *>(lock->get_parent());
1649
1650 // auth?
1651 if (lock->get_parent()->is_auth()) {
1652 // auth
1653 while (1) {
1654 if (lock->can_xlock(client) &&
1655 !(lock->get_state() == LOCK_LOCK_XLOCK && // client is not xlocker or
1656 in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
1657 lock->set_state(LOCK_XLOCK);
1658 lock->get_xlock(mut, client);
1659 mut->xlocks.insert(lock);
1660 mut->locks.insert(lock);
1661 mut->finish_locking(lock);
1662 return true;
1663 }
1664
1665 if (lock->get_type() == CEPH_LOCK_IFILE &&
1666 in->state_test(CInode::STATE_RECOVERING)) {
1667 mds->mdcache->recovery_queue.prioritize(in);
1668 }
1669
1670 if (!lock->is_stable() && (lock->get_state() != LOCK_XLOCKDONE ||
1671 lock->get_xlock_by_client() != client ||
1672 lock->is_waiter_for(SimpleLock::WAIT_STABLE)))
1673 break;
1674
1675 if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
1676 mut->start_locking(lock);
1677 simple_xlock(lock);
1678 } else {
1679 simple_lock(lock);
1680 }
1681 }
1682
1683 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
1684 nudge_log(lock);
1685 return false;
1686 } else {
1687 // replica
1688 assert(lock->get_sm()->can_remote_xlock);
1689 assert(!mut->slave_request);
1690
1691 // wait for single auth
1692 if (lock->get_parent()->is_ambiguous_auth()) {
1693 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
1694 new C_MDS_RetryRequest(mdcache, mut));
1695 return false;
1696 }
1697
1698 // wait for active auth
1699 mds_rank_t auth = lock->get_parent()->authority().first;
1700 if (mds->is_cluster_degraded() &&
1701 !mds->mdsmap->is_clientreplay_or_active_or_stopping(auth)) {
1702 dout(7) << " mds." << auth << " is not active" << dendl;
1703 if (mut->more()->waiting_on_slave.empty())
1704 mds->wait_for_active_peer(auth, new C_MDS_RetryRequest(mdcache, mut));
1705 return false;
1706 }
1707
1708 // send lock request
1709 mut->more()->slaves.insert(auth);
1710 mut->start_locking(lock, auth);
1711 MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1712 MMDSSlaveRequest::OP_XLOCK);
1713 r->set_lock_type(lock->get_type());
1714 lock->get_parent()->set_object_info(r->get_object_info());
1715 mds->send_message_mds(r, auth);
1716
1717 assert(mut->more()->waiting_on_slave.count(auth) == 0);
1718 mut->more()->waiting_on_slave.insert(auth);
1719
1720 return false;
1721 }
1722 }
1723
1724 void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue)
1725 {
1726 assert(!lock->is_stable());
1727 if (lock->get_num_rdlocks() == 0 &&
1728 lock->get_num_wrlocks() == 0 &&
1729 !lock->is_leased() &&
1730 lock->get_state() != LOCK_XLOCKSNAP &&
1731 lock->get_type() != CEPH_LOCK_DN) {
1732 CInode *in = static_cast<CInode*>(lock->get_parent());
1733 client_t loner = in->get_target_loner();
1734 if (loner >= 0 && (xlocker < 0 || xlocker == loner)) {
1735 lock->set_state(LOCK_EXCL);
1736 lock->get_parent()->auth_unpin(lock);
1737 lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_WR|SimpleLock::WAIT_RD);
1738 if (lock->get_cap_shift())
1739 *pneed_issue = true;
1740 if (lock->get_parent()->is_auth() &&
1741 lock->is_stable())
1742 try_eval(lock, pneed_issue);
1743 return;
1744 }
1745 }
1746 // the xlocker may have CEPH_CAP_GSHARED, need to revoke it if next state is LOCK_LOCK
1747 eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
1748 }
1749
1750 void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
1751 {
1752 if (lock->get_type() == CEPH_LOCK_IVERSION ||
1753 lock->get_type() == CEPH_LOCK_DVERSION)
1754 return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
1755
1756 dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
1757
1758 client_t xlocker = lock->get_xlock_by_client();
1759
1760 // drop ref
1761 lock->put_xlock();
1762 assert(mut);
1763 mut->xlocks.erase(lock);
1764 mut->locks.erase(lock);
1765
1766 bool do_issue = false;
1767
1768 // remote xlock?
1769 if (!lock->get_parent()->is_auth()) {
1770 assert(lock->get_sm()->can_remote_xlock);
1771
1772 // tell auth
1773 dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
1774 mds_rank_t auth = lock->get_parent()->authority().first;
1775 if (!mds->is_cluster_degraded() ||
1776 mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
1777 MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
1778 MMDSSlaveRequest::OP_UNXLOCK);
1779 slavereq->set_lock_type(lock->get_type());
1780 lock->get_parent()->set_object_info(slavereq->get_object_info());
1781 mds->send_message_mds(slavereq, auth);
1782 }
1783 // others waiting?
1784 lock->finish_waiters(SimpleLock::WAIT_STABLE |
1785 SimpleLock::WAIT_WR |
1786 SimpleLock::WAIT_RD, 0);
1787 } else {
1788 if (lock->get_num_xlocks() == 0 &&
1789 lock->get_state() != LOCK_LOCK_XLOCK) { // no one is taking xlock
1790 _finish_xlock(lock, xlocker, &do_issue);
1791 }
1792 }
1793
1794 if (do_issue) {
1795 CInode *in = static_cast<CInode*>(lock->get_parent());
1796 if (in->is_head()) {
1797 if (pneed_issue)
1798 *pneed_issue = true;
1799 else
1800 issue_caps(in);
1801 }
1802 }
1803 }
1804
1805 void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
1806 {
1807 dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
1808
1809 lock->put_xlock();
1810 mut->xlocks.erase(lock);
1811 mut->locks.erase(lock);
1812
1813 MDSCacheObject *p = lock->get_parent();
1814 assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
1815
1816 if (!lock->is_stable())
1817 lock->get_parent()->auth_unpin(lock);
1818
1819 lock->set_state(LOCK_LOCK);
1820 }
1821
1822 void Locker::xlock_import(SimpleLock *lock)
1823 {
1824 dout(10) << "xlock_import on " << *lock << " " << *lock->get_parent() << dendl;
1825 lock->get_parent()->auth_pin(lock);
1826 }
1827
1828
1829
1830 // file i/o -----------------------------------------
1831
1832 version_t Locker::issue_file_data_version(CInode *in)
1833 {
1834 dout(7) << "issue_file_data_version on " << *in << dendl;
1835 return in->inode.file_data_version;
1836 }
1837
1838 class C_Locker_FileUpdate_finish : public LockerLogContext {
1839 CInode *in;
1840 MutationRef mut;
1841 bool share_max;
1842 bool need_issue;
1843 client_t client;
1844 MClientCaps *ack;
1845 public:
1846 C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
1847 bool sm=false, bool ni=false, client_t c=-1,
1848 MClientCaps *ac = 0)
1849 : LockerLogContext(l), in(i), mut(m), share_max(sm), need_issue(ni),
1850 client(c), ack(ac) {
1851 in->get(CInode::PIN_PTRWAITER);
1852 }
1853 void finish(int r) override {
1854 locker->file_update_finish(in, mut, share_max, need_issue, client, ack);
1855 in->put(CInode::PIN_PTRWAITER);
1856 }
1857 };
1858
1859 void Locker::file_update_finish(CInode *in, MutationRef& mut, bool share_max, bool issue_client_cap,
1860 client_t client, MClientCaps *ack)
1861 {
1862 dout(10) << "file_update_finish on " << *in << dendl;
1863 in->pop_and_dirty_projected_inode(mut->ls);
1864
1865 mut->apply();
1866
1867 if (ack) {
1868 Session *session = mds->get_session(client);
1869 if (session) {
1870 // "oldest flush tid" > 0 means client uses unique TID for each flush
1871 if (ack->get_oldest_flush_tid() > 0)
1872 session->add_completed_flush(ack->get_client_tid());
1873 mds->send_message_client_counted(ack, session);
1874 } else {
1875 dout(10) << " no session for client." << client << " " << *ack << dendl;
1876 ack->put();
1877 }
1878 }
1879
1880 set<CInode*> need_issue;
1881 drop_locks(mut.get(), &need_issue);
1882
1883 if (!in->is_head() && !in->client_snap_caps.empty()) {
1884 dout(10) << " client_snap_caps " << in->client_snap_caps << dendl;
1885 // check for snap writeback completion
1886 bool gather = false;
1887 auto p = in->client_snap_caps.begin();
1888 while (p != in->client_snap_caps.end()) {
1889 SimpleLock *lock = in->get_lock(p->first);
1890 assert(lock);
1891 dout(10) << " completing client_snap_caps for " << ccap_string(p->first)
1892 << " lock " << *lock << " on " << *in << dendl;
1893 lock->put_wrlock();
1894
1895 p->second.erase(client);
1896 if (p->second.empty()) {
1897 gather = true;
1898 in->client_snap_caps.erase(p++);
1899 } else
1900 ++p;
1901 }
1902 if (gather) {
1903 if (in->client_snap_caps.empty())
1904 in->item_open_file.remove_myself();
1905 eval_cap_gather(in, &need_issue);
1906 }
1907 } else {
1908 if (issue_client_cap && need_issue.count(in) == 0) {
1909 Capability *cap = in->get_client_cap(client);
1910 if (cap && (cap->wanted() & ~cap->pending()))
1911 issue_caps(in, cap);
1912 }
1913
1914 if (share_max && in->is_auth() &&
1915 (in->filelock.gcaps_allowed(CAP_LONER) & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)))
1916 share_inode_max_size(in);
1917 }
1918 issue_caps_set(need_issue);
1919
1920 utime_t now = ceph_clock_now();
1921 mds->balancer->hit_inode(now, in, META_POP_IWR);
1922
1923 // auth unpin after issuing caps
1924 mut->cleanup();
1925 }
1926
1927 Capability* Locker::issue_new_caps(CInode *in,
1928 int mode,
1929 Session *session,
1930 SnapRealm *realm,
1931 bool is_replay)
1932 {
1933 dout(7) << "issue_new_caps for mode " << mode << " on " << *in << dendl;
1934 bool is_new;
1935
1936 // if replay, try to reconnect cap, and otherwise do nothing.
1937 if (is_replay)
1938 return mds->mdcache->try_reconnect_cap(in, session);
1939
1940
1941 // my needs
1942 assert(session->info.inst.name.is_client());
1943 client_t my_client = session->info.inst.name.num();
1944 int my_want = ceph_caps_for_mode(mode);
1945
1946 // register a capability
1947 Capability *cap = in->get_client_cap(my_client);
1948 if (!cap) {
1949 // new cap
1950 cap = in->add_client_cap(my_client, session, realm);
1951 cap->set_wanted(my_want);
1952 cap->mark_new();
1953 cap->inc_suppress(); // suppress file cap messages for new cap (we'll bundle with the open() reply)
1954 is_new = true;
1955 } else {
1956 is_new = false;
1957 // make sure it wants sufficient caps
1958 if (my_want & ~cap->wanted()) {
1959 // augment wanted caps for this client
1960 cap->set_wanted(cap->wanted() | my_want);
1961 }
1962 }
1963
1964 if (in->is_auth()) {
1965 // [auth] twiddle mode?
1966 eval(in, CEPH_CAP_LOCKS);
1967
1968 if (_need_flush_mdlog(in, my_want))
1969 mds->mdlog->flush();
1970
1971 } else {
1972 // [replica] tell auth about any new caps wanted
1973 request_inode_file_caps(in);
1974 }
1975
1976 // issue caps (pot. incl new one)
1977 //issue_caps(in); // note: _eval above may have done this already...
1978
1979 // re-issue whatever we can
1980 //cap->issue(cap->pending());
1981
1982 if (is_new)
1983 cap->dec_suppress();
1984
1985 return cap;
1986 }
1987
1988
1989 void Locker::issue_caps_set(set<CInode*>& inset)
1990 {
1991 for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
1992 issue_caps(*p);
1993 }
1994
1995 bool Locker::issue_caps(CInode *in, Capability *only_cap)
1996 {
1997 // allowed caps are determined by the lock mode.
1998 int all_allowed = in->get_caps_allowed_by_type(CAP_ANY);
1999 int loner_allowed = in->get_caps_allowed_by_type(CAP_LONER);
2000 int xlocker_allowed = in->get_caps_allowed_by_type(CAP_XLOCKER);
2001
2002 client_t loner = in->get_loner();
2003 if (loner >= 0) {
2004 dout(7) << "issue_caps loner client." << loner
2005 << " allowed=" << ccap_string(loner_allowed)
2006 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2007 << ", others allowed=" << ccap_string(all_allowed)
2008 << " on " << *in << dendl;
2009 } else {
2010 dout(7) << "issue_caps allowed=" << ccap_string(all_allowed)
2011 << ", xlocker allowed=" << ccap_string(xlocker_allowed)
2012 << " on " << *in << dendl;
2013 }
2014
2015 assert(in->is_head());
2016
2017 // count conflicts with
2018 int nissued = 0;
2019
2020 // client caps
2021 map<client_t, Capability*>::iterator it;
2022 if (only_cap)
2023 it = in->client_caps.find(only_cap->get_client());
2024 else
2025 it = in->client_caps.begin();
2026 for (; it != in->client_caps.end(); ++it) {
2027 Capability *cap = it->second;
2028 if (cap->is_stale())
2029 continue;
2030
2031 // do not issue _new_ bits when size|mtime is projected
2032 int allowed;
2033 if (loner == it->first)
2034 allowed = loner_allowed;
2035 else
2036 allowed = all_allowed;
2037
2038 // add in any xlocker-only caps (for locks this client is the xlocker for)
2039 allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
2040
2041 Session *session = mds->get_session(it->first);
2042 if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
2043 !(session && session->connection &&
2044 session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
2045 allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
2046
2047 int pending = cap->pending();
2048 int wanted = cap->wanted();
2049
2050 dout(20) << " client." << it->first
2051 << " pending " << ccap_string(pending)
2052 << " allowed " << ccap_string(allowed)
2053 << " wanted " << ccap_string(wanted)
2054 << dendl;
2055
2056 if (!(pending & ~allowed)) {
2057 // skip if suppress or new, and not revocation
2058 if (cap->is_new() || cap->is_suppress()) {
2059 dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
2060 continue;
2061 }
2062 }
2063
2064 // notify clients about deleted inode, to make sure they release caps ASAP.
2065 if (in->inode.nlink == 0)
2066 wanted |= CEPH_CAP_LINK_SHARED;
2067
2068 // are there caps that the client _wants_ and can have, but aren't pending?
2069 // or do we need to revoke?
2070 if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
2071 (pending & ~allowed)) { // need to revoke ~allowed caps.
2072 // issue
2073 nissued++;
2074
2075 // include caps that clients generally like, while we're at it.
2076 int likes = in->get_caps_liked();
2077 int before = pending;
2078 long seq;
2079 if (pending & ~allowed)
2080 seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
2081 else
2082 seq = cap->issue((wanted|likes) & allowed);
2083 int after = cap->pending();
2084
2085 if (cap->is_new()) {
2086 // haven't send caps to client yet
2087 if (before & ~after)
2088 cap->confirm_receipt(seq, after);
2089 } else {
2090 dout(7) << " sending MClientCaps to client." << it->first
2091 << " seq " << cap->get_last_seq()
2092 << " new pending " << ccap_string(after) << " was " << ccap_string(before)
2093 << dendl;
2094
2095 int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
2096 if (op == CEPH_CAP_OP_REVOKE) {
2097 revoking_caps.push_back(&cap->item_revoking_caps);
2098 revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
2099 cap->set_last_revoke_stamp(ceph_clock_now());
2100 cap->reset_num_revoke_warnings();
2101 }
2102
2103 MClientCaps *m = new MClientCaps(op, in->ino(),
2104 in->find_snaprealm()->inode->ino(),
2105 cap->get_cap_id(), cap->get_last_seq(),
2106 after, wanted, 0,
2107 cap->get_mseq(),
2108 mds->get_osd_epoch_barrier());
2109 in->encode_cap_message(m, cap);
2110
2111 mds->send_message_client_counted(m, it->first);
2112 }
2113 }
2114
2115 if (only_cap)
2116 break;
2117 }
2118
2119 return (nissued == 0); // true if no re-issued, no callbacks
2120 }
2121
2122 void Locker::issue_truncate(CInode *in)
2123 {
2124 dout(7) << "issue_truncate on " << *in << dendl;
2125
2126 for (map<client_t, Capability*>::iterator it = in->client_caps.begin();
2127 it != in->client_caps.end();
2128 ++it) {
2129 Capability *cap = it->second;
2130 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_TRUNC,
2131 in->ino(),
2132 in->find_snaprealm()->inode->ino(),
2133 cap->get_cap_id(), cap->get_last_seq(),
2134 cap->pending(), cap->wanted(), 0,
2135 cap->get_mseq(),
2136 mds->get_osd_epoch_barrier());
2137 in->encode_cap_message(m, cap);
2138 mds->send_message_client_counted(m, it->first);
2139 }
2140
2141 // should we increase max_size?
2142 if (in->is_auth() && in->is_file())
2143 check_inode_max_size(in);
2144 }
2145
2146 void Locker::revoke_stale_caps(Session *session)
2147 {
2148 dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
2149
2150 std::vector<CInode*> to_eval;
2151
2152 for (auto p = session->caps.begin(); !p.end(); ) {
2153 Capability *cap = *p;
2154 ++p;
2155 if (!cap->is_notable()) {
2156 // the rest ones are not being revoked and don't have writeable range
2157 // and don't want exclusive caps or want file read/write. They don't
2158 // need recover, they don't affect eval_gather()/try_eval()
2159 break;
2160 }
2161
2162 int issued = cap->issued();
2163 if (!(issued & ~CEPH_CAP_PIN))
2164 continue;
2165
2166 CInode *in = cap->get_inode();
2167 dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
2168 cap->revoke();
2169
2170 if (in->is_auth() &&
2171 in->inode.client_ranges.count(cap->get_client()))
2172 in->state_set(CInode::STATE_NEEDSRECOVER);
2173
2174 // eval lock/inode may finish contexts, which may modify other cap's position
2175 // in the session->caps.
2176 to_eval.push_back(in);
2177 }
2178
2179 // invalidate the rest
2180 session->inc_cap_gen();
2181
2182 for (auto in : to_eval) {
2183 if (in->state_test(CInode::STATE_EXPORTINGCAPS))
2184 continue;
2185
2186 if (!in->filelock.is_stable())
2187 eval_gather(&in->filelock);
2188 if (!in->linklock.is_stable())
2189 eval_gather(&in->linklock);
2190 if (!in->authlock.is_stable())
2191 eval_gather(&in->authlock);
2192 if (!in->xattrlock.is_stable())
2193 eval_gather(&in->xattrlock);
2194
2195 if (in->is_auth())
2196 try_eval(in, CEPH_CAP_LOCKS);
2197 else
2198 request_inode_file_caps(in);
2199 }
2200 }
2201
2202 void Locker::resume_stale_caps(Session *session)
2203 {
2204 dout(10) << "resume_stale_caps for " << session->info.inst.name << dendl;
2205
2206 for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ) {
2207 Capability *cap = *p;
2208 ++p;
2209 if (!cap->is_notable())
2210 break; // see revoke_stale_caps()
2211
2212 CInode *in = cap->get_inode();
2213 ceph_assert(in->is_head());
2214 dout(10) << " clearing stale flag on " << *in << dendl;
2215
2216 if (in->state_test(CInode::STATE_EXPORTINGCAPS)) {
2217 // if export succeeds, the cap will be removed. if export fails,
2218 // we need to re-issue the cap if it's not stale.
2219 in->state_set(CInode::STATE_EVALSTALECAPS);
2220 continue;
2221 }
2222
2223 if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
2224 issue_caps(in, cap);
2225 }
2226 }
2227
2228 void Locker::remove_stale_leases(Session *session)
2229 {
2230 dout(10) << "remove_stale_leases for " << session->info.inst.name << dendl;
2231 xlist<ClientLease*>::iterator p = session->leases.begin();
2232 while (!p.end()) {
2233 ClientLease *l = *p;
2234 ++p;
2235 CDentry *parent = static_cast<CDentry*>(l->parent);
2236 dout(15) << " removing lease on " << *parent << dendl;
2237 parent->remove_client_lease(l, this);
2238 }
2239 }
2240
2241
2242 class C_MDL_RequestInodeFileCaps : public LockerContext {
2243 CInode *in;
2244 public:
2245 C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
2246 in->get(CInode::PIN_PTRWAITER);
2247 }
2248 void finish(int r) override {
2249 if (!in->is_auth())
2250 locker->request_inode_file_caps(in);
2251 in->put(CInode::PIN_PTRWAITER);
2252 }
2253 };
2254
2255 void Locker::request_inode_file_caps(CInode *in)
2256 {
2257 assert(!in->is_auth());
2258
2259 int wanted = in->get_caps_wanted() & in->get_caps_allowed_ever() & ~CEPH_CAP_PIN;
2260 if (wanted != in->replica_caps_wanted) {
2261 // wait for single auth
2262 if (in->is_ambiguous_auth()) {
2263 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH,
2264 new C_MDL_RequestInodeFileCaps(this, in));
2265 return;
2266 }
2267
2268 mds_rank_t auth = in->authority().first;
2269 if (mds->is_cluster_degraded() &&
2270 mds->mdsmap->get_state(auth) == MDSMap::STATE_REJOIN) {
2271 mds->wait_for_active_peer(auth, new C_MDL_RequestInodeFileCaps(this, in));
2272 return;
2273 }
2274
2275 dout(7) << "request_inode_file_caps " << ccap_string(wanted)
2276 << " was " << ccap_string(in->replica_caps_wanted)
2277 << " on " << *in << " to mds." << auth << dendl;
2278
2279 in->replica_caps_wanted = wanted;
2280
2281 if (!mds->is_cluster_degraded() ||
2282 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
2283 mds->send_message_mds(new MInodeFileCaps(in->ino(), in->replica_caps_wanted),
2284 auth);
2285 }
2286 }
2287
2288 /* This function DOES put the passed message before returning */
2289 void Locker::handle_inode_file_caps(MInodeFileCaps *m)
2290 {
2291 // nobody should be talking to us during recovery.
2292 if (mds->get_state() < MDSMap::STATE_CLIENTREPLAY) {
2293 if (mds->get_want_state() >= MDSMap::STATE_CLIENTREPLAY) {
2294 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2295 return;
2296 }
2297 assert(!"got unexpected message during recovery");
2298 }
2299
2300 // ok
2301 CInode *in = mdcache->get_inode(m->get_ino());
2302 mds_rank_t from = mds_rank_t(m->get_source().num());
2303
2304 assert(in);
2305 assert(in->is_auth());
2306
2307 dout(7) << "handle_inode_file_caps replica mds." << from << " wants caps " << ccap_string(m->get_caps()) << " on " << *in << dendl;
2308
2309 if (m->get_caps())
2310 in->mds_caps_wanted[from] = m->get_caps();
2311 else
2312 in->mds_caps_wanted.erase(from);
2313
2314 try_eval(in, CEPH_CAP_LOCKS);
2315 m->put();
2316 }
2317
2318
2319 class C_MDL_CheckMaxSize : public LockerContext {
2320 CInode *in;
2321 uint64_t new_max_size;
2322 uint64_t newsize;
2323 utime_t mtime;
2324
2325 public:
2326 C_MDL_CheckMaxSize(Locker *l, CInode *i, uint64_t _new_max_size,
2327 uint64_t _newsize, utime_t _mtime) :
2328 LockerContext(l), in(i),
2329 new_max_size(_new_max_size), newsize(_newsize), mtime(_mtime)
2330 {
2331 in->get(CInode::PIN_PTRWAITER);
2332 }
2333 void finish(int r) override {
2334 if (in->is_auth())
2335 locker->check_inode_max_size(in, false, new_max_size, newsize, mtime);
2336 in->put(CInode::PIN_PTRWAITER);
2337 }
2338 };
2339
2340 uint64_t Locker::calc_new_max_size(CInode::mempool_inode *pi, uint64_t size)
2341 {
2342 uint64_t new_max = (size + 1) << 1;
2343 uint64_t max_inc = g_conf->mds_client_writeable_range_max_inc_objs;
2344 if (max_inc > 0) {
2345 max_inc *= pi->layout.object_size;
2346 new_max = MIN(new_max, size + max_inc);
2347 }
2348 return ROUND_UP_TO(new_max, pi->get_layout_size_increment());
2349 }
2350
2351 void Locker::calc_new_client_ranges(CInode *in, uint64_t size, bool update,
2352 CInode::mempool_inode::client_range_map *new_ranges,
2353 bool *max_increased)
2354 {
2355 auto latest = in->get_projected_inode();
2356 uint64_t ms;
2357 if (latest->has_layout()) {
2358 ms = calc_new_max_size(latest, size);
2359 } else {
2360 // Layout-less directories like ~mds0/, have zero size
2361 ms = 0;
2362 }
2363
2364 // increase ranges as appropriate.
2365 // shrink to 0 if no WR|BUFFER caps issued.
2366 for (map<client_t,Capability*>::iterator p = in->client_caps.begin();
2367 p != in->client_caps.end();
2368 ++p) {
2369 if ((p->second->issued() | p->second->wanted()) & (CEPH_CAP_ANY_FILE_WR)) {
2370 client_writeable_range_t& nr = (*new_ranges)[p->first];
2371 nr.range.first = 0;
2372 if (latest->client_ranges.count(p->first)) {
2373 client_writeable_range_t& oldr = latest->client_ranges[p->first];
2374 if (ms > oldr.range.last)
2375 *max_increased = true;
2376 nr.range.last = MAX(ms, oldr.range.last);
2377 nr.follows = oldr.follows;
2378 } else {
2379 *max_increased = true;
2380 nr.range.last = ms;
2381 nr.follows = in->first - 1;
2382 }
2383 if (update)
2384 p->second->mark_clientwriteable();
2385 } else {
2386 if (update)
2387 p->second->clear_clientwriteable();
2388 }
2389 }
2390 }
2391
2392 bool Locker::check_inode_max_size(CInode *in, bool force_wrlock,
2393 uint64_t new_max_size, uint64_t new_size,
2394 utime_t new_mtime)
2395 {
2396 assert(in->is_auth());
2397 assert(in->is_file());
2398
2399 CInode::mempool_inode *latest = in->get_projected_inode();
2400 CInode::mempool_inode::client_range_map new_ranges;
2401 uint64_t size = latest->size;
2402 bool update_size = new_size > 0;
2403 bool update_max = false;
2404 bool max_increased = false;
2405
2406 if (update_size) {
2407 new_size = size = MAX(size, new_size);
2408 new_mtime = MAX(new_mtime, latest->mtime);
2409 if (latest->size == new_size && latest->mtime == new_mtime)
2410 update_size = false;
2411 }
2412
2413 int can_update = 1;
2414 if (in->is_frozen()) {
2415 can_update = -1;
2416 } else if (!force_wrlock && !in->filelock.can_wrlock(in->get_loner())) {
2417 // lock?
2418 if (in->filelock.is_stable()) {
2419 if (in->get_target_loner() >= 0)
2420 file_excl(&in->filelock);
2421 else
2422 simple_lock(&in->filelock);
2423 }
2424 if (!in->filelock.can_wrlock(in->get_loner()))
2425 can_update = -2;
2426 }
2427
2428 calc_new_client_ranges(in, std::max(new_max_size, size), can_update > 0,
2429 &new_ranges, &max_increased);
2430
2431 if (max_increased || latest->client_ranges != new_ranges)
2432 update_max = true;
2433
2434 if (!update_size && !update_max) {
2435 dout(20) << "check_inode_max_size no-op on " << *in << dendl;
2436 return false;
2437 }
2438
2439 dout(10) << "check_inode_max_size new_ranges " << new_ranges
2440 << " update_size " << update_size
2441 << " on " << *in << dendl;
2442
2443 if (can_update < 0) {
2444 auto cms = new C_MDL_CheckMaxSize(this, in, new_max_size, new_size, new_mtime);
2445 if (can_update == -1) {
2446 dout(10) << "check_inode_max_size frozen, waiting on " << *in << dendl;
2447 in->add_waiter(CInode::WAIT_UNFREEZE, cms);
2448 } else {
2449 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
2450 dout(10) << "check_inode_max_size can't wrlock, waiting on " << *in << dendl;
2451 }
2452 return false;
2453 }
2454
2455 MutationRef mut(new MutationImpl());
2456 mut->ls = mds->mdlog->get_current_segment();
2457
2458 auto &pi = in->project_inode();
2459 pi.inode.version = in->pre_dirty();
2460
2461 if (update_max) {
2462 dout(10) << "check_inode_max_size client_ranges " << pi.inode.client_ranges << " -> " << new_ranges << dendl;
2463 pi.inode.client_ranges = new_ranges;
2464 }
2465
2466 if (update_size) {
2467 dout(10) << "check_inode_max_size size " << pi.inode.size << " -> " << new_size << dendl;
2468 pi.inode.size = new_size;
2469 pi.inode.rstat.rbytes = new_size;
2470 dout(10) << "check_inode_max_size mtime " << pi.inode.mtime << " -> " << new_mtime << dendl;
2471 pi.inode.mtime = new_mtime;
2472 if (new_mtime > pi.inode.ctime) {
2473 pi.inode.ctime = new_mtime;
2474 if (new_mtime > pi.inode.rstat.rctime)
2475 pi.inode.rstat.rctime = new_mtime;
2476 }
2477 }
2478
2479 // use EOpen if the file is still open; otherwise, use EUpdate.
2480 // this is just an optimization to push open files forward into
2481 // newer log segments.
2482 LogEvent *le;
2483 EMetaBlob *metablob;
2484 if (in->is_any_caps_wanted() && in->last == CEPH_NOSNAP) {
2485 EOpen *eo = new EOpen(mds->mdlog);
2486 eo->add_ino(in->ino());
2487 metablob = &eo->metablob;
2488 le = eo;
2489 mut->ls->open_files.push_back(&in->item_open_file);
2490 } else {
2491 EUpdate *eu = new EUpdate(mds->mdlog, "check_inode_max_size");
2492 metablob = &eu->metablob;
2493 le = eu;
2494 }
2495 mds->mdlog->start_entry(le);
2496 if (update_size) { // FIXME if/when we do max_size nested accounting
2497 mdcache->predirty_journal_parents(mut, metablob, in, 0, PREDIRTY_PRIMARY);
2498 // no cow, here!
2499 CDentry *parent = in->get_projected_parent_dn();
2500 metablob->add_primary_dentry(parent, in, true);
2501 } else {
2502 metablob->add_dir_context(in->get_projected_parent_dn()->get_dir());
2503 mdcache->journal_dirty_inode(mut.get(), metablob, in);
2504 }
2505 mds->mdlog->submit_entry(le,
2506 new C_Locker_FileUpdate_finish(this, in, mut, true));
2507 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
2508 mut->auth_pin(in);
2509
2510 // make max_size _increase_ timely
2511 if (max_increased)
2512 mds->mdlog->flush();
2513
2514 return true;
2515 }
2516
2517
2518 void Locker::share_inode_max_size(CInode *in, Capability *only_cap)
2519 {
2520 /*
2521 * only share if currently issued a WR cap. if client doesn't have it,
2522 * file_max doesn't matter, and the client will get it if/when they get
2523 * the cap later.
2524 */
2525 dout(10) << "share_inode_max_size on " << *in << dendl;
2526 map<client_t, Capability*>::iterator it;
2527 if (only_cap)
2528 it = in->client_caps.find(only_cap->get_client());
2529 else
2530 it = in->client_caps.begin();
2531 for (; it != in->client_caps.end(); ++it) {
2532 const client_t client = it->first;
2533 Capability *cap = it->second;
2534 if (cap->is_suppress())
2535 continue;
2536 if (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER)) {
2537 dout(10) << "share_inode_max_size with client." << client << dendl;
2538 cap->inc_last_seq();
2539 MClientCaps *m = new MClientCaps(CEPH_CAP_OP_GRANT,
2540 in->ino(),
2541 in->find_snaprealm()->inode->ino(),
2542 cap->get_cap_id(), cap->get_last_seq(),
2543 cap->pending(), cap->wanted(), 0,
2544 cap->get_mseq(),
2545 mds->get_osd_epoch_barrier());
2546 in->encode_cap_message(m, cap);
2547 mds->send_message_client_counted(m, client);
2548 }
2549 if (only_cap)
2550 break;
2551 }
2552 }
2553
2554 bool Locker::_need_flush_mdlog(CInode *in, int wanted)
2555 {
2556 /* flush log if caps are wanted by client but corresponding lock is unstable and locked by
2557 * pending mutations. */
2558 if (((wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR|CEPH_CAP_FILE_SHARED|CEPH_CAP_FILE_EXCL)) &&
2559 in->filelock.is_unstable_and_locked()) ||
2560 ((wanted & (CEPH_CAP_AUTH_SHARED|CEPH_CAP_AUTH_EXCL)) &&
2561 in->authlock.is_unstable_and_locked()) ||
2562 ((wanted & (CEPH_CAP_LINK_SHARED|CEPH_CAP_LINK_EXCL)) &&
2563 in->linklock.is_unstable_and_locked()) ||
2564 ((wanted & (CEPH_CAP_XATTR_SHARED|CEPH_CAP_XATTR_EXCL)) &&
2565 in->xattrlock.is_unstable_and_locked()))
2566 return true;
2567 return false;
2568 }
2569
2570 void Locker::adjust_cap_wanted(Capability *cap, int wanted, int issue_seq)
2571 {
2572 if (ceph_seq_cmp(issue_seq, cap->get_last_issue()) == 0) {
2573 dout(10) << " wanted " << ccap_string(cap->wanted())
2574 << " -> " << ccap_string(wanted) << dendl;
2575 cap->set_wanted(wanted);
2576 } else if (wanted & ~cap->wanted()) {
2577 dout(10) << " wanted " << ccap_string(cap->wanted())
2578 << " -> " << ccap_string(wanted)
2579 << " (added caps even though we had seq mismatch!)" << dendl;
2580 cap->set_wanted(wanted | cap->wanted());
2581 } else {
2582 dout(10) << " NOT changing wanted " << ccap_string(cap->wanted())
2583 << " -> " << ccap_string(wanted)
2584 << " (issue_seq " << issue_seq << " != last_issue "
2585 << cap->get_last_issue() << ")" << dendl;
2586 return;
2587 }
2588
2589 CInode *cur = cap->get_inode();
2590 if (!cur->is_auth()) {
2591 request_inode_file_caps(cur);
2592 return;
2593 }
2594
2595 if (cap->wanted() == 0) {
2596 if (cur->item_open_file.is_on_list() &&
2597 !cur->is_any_caps_wanted()) {
2598 dout(10) << " removing unwanted file from open file list " << *cur << dendl;
2599 cur->item_open_file.remove_myself();
2600 }
2601 } else {
2602 if (cur->state_test(CInode::STATE_RECOVERING) &&
2603 (cap->wanted() & (CEPH_CAP_FILE_RD |
2604 CEPH_CAP_FILE_WR))) {
2605 mds->mdcache->recovery_queue.prioritize(cur);
2606 }
2607
2608 if (!cur->item_open_file.is_on_list()) {
2609 dout(10) << " adding to open file list " << *cur << dendl;
2610 assert(cur->last == CEPH_NOSNAP);
2611 LogSegment *ls = mds->mdlog->get_current_segment();
2612 EOpen *le = new EOpen(mds->mdlog);
2613 mds->mdlog->start_entry(le);
2614 le->add_clean_inode(cur);
2615 ls->open_files.push_back(&cur->item_open_file);
2616 mds->mdlog->submit_entry(le);
2617 }
2618 }
2619 }
2620
2621
2622
2623 void Locker::_do_null_snapflush(CInode *head_in, client_t client, snapid_t last)
2624 {
2625 dout(10) << "_do_null_snapflush client." << client << " on " << *head_in << dendl;
2626 for (auto p = head_in->client_need_snapflush.begin();
2627 p != head_in->client_need_snapflush.end() && p->first < last; ) {
2628 snapid_t snapid = p->first;
2629 auto &clients = p->second;
2630 ++p; // be careful, q loop below depends on this
2631
2632 if (clients.count(client)) {
2633 dout(10) << " doing async NULL snapflush on " << snapid << " from client." << client << dendl;
2634 CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
2635 assert(sin);
2636 assert(sin->first <= snapid);
2637 _do_snap_update(sin, snapid, 0, sin->first - 1, client, NULL, NULL);
2638 head_in->remove_need_snapflush(sin, snapid, client);
2639 }
2640 }
2641 }
2642
2643
2644 bool Locker::should_defer_client_cap_frozen(CInode *in)
2645 {
2646 /*
2647 * This policy needs to be AT LEAST as permissive as allowing a client request
2648 * to go forward, or else a client request can release something, the release
2649 * gets deferred, but the request gets processed and deadlocks because when the
2650 * caps can't get revoked.
2651 *
2652 * Currently, a request wait if anything locked is freezing (can't
2653 * auth_pin), which would avoid any deadlock with cap release. Thus @in
2654 * _MUST_ be in the lock/auth_pin set.
2655 *
2656 * auth_pins==0 implies no unstable lock and not auth pinnned by
2657 * client request, otherwise continue even it's freezing.
2658 */
2659 return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
2660 }
2661
2662 /*
2663 * This function DOES put the passed message before returning
2664 */
2665 void Locker::handle_client_caps(MClientCaps *m)
2666 {
2667 client_t client = m->get_source().num();
2668 snapid_t follows = m->get_snap_follows();
2669 dout(7) << "handle_client_caps "
2670 << ((m->flags & CLIENT_CAPS_SYNC) ? "sync" : "async")
2671 << " on " << m->get_ino()
2672 << " tid " << m->get_client_tid() << " follows " << follows
2673 << " op " << ceph_cap_op_name(m->get_op()) << dendl;
2674
2675 Session *session = mds->get_session(m);
2676 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
2677 if (!session) {
2678 dout(5) << " no session, dropping " << *m << dendl;
2679 m->put();
2680 return;
2681 }
2682 if (session->is_closed() ||
2683 session->is_closing() ||
2684 session->is_killing()) {
2685 dout(7) << " session closed|closing|killing, dropping " << *m << dendl;
2686 m->put();
2687 return;
2688 }
2689 if (mds->is_reconnect() &&
2690 m->get_dirty() && m->get_client_tid() > 0 &&
2691 !session->have_completed_flush(m->get_client_tid())) {
2692 mdcache->set_reconnected_dirty_caps(client, m->get_ino(), m->get_dirty());
2693 }
2694 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
2695 return;
2696 }
2697
2698 if (m->get_client_tid() > 0 && session &&
2699 session->have_completed_flush(m->get_client_tid())) {
2700 dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
2701 << " for client." << client << dendl;
2702 MClientCaps *ack;
2703 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2704 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0,
2705 m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2706 } else {
2707 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(),
2708 m->get_seq(), m->get_caps(), 0, m->get_dirty(), 0,
2709 mds->get_osd_epoch_barrier());
2710 }
2711 ack->set_snap_follows(follows);
2712 ack->set_client_tid(m->get_client_tid());
2713 mds->send_message_client_counted(ack, m->get_connection());
2714 if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
2715 m->put();
2716 return;
2717 } else {
2718 // fall-thru because the message may release some caps
2719 m->clear_dirty();
2720 m->set_op(CEPH_CAP_OP_UPDATE);
2721 }
2722 }
2723
2724 // "oldest flush tid" > 0 means client uses unique TID for each flush
2725 if (m->get_oldest_flush_tid() > 0 && session) {
2726 if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
2727 mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
2728
2729 if (session->get_num_trim_flushes_warnings() > 0 &&
2730 session->get_num_completed_flushes() * 2 < g_conf->mds_max_completed_flushes)
2731 session->reset_num_trim_flushes_warnings();
2732 } else {
2733 if (session->get_num_completed_flushes() >=
2734 (g_conf->mds_max_completed_flushes << session->get_num_trim_flushes_warnings())) {
2735 session->inc_num_trim_flushes_warnings();
2736 stringstream ss;
2737 ss << "client." << session->get_client() << " does not advance its oldest_flush_tid ("
2738 << m->get_oldest_flush_tid() << "), "
2739 << session->get_num_completed_flushes()
2740 << " completed flushes recorded in session";
2741 mds->clog->warn() << ss.str();
2742 dout(20) << __func__ << " " << ss.str() << dendl;
2743 }
2744 }
2745 }
2746
2747 CInode *head_in = mdcache->get_inode(m->get_ino());
2748 if (!head_in) {
2749 if (mds->is_clientreplay()) {
2750 dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
2751 << ", will try again after replayed client requests" << dendl;
2752 mdcache->wait_replay_cap_reconnect(m->get_ino(), new C_MDS_RetryMessage(mds, m));
2753 return;
2754 }
2755
2756 /*
2757 * "handle_client_caps on unknown ino xxx” is normal after migrating a subtree
2758 * Sequence of events that cause this are:
2759 * - client sends caps message to mds.a
2760 * - mds finishes subtree migration, send cap export to client
2761 * - mds trim its cache
2762 * - mds receives cap messages from client
2763 */
2764 dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
2765 m->put();
2766 return;
2767 }
2768
2769 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
2770 // Pause RADOS operations until we see the required epoch
2771 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
2772 }
2773
2774 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
2775 // Record the barrier so that we will retransmit it to clients
2776 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
2777 }
2778
2779 dout(10) << " head inode " << *head_in << dendl;
2780
2781 Capability *cap = 0;
2782 cap = head_in->get_client_cap(client);
2783 if (!cap) {
2784 dout(7) << "handle_client_caps no cap for client." << client << " on " << *head_in << dendl;
2785 m->put();
2786 return;
2787 }
2788 assert(cap);
2789
2790 // freezing|frozen?
2791 if (should_defer_client_cap_frozen(head_in)) {
2792 dout(7) << "handle_client_caps freezing|frozen on " << *head_in << dendl;
2793 head_in->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryMessage(mds, m));
2794 return;
2795 }
2796 if (ceph_seq_cmp(m->get_mseq(), cap->get_mseq()) < 0) {
2797 dout(7) << "handle_client_caps mseq " << m->get_mseq() << " < " << cap->get_mseq()
2798 << ", dropping" << dendl;
2799 m->put();
2800 return;
2801 }
2802
2803 int op = m->get_op();
2804
2805 // flushsnap?
2806 if (op == CEPH_CAP_OP_FLUSHSNAP) {
2807 if (!head_in->is_auth()) {
2808 dout(7) << " not auth, ignoring flushsnap on " << *head_in << dendl;
2809 goto out;
2810 }
2811
2812 SnapRealm *realm = head_in->find_snaprealm();
2813 snapid_t snap = realm->get_snap_following(follows);
2814 dout(10) << " flushsnap follows " << follows << " -> snap " << snap << dendl;
2815
2816 CInode *in = head_in;
2817 if (snap != CEPH_NOSNAP) {
2818 in = mdcache->pick_inode_snap(head_in, snap - 1);
2819 if (in != head_in)
2820 dout(10) << " snapped inode " << *in << dendl;
2821 }
2822
2823 // we can prepare the ack now, since this FLUSHEDSNAP is independent of any
2824 // other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
2825 // case we get a dup response, so whatever.)
2826 MClientCaps *ack = 0;
2827 if (m->get_dirty()) {
2828 ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2829 ack->set_snap_follows(follows);
2830 ack->set_client_tid(m->get_client_tid());
2831 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2832 }
2833
2834 if (in == head_in ||
2835 (head_in->client_need_snapflush.count(snap) &&
2836 head_in->client_need_snapflush[snap].count(client))) {
2837 dout(7) << " flushsnap snap " << snap
2838 << " client." << client << " on " << *in << dendl;
2839
2840 // this cap now follows a later snap (i.e. the one initiating this flush, or later)
2841 if (in == head_in)
2842 cap->client_follows = snap < CEPH_NOSNAP ? snap : realm->get_newest_seq();
2843 else if (head_in->client_need_snapflush.begin()->first < snap)
2844 _do_null_snapflush(head_in, client, snap);
2845
2846 _do_snap_update(in, snap, m->get_dirty(), follows, client, m, ack);
2847
2848 if (in != head_in)
2849 head_in->remove_need_snapflush(in, snap, client);
2850 } else {
2851 dout(7) << " not expecting flushsnap " << snap << " from client." << client << " on " << *in << dendl;
2852 if (ack)
2853 mds->send_message_client_counted(ack, m->get_connection());
2854 }
2855 goto out;
2856 }
2857
2858 if (cap->get_cap_id() != m->get_cap_id()) {
2859 dout(7) << " ignoring client capid " << m->get_cap_id() << " != my " << cap->get_cap_id() << dendl;
2860 } else {
2861 CInode *in = head_in;
2862 if (follows > 0) {
2863 in = mdcache->pick_inode_snap(head_in, follows);
2864 // intermediate snap inodes
2865 while (in != head_in) {
2866 assert(in->last != CEPH_NOSNAP);
2867 if (in->is_auth() && m->get_dirty()) {
2868 dout(10) << " updating intermediate snapped inode " << *in << dendl;
2869 _do_cap_update(in, NULL, m->get_dirty(), follows, m);
2870 }
2871 in = mdcache->pick_inode_snap(head_in, in->last);
2872 }
2873 }
2874
2875 // head inode, and cap
2876 MClientCaps *ack = 0;
2877
2878 int caps = m->get_caps();
2879 if (caps & ~cap->issued()) {
2880 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
2881 caps &= cap->issued();
2882 }
2883
2884 cap->confirm_receipt(m->get_seq(), caps);
2885 dout(10) << " follows " << follows
2886 << " retains " << ccap_string(m->get_caps())
2887 << " dirty " << ccap_string(m->get_dirty())
2888 << " on " << *in << dendl;
2889
2890
2891 // missing/skipped snapflush?
2892 // The client MAY send a snapflush if it is issued WR/EXCL caps, but
2893 // presently only does so when it has actual dirty metadata. But, we
2894 // set up the need_snapflush stuff based on the issued caps.
2895 // We can infer that the client WONT send a FLUSHSNAP once they have
2896 // released all WR/EXCL caps (the FLUSHSNAP always comes before the cap
2897 // update/release).
2898 if (!head_in->client_need_snapflush.empty()) {
2899 if ((cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
2900 _do_null_snapflush(head_in, client);
2901 } else {
2902 dout(10) << " revocation in progress, not making any conclusions about null snapflushes" << dendl;
2903 }
2904 }
2905
2906 if (m->get_dirty() && in->is_auth()) {
2907 dout(7) << " flush client." << client << " dirty " << ccap_string(m->get_dirty())
2908 << " seq " << m->get_seq() << " on " << *in << dendl;
2909 ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
2910 m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
2911 ack->set_client_tid(m->get_client_tid());
2912 ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
2913 }
2914
2915 // filter wanted based on what we could ever give out (given auth/replica status)
2916 bool need_flush = m->flags & CLIENT_CAPS_SYNC;
2917 int new_wanted = m->get_wanted();
2918 if (new_wanted != cap->wanted()) {
2919 if (!need_flush && in->is_auth() && (new_wanted & ~cap->pending())) {
2920 // exapnding caps. make sure we aren't waiting for a log flush
2921 need_flush = _need_flush_mdlog(head_in, new_wanted & ~cap->pending());
2922 }
2923
2924 adjust_cap_wanted(cap, new_wanted, m->get_issue_seq());
2925 }
2926
2927 if (in->is_auth() &&
2928 _do_cap_update(in, cap, m->get_dirty(), follows, m, ack, &need_flush)) {
2929 // updated
2930 eval(in, CEPH_CAP_LOCKS);
2931
2932 if (!need_flush && (cap->wanted() & ~cap->pending()))
2933 need_flush = _need_flush_mdlog(in, cap->wanted() & ~cap->pending());
2934 } else {
2935 // no update, ack now.
2936 if (ack)
2937 mds->send_message_client_counted(ack, m->get_connection());
2938
2939 bool did_issue = eval(in, CEPH_CAP_LOCKS);
2940 if (!did_issue && (cap->wanted() & ~cap->pending()))
2941 issue_caps(in, cap);
2942
2943 if (cap->get_last_seq() == 0 &&
2944 (cap->pending() & (CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER))) {
2945 cap->issue_norevoke(cap->issued());
2946 share_inode_max_size(in, cap);
2947 }
2948 }
2949
2950 if (need_flush)
2951 mds->mdlog->flush();
2952 }
2953
2954 out:
2955 m->put();
2956 }
2957
2958
2959 class C_Locker_RetryRequestCapRelease : public LockerContext {
2960 client_t client;
2961 ceph_mds_request_release item;
2962 public:
2963 C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
2964 LockerContext(l), client(c), item(it) { }
2965 void finish(int r) override {
2966 string dname;
2967 MDRequestRef null_ref;
2968 locker->process_request_cap_release(null_ref, client, item, dname);
2969 }
2970 };
2971
2972 void Locker::process_request_cap_release(MDRequestRef& mdr, client_t client, const ceph_mds_request_release& item,
2973 boost::string_view dname)
2974 {
2975 inodeno_t ino = (uint64_t)item.ino;
2976 uint64_t cap_id = item.cap_id;
2977 int caps = item.caps;
2978 int wanted = item.wanted;
2979 int seq = item.seq;
2980 int issue_seq = item.issue_seq;
2981 int mseq = item.mseq;
2982
2983 CInode *in = mdcache->get_inode(ino);
2984 if (!in)
2985 return;
2986
2987 if (dname.length()) {
2988 frag_t fg = in->pick_dirfrag(dname);
2989 CDir *dir = in->get_dirfrag(fg);
2990 if (dir) {
2991 CDentry *dn = dir->lookup(dname);
2992 if (dn) {
2993 ClientLease *l = dn->get_client_lease(client);
2994 if (l) {
2995 dout(10) << "process_cap_release removing lease on " << *dn << dendl;
2996 dn->remove_client_lease(l, this);
2997 } else {
2998 dout(7) << "process_cap_release client." << client
2999 << " doesn't have lease on " << *dn << dendl;
3000 }
3001 } else {
3002 dout(7) << "process_cap_release client." << client << " released lease on dn "
3003 << dir->dirfrag() << "/" << dname << " which dne" << dendl;
3004 }
3005 }
3006 }
3007
3008 Capability *cap = in->get_client_cap(client);
3009 if (!cap)
3010 return;
3011
3012 dout(10) << "process_cap_release client." << client << " " << ccap_string(caps) << " on " << *in
3013 << (mdr ? "" : " (DEFERRED, no mdr)")
3014 << dendl;
3015
3016 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3017 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
3018 return;
3019 }
3020
3021 if (cap->get_cap_id() != cap_id) {
3022 dout(7) << " cap_id " << cap_id << " != " << cap->get_cap_id() << ", dropping" << dendl;
3023 return;
3024 }
3025
3026 if (should_defer_client_cap_frozen(in)) {
3027 dout(7) << " frozen, deferring" << dendl;
3028 in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_RetryRequestCapRelease(this, client, item));
3029 return;
3030 }
3031
3032 if (caps & ~cap->issued()) {
3033 dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
3034 caps &= cap->issued();
3035 }
3036 cap->confirm_receipt(seq, caps);
3037
3038 if (!in->client_need_snapflush.empty() &&
3039 (cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
3040 _do_null_snapflush(in, client);
3041 }
3042
3043 adjust_cap_wanted(cap, wanted, issue_seq);
3044
3045 if (mdr)
3046 cap->inc_suppress();
3047 eval(in, CEPH_CAP_LOCKS);
3048 if (mdr)
3049 cap->dec_suppress();
3050
3051 // take note; we may need to reissue on this cap later
3052 if (mdr)
3053 mdr->cap_releases[in->vino()] = cap->get_last_seq();
3054 }
3055
3056 class C_Locker_RetryKickIssueCaps : public LockerContext {
3057 CInode *in;
3058 client_t client;
3059 ceph_seq_t seq;
3060 public:
3061 C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
3062 LockerContext(l), in(i), client(c), seq(s) {
3063 in->get(CInode::PIN_PTRWAITER);
3064 }
3065 void finish(int r) override {
3066 locker->kick_issue_caps(in, client, seq);
3067 in->put(CInode::PIN_PTRWAITER);
3068 }
3069 };
3070
3071 void Locker::kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq)
3072 {
3073 Capability *cap = in->get_client_cap(client);
3074 if (!cap || cap->get_last_seq() != seq)
3075 return;
3076 if (in->is_frozen()) {
3077 dout(10) << "kick_issue_caps waiting for unfreeze on " << *in << dendl;
3078 in->add_waiter(CInode::WAIT_UNFREEZE,
3079 new C_Locker_RetryKickIssueCaps(this, in, client, seq));
3080 return;
3081 }
3082 dout(10) << "kick_issue_caps released at current seq " << seq
3083 << ", reissuing" << dendl;
3084 issue_caps(in, cap);
3085 }
3086
3087 void Locker::kick_cap_releases(MDRequestRef& mdr)
3088 {
3089 client_t client = mdr->get_client();
3090 for (map<vinodeno_t,ceph_seq_t>::iterator p = mdr->cap_releases.begin();
3091 p != mdr->cap_releases.end();
3092 ++p) {
3093 CInode *in = mdcache->get_inode(p->first);
3094 if (!in)
3095 continue;
3096 kick_issue_caps(in, client, p->second);
3097 }
3098 }
3099
3100 /**
3101 * m and ack might be NULL, so don't dereference them unless dirty != 0
3102 */
3103 void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, MClientCaps *m, MClientCaps *ack)
3104 {
3105 dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
3106 << " follows " << follows << " snap " << snap
3107 << " on " << *in << dendl;
3108
3109 if (snap == CEPH_NOSNAP) {
3110 // hmm, i guess snap was already deleted? just ack!
3111 dout(10) << " wow, the snap following " << follows
3112 << " was already deleted. nothing to record, just ack." << dendl;
3113 if (ack)
3114 mds->send_message_client_counted(ack, m->get_connection());
3115 return;
3116 }
3117
3118 EUpdate *le = new EUpdate(mds->mdlog, "snap flush");
3119 mds->mdlog->start_entry(le);
3120 MutationRef mut = new MutationImpl();
3121 mut->ls = mds->mdlog->get_current_segment();
3122
3123 // normal metadata updates that we can apply to the head as well.
3124
3125 // update xattrs?
3126 CInode::mempool_xattr_map *px = nullptr;
3127 bool xattrs = (dirty & CEPH_CAP_XATTR_EXCL) &&
3128 m->xattrbl.length() &&
3129 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3130
3131 CInode::mempool_old_inode *oi = 0;
3132 if (in->is_multiversion()) {
3133 oi = in->pick_old_inode(snap);
3134 }
3135
3136 CInode::mempool_inode *i;
3137 if (oi) {
3138 dout(10) << " writing into old inode" << dendl;
3139 auto &pi = in->project_inode();
3140 pi.inode.version = in->pre_dirty();
3141 if (snap > oi->first)
3142 in->split_old_inode(snap);
3143 i = &oi->inode;
3144 if (xattrs)
3145 px = &oi->xattrs;
3146 } else {
3147 auto &pi = in->project_inode(xattrs);
3148 pi.inode.version = in->pre_dirty();
3149 i = &pi.inode;
3150 if (xattrs)
3151 px = pi.xattrs.get();
3152 }
3153
3154 _update_cap_fields(in, dirty, m, i);
3155
3156 // xattr
3157 if (xattrs) {
3158 dout(7) << " xattrs v" << i->xattr_version << " -> " << m->head.xattr_version
3159 << " len " << m->xattrbl.length() << dendl;
3160 i->xattr_version = m->head.xattr_version;
3161 bufferlist::iterator p = m->xattrbl.begin();
3162 ::decode(*px, p);
3163 }
3164
3165 {
3166 auto it = i->client_ranges.find(client);
3167 if (it != i->client_ranges.end()) {
3168 if (in->last == snap) {
3169 dout(10) << " removing client_range entirely" << dendl;
3170 i->client_ranges.erase(it);
3171 } else {
3172 dout(10) << " client_range now follows " << snap << dendl;
3173 it->second.follows = snap;
3174 }
3175 }
3176 }
3177
3178 mut->auth_pin(in);
3179 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3180 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3181
3182 // "oldest flush tid" > 0 means client uses unique TID for each flush
3183 if (ack && ack->get_oldest_flush_tid() > 0)
3184 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3185 ack->get_oldest_flush_tid());
3186
3187 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, false, false,
3188 client, ack));
3189 }
3190
3191 void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, CInode::mempool_inode *pi)
3192 {
3193 if (dirty == 0)
3194 return;
3195
3196 /* m must be valid if there are dirty caps */
3197 assert(m);
3198 uint64_t features = m->get_connection()->get_features();
3199
3200 if (m->get_ctime() > pi->ctime) {
3201 dout(7) << " ctime " << pi->ctime << " -> " << m->get_ctime()
3202 << " for " << *in << dendl;
3203 pi->ctime = m->get_ctime();
3204 if (m->get_ctime() > pi->rstat.rctime)
3205 pi->rstat.rctime = m->get_ctime();
3206 }
3207
3208 if ((features & CEPH_FEATURE_FS_CHANGE_ATTR) &&
3209 m->get_change_attr() > pi->change_attr) {
3210 dout(7) << " change_attr " << pi->change_attr << " -> " << m->get_change_attr()
3211 << " for " << *in << dendl;
3212 pi->change_attr = m->get_change_attr();
3213 }
3214
3215 // file
3216 if (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
3217 utime_t atime = m->get_atime();
3218 utime_t mtime = m->get_mtime();
3219 uint64_t size = m->get_size();
3220 version_t inline_version = m->inline_version;
3221
3222 if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) ||
3223 ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) {
3224 dout(7) << " mtime " << pi->mtime << " -> " << mtime
3225 << " for " << *in << dendl;
3226 pi->mtime = mtime;
3227 if (mtime > pi->rstat.rctime)
3228 pi->rstat.rctime = mtime;
3229 }
3230 if (in->inode.is_file() && // ONLY if regular file
3231 size > pi->size) {
3232 dout(7) << " size " << pi->size << " -> " << size
3233 << " for " << *in << dendl;
3234 pi->size = size;
3235 pi->rstat.rbytes = size;
3236 }
3237 if (in->inode.is_file() &&
3238 (dirty & CEPH_CAP_FILE_WR) &&
3239 inline_version > pi->inline_data.version) {
3240 pi->inline_data.version = inline_version;
3241 if (inline_version != CEPH_INLINE_NONE && m->inline_data.length() > 0)
3242 pi->inline_data.get_data() = m->inline_data;
3243 else
3244 pi->inline_data.free_data();
3245 }
3246 if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) {
3247 dout(7) << " atime " << pi->atime << " -> " << atime
3248 << " for " << *in << dendl;
3249 pi->atime = atime;
3250 }
3251 if ((dirty & CEPH_CAP_FILE_EXCL) &&
3252 ceph_seq_cmp(pi->time_warp_seq, m->get_time_warp_seq()) < 0) {
3253 dout(7) << " time_warp_seq " << pi->time_warp_seq << " -> " << m->get_time_warp_seq()
3254 << " for " << *in << dendl;
3255 pi->time_warp_seq = m->get_time_warp_seq();
3256 }
3257 }
3258 // auth
3259 if (dirty & CEPH_CAP_AUTH_EXCL) {
3260 if (m->head.uid != pi->uid) {
3261 dout(7) << " uid " << pi->uid
3262 << " -> " << m->head.uid
3263 << " for " << *in << dendl;
3264 pi->uid = m->head.uid;
3265 }
3266 if (m->head.gid != pi->gid) {
3267 dout(7) << " gid " << pi->gid
3268 << " -> " << m->head.gid
3269 << " for " << *in << dendl;
3270 pi->gid = m->head.gid;
3271 }
3272 if (m->head.mode != pi->mode) {
3273 dout(7) << " mode " << oct << pi->mode
3274 << " -> " << m->head.mode << dec
3275 << " for " << *in << dendl;
3276 pi->mode = m->head.mode;
3277 }
3278 if ((features & CEPH_FEATURE_FS_BTIME) && m->get_btime() != pi->btime) {
3279 dout(7) << " btime " << oct << pi->btime
3280 << " -> " << m->get_btime() << dec
3281 << " for " << *in << dendl;
3282 pi->btime = m->get_btime();
3283 }
3284 }
3285 }
3286
3287 /*
3288 * update inode based on cap flush|flushsnap|wanted.
3289 * adjust max_size, if needed.
3290 * if we update, return true; otherwise, false (no updated needed).
3291 */
3292 bool Locker::_do_cap_update(CInode *in, Capability *cap,
3293 int dirty, snapid_t follows,
3294 MClientCaps *m, MClientCaps *ack,
3295 bool *need_flush)
3296 {
3297 dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
3298 << " issued " << ccap_string(cap ? cap->issued() : 0)
3299 << " wanted " << ccap_string(cap ? cap->wanted() : 0)
3300 << " on " << *in << dendl;
3301 assert(in->is_auth());
3302 client_t client = m->get_source().num();
3303 CInode::mempool_inode *latest = in->get_projected_inode();
3304
3305 // increase or zero max_size?
3306 uint64_t size = m->get_size();
3307 bool change_max = false;
3308 uint64_t old_max = latest->client_ranges.count(client) ? latest->client_ranges[client].range.last : 0;
3309 uint64_t new_max = old_max;
3310
3311 if (in->is_file()) {
3312 bool forced_change_max = false;
3313 dout(20) << "inode is file" << dendl;
3314 if (cap && ((cap->issued() | cap->wanted()) & CEPH_CAP_ANY_FILE_WR)) {
3315 dout(20) << "client has write caps; m->get_max_size="
3316 << m->get_max_size() << "; old_max=" << old_max << dendl;
3317 if (m->get_max_size() > new_max) {
3318 dout(10) << "client requests file_max " << m->get_max_size()
3319 << " > max " << old_max << dendl;
3320 change_max = true;
3321 forced_change_max = true;
3322 new_max = calc_new_max_size(latest, m->get_max_size());
3323 } else {
3324 new_max = calc_new_max_size(latest, size);
3325
3326 if (new_max > old_max)
3327 change_max = true;
3328 else
3329 new_max = old_max;
3330 }
3331 } else {
3332 if (old_max) {
3333 change_max = true;
3334 new_max = 0;
3335 }
3336 }
3337
3338 if (in->last == CEPH_NOSNAP &&
3339 change_max &&
3340 !in->filelock.can_wrlock(client) &&
3341 !in->filelock.can_force_wrlock(client)) {
3342 dout(10) << " i want to change file_max, but lock won't allow it (yet)" << dendl;
3343 if (in->filelock.is_stable()) {
3344 bool need_issue = false;
3345 if (cap)
3346 cap->inc_suppress();
3347 if (in->mds_caps_wanted.empty() &&
3348 (in->get_loner() >= 0 || (in->get_wanted_loner() >= 0 && in->try_set_loner()))) {
3349 if (in->filelock.get_state() != LOCK_EXCL)
3350 file_excl(&in->filelock, &need_issue);
3351 } else
3352 simple_lock(&in->filelock, &need_issue);
3353 if (need_issue)
3354 issue_caps(in);
3355 if (cap)
3356 cap->dec_suppress();
3357 }
3358 if (!in->filelock.can_wrlock(client) &&
3359 !in->filelock.can_force_wrlock(client)) {
3360 C_MDL_CheckMaxSize *cms = new C_MDL_CheckMaxSize(this, in,
3361 forced_change_max ? new_max : 0,
3362 0, utime_t());
3363
3364 in->filelock.add_waiter(SimpleLock::WAIT_STABLE, cms);
3365 change_max = false;
3366 }
3367 }
3368 }
3369
3370 if (m->flockbl.length()) {
3371 int32_t num_locks;
3372 bufferlist::iterator bli = m->flockbl.begin();
3373 ::decode(num_locks, bli);
3374 for ( int i=0; i < num_locks; ++i) {
3375 ceph_filelock decoded_lock;
3376 ::decode(decoded_lock, bli);
3377 in->get_fcntl_lock_state()->held_locks.
3378 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3379 ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3380 }
3381 ::decode(num_locks, bli);
3382 for ( int i=0; i < num_locks; ++i) {
3383 ceph_filelock decoded_lock;
3384 ::decode(decoded_lock, bli);
3385 in->get_flock_lock_state()->held_locks.
3386 insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
3387 ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
3388 }
3389 }
3390
3391 if (!dirty && !change_max)
3392 return false;
3393
3394 Session *session = mds->get_session(m);
3395 if (session->check_access(in, MAY_WRITE,
3396 m->caller_uid, m->caller_gid, NULL, 0, 0) < 0) {
3397 dout(10) << "check_access failed, dropping cap update on " << *in << dendl;
3398 return false;
3399 }
3400
3401 // do the update.
3402 EUpdate *le = new EUpdate(mds->mdlog, "cap update");
3403 mds->mdlog->start_entry(le);
3404
3405 bool xattr = (dirty & CEPH_CAP_XATTR_EXCL) &&
3406 m->xattrbl.length() &&
3407 m->head.xattr_version > in->get_projected_inode()->xattr_version;
3408
3409 auto &pi = in->project_inode(xattr);
3410 pi.inode.version = in->pre_dirty();
3411
3412 MutationRef mut(new MutationImpl());
3413 mut->ls = mds->mdlog->get_current_segment();
3414
3415 _update_cap_fields(in, dirty, m, &pi.inode);
3416
3417 if (change_max) {
3418 dout(7) << " max_size " << old_max << " -> " << new_max
3419 << " for " << *in << dendl;
3420 if (new_max) {
3421 auto &cr = pi.inode.client_ranges[client];
3422 cr.range.first = 0;
3423 cr.range.last = new_max;
3424 cr.follows = in->first - 1;
3425 if (cap)
3426 cap->mark_clientwriteable();
3427 } else {
3428 pi.inode.client_ranges.erase(client);
3429 if (cap)
3430 cap->clear_clientwriteable();
3431 }
3432 }
3433
3434 if (change_max || (dirty & (CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)))
3435 wrlock_force(&in->filelock, mut); // wrlock for duration of journal
3436
3437 // auth
3438 if (dirty & CEPH_CAP_AUTH_EXCL)
3439 wrlock_force(&in->authlock, mut);
3440
3441 // xattrs update?
3442 if (xattr) {
3443 dout(7) << " xattrs v" << pi.inode.xattr_version << " -> " << m->head.xattr_version << dendl;
3444 pi.inode.xattr_version = m->head.xattr_version;
3445 bufferlist::iterator p = m->xattrbl.begin();
3446 ::decode(*pi.xattrs, p);
3447 wrlock_force(&in->xattrlock, mut);
3448 }
3449
3450 mut->auth_pin(in);
3451 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
3452 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
3453
3454 // "oldest flush tid" > 0 means client uses unique TID for each flush
3455 if (ack && ack->get_oldest_flush_tid() > 0)
3456 le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
3457 ack->get_oldest_flush_tid());
3458
3459 mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
3460 change_max, !!cap,
3461 client, ack));
3462 if (need_flush && !*need_flush &&
3463 ((change_max && new_max) || // max INCREASE
3464 _need_flush_mdlog(in, dirty)))
3465 *need_flush = true;
3466
3467 return true;
3468 }
3469
3470 /* This function DOES put the passed message before returning */
3471 void Locker::handle_client_cap_release(MClientCapRelease *m)
3472 {
3473 client_t client = m->get_source().num();
3474 dout(10) << "handle_client_cap_release " << *m << dendl;
3475
3476 if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
3477 mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
3478 return;
3479 }
3480
3481 if (m->osd_epoch_barrier && !mds->objecter->have_map(m->osd_epoch_barrier)) {
3482 // Pause RADOS operations until we see the required epoch
3483 mds->objecter->set_epoch_barrier(m->osd_epoch_barrier);
3484 }
3485
3486 if (mds->get_osd_epoch_barrier() < m->osd_epoch_barrier) {
3487 // Record the barrier so that we will retransmit it to clients
3488 mds->set_osd_epoch_barrier(m->osd_epoch_barrier);
3489 }
3490
3491 Session *session = mds->get_session(m);
3492
3493 for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
3494 _do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
3495 }
3496
3497 if (session) {
3498 session->notify_cap_release(m->caps.size());
3499 }
3500
3501 m->put();
3502 }
3503
3504 class C_Locker_RetryCapRelease : public LockerContext {
3505 client_t client;
3506 inodeno_t ino;
3507 uint64_t cap_id;
3508 ceph_seq_t migrate_seq;
3509 ceph_seq_t issue_seq;
3510 public:
3511 C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
3512 ceph_seq_t mseq, ceph_seq_t seq) :
3513 LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
3514 void finish(int r) override {
3515 locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
3516 }
3517 };
3518
3519 void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id,
3520 ceph_seq_t mseq, ceph_seq_t seq)
3521 {
3522 CInode *in = mdcache->get_inode(ino);
3523 if (!in) {
3524 dout(7) << "_do_cap_release missing ino " << ino << dendl;
3525 return;
3526 }
3527 Capability *cap = in->get_client_cap(client);
3528 if (!cap) {
3529 dout(7) << "_do_cap_release no cap for client" << client << " on "<< *in << dendl;
3530 return;
3531 }
3532
3533 dout(7) << "_do_cap_release for client." << client << " on "<< *in << dendl;
3534 if (cap->get_cap_id() != cap_id) {
3535 dout(7) << " capid " << cap_id << " != " << cap->get_cap_id() << ", ignore" << dendl;
3536 return;
3537 }
3538 if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
3539 dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", ignore" << dendl;
3540 return;
3541 }
3542 if (should_defer_client_cap_frozen(in)) {
3543 dout(7) << " freezing|frozen, deferring" << dendl;
3544 in->add_waiter(CInode::WAIT_UNFREEZE,
3545 new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
3546 return;
3547 }
3548 if (seq != cap->get_last_issue()) {
3549 dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
3550 // clean out any old revoke history
3551 cap->clean_revoke_from(seq);
3552 eval_cap_gather(in);
3553 return;
3554 }
3555 remove_client_cap(in, cap);
3556 }
3557
3558 void Locker::remove_client_cap(CInode *in, Capability *cap)
3559 {
3560 client_t client = cap->get_client();
3561 // clean out any pending snapflush state
3562 if (!in->client_need_snapflush.empty())
3563 _do_null_snapflush(in, client);
3564
3565 bool notable = cap->is_notable();
3566 in->remove_client_cap(client);
3567 if (!notable)
3568 return;
3569
3570 if (in->is_auth()) {
3571 // make sure we clear out the client byte range
3572 if (in->get_projected_inode()->client_ranges.count(client) &&
3573 !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
3574 check_inode_max_size(in);
3575 } else {
3576 request_inode_file_caps(in);
3577 }
3578
3579 try_eval(in, CEPH_CAP_LOCKS);
3580 }
3581
3582
3583 /**
3584 * Return true if any currently revoking caps exceed the
3585 * session_timeout threshold.
3586 */
3587 bool Locker::any_late_revoking_caps(xlist<Capability*> const &revoking,
3588 double timeout) const
3589 {
3590 xlist<Capability*>::const_iterator p = revoking.begin();
3591 if (p.end()) {
3592 // No revoking caps at the moment
3593 return false;
3594 } else {
3595 utime_t now = ceph_clock_now();
3596 utime_t age = now - (*p)->get_last_revoke_stamp();
3597 if (age <= timeout) {
3598 return false;
3599 } else {
3600 return true;
3601 }
3602 }
3603 }
3604
3605 void Locker::get_late_revoking_clients(std::list<client_t> *result,
3606 double timeout) const
3607 {
3608 if (!any_late_revoking_caps(revoking_caps, timeout)) {
3609 // Fast path: no misbehaving clients, execute in O(1)
3610 return;
3611 }
3612
3613 // Slow path: execute in O(N_clients)
3614 for (auto &p : revoking_caps_by_client) {
3615 if (any_late_revoking_caps(p.second, timeout)) {
3616 result->push_back(p.first);
3617 }
3618 }
3619 }
3620
3621 // Hard-code instead of surfacing a config settings because this is
3622 // really a hack that should go away at some point when we have better
3623 // inspection tools for getting at detailed cap state (#7316)
3624 #define MAX_WARN_CAPS 100
3625
3626 void Locker::caps_tick()
3627 {
3628 utime_t now = ceph_clock_now();
3629
3630 dout(20) << __func__ << " " << revoking_caps.size() << " revoking caps" << dendl;
3631
3632 int i = 0;
3633 for (xlist<Capability*>::iterator p = revoking_caps.begin(); !p.end(); ++p) {
3634 Capability *cap = *p;
3635
3636 utime_t age = now - cap->get_last_revoke_stamp();
3637 dout(20) << __func__ << " age = " << age << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3638 if (age <= mds->mdsmap->get_session_timeout()) {
3639 dout(20) << __func__ << " age below timeout " << mds->mdsmap->get_session_timeout() << dendl;
3640 break;
3641 } else {
3642 ++i;
3643 if (i > MAX_WARN_CAPS) {
3644 dout(1) << __func__ << " more than " << MAX_WARN_CAPS << " caps are late"
3645 << "revoking, ignoring subsequent caps" << dendl;
3646 break;
3647 }
3648 }
3649 // exponential backoff of warning intervals
3650 if (age > mds->mdsmap->get_session_timeout() * (1 << cap->get_num_revoke_warnings())) {
3651 cap->inc_num_revoke_warnings();
3652 stringstream ss;
3653 ss << "client." << cap->get_client() << " isn't responding to mclientcaps(revoke), ino "
3654 << cap->get_inode()->ino() << " pending " << ccap_string(cap->pending())
3655 << " issued " << ccap_string(cap->issued()) << ", sent " << age << " seconds ago";
3656 mds->clog->warn() << ss.str();
3657 dout(20) << __func__ << " " << ss.str() << dendl;
3658 } else {
3659 dout(20) << __func__ << " silencing log message (backoff) for " << cap->get_client() << "." << cap->get_inode()->ino() << dendl;
3660 }
3661 }
3662 }
3663
3664
3665 void Locker::handle_client_lease(MClientLease *m)
3666 {
3667 dout(10) << "handle_client_lease " << *m << dendl;
3668
3669 assert(m->get_source().is_client());
3670 client_t client = m->get_source().num();
3671
3672 CInode *in = mdcache->get_inode(m->get_ino(), m->get_last());
3673 if (!in) {
3674 dout(7) << "handle_client_lease don't have ino " << m->get_ino() << "." << m->get_last() << dendl;
3675 m->put();
3676 return;
3677 }
3678 CDentry *dn = 0;
3679
3680 frag_t fg = in->pick_dirfrag(m->dname);
3681 CDir *dir = in->get_dirfrag(fg);
3682 if (dir)
3683 dn = dir->lookup(m->dname);
3684 if (!dn) {
3685 dout(7) << "handle_client_lease don't have dn " << m->get_ino() << " " << m->dname << dendl;
3686 m->put();
3687 return;
3688 }
3689 dout(10) << " on " << *dn << dendl;
3690
3691 // replica and lock
3692 ClientLease *l = dn->get_client_lease(client);
3693 if (!l) {
3694 dout(7) << "handle_client_lease didn't have lease for client." << client << " of " << *dn << dendl;
3695 m->put();
3696 return;
3697 }
3698
3699 switch (m->get_action()) {
3700 case CEPH_MDS_LEASE_REVOKE_ACK:
3701 case CEPH_MDS_LEASE_RELEASE:
3702 if (l->seq != m->get_seq()) {
3703 dout(7) << "handle_client_lease release - seq " << l->seq << " != provided " << m->get_seq() << dendl;
3704 } else {
3705 dout(7) << "handle_client_lease client." << client
3706 << " on " << *dn << dendl;
3707 dn->remove_client_lease(l, this);
3708 }
3709 m->put();
3710 break;
3711
3712 case CEPH_MDS_LEASE_RENEW:
3713 {
3714 dout(7) << "handle_client_lease client." << client << " renew on " << *dn
3715 << (!dn->lock.can_lease(client)?", revoking lease":"") << dendl;
3716 if (dn->lock.can_lease(client)) {
3717 int pool = 1; // fixme.. do something smart!
3718 m->h.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3719 m->h.seq = ++l->seq;
3720 m->clear_payload();
3721
3722 utime_t now = ceph_clock_now();
3723 now += mdcache->client_lease_durations[pool];
3724 mdcache->touch_client_lease(l, pool, now);
3725
3726 mds->send_message_client_counted(m, m->get_connection());
3727 }
3728 }
3729 break;
3730
3731 default:
3732 ceph_abort(); // implement me
3733 break;
3734 }
3735 }
3736
3737
3738 void Locker::issue_client_lease(CDentry *dn, client_t client,
3739 bufferlist &bl, utime_t now, Session *session)
3740 {
3741 CInode *diri = dn->get_dir()->get_inode();
3742 if (!diri->is_stray() && // do not issue dn leases in stray dir!
3743 ((!diri->filelock.can_lease(client) &&
3744 (diri->get_client_cap_pending(client) & (CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL)) == 0)) &&
3745 dn->lock.can_lease(client)) {
3746 int pool = 1; // fixme.. do something smart!
3747 // issue a dentry lease
3748 ClientLease *l = dn->add_client_lease(client, session);
3749 session->touch_lease(l);
3750
3751 now += mdcache->client_lease_durations[pool];
3752 mdcache->touch_client_lease(l, pool, now);
3753
3754 LeaseStat e;
3755 e.mask = 1 | CEPH_LOCK_DN; // old and new bit values
3756 e.seq = ++l->seq;
3757 e.duration_ms = (int)(1000 * mdcache->client_lease_durations[pool]);
3758 ::encode(e, bl);
3759 dout(20) << "issue_client_lease seq " << e.seq << " dur " << e.duration_ms << "ms "
3760 << " on " << *dn << dendl;
3761 } else {
3762 // null lease
3763 LeaseStat e;
3764 e.mask = 0;
3765 e.seq = 0;
3766 e.duration_ms = 0;
3767 ::encode(e, bl);
3768 dout(20) << "issue_client_lease no/null lease on " << *dn << dendl;
3769 }
3770 }
3771
3772
3773 void Locker::revoke_client_leases(SimpleLock *lock)
3774 {
3775 int n = 0;
3776 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3777 for (map<client_t, ClientLease*>::iterator p = dn->client_lease_map.begin();
3778 p != dn->client_lease_map.end();
3779 ++p) {
3780 ClientLease *l = p->second;
3781
3782 n++;
3783 assert(lock->get_type() == CEPH_LOCK_DN);
3784
3785 CDentry *dn = static_cast<CDentry*>(lock->get_parent());
3786 int mask = 1 | CEPH_LOCK_DN; // old and new bits
3787
3788 // i should also revoke the dir ICONTENT lease, if they have it!
3789 CInode *diri = dn->get_dir()->get_inode();
3790 mds->send_message_client_counted(new MClientLease(CEPH_MDS_LEASE_REVOKE, l->seq,
3791 mask,
3792 diri->ino(),
3793 diri->first, CEPH_NOSNAP,
3794 dn->get_name()),
3795 l->client);
3796 }
3797 }
3798
3799
3800
3801 // locks ----------------------------------------------------------------
3802
3803 SimpleLock *Locker::get_lock(int lock_type, MDSCacheObjectInfo &info)
3804 {
3805 switch (lock_type) {
3806 case CEPH_LOCK_DN:
3807 {
3808 // be careful; info.dirfrag may have incorrect frag; recalculate based on dname.
3809 CInode *diri = mdcache->get_inode(info.dirfrag.ino);
3810 frag_t fg;
3811 CDir *dir = 0;
3812 CDentry *dn = 0;
3813 if (diri) {
3814 fg = diri->pick_dirfrag(info.dname);
3815 dir = diri->get_dirfrag(fg);
3816 if (dir)
3817 dn = dir->lookup(info.dname, info.snapid);
3818 }
3819 if (!dn) {
3820 dout(7) << "get_lock don't have dn " << info.dirfrag.ino << " " << info.dname << dendl;
3821 return 0;
3822 }
3823 return &dn->lock;
3824 }
3825
3826 case CEPH_LOCK_IAUTH:
3827 case CEPH_LOCK_ILINK:
3828 case CEPH_LOCK_IDFT:
3829 case CEPH_LOCK_IFILE:
3830 case CEPH_LOCK_INEST:
3831 case CEPH_LOCK_IXATTR:
3832 case CEPH_LOCK_ISNAP:
3833 case CEPH_LOCK_IFLOCK:
3834 case CEPH_LOCK_IPOLICY:
3835 {
3836 CInode *in = mdcache->get_inode(info.ino, info.snapid);
3837 if (!in) {
3838 dout(7) << "get_lock don't have ino " << info.ino << dendl;
3839 return 0;
3840 }
3841 switch (lock_type) {
3842 case CEPH_LOCK_IAUTH: return &in->authlock;
3843 case CEPH_LOCK_ILINK: return &in->linklock;
3844 case CEPH_LOCK_IDFT: return &in->dirfragtreelock;
3845 case CEPH_LOCK_IFILE: return &in->filelock;
3846 case CEPH_LOCK_INEST: return &in->nestlock;
3847 case CEPH_LOCK_IXATTR: return &in->xattrlock;
3848 case CEPH_LOCK_ISNAP: return &in->snaplock;
3849 case CEPH_LOCK_IFLOCK: return &in->flocklock;
3850 case CEPH_LOCK_IPOLICY: return &in->policylock;
3851 }
3852 }
3853
3854 default:
3855 dout(7) << "get_lock don't know lock_type " << lock_type << dendl;
3856 ceph_abort();
3857 break;
3858 }
3859
3860 return 0;
3861 }
3862
3863 /* This function DOES put the passed message before returning */
3864 void Locker::handle_lock(MLock *m)
3865 {
3866 // nobody should be talking to us during recovery.
3867 assert(mds->is_rejoin() || mds->is_clientreplay() || mds->is_active() || mds->is_stopping());
3868
3869 SimpleLock *lock = get_lock(m->get_lock_type(), m->get_object_info());
3870 if (!lock) {
3871 dout(10) << "don't have object " << m->get_object_info() << ", must have trimmed, dropping" << dendl;
3872 m->put();
3873 return;
3874 }
3875
3876 switch (lock->get_type()) {
3877 case CEPH_LOCK_DN:
3878 case CEPH_LOCK_IAUTH:
3879 case CEPH_LOCK_ILINK:
3880 case CEPH_LOCK_ISNAP:
3881 case CEPH_LOCK_IXATTR:
3882 case CEPH_LOCK_IFLOCK:
3883 case CEPH_LOCK_IPOLICY:
3884 handle_simple_lock(lock, m);
3885 break;
3886
3887 case CEPH_LOCK_IDFT:
3888 case CEPH_LOCK_INEST:
3889 //handle_scatter_lock((ScatterLock*)lock, m);
3890 //break;
3891
3892 case CEPH_LOCK_IFILE:
3893 handle_file_lock(static_cast<ScatterLock*>(lock), m);
3894 break;
3895
3896 default:
3897 dout(7) << "handle_lock got otype " << m->get_lock_type() << dendl;
3898 ceph_abort();
3899 break;
3900 }
3901 }
3902
3903
3904
3905
3906
3907 // ==========================================================================
3908 // simple lock
3909
3910 /** This function may take a reference to m if it needs one, but does
3911 * not put references. */
3912 void Locker::handle_reqrdlock(SimpleLock *lock, MLock *m)
3913 {
3914 MDSCacheObject *parent = lock->get_parent();
3915 if (parent->is_auth() &&
3916 lock->get_state() != LOCK_SYNC &&
3917 !parent->is_frozen()) {
3918 dout(7) << "handle_reqrdlock got rdlock request on " << *lock
3919 << " on " << *parent << dendl;
3920 assert(parent->is_auth()); // replica auth pinned if they're doing this!
3921 if (lock->is_stable()) {
3922 simple_sync(lock);
3923 } else {
3924 dout(7) << "handle_reqrdlock delaying request until lock is stable" << dendl;
3925 lock->add_waiter(SimpleLock::WAIT_STABLE | MDSCacheObject::WAIT_UNFREEZE,
3926 new C_MDS_RetryMessage(mds, m->get()));
3927 }
3928 } else {
3929 dout(7) << "handle_reqrdlock dropping rdlock request on " << *lock
3930 << " on " << *parent << dendl;
3931 // replica should retry
3932 }
3933 }
3934
3935 /* This function DOES put the passed message before returning */
3936 void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
3937 {
3938 int from = m->get_asker();
3939
3940 dout(10) << "handle_simple_lock " << *m
3941 << " on " << *lock << " " << *lock->get_parent() << dendl;
3942
3943 if (mds->is_rejoin()) {
3944 if (lock->get_parent()->is_rejoining()) {
3945 dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
3946 << ", dropping " << *m << dendl;
3947 m->put();
3948 return;
3949 }
3950 }
3951
3952 switch (m->get_action()) {
3953 // -- replica --
3954 case LOCK_AC_SYNC:
3955 assert(lock->get_state() == LOCK_LOCK);
3956 lock->decode_locked_state(m->get_data());
3957 lock->set_state(LOCK_SYNC);
3958 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
3959 break;
3960
3961 case LOCK_AC_LOCK:
3962 assert(lock->get_state() == LOCK_SYNC);
3963 lock->set_state(LOCK_SYNC_LOCK);
3964 if (lock->is_leased())
3965 revoke_client_leases(lock);
3966 eval_gather(lock, true);
3967 if (lock->is_unstable_and_locked())
3968 mds->mdlog->flush();
3969 break;
3970
3971
3972 // -- auth --
3973 case LOCK_AC_LOCKACK:
3974 assert(lock->get_state() == LOCK_SYNC_LOCK ||
3975 lock->get_state() == LOCK_SYNC_EXCL);
3976 assert(lock->is_gathering(from));
3977 lock->remove_gather(from);
3978
3979 if (lock->is_gathering()) {
3980 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3981 << ", still gathering " << lock->get_gather_set() << dendl;
3982 } else {
3983 dout(7) << "handle_simple_lock " << *lock << " on " << *lock->get_parent() << " from " << from
3984 << ", last one" << dendl;
3985 eval_gather(lock);
3986 }
3987 break;
3988
3989 case LOCK_AC_REQRDLOCK:
3990 handle_reqrdlock(lock, m);
3991 break;
3992
3993 }
3994
3995 m->put();
3996 }
3997
3998 /* unused, currently.
3999
4000 class C_Locker_SimpleEval : public Context {
4001 Locker *locker;
4002 SimpleLock *lock;
4003 public:
4004 C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
4005 void finish(int r) {
4006 locker->try_simple_eval(lock);
4007 }
4008 };
4009
4010 void Locker::try_simple_eval(SimpleLock *lock)
4011 {
4012 // unstable and ambiguous auth?
4013 if (!lock->is_stable() &&
4014 lock->get_parent()->is_ambiguous_auth()) {
4015 dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << dendl;
4016 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4017 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
4018 return;
4019 }
4020
4021 if (!lock->get_parent()->is_auth()) {
4022 dout(7) << "try_simple_eval not auth for " << *lock->get_parent() << dendl;
4023 return;
4024 }
4025
4026 if (!lock->get_parent()->can_auth_pin()) {
4027 dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
4028 //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
4029 lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
4030 return;
4031 }
4032
4033 if (lock->is_stable())
4034 simple_eval(lock);
4035 }
4036 */
4037
4038
4039 void Locker::simple_eval(SimpleLock *lock, bool *need_issue)
4040 {
4041 dout(10) << "simple_eval " << *lock << " on " << *lock->get_parent() << dendl;
4042
4043 assert(lock->get_parent()->is_auth());
4044 assert(lock->is_stable());
4045
4046 if (lock->get_parent()->is_freezing_or_frozen()) {
4047 // dentry lock in unreadable state can block path traverse
4048 if ((lock->get_type() != CEPH_LOCK_DN ||
4049 lock->get_state() == LOCK_SYNC ||
4050 lock->get_parent()->is_frozen()))
4051 return;
4052 }
4053
4054 if (mdcache->is_readonly()) {
4055 if (lock->get_state() != LOCK_SYNC) {
4056 dout(10) << "simple_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4057 simple_sync(lock, need_issue);
4058 }
4059 return;
4060 }
4061
4062 CInode *in = 0;
4063 int wanted = 0;
4064 if (lock->get_type() != CEPH_LOCK_DN) {
4065 in = static_cast<CInode*>(lock->get_parent());
4066 in->get_caps_wanted(&wanted, NULL, lock->get_cap_shift());
4067 }
4068
4069 // -> excl?
4070 if (lock->get_state() != LOCK_EXCL &&
4071 in && in->get_target_loner() >= 0 &&
4072 (wanted & CEPH_CAP_GEXCL)) {
4073 dout(7) << "simple_eval stable, going to excl " << *lock
4074 << " on " << *lock->get_parent() << dendl;
4075 simple_excl(lock, need_issue);
4076 }
4077
4078 // stable -> sync?
4079 else if (lock->get_state() != LOCK_SYNC &&
4080 !lock->is_wrlocked() &&
4081 ((!(wanted & CEPH_CAP_GEXCL) && !lock->is_waiter_for(SimpleLock::WAIT_WR)) ||
4082 (lock->get_state() == LOCK_EXCL && in && in->get_target_loner() < 0))) {
4083 dout(7) << "simple_eval stable, syncing " << *lock
4084 << " on " << *lock->get_parent() << dendl;
4085 simple_sync(lock, need_issue);
4086 }
4087 }
4088
4089
4090 // mid
4091
4092 bool Locker::simple_sync(SimpleLock *lock, bool *need_issue)
4093 {
4094 dout(7) << "simple_sync on " << *lock << " on " << *lock->get_parent() << dendl;
4095 assert(lock->get_parent()->is_auth());
4096 assert(lock->is_stable());
4097
4098 CInode *in = 0;
4099 if (lock->get_cap_shift())
4100 in = static_cast<CInode *>(lock->get_parent());
4101
4102 int old_state = lock->get_state();
4103
4104 if (old_state != LOCK_TSYN) {
4105
4106 switch (lock->get_state()) {
4107 case LOCK_MIX: lock->set_state(LOCK_MIX_SYNC); break;
4108 case LOCK_LOCK: lock->set_state(LOCK_LOCK_SYNC); break;
4109 case LOCK_XSYN: lock->set_state(LOCK_XSYN_SYNC); break;
4110 case LOCK_EXCL: lock->set_state(LOCK_EXCL_SYNC); break;
4111 default: ceph_abort();
4112 }
4113
4114 int gather = 0;
4115 if (lock->is_wrlocked())
4116 gather++;
4117
4118 if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
4119 send_lock_message(lock, LOCK_AC_SYNC);
4120 lock->init_gather();
4121 gather++;
4122 }
4123
4124 if (in && in->is_head()) {
4125 if (in->issued_caps_need_gather(lock)) {
4126 if (need_issue)
4127 *need_issue = true;
4128 else
4129 issue_caps(in);
4130 gather++;
4131 }
4132 }
4133
4134 bool need_recover = false;
4135 if (lock->get_type() == CEPH_LOCK_IFILE) {
4136 assert(in);
4137 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
4138 mds->mdcache->queue_file_recover(in);
4139 need_recover = true;
4140 gather++;
4141 }
4142 }
4143
4144 if (!gather && lock->is_dirty()) {
4145 lock->get_parent()->auth_pin(lock);
4146 scatter_writebehind(static_cast<ScatterLock*>(lock));
4147 mds->mdlog->flush();
4148 return false;
4149 }
4150
4151 if (gather) {
4152 lock->get_parent()->auth_pin(lock);
4153 if (need_recover)
4154 mds->mdcache->do_file_recover();
4155 return false;
4156 }
4157 }
4158
4159 if (lock->get_parent()->is_replicated()) { // FIXME
4160 bufferlist data;
4161 lock->encode_locked_state(data);
4162 send_lock_message(lock, LOCK_AC_SYNC, data);
4163 }
4164 lock->set_state(LOCK_SYNC);
4165 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
4166 if (in && in->is_head()) {
4167 if (need_issue)
4168 *need_issue = true;
4169 else
4170 issue_caps(in);
4171 }
4172 return true;
4173 }
4174
4175 void Locker::simple_excl(SimpleLock *lock, bool *need_issue)
4176 {
4177 dout(7) << "simple_excl on " << *lock << " on " << *lock->get_parent() << dendl;
4178 assert(lock->get_parent()->is_auth());
4179 assert(lock->is_stable());
4180
4181 CInode *in = 0;
4182 if (lock->get_cap_shift())
4183 in = static_cast<CInode *>(lock->get_parent());
4184
4185 switch (lock->get_state()) {
4186 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
4187 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
4188 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
4189 default: ceph_abort();
4190 }
4191
4192 int gather = 0;
4193 if (lock->is_rdlocked())
4194 gather++;
4195 if (lock->is_wrlocked())
4196 gather++;
4197
4198 if (lock->get_parent()->is_replicated() &&
4199 lock->get_state() != LOCK_LOCK_EXCL &&
4200 lock->get_state() != LOCK_XSYN_EXCL) {
4201 send_lock_message(lock, LOCK_AC_LOCK);
4202 lock->init_gather();
4203 gather++;
4204 }
4205
4206 if (in && in->is_head()) {
4207 if (in->issued_caps_need_gather(lock)) {
4208 if (need_issue)
4209 *need_issue = true;
4210 else
4211 issue_caps(in);
4212 gather++;
4213 }
4214 }
4215
4216 if (gather) {
4217 lock->get_parent()->auth_pin(lock);
4218 } else {
4219 lock->set_state(LOCK_EXCL);
4220 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
4221 if (in) {
4222 if (need_issue)
4223 *need_issue = true;
4224 else
4225 issue_caps(in);
4226 }
4227 }
4228 }
4229
4230 void Locker::simple_lock(SimpleLock *lock, bool *need_issue)
4231 {
4232 dout(7) << "simple_lock on " << *lock << " on " << *lock->get_parent() << dendl;
4233 assert(lock->get_parent()->is_auth());
4234 assert(lock->is_stable());
4235 assert(lock->get_state() != LOCK_LOCK);
4236
4237 CInode *in = 0;
4238 if (lock->get_cap_shift())
4239 in = static_cast<CInode *>(lock->get_parent());
4240
4241 int old_state = lock->get_state();
4242
4243 switch (lock->get_state()) {
4244 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
4245 case LOCK_XSYN: lock->set_state(LOCK_XSYN_LOCK); break;
4246 case LOCK_EXCL: lock->set_state(LOCK_EXCL_LOCK); break;
4247 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK);
4248 (static_cast<ScatterLock *>(lock))->clear_unscatter_wanted();
4249 break;
4250 case LOCK_TSYN: lock->set_state(LOCK_TSYN_LOCK); break;
4251 default: ceph_abort();
4252 }
4253
4254 int gather = 0;
4255 if (lock->is_leased()) {
4256 gather++;
4257 revoke_client_leases(lock);
4258 }
4259 if (lock->is_rdlocked())
4260 gather++;
4261 if (in && in->is_head()) {
4262 if (in->issued_caps_need_gather(lock)) {
4263 if (need_issue)
4264 *need_issue = true;
4265 else
4266 issue_caps(in);
4267 gather++;
4268 }
4269 }
4270
4271 bool need_recover = false;
4272 if (lock->get_type() == CEPH_LOCK_IFILE) {
4273 assert(in);
4274 if(in->state_test(CInode::STATE_NEEDSRECOVER)) {
4275 mds->mdcache->queue_file_recover(in);
4276 need_recover = true;
4277 gather++;
4278 }
4279 }
4280
4281 if (lock->get_parent()->is_replicated() &&
4282 lock->get_state() == LOCK_MIX_LOCK &&
4283 gather) {
4284 dout(10) << " doing local stage of mix->lock gather before gathering from replicas" << dendl;
4285 } else {
4286 // move to second stage of gather now, so we don't send the lock action later.
4287 if (lock->get_state() == LOCK_MIX_LOCK)
4288 lock->set_state(LOCK_MIX_LOCK2);
4289
4290 if (lock->get_parent()->is_replicated() &&
4291 lock->get_sm()->states[old_state].replica_state != LOCK_LOCK) { // replica may already be LOCK
4292 gather++;
4293 send_lock_message(lock, LOCK_AC_LOCK);
4294 lock->init_gather();
4295 }
4296 }
4297
4298 if (!gather && lock->is_dirty()) {
4299 lock->get_parent()->auth_pin(lock);
4300 scatter_writebehind(static_cast<ScatterLock*>(lock));
4301 mds->mdlog->flush();
4302 return;
4303 }
4304
4305 if (gather) {
4306 lock->get_parent()->auth_pin(lock);
4307 if (need_recover)
4308 mds->mdcache->do_file_recover();
4309 } else {
4310 lock->set_state(LOCK_LOCK);
4311 lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
4312 }
4313 }
4314
4315
4316 void Locker::simple_xlock(SimpleLock *lock)
4317 {
4318 dout(7) << "simple_xlock on " << *lock << " on " << *lock->get_parent() << dendl;
4319 assert(lock->get_parent()->is_auth());
4320 //assert(lock->is_stable());
4321 assert(lock->get_state() != LOCK_XLOCK);
4322
4323 CInode *in = 0;
4324 if (lock->get_cap_shift())
4325 in = static_cast<CInode *>(lock->get_parent());
4326
4327 if (lock->is_stable())
4328 lock->get_parent()->auth_pin(lock);
4329
4330 switch (lock->get_state()) {
4331 case LOCK_LOCK:
4332 case LOCK_XLOCKDONE: lock->set_state(LOCK_LOCK_XLOCK); break;
4333 default: ceph_abort();
4334 }
4335
4336 int gather = 0;
4337 if (lock->is_rdlocked())
4338 gather++;
4339 if (lock->is_wrlocked())
4340 gather++;
4341
4342 if (in && in->is_head()) {
4343 if (in->issued_caps_need_gather(lock)) {
4344 issue_caps(in);
4345 gather++;
4346 }
4347 }
4348
4349 if (!gather) {
4350 lock->set_state(LOCK_PREXLOCK);
4351 //assert("shouldn't be called if we are already xlockable" == 0);
4352 }
4353 }
4354
4355
4356
4357
4358
4359 // ==========================================================================
4360 // scatter lock
4361
4362 /*
4363
4364 Some notes on scatterlocks.
4365
4366 - The scatter/gather is driven by the inode lock. The scatter always
4367 brings in the latest metadata from the fragments.
4368
4369 - When in a scattered/MIX state, fragments are only allowed to
4370 update/be written to if the accounted stat matches the inode's
4371 current version.
4372
4373 - That means, on gather, we _only_ assimilate diffs for frag metadata
4374 that match the current version, because those are the only ones
4375 written during this scatter/gather cycle. (Others didn't permit
4376 it.) We increment the version and journal this to disk.
4377
4378 - When possible, we also simultaneously update our local frag
4379 accounted stats to match.
4380
4381 - On scatter, the new inode info is broadcast to frags, both local
4382 and remote. If possible (auth and !frozen), the dirfrag auth
4383 should update the accounted state (if it isn't already up to date).
4384 Note that this may occur on both the local inode auth node and
4385 inode replicas, so there are two potential paths. If it is NOT
4386 possible, they need to mark_stale to prevent any possible writes.
4387
4388 - A scatter can be to MIX (potentially writeable) or to SYNC (read
4389 only). Both are opportunities to update the frag accounted stats,
4390 even though only the MIX case is affected by a stale dirfrag.
4391
4392 - Because many scatter/gather cycles can potentially go by without a
4393 frag being able to update its accounted stats (due to being frozen
4394 by exports/refragments in progress), the frag may have (even very)
4395 old stat versions. That's fine. If when we do want to update it,
4396 we can update accounted_* and the version first.
4397
4398 */
4399
4400 class C_Locker_ScatterWB : public LockerLogContext {
4401 ScatterLock *lock;
4402 MutationRef mut;
4403 public:
4404 C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
4405 LockerLogContext(l), lock(sl), mut(m) {}
4406 void finish(int r) override {
4407 locker->scatter_writebehind_finish(lock, mut);
4408 }
4409 };
4410
4411 void Locker::scatter_writebehind(ScatterLock *lock)
4412 {
4413 CInode *in = static_cast<CInode*>(lock->get_parent());
4414 dout(10) << "scatter_writebehind " << in->inode.mtime << " on " << *lock << " on " << *in << dendl;
4415
4416 // journal
4417 MutationRef mut(new MutationImpl());
4418 mut->ls = mds->mdlog->get_current_segment();
4419
4420 // forcefully take a wrlock
4421 lock->get_wrlock(true);
4422 mut->wrlocks.insert(lock);
4423 mut->locks.insert(lock);
4424
4425 in->pre_cow_old_inode(); // avoid cow mayhem
4426
4427 auto &pi = in->project_inode();
4428 pi.inode.version = in->pre_dirty();
4429
4430 in->finish_scatter_gather_update(lock->get_type());
4431 lock->start_flush();
4432
4433 EUpdate *le = new EUpdate(mds->mdlog, "scatter_writebehind");
4434 mds->mdlog->start_entry(le);
4435
4436 mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
4437 mdcache->journal_dirty_inode(mut.get(), &le->metablob, in);
4438
4439 in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
4440
4441 mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
4442 }
4443
4444 void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
4445 {
4446 CInode *in = static_cast<CInode*>(lock->get_parent());
4447 dout(10) << "scatter_writebehind_finish on " << *lock << " on " << *in << dendl;
4448 in->pop_and_dirty_projected_inode(mut->ls);
4449
4450 lock->finish_flush();
4451
4452 // if replicas may have flushed in a mix->lock state, send another
4453 // message so they can finish_flush().
4454 if (in->is_replicated()) {
4455 switch (lock->get_state()) {
4456 case LOCK_MIX_LOCK:
4457 case LOCK_MIX_LOCK2:
4458 case LOCK_MIX_EXCL:
4459 case LOCK_MIX_TSYN:
4460 send_lock_message(lock, LOCK_AC_LOCKFLUSHED);
4461 }
4462 }
4463
4464 mut->apply();
4465 drop_locks(mut.get());
4466 mut->cleanup();
4467
4468 if (lock->is_stable())
4469 lock->finish_waiters(ScatterLock::WAIT_STABLE);
4470
4471 //scatter_eval_gather(lock);
4472 }
4473
4474 void Locker::scatter_eval(ScatterLock *lock, bool *need_issue)
4475 {
4476 dout(10) << "scatter_eval " << *lock << " on " << *lock->get_parent() << dendl;
4477
4478 assert(lock->get_parent()->is_auth());
4479 assert(lock->is_stable());
4480
4481 if (lock->get_parent()->is_freezing_or_frozen()) {
4482 dout(20) << " freezing|frozen" << dendl;
4483 return;
4484 }
4485
4486 if (mdcache->is_readonly()) {
4487 if (lock->get_state() != LOCK_SYNC) {
4488 dout(10) << "scatter_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4489 simple_sync(lock, need_issue);
4490 }
4491 return;
4492 }
4493
4494 if (!lock->is_rdlocked() &&
4495 lock->get_state() != LOCK_MIX &&
4496 lock->get_scatter_wanted()) {
4497 dout(10) << "scatter_eval scatter_wanted, bump to mix " << *lock
4498 << " on " << *lock->get_parent() << dendl;
4499 scatter_mix(lock, need_issue);
4500 return;
4501 }
4502
4503 if (lock->get_type() == CEPH_LOCK_INEST) {
4504 // in general, we want to keep INEST writable at all times.
4505 if (!lock->is_rdlocked()) {
4506 if (lock->get_parent()->is_replicated()) {
4507 if (lock->get_state() != LOCK_MIX)
4508 scatter_mix(lock, need_issue);
4509 } else {
4510 if (lock->get_state() != LOCK_LOCK)
4511 simple_lock(lock, need_issue);
4512 }
4513 }
4514 return;
4515 }
4516
4517 CInode *in = static_cast<CInode*>(lock->get_parent());
4518 if (!in->has_subtree_or_exporting_dirfrag() || in->is_base()) {
4519 // i _should_ be sync.
4520 if (!lock->is_wrlocked() &&
4521 lock->get_state() != LOCK_SYNC) {
4522 dout(10) << "scatter_eval no wrlocks|xlocks, not subtree root inode, syncing" << dendl;
4523 simple_sync(lock, need_issue);
4524 }
4525 }
4526 }
4527
4528
4529 /*
4530 * mark a scatterlock to indicate that the dir fnode has some dirty data
4531 */
4532 void Locker::mark_updated_scatterlock(ScatterLock *lock)
4533 {
4534 lock->mark_dirty();
4535 if (lock->get_updated_item()->is_on_list()) {
4536 dout(10) << "mark_updated_scatterlock " << *lock
4537 << " - already on list since " << lock->get_update_stamp() << dendl;
4538 } else {
4539 updated_scatterlocks.push_back(lock->get_updated_item());
4540 utime_t now = ceph_clock_now();
4541 lock->set_update_stamp(now);
4542 dout(10) << "mark_updated_scatterlock " << *lock
4543 << " - added at " << now << dendl;
4544 }
4545 }
4546
4547 /*
4548 * this is called by scatter_tick and LogSegment::try_to_trim() when
4549 * trying to flush dirty scattered data (i.e. updated fnode) back to
4550 * the inode.
4551 *
4552 * we need to lock|scatter in order to push fnode changes into the
4553 * inode.dirstat.
4554 */
4555 void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
4556 {
4557 CInode *p = static_cast<CInode *>(lock->get_parent());
4558
4559 if (p->is_frozen() || p->is_freezing()) {
4560 dout(10) << "scatter_nudge waiting for unfreeze on " << *p << dendl;
4561 if (c)
4562 p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, c);
4563 else if (lock->is_dirty())
4564 // just requeue. not ideal.. starvation prone..
4565 updated_scatterlocks.push_back(lock->get_updated_item());
4566 return;
4567 }
4568
4569 if (p->is_ambiguous_auth()) {
4570 dout(10) << "scatter_nudge waiting for single auth on " << *p << dendl;
4571 if (c)
4572 p->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, c);
4573 else if (lock->is_dirty())
4574 // just requeue. not ideal.. starvation prone..
4575 updated_scatterlocks.push_back(lock->get_updated_item());
4576 return;
4577 }
4578
4579 if (p->is_auth()) {
4580 int count = 0;
4581 while (true) {
4582 if (lock->is_stable()) {
4583 // can we do it now?
4584 // (only if we're not replicated.. if we are, we really do need
4585 // to nudge the lock state!)
4586 /*
4587 actually, even if we're not replicated, we can't stay in MIX, because another mds
4588 could discover and replicate us at any time. if that happens while we're flushing,
4589 they end up in MIX but their inode has the old scatterstat version.
4590
4591 if (!forcelockchange && !lock->get_parent()->is_replicated() && lock->can_wrlock(-1)) {
4592 dout(10) << "scatter_nudge auth, propagating " << *lock << " on " << *p << dendl;
4593 scatter_writebehind(lock);
4594 if (c)
4595 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4596 return;
4597 }
4598 */
4599
4600 if (mdcache->is_readonly()) {
4601 if (lock->get_state() != LOCK_SYNC) {
4602 dout(10) << "scatter_nudge auth, read-only FS, syncing " << *lock << " on " << *p << dendl;
4603 simple_sync(static_cast<ScatterLock*>(lock));
4604 }
4605 break;
4606 }
4607
4608 // adjust lock state
4609 dout(10) << "scatter_nudge auth, scatter/unscattering " << *lock << " on " << *p << dendl;
4610 switch (lock->get_type()) {
4611 case CEPH_LOCK_IFILE:
4612 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4613 scatter_mix(static_cast<ScatterLock*>(lock));
4614 else if (lock->get_state() != LOCK_LOCK)
4615 simple_lock(static_cast<ScatterLock*>(lock));
4616 else
4617 simple_sync(static_cast<ScatterLock*>(lock));
4618 break;
4619
4620 case CEPH_LOCK_IDFT:
4621 case CEPH_LOCK_INEST:
4622 if (p->is_replicated() && lock->get_state() != LOCK_MIX)
4623 scatter_mix(lock);
4624 else if (lock->get_state() != LOCK_LOCK)
4625 simple_lock(lock);
4626 else
4627 simple_sync(lock);
4628 break;
4629 default:
4630 ceph_abort();
4631 }
4632 ++count;
4633 if (lock->is_stable() && count == 2) {
4634 dout(10) << "scatter_nudge oh, stable after two cycles." << dendl;
4635 // this should only realy happen when called via
4636 // handle_file_lock due to AC_NUDGE, because the rest of the
4637 // time we are replicated or have dirty data and won't get
4638 // called. bailing here avoids an infinite loop.
4639 assert(!c);
4640 break;
4641 }
4642 } else {
4643 dout(10) << "scatter_nudge auth, waiting for stable " << *lock << " on " << *p << dendl;
4644 if (c)
4645 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4646 return;
4647 }
4648 }
4649 } else {
4650 dout(10) << "scatter_nudge replica, requesting scatter/unscatter of "
4651 << *lock << " on " << *p << dendl;
4652 // request unscatter?
4653 mds_rank_t auth = lock->get_parent()->authority().first;
4654 if (!mds->is_cluster_degraded() ||
4655 mds->mdsmap->is_clientreplay_or_active_or_stopping(auth))
4656 mds->send_message_mds(new MLock(lock, LOCK_AC_NUDGE, mds->get_nodeid()), auth);
4657
4658 // wait...
4659 if (c)
4660 lock->add_waiter(SimpleLock::WAIT_STABLE, c);
4661
4662 // also, requeue, in case we had wrong auth or something
4663 if (lock->is_dirty())
4664 updated_scatterlocks.push_back(lock->get_updated_item());
4665 }
4666 }
4667
4668 void Locker::scatter_tick()
4669 {
4670 dout(10) << "scatter_tick" << dendl;
4671
4672 // updated
4673 utime_t now = ceph_clock_now();
4674 int n = updated_scatterlocks.size();
4675 while (!updated_scatterlocks.empty()) {
4676 ScatterLock *lock = updated_scatterlocks.front();
4677
4678 if (n-- == 0) break; // scatter_nudge() may requeue; avoid looping
4679
4680 if (!lock->is_dirty()) {
4681 updated_scatterlocks.pop_front();
4682 dout(10) << " removing from updated_scatterlocks "
4683 << *lock << " " << *lock->get_parent() << dendl;
4684 continue;
4685 }
4686 if (now - lock->get_update_stamp() < g_conf->mds_scatter_nudge_interval)
4687 break;
4688 updated_scatterlocks.pop_front();
4689 scatter_nudge(lock, 0);
4690 }
4691 mds->mdlog->flush();
4692 }
4693
4694
4695 void Locker::scatter_tempsync(ScatterLock *lock, bool *need_issue)
4696 {
4697 dout(10) << "scatter_tempsync " << *lock
4698 << " on " << *lock->get_parent() << dendl;
4699 assert(lock->get_parent()->is_auth());
4700 assert(lock->is_stable());
4701
4702 assert(0 == "not fully implemented, at least not for filelock");
4703
4704 CInode *in = static_cast<CInode *>(lock->get_parent());
4705
4706 switch (lock->get_state()) {
4707 case LOCK_SYNC: ceph_abort(); // this shouldn't happen
4708 case LOCK_LOCK: lock->set_state(LOCK_LOCK_TSYN); break;
4709 case LOCK_MIX: lock->set_state(LOCK_MIX_TSYN); break;
4710 default: ceph_abort();
4711 }
4712
4713 int gather = 0;
4714 if (lock->is_wrlocked())
4715 gather++;
4716
4717 if (lock->get_cap_shift() &&
4718 in->is_head() &&
4719 in->issued_caps_need_gather(lock)) {
4720 if (need_issue)
4721 *need_issue = true;
4722 else
4723 issue_caps(in);
4724 gather++;
4725 }
4726
4727 if (lock->get_state() == LOCK_MIX_TSYN &&
4728 in->is_replicated()) {
4729 lock->init_gather();
4730 send_lock_message(lock, LOCK_AC_LOCK);
4731 gather++;
4732 }
4733
4734 if (gather) {
4735 in->auth_pin(lock);
4736 } else {
4737 // do tempsync
4738 lock->set_state(LOCK_TSYN);
4739 lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
4740 if (lock->get_cap_shift()) {
4741 if (need_issue)
4742 *need_issue = true;
4743 else
4744 issue_caps(in);
4745 }
4746 }
4747 }
4748
4749
4750
4751 // ==========================================================================
4752 // local lock
4753
4754 void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
4755 {
4756 dout(7) << "local_wrlock_grab on " << *lock
4757 << " on " << *lock->get_parent() << dendl;
4758
4759 assert(lock->get_parent()->is_auth());
4760 assert(lock->can_wrlock());
4761 assert(!mut->wrlocks.count(lock));
4762 lock->get_wrlock(mut->get_client());
4763 mut->wrlocks.insert(lock);
4764 mut->locks.insert(lock);
4765 }
4766
4767 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
4768 {
4769 dout(7) << "local_wrlock_start on " << *lock
4770 << " on " << *lock->get_parent() << dendl;
4771
4772 assert(lock->get_parent()->is_auth());
4773 if (lock->can_wrlock()) {
4774 assert(!mut->wrlocks.count(lock));
4775 lock->get_wrlock(mut->get_client());
4776 mut->wrlocks.insert(lock);
4777 mut->locks.insert(lock);
4778 return true;
4779 } else {
4780 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4781 return false;
4782 }
4783 }
4784
4785 void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
4786 {
4787 dout(7) << "local_wrlock_finish on " << *lock
4788 << " on " << *lock->get_parent() << dendl;
4789 lock->put_wrlock();
4790 mut->wrlocks.erase(lock);
4791 mut->locks.erase(lock);
4792 if (lock->get_num_wrlocks() == 0) {
4793 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4794 SimpleLock::WAIT_WR |
4795 SimpleLock::WAIT_RD);
4796 }
4797 }
4798
4799 bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
4800 {
4801 dout(7) << "local_xlock_start on " << *lock
4802 << " on " << *lock->get_parent() << dendl;
4803
4804 assert(lock->get_parent()->is_auth());
4805 if (!lock->can_xlock_local()) {
4806 lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
4807 return false;
4808 }
4809
4810 lock->get_xlock(mut, mut->get_client());
4811 mut->xlocks.insert(lock);
4812 mut->locks.insert(lock);
4813 return true;
4814 }
4815
4816 void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
4817 {
4818 dout(7) << "local_xlock_finish on " << *lock
4819 << " on " << *lock->get_parent() << dendl;
4820 lock->put_xlock();
4821 mut->xlocks.erase(lock);
4822 mut->locks.erase(lock);
4823
4824 lock->finish_waiters(SimpleLock::WAIT_STABLE |
4825 SimpleLock::WAIT_WR |
4826 SimpleLock::WAIT_RD);
4827 }
4828
4829
4830
4831 // ==========================================================================
4832 // file lock
4833
4834
4835 void Locker::file_eval(ScatterLock *lock, bool *need_issue)
4836 {
4837 CInode *in = static_cast<CInode*>(lock->get_parent());
4838 int loner_wanted, other_wanted;
4839 int wanted = in->get_caps_wanted(&loner_wanted, &other_wanted, CEPH_CAP_SFILE);
4840 dout(7) << "file_eval wanted=" << gcap_string(wanted)
4841 << " loner_wanted=" << gcap_string(loner_wanted)
4842 << " other_wanted=" << gcap_string(other_wanted)
4843 << " filelock=" << *lock << " on " << *lock->get_parent()
4844 << dendl;
4845
4846 assert(lock->get_parent()->is_auth());
4847 assert(lock->is_stable());
4848
4849 if (lock->get_parent()->is_freezing_or_frozen())
4850 return;
4851
4852 if (mdcache->is_readonly()) {
4853 if (lock->get_state() != LOCK_SYNC) {
4854 dout(10) << "file_eval read-only FS, syncing " << *lock << " on " << *lock->get_parent() << dendl;
4855 simple_sync(lock, need_issue);
4856 }
4857 return;
4858 }
4859
4860 // excl -> *?
4861 if (lock->get_state() == LOCK_EXCL) {
4862 dout(20) << " is excl" << dendl;
4863 int loner_issued, other_issued, xlocker_issued;
4864 in->get_caps_issued(&loner_issued, &other_issued, &xlocker_issued, CEPH_CAP_SFILE);
4865 dout(7) << "file_eval loner_issued=" << gcap_string(loner_issued)
4866 << " other_issued=" << gcap_string(other_issued)
4867 << " xlocker_issued=" << gcap_string(xlocker_issued)
4868 << dendl;
4869 if (!((loner_wanted|loner_issued) & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4870 (other_wanted & (CEPH_CAP_GEXCL|CEPH_CAP_GWR|CEPH_CAP_GRD)) ||
4871 (in->inode.is_dir() && in->multiple_nonstale_caps())) { // FIXME.. :/
4872 dout(20) << " should lose it" << dendl;
4873 // we should lose it.
4874 // loner other want
4875 // R R SYNC
4876 // R R|W MIX
4877 // R W MIX
4878 // R|W R MIX
4879 // R|W R|W MIX
4880 // R|W W MIX
4881 // W R MIX
4882 // W R|W MIX
4883 // W W MIX
4884 // -> any writer means MIX; RD doesn't matter.
4885 if (((other_wanted|loner_wanted) & CEPH_CAP_GWR) ||
4886 lock->is_waiter_for(SimpleLock::WAIT_WR))
4887 scatter_mix(lock, need_issue);
4888 else if (!lock->is_wrlocked()) // let excl wrlocks drain first
4889 simple_sync(lock, need_issue);
4890 else
4891 dout(10) << " waiting for wrlock to drain" << dendl;
4892 }
4893 }
4894
4895 // * -> excl?
4896 else if (lock->get_state() != LOCK_EXCL &&
4897 !lock->is_rdlocked() &&
4898 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4899 ((wanted & (CEPH_CAP_GWR|CEPH_CAP_GBUFFER)) ||
4900 (in->inode.is_dir() && !in->has_subtree_or_exporting_dirfrag())) &&
4901 in->get_target_loner() >= 0) {
4902 dout(7) << "file_eval stable, bump to loner " << *lock
4903 << " on " << *lock->get_parent() << dendl;
4904 file_excl(lock, need_issue);
4905 }
4906
4907 // * -> mixed?
4908 else if (lock->get_state() != LOCK_MIX &&
4909 !lock->is_rdlocked() &&
4910 //!lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4911 (lock->get_scatter_wanted() ||
4912 (in->get_target_loner() < 0 && (wanted & CEPH_CAP_GWR)))) {
4913 dout(7) << "file_eval stable, bump to mixed " << *lock
4914 << " on " << *lock->get_parent() << dendl;
4915 scatter_mix(lock, need_issue);
4916 }
4917
4918 // * -> sync?
4919 else if (lock->get_state() != LOCK_SYNC &&
4920 !lock->is_wrlocked() && // drain wrlocks first!
4921 !lock->is_waiter_for(SimpleLock::WAIT_WR) &&
4922 !(wanted & CEPH_CAP_GWR) &&
4923 !((lock->get_state() == LOCK_MIX) &&
4924 in->is_dir() && in->has_subtree_or_exporting_dirfrag()) // if we are a delegation point, stay where we are
4925 //((wanted & CEPH_CAP_RD) ||
4926 //in->is_replicated() ||
4927 //lock->is_leased() ||
4928 //(!loner && lock->get_state() == LOCK_EXCL)) &&
4929 ) {
4930 dout(7) << "file_eval stable, bump to sync " << *lock
4931 << " on " << *lock->get_parent() << dendl;
4932 simple_sync(lock, need_issue);
4933 }
4934 }
4935
4936
4937
4938 void Locker::scatter_mix(ScatterLock *lock, bool *need_issue)
4939 {
4940 dout(7) << "scatter_mix " << *lock << " on " << *lock->get_parent() << dendl;
4941
4942 CInode *in = static_cast<CInode*>(lock->get_parent());
4943 assert(in->is_auth());
4944 assert(lock->is_stable());
4945
4946 if (lock->get_state() == LOCK_LOCK) {
4947 in->start_scatter(lock);
4948 if (in->is_replicated()) {
4949 // data
4950 bufferlist softdata;
4951 lock->encode_locked_state(softdata);
4952
4953 // bcast to replicas
4954 send_lock_message(lock, LOCK_AC_MIX, softdata);
4955 }
4956
4957 // change lock
4958 lock->set_state(LOCK_MIX);
4959 lock->clear_scatter_wanted();
4960 if (lock->get_cap_shift()) {
4961 if (need_issue)
4962 *need_issue = true;
4963 else
4964 issue_caps(in);
4965 }
4966 } else {
4967 // gather?
4968 switch (lock->get_state()) {
4969 case LOCK_SYNC: lock->set_state(LOCK_SYNC_MIX); break;
4970 case LOCK_EXCL: lock->set_state(LOCK_EXCL_MIX); break;
4971 case LOCK_XSYN: lock->set_state(LOCK_XSYN_MIX); break;
4972 case LOCK_TSYN: lock->set_state(LOCK_TSYN_MIX); break;
4973 default: ceph_abort();
4974 }
4975
4976 int gather = 0;
4977 if (lock->is_rdlocked())
4978 gather++;
4979 if (in->is_replicated()) {
4980 if (lock->get_state() == LOCK_SYNC_MIX) { // for the rest states, replicas are already LOCK
4981 send_lock_message(lock, LOCK_AC_MIX);
4982 lock->init_gather();
4983 gather++;
4984 }
4985 }
4986 if (lock->is_leased()) {
4987 revoke_client_leases(lock);
4988 gather++;
4989 }
4990 if (lock->get_cap_shift() &&
4991 in->is_head() &&
4992 in->issued_caps_need_gather(lock)) {
4993 if (need_issue)
4994 *need_issue = true;
4995 else
4996 issue_caps(in);
4997 gather++;
4998 }
4999 bool need_recover = false;
5000 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5001 mds->mdcache->queue_file_recover(in);
5002 need_recover = true;
5003 gather++;
5004 }
5005
5006 if (gather) {
5007 lock->get_parent()->auth_pin(lock);
5008 if (need_recover)
5009 mds->mdcache->do_file_recover();
5010 } else {
5011 in->start_scatter(lock);
5012 lock->set_state(LOCK_MIX);
5013 lock->clear_scatter_wanted();
5014 if (in->is_replicated()) {
5015 bufferlist softdata;
5016 lock->encode_locked_state(softdata);
5017 send_lock_message(lock, LOCK_AC_MIX, softdata);
5018 }
5019 if (lock->get_cap_shift()) {
5020 if (need_issue)
5021 *need_issue = true;
5022 else
5023 issue_caps(in);
5024 }
5025 }
5026 }
5027 }
5028
5029
5030 void Locker::file_excl(ScatterLock *lock, bool *need_issue)
5031 {
5032 CInode *in = static_cast<CInode*>(lock->get_parent());
5033 dout(7) << "file_excl " << *lock << " on " << *lock->get_parent() << dendl;
5034
5035 assert(in->is_auth());
5036 assert(lock->is_stable());
5037
5038 assert((in->get_loner() >= 0 && in->mds_caps_wanted.empty()) ||
5039 (lock->get_state() == LOCK_XSYN)); // must do xsyn -> excl -> <anything else>
5040
5041 switch (lock->get_state()) {
5042 case LOCK_SYNC: lock->set_state(LOCK_SYNC_EXCL); break;
5043 case LOCK_MIX: lock->set_state(LOCK_MIX_EXCL); break;
5044 case LOCK_LOCK: lock->set_state(LOCK_LOCK_EXCL); break;
5045 case LOCK_XSYN: lock->set_state(LOCK_XSYN_EXCL); break;
5046 default: ceph_abort();
5047 }
5048 int gather = 0;
5049
5050 if (lock->is_rdlocked())
5051 gather++;
5052 if (lock->is_wrlocked())
5053 gather++;
5054
5055 if (in->is_replicated() &&
5056 lock->get_state() != LOCK_LOCK_EXCL &&
5057 lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
5058 send_lock_message(lock, LOCK_AC_LOCK);
5059 lock->init_gather();
5060 gather++;
5061 }
5062 if (lock->is_leased()) {
5063 revoke_client_leases(lock);
5064 gather++;
5065 }
5066 if (in->is_head() &&
5067 in->issued_caps_need_gather(lock)) {
5068 if (need_issue)
5069 *need_issue = true;
5070 else
5071 issue_caps(in);
5072 gather++;
5073 }
5074 bool need_recover = false;
5075 if (in->state_test(CInode::STATE_NEEDSRECOVER)) {
5076 mds->mdcache->queue_file_recover(in);
5077 need_recover = true;
5078 gather++;
5079 }
5080
5081 if (gather) {
5082 lock->get_parent()->auth_pin(lock);
5083 if (need_recover)
5084 mds->mdcache->do_file_recover();
5085 } else {
5086 lock->set_state(LOCK_EXCL);
5087 if (need_issue)
5088 *need_issue = true;
5089 else
5090 issue_caps(in);
5091 }
5092 }
5093
5094 void Locker::file_xsyn(SimpleLock *lock, bool *need_issue)
5095 {
5096 dout(7) << "file_xsyn on " << *lock << " on " << *lock->get_parent() << dendl;
5097 CInode *in = static_cast<CInode *>(lock->get_parent());
5098 assert(in->is_auth());
5099 assert(in->get_loner() >= 0 && in->mds_caps_wanted.empty());
5100
5101 switch (lock->get_state()) {
5102 case LOCK_EXCL: lock->set_state(LOCK_EXCL_XSYN); break;
5103 default: ceph_abort();
5104 }
5105
5106 int gather = 0;
5107 if (lock->is_wrlocked())
5108 gather++;
5109
5110 if (in->is_head() &&
5111 in->issued_caps_need_gather(lock)) {
5112 if (need_issue)
5113 *need_issue = true;
5114 else
5115 issue_caps(in);
5116 gather++;
5117 }
5118
5119 if (gather) {
5120 lock->get_parent()->auth_pin(lock);
5121 } else {
5122 lock->set_state(LOCK_XSYN);
5123 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5124 if (need_issue)
5125 *need_issue = true;
5126 else
5127 issue_caps(in);
5128 }
5129 }
5130
5131 void Locker::file_recover(ScatterLock *lock)
5132 {
5133 CInode *in = static_cast<CInode *>(lock->get_parent());
5134 dout(7) << "file_recover " << *lock << " on " << *in << dendl;
5135
5136 assert(in->is_auth());
5137 //assert(lock->is_stable());
5138 assert(lock->get_state() == LOCK_PRE_SCAN); // only called from MDCache::start_files_to_recover()
5139
5140 int gather = 0;
5141
5142 /*
5143 if (in->is_replicated()
5144 lock->get_sm()->states[oldstate].replica_state != LOCK_LOCK) {
5145 send_lock_message(lock, LOCK_AC_LOCK);
5146 lock->init_gather();
5147 gather++;
5148 }
5149 */
5150 if (in->is_head() &&
5151 in->issued_caps_need_gather(lock)) {
5152 issue_caps(in);
5153 gather++;
5154 }
5155
5156 lock->set_state(LOCK_SCAN);
5157 if (gather)
5158 in->state_set(CInode::STATE_NEEDSRECOVER);
5159 else
5160 mds->mdcache->queue_file_recover(in);
5161 }
5162
5163
5164 // messenger
5165 /* This function DOES put the passed message before returning */
5166 void Locker::handle_file_lock(ScatterLock *lock, MLock *m)
5167 {
5168 CInode *in = static_cast<CInode*>(lock->get_parent());
5169 int from = m->get_asker();
5170
5171 if (mds->is_rejoin()) {
5172 if (in->is_rejoining()) {
5173 dout(7) << "handle_file_lock still rejoining " << *in
5174 << ", dropping " << *m << dendl;
5175 m->put();
5176 return;
5177 }
5178 }
5179
5180 dout(7) << "handle_file_lock a=" << get_lock_action_name(m->get_action())
5181 << " on " << *lock
5182 << " from mds." << from << " "
5183 << *in << dendl;
5184
5185 bool caps = lock->get_cap_shift();
5186
5187 switch (m->get_action()) {
5188 // -- replica --
5189 case LOCK_AC_SYNC:
5190 assert(lock->get_state() == LOCK_LOCK ||
5191 lock->get_state() == LOCK_MIX ||
5192 lock->get_state() == LOCK_MIX_SYNC2);
5193
5194 if (lock->get_state() == LOCK_MIX) {
5195 lock->set_state(LOCK_MIX_SYNC);
5196 eval_gather(lock, true);
5197 if (lock->is_unstable_and_locked())
5198 mds->mdlog->flush();
5199 break;
5200 }
5201
5202 (static_cast<ScatterLock *>(lock))->finish_flush();
5203 (static_cast<ScatterLock *>(lock))->clear_flushed();
5204
5205 // ok
5206 lock->decode_locked_state(m->get_data());
5207 lock->set_state(LOCK_SYNC);
5208
5209 lock->get_rdlock();
5210 if (caps)
5211 issue_caps(in);
5212 lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
5213 lock->put_rdlock();
5214 break;
5215
5216 case LOCK_AC_LOCK:
5217 switch (lock->get_state()) {
5218 case LOCK_SYNC: lock->set_state(LOCK_SYNC_LOCK); break;
5219 case LOCK_MIX: lock->set_state(LOCK_MIX_LOCK); break;
5220 default: ceph_abort();
5221 }
5222
5223 eval_gather(lock, true);
5224 if (lock->is_unstable_and_locked())
5225 mds->mdlog->flush();
5226
5227 break;
5228
5229 case LOCK_AC_LOCKFLUSHED:
5230 (static_cast<ScatterLock *>(lock))->finish_flush();
5231 (static_cast<ScatterLock *>(lock))->clear_flushed();
5232 // wake up scatter_nudge waiters
5233 if (lock->is_stable())
5234 lock->finish_waiters(SimpleLock::WAIT_STABLE);
5235 break;
5236
5237 case LOCK_AC_MIX:
5238 assert(lock->get_state() == LOCK_SYNC ||
5239 lock->get_state() == LOCK_LOCK ||
5240 lock->get_state() == LOCK_SYNC_MIX2);
5241
5242 if (lock->get_state() == LOCK_SYNC) {
5243 // MIXED
5244 lock->set_state(LOCK_SYNC_MIX);
5245 eval_gather(lock, true);
5246 if (lock->is_unstable_and_locked())
5247 mds->mdlog->flush();
5248 break;
5249 }
5250
5251 // ok
5252 lock->set_state(LOCK_MIX);
5253 lock->decode_locked_state(m->get_data());
5254
5255 if (caps)
5256 issue_caps(in);
5257
5258 lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
5259 break;
5260
5261
5262 // -- auth --
5263 case LOCK_AC_LOCKACK:
5264 assert(lock->get_state() == LOCK_SYNC_LOCK ||
5265 lock->get_state() == LOCK_MIX_LOCK ||
5266 lock->get_state() == LOCK_MIX_LOCK2 ||
5267 lock->get_state() == LOCK_MIX_EXCL ||
5268 lock->get_state() == LOCK_SYNC_EXCL ||
5269 lock->get_state() == LOCK_SYNC_MIX ||
5270 lock->get_state() == LOCK_MIX_TSYN);
5271 assert(lock->is_gathering(from));
5272 lock->remove_gather(from);
5273
5274 if (lock->get_state() == LOCK_MIX_LOCK ||
5275 lock->get_state() == LOCK_MIX_LOCK2 ||
5276 lock->get_state() == LOCK_MIX_EXCL ||
5277 lock->get_state() == LOCK_MIX_TSYN) {
5278 lock->decode_locked_state(m->get_data());
5279 // replica is waiting for AC_LOCKFLUSHED, eval_gather() should not
5280 // delay calling scatter_writebehind().
5281 lock->clear_flushed();
5282 }
5283
5284 if (lock->is_gathering()) {
5285 dout(7) << "handle_file_lock " << *in << " from " << from
5286 << ", still gathering " << lock->get_gather_set() << dendl;
5287 } else {
5288 dout(7) << "handle_file_lock " << *in << " from " << from
5289 << ", last one" << dendl;
5290 eval_gather(lock);
5291 }
5292 break;
5293
5294 case LOCK_AC_SYNCACK:
5295 assert(lock->get_state() == LOCK_MIX_SYNC);
5296 assert(lock->is_gathering(from));
5297 lock->remove_gather(from);
5298
5299 lock->decode_locked_state(m->get_data());
5300
5301 if (lock->is_gathering()) {
5302 dout(7) << "handle_file_lock " << *in << " from " << from
5303 << ", still gathering " << lock->get_gather_set() << dendl;
5304 } else {
5305 dout(7) << "handle_file_lock " << *in << " from " << from
5306 << ", last one" << dendl;
5307 eval_gather(lock);
5308 }
5309 break;
5310
5311 case LOCK_AC_MIXACK:
5312 assert(lock->get_state() == LOCK_SYNC_MIX);
5313 assert(lock->is_gathering(from));
5314 lock->remove_gather(from);
5315
5316 if (lock->is_gathering()) {
5317 dout(7) << "handle_file_lock " << *in << " from " << from
5318 << ", still gathering " << lock->get_gather_set() << dendl;
5319 } else {
5320 dout(7) << "handle_file_lock " << *in << " from " << from
5321 << ", last one" << dendl;
5322 eval_gather(lock);
5323 }
5324 break;
5325
5326
5327 // requests....
5328 case LOCK_AC_REQSCATTER:
5329 if (lock->is_stable()) {
5330 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5331 * because the replica should be holding an auth_pin if they're
5332 * doing this (and thus, we are freezing, not frozen, and indefinite
5333 * starvation isn't an issue).
5334 */
5335 dout(7) << "handle_file_lock got scatter request on " << *lock
5336 << " on " << *lock->get_parent() << dendl;
5337 if (lock->get_state() != LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5338 scatter_mix(lock);
5339 } else {
5340 dout(7) << "handle_file_lock got scatter request, !stable, marking scatter_wanted on " << *lock
5341 << " on " << *lock->get_parent() << dendl;
5342 lock->set_scatter_wanted();
5343 }
5344 break;
5345
5346 case LOCK_AC_REQUNSCATTER:
5347 if (lock->is_stable()) {
5348 /* NOTE: we can do this _even_ if !can_auth_pin (i.e. freezing)
5349 * because the replica should be holding an auth_pin if they're
5350 * doing this (and thus, we are freezing, not frozen, and indefinite
5351 * starvation isn't an issue).
5352 */
5353 dout(7) << "handle_file_lock got unscatter request on " << *lock
5354 << " on " << *lock->get_parent() << dendl;
5355 if (lock->get_state() == LOCK_MIX) // i.e., the reqscatter didn't race with an actual mix/scatter
5356 simple_lock(lock); // FIXME tempsync?
5357 } else {
5358 dout(7) << "handle_file_lock ignoring unscatter request on " << *lock
5359 << " on " << *lock->get_parent() << dendl;
5360 lock->set_unscatter_wanted();
5361 }
5362 break;
5363
5364 case LOCK_AC_REQRDLOCK:
5365 handle_reqrdlock(lock, m);
5366 break;
5367
5368 case LOCK_AC_NUDGE:
5369 if (!lock->get_parent()->is_auth()) {
5370 dout(7) << "handle_file_lock IGNORING nudge on non-auth " << *lock
5371 << " on " << *lock->get_parent() << dendl;
5372 } else if (!lock->get_parent()->is_replicated()) {
5373 dout(7) << "handle_file_lock IGNORING nudge on non-replicated " << *lock
5374 << " on " << *lock->get_parent() << dendl;
5375 } else {
5376 dout(7) << "handle_file_lock trying nudge on " << *lock
5377 << " on " << *lock->get_parent() << dendl;
5378 scatter_nudge(lock, 0, true);
5379 mds->mdlog->flush();
5380 }
5381 break;
5382
5383 default:
5384 ceph_abort();
5385 }
5386
5387 m->put();
5388 }
5389
5390
5391
5392
5393
5394