1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "common/debug.h"
9 #define dout_subsys ceph_subsys_mds
11 static multimap
<ceph_filelock
, ceph_lock_state_t
*> global_waiting_locks
;
13 static void remove_global_waiting(ceph_filelock
&fl
, ceph_lock_state_t
*lock_state
)
15 for (auto p
= global_waiting_locks
.find(fl
);
16 p
!= global_waiting_locks
.end(); ) {
19 if (p
->second
== lock_state
) {
20 global_waiting_locks
.erase(p
);
27 ceph_lock_state_t::~ceph_lock_state_t()
29 if (type
== CEPH_LOCK_FCNTL
) {
30 for (auto p
= waiting_locks
.begin(); p
!= waiting_locks
.end(); ++p
) {
31 remove_global_waiting(p
->second
, this);
36 bool ceph_lock_state_t::is_waiting(const ceph_filelock
&fl
) const
38 multimap
<uint64_t, ceph_filelock
>::const_iterator p
= waiting_locks
.find(fl
.start
);
39 while (p
!= waiting_locks
.end()) {
40 if (p
->second
.start
> fl
.start
)
42 if (p
->second
.length
== fl
.length
&&
43 ceph_filelock_owner_equal(p
->second
, fl
))
50 void ceph_lock_state_t::remove_waiting(const ceph_filelock
& fl
)
52 for (auto p
= waiting_locks
.find(fl
.start
);
53 p
!= waiting_locks
.end(); ) {
54 if (p
->second
.start
> fl
.start
)
56 if (p
->second
.length
== fl
.length
&&
57 ceph_filelock_owner_equal(p
->second
, fl
)) {
58 if (type
== CEPH_LOCK_FCNTL
) {
59 remove_global_waiting(p
->second
, this);
61 waiting_locks
.erase(p
);
62 --client_waiting_lock_counts
[(client_t
)fl
.client
];
63 if (!client_waiting_lock_counts
[(client_t
)fl
.client
]) {
64 client_waiting_lock_counts
.erase((client_t
)fl
.client
);
72 bool ceph_lock_state_t::is_deadlock(const ceph_filelock
& fl
,
73 list
<multimap
<uint64_t, ceph_filelock
>::iterator
>&
75 const ceph_filelock
*first_fl
, unsigned depth
) const
77 ldout(cct
,15) << "is_deadlock " << fl
<< dendl
;
79 // only for posix lock
80 if (type
!= CEPH_LOCK_FCNTL
)
83 // find conflict locks' owners
84 set
<ceph_filelock
> lock_owners
;
85 for (auto p
= overlapping_locks
.begin();
86 p
!= overlapping_locks
.end();
89 if (fl
.type
== CEPH_LOCK_SHARED
&&
90 (*p
)->second
.type
== CEPH_LOCK_SHARED
)
94 if (first_fl
&& ceph_filelock_owner_equal(*first_fl
, (*p
)->second
)) {
95 ldout(cct
,15) << " detect deadlock" << dendl
;
99 ceph_filelock tmp
= (*p
)->second
;
103 lock_owners
.insert(tmp
);
106 if (depth
>= MAX_DEADLK_DEPTH
)
109 first_fl
= first_fl
? first_fl
: &fl
;
110 for (auto p
= lock_owners
.begin();
111 p
!= lock_owners
.end();
113 ldout(cct
,15) << " conflict lock owner " << *p
<< dendl
;
114 // if conflict lock' owner is waiting for other lock?
115 for (auto q
= global_waiting_locks
.lower_bound(*p
);
116 q
!= global_waiting_locks
.end();
118 if (!ceph_filelock_owner_equal(q
->first
, *p
))
121 list
<multimap
<uint64_t, ceph_filelock
>::iterator
>
122 _overlapping_locks
, _self_overlapping_locks
;
123 ceph_lock_state_t
& state
= *(q
->second
);
124 if (state
.get_overlapping_locks(q
->first
, _overlapping_locks
)) {
125 state
.split_by_owner(q
->first
, _overlapping_locks
, _self_overlapping_locks
);
127 if (!_overlapping_locks
.empty()) {
128 if (is_deadlock(q
->first
, _overlapping_locks
, first_fl
, depth
+ 1))
136 void ceph_lock_state_t::add_waiting(const ceph_filelock
& fl
)
138 waiting_locks
.insert(pair
<uint64_t, ceph_filelock
>(fl
.start
, fl
));
139 ++client_waiting_lock_counts
[(client_t
)fl
.client
];
140 if (type
== CEPH_LOCK_FCNTL
) {
141 global_waiting_locks
.insert(pair
<ceph_filelock
,ceph_lock_state_t
*>(fl
, this));
145 bool ceph_lock_state_t::add_lock(ceph_filelock
& new_lock
,
146 bool wait_on_fail
, bool replay
,
149 ldout(cct
,15) << "add_lock " << new_lock
<< dendl
;
151 list
<multimap
<uint64_t, ceph_filelock
>::iterator
>
152 overlapping_locks
, self_overlapping_locks
, neighbor_locks
;
154 // first, get any overlapping locks and split them into owned-by-us and not
155 if (get_overlapping_locks(new_lock
, overlapping_locks
, &neighbor_locks
)) {
156 ldout(cct
,15) << "got overlapping lock, splitting by owner" << dendl
;
157 split_by_owner(new_lock
, overlapping_locks
, self_overlapping_locks
);
159 if (!overlapping_locks
.empty()) { //overlapping locks owned by others :(
160 if (CEPH_LOCK_EXCL
== new_lock
.type
) {
161 //can't set, we want an exclusive
162 ldout(cct
,15) << "overlapping lock, and this lock is exclusive, can't set"
164 if (wait_on_fail
&& !replay
) {
165 if (is_deadlock(new_lock
, overlapping_locks
))
168 add_waiting(new_lock
);
170 } else { //shared lock, check for any exclusive locks blocking us
171 if (contains_exclusive_lock(overlapping_locks
)) { //blocked :(
172 ldout(cct
,15) << " blocked by exclusive lock in overlapping_locks" << dendl
;
173 if (wait_on_fail
&& !replay
) {
174 if (is_deadlock(new_lock
, overlapping_locks
))
177 add_waiting(new_lock
);
180 //yay, we can insert a shared lock
181 ldout(cct
,15) << "inserting shared lock" << dendl
;
182 remove_waiting(new_lock
);
183 adjust_locks(self_overlapping_locks
, new_lock
, neighbor_locks
);
184 held_locks
.insert(pair
<uint64_t, ceph_filelock
>(new_lock
.start
, new_lock
));
188 } else { //no overlapping locks except our own
189 remove_waiting(new_lock
);
190 adjust_locks(self_overlapping_locks
, new_lock
, neighbor_locks
);
191 ldout(cct
,15) << "no conflicts, inserting " << new_lock
<< dendl
;
192 held_locks
.insert(pair
<uint64_t, ceph_filelock
>
193 (new_lock
.start
, new_lock
));
197 ++client_held_lock_counts
[(client_t
)new_lock
.client
];
202 void ceph_lock_state_t::look_for_lock(ceph_filelock
& testing_lock
)
204 list
<multimap
<uint64_t, ceph_filelock
>::iterator
> overlapping_locks
,
205 self_overlapping_locks
;
206 if (get_overlapping_locks(testing_lock
, overlapping_locks
)) {
207 split_by_owner(testing_lock
, overlapping_locks
, self_overlapping_locks
);
209 if (!overlapping_locks
.empty()) { //somebody else owns overlapping lock
210 if (CEPH_LOCK_EXCL
== testing_lock
.type
) { //any lock blocks it
211 testing_lock
= (*overlapping_locks
.begin())->second
;
213 ceph_filelock
*blocking_lock
;
214 if ((blocking_lock
= contains_exclusive_lock(overlapping_locks
))) {
215 testing_lock
= *blocking_lock
;
216 } else { //nothing blocking!
217 testing_lock
.type
= CEPH_LOCK_UNLOCK
;
222 //if we get here, only our own locks block
223 testing_lock
.type
= CEPH_LOCK_UNLOCK
;
226 void ceph_lock_state_t::remove_lock(ceph_filelock removal_lock
,
227 list
<ceph_filelock
>& activated_locks
)
229 list
<multimap
<uint64_t, ceph_filelock
>::iterator
> overlapping_locks
,
230 self_overlapping_locks
;
231 if (get_overlapping_locks(removal_lock
, overlapping_locks
)) {
232 ldout(cct
,15) << "splitting by owner" << dendl
;
233 split_by_owner(removal_lock
, overlapping_locks
, self_overlapping_locks
);
234 } else ldout(cct
,15) << "attempt to remove lock at " << removal_lock
.start
235 << " but no locks there!" << dendl
;
236 bool remove_to_end
= (0 == removal_lock
.length
);
237 uint64_t removal_start
= removal_lock
.start
;
238 uint64_t removal_end
= removal_start
+ removal_lock
.length
- 1;
239 __s64 old_lock_client
= 0;
240 ceph_filelock
*old_lock
;
242 ldout(cct
,15) << "examining " << self_overlapping_locks
.size()
243 << " self-overlapping locks for removal" << dendl
;
244 for (list
<multimap
<uint64_t, ceph_filelock
>::iterator
>::iterator
245 iter
= self_overlapping_locks
.begin();
246 iter
!= self_overlapping_locks
.end();
248 ldout(cct
,15) << "self overlapping lock " << (*iter
)->second
<< dendl
;
249 old_lock
= &(*iter
)->second
;
250 bool old_lock_to_end
= (0 == old_lock
->length
);
251 uint64_t old_lock_end
= old_lock
->start
+ old_lock
->length
- 1;
252 old_lock_client
= old_lock
->client
;
254 if (old_lock
->start
< removal_start
) {
255 old_lock
->length
= removal_start
- old_lock
->start
;
257 ldout(cct
,15) << "erasing " << (*iter
)->second
<< dendl
;
258 held_locks
.erase(*iter
);
259 --client_held_lock_counts
[old_lock_client
];
261 } else if (old_lock_to_end
) {
262 ceph_filelock append_lock
= *old_lock
;
263 append_lock
.start
= removal_end
+1;
264 held_locks
.insert(pair
<uint64_t, ceph_filelock
>
265 (append_lock
.start
, append_lock
));
266 ++client_held_lock_counts
[(client_t
)old_lock
->client
];
267 if (old_lock
->start
>= removal_start
) {
268 ldout(cct
,15) << "erasing " << (*iter
)->second
<< dendl
;
269 held_locks
.erase(*iter
);
270 --client_held_lock_counts
[old_lock_client
];
271 } else old_lock
->length
= removal_start
- old_lock
->start
;
273 if (old_lock_end
> removal_end
) {
274 ceph_filelock append_lock
= *old_lock
;
275 append_lock
.start
= removal_end
+ 1;
276 append_lock
.length
= old_lock_end
- append_lock
.start
+ 1;
277 held_locks
.insert(pair
<uint64_t, ceph_filelock
>
278 (append_lock
.start
, append_lock
));
279 ++client_held_lock_counts
[(client_t
)old_lock
->client
];
281 if (old_lock
->start
< removal_start
) {
282 old_lock
->length
= removal_start
- old_lock
->start
;
284 ldout(cct
,15) << "erasing " << (*iter
)->second
<< dendl
;
285 held_locks
.erase(*iter
);
286 --client_held_lock_counts
[old_lock_client
];
289 if (!client_held_lock_counts
[old_lock_client
]) {
290 client_held_lock_counts
.erase(old_lock_client
);
295 bool ceph_lock_state_t::remove_all_from (client_t client
)
297 bool cleared_any
= false;
298 if (client_held_lock_counts
.count(client
)) {
299 multimap
<uint64_t, ceph_filelock
>::iterator iter
= held_locks
.begin();
300 while (iter
!= held_locks
.end()) {
301 if ((client_t
)iter
->second
.client
== client
) {
302 held_locks
.erase(iter
++);
306 client_held_lock_counts
.erase(client
);
310 if (client_waiting_lock_counts
.count(client
)) {
311 multimap
<uint64_t, ceph_filelock
>::iterator iter
= waiting_locks
.begin();
312 while (iter
!= waiting_locks
.end()) {
313 if ((client_t
)iter
->second
.client
!= client
) {
317 if (type
== CEPH_LOCK_FCNTL
) {
318 remove_global_waiting(iter
->second
, this);
320 waiting_locks
.erase(iter
++);
322 client_waiting_lock_counts
.erase(client
);
327 void ceph_lock_state_t::adjust_locks(list
<multimap
<uint64_t, ceph_filelock
>::iterator
> old_locks
,
328 ceph_filelock
& new_lock
,
329 list
<multimap
<uint64_t, ceph_filelock
>::iterator
>
332 ldout(cct
,15) << "adjust_locks" << dendl
;
333 bool new_lock_to_end
= (0 == new_lock
.length
);
334 __s64 old_lock_client
= 0;
335 ceph_filelock
*old_lock
;
336 for (list
<multimap
<uint64_t, ceph_filelock
>::iterator
>::iterator
337 iter
= old_locks
.begin();
338 iter
!= old_locks
.end();
340 old_lock
= &(*iter
)->second
;
341 ldout(cct
,15) << "adjusting lock: " << *old_lock
<< dendl
;
342 bool old_lock_to_end
= (0 == old_lock
->length
);
343 uint64_t old_lock_start
= old_lock
->start
;
344 uint64_t old_lock_end
= old_lock
->start
+ old_lock
->length
- 1;
345 uint64_t new_lock_start
= new_lock
.start
;
346 uint64_t new_lock_end
= new_lock
.start
+ new_lock
.length
- 1;
347 old_lock_client
= old_lock
->client
;
348 if (new_lock_to_end
|| old_lock_to_end
) {
349 //special code path to deal with a length set at 0
350 ldout(cct
,15) << "one lock extends forever" << dendl
;
351 if (old_lock
->type
== new_lock
.type
) {
352 //just unify them in new lock, remove old lock
353 ldout(cct
,15) << "same lock type, unifying" << dendl
;
354 new_lock
.start
= (new_lock_start
< old_lock_start
) ? new_lock_start
:
357 held_locks
.erase(*iter
);
358 --client_held_lock_counts
[old_lock_client
];
359 } else { //not same type, have to keep any remains of old lock around
360 ldout(cct
,15) << "shrinking old lock" << dendl
;
361 if (new_lock_to_end
) {
362 if (old_lock_start
< new_lock_start
) {
363 old_lock
->length
= new_lock_start
- old_lock_start
;
365 held_locks
.erase(*iter
);
366 --client_held_lock_counts
[old_lock_client
];
368 } else { //old lock extends past end of new lock
369 ceph_filelock appended_lock
= *old_lock
;
370 appended_lock
.start
= new_lock_end
+ 1;
371 held_locks
.insert(pair
<uint64_t, ceph_filelock
>
372 (appended_lock
.start
, appended_lock
));
373 ++client_held_lock_counts
[(client_t
)old_lock
->client
];
374 if (old_lock_start
< new_lock_start
) {
375 old_lock
->length
= new_lock_start
- old_lock_start
;
377 held_locks
.erase(*iter
);
378 --client_held_lock_counts
[old_lock_client
];
383 if (old_lock
->type
== new_lock
.type
) { //just merge them!
384 ldout(cct
,15) << "merging locks, they're the same type" << dendl
;
385 new_lock
.start
= (old_lock_start
< new_lock_start
) ? old_lock_start
:
387 int new_end
= (new_lock_end
> old_lock_end
) ? new_lock_end
:
389 new_lock
.length
= new_end
- new_lock
.start
+ 1;
390 ldout(cct
,15) << "erasing lock " << (*iter
)->second
<< dendl
;
391 held_locks
.erase(*iter
);
392 --client_held_lock_counts
[old_lock_client
];
393 } else { //we'll have to update sizes and maybe make new locks
394 ldout(cct
,15) << "locks aren't same type, changing sizes" << dendl
;
395 if (old_lock_end
> new_lock_end
) { //add extra lock after new_lock
396 ceph_filelock appended_lock
= *old_lock
;
397 appended_lock
.start
= new_lock_end
+ 1;
398 appended_lock
.length
= old_lock_end
- appended_lock
.start
+ 1;
399 held_locks
.insert(pair
<uint64_t, ceph_filelock
>
400 (appended_lock
.start
, appended_lock
));
401 ++client_held_lock_counts
[(client_t
)old_lock
->client
];
403 if (old_lock_start
< new_lock_start
) {
404 old_lock
->length
= new_lock_start
- old_lock_start
;
405 } else { //old_lock starts inside new_lock, so remove it
406 //if it extended past new_lock_end it's been replaced
407 held_locks
.erase(*iter
);
408 --client_held_lock_counts
[old_lock_client
];
412 if (!client_held_lock_counts
[old_lock_client
]) {
413 client_held_lock_counts
.erase(old_lock_client
);
417 //make sure to coalesce neighboring locks
418 for (list
<multimap
<uint64_t, ceph_filelock
>::iterator
>::iterator
419 iter
= neighbor_locks
.begin();
420 iter
!= neighbor_locks
.end();
422 old_lock
= &(*iter
)->second
;
423 old_lock_client
= old_lock
->client
;
424 ldout(cct
,15) << "lock to coalesce: " << *old_lock
<< dendl
;
425 /* because if it's a neighboring lock there can't be any self-overlapping
426 locks that covered it */
427 if (old_lock
->type
== new_lock
.type
) { //merge them
428 if (0 == new_lock
.length
) {
429 if (old_lock
->start
+ old_lock
->length
== new_lock
.start
) {
430 new_lock
.start
= old_lock
->start
;
431 } else ceph_abort(); /* if there's no end to new_lock, the neighbor
432 HAS TO be to left side */
433 } else if (0 == old_lock
->length
) {
434 if (new_lock
.start
+ new_lock
.length
== old_lock
->start
) {
436 } else ceph_abort(); //same as before, but reversed
438 if (old_lock
->start
+ old_lock
->length
== new_lock
.start
) {
439 new_lock
.start
= old_lock
->start
;
440 new_lock
.length
= old_lock
->length
+ new_lock
.length
;
441 } else if (new_lock
.start
+ new_lock
.length
== old_lock
->start
) {
442 new_lock
.length
= old_lock
->length
+ new_lock
.length
;
445 held_locks
.erase(*iter
);
446 --client_held_lock_counts
[old_lock_client
];
448 if (!client_held_lock_counts
[old_lock_client
]) {
449 client_held_lock_counts
.erase(old_lock_client
);
454 multimap
<uint64_t, ceph_filelock
>::iterator
455 ceph_lock_state_t::get_lower_bound(uint64_t start
,
456 multimap
<uint64_t, ceph_filelock
>& lock_map
)
458 multimap
<uint64_t, ceph_filelock
>::iterator lower_bound
=
459 lock_map
.lower_bound(start
);
460 if ((lower_bound
->first
!= start
)
462 && (lower_bound
!= lock_map
.begin())) --lower_bound
;
463 if (lock_map
.end() == lower_bound
)
464 ldout(cct
,15) << "get_lower_dout(15)eturning end()" << dendl
;
465 else ldout(cct
,15) << "get_lower_bound returning iterator pointing to "
466 << lower_bound
->second
<< dendl
;
470 multimap
<uint64_t, ceph_filelock
>::iterator
471 ceph_lock_state_t::get_last_before(uint64_t end
,
472 multimap
<uint64_t, ceph_filelock
>& lock_map
)
474 multimap
<uint64_t, ceph_filelock
>::iterator last
=
475 lock_map
.upper_bound(end
);
476 if (last
!= lock_map
.begin()) --last
;
477 if (lock_map
.end() == last
)
478 ldout(cct
,15) << "get_last_before returning end()" << dendl
;
479 else ldout(cct
,15) << "get_last_before returning iterator pointing to "
480 << last
->second
<< dendl
;
484 bool ceph_lock_state_t::share_space(
485 multimap
<uint64_t, ceph_filelock
>::iterator
& iter
,
486 uint64_t start
, uint64_t end
)
488 bool ret
= ((iter
->first
>= start
&& iter
->first
<= end
) ||
489 ((iter
->first
< start
) &&
490 (((iter
->first
+ iter
->second
.length
- 1) >= start
) ||
491 (0 == iter
->second
.length
))));
492 ldout(cct
,15) << "share_space got start: " << start
<< ", end: " << end
493 << ", lock: " << iter
->second
<< ", returning " << ret
<< dendl
;
497 bool ceph_lock_state_t::get_overlapping_locks(const ceph_filelock
& lock
,
498 list
<multimap
<uint64_t,
499 ceph_filelock
>::iterator
> & overlaps
,
500 list
<multimap
<uint64_t,
501 ceph_filelock
>::iterator
> *self_neighbors
)
503 ldout(cct
,15) << "get_overlapping_locks" << dendl
;
504 // create a lock starting one earlier and ending one later
505 // to check for neighbors
506 ceph_filelock neighbor_check_lock
= lock
;
507 if (neighbor_check_lock
.start
!= 0) {
508 neighbor_check_lock
.start
= neighbor_check_lock
.start
- 1;
509 if (neighbor_check_lock
.length
)
510 neighbor_check_lock
.length
= neighbor_check_lock
.length
+ 2;
512 if (neighbor_check_lock
.length
)
513 neighbor_check_lock
.length
= neighbor_check_lock
.length
+ 1;
515 //find the last held lock starting at the point after lock
516 uint64_t endpoint
= lock
.start
;
518 endpoint
+= lock
.length
;
520 endpoint
= uint64_t(-1); // max offset
522 multimap
<uint64_t, ceph_filelock
>::iterator iter
=
523 get_last_before(endpoint
, held_locks
);
524 bool cont
= iter
!= held_locks
.end();
526 if (share_space(iter
, lock
)) {
527 overlaps
.push_front(iter
);
528 } else if (self_neighbors
&&
529 ceph_filelock_owner_equal(neighbor_check_lock
, iter
->second
) &&
530 share_space(iter
, neighbor_check_lock
)) {
531 self_neighbors
->push_front(iter
);
533 if ((iter
->first
< lock
.start
) && (CEPH_LOCK_EXCL
== iter
->second
.type
)) {
534 //can't be any more overlapping locks or they'd interfere with this one
536 } else if (held_locks
.begin() == iter
) cont
= false;
539 return !overlaps
.empty();
542 bool ceph_lock_state_t::get_waiting_overlaps(const ceph_filelock
& lock
,
543 list
<multimap
<uint64_t,
544 ceph_filelock
>::iterator
>&
547 ldout(cct
,15) << "get_waiting_overlaps" << dendl
;
548 multimap
<uint64_t, ceph_filelock
>::iterator iter
=
549 get_last_before(lock
.start
+ lock
.length
- 1, waiting_locks
);
550 bool cont
= iter
!= waiting_locks
.end();
552 if (share_space(iter
, lock
)) overlaps
.push_front(iter
);
553 if (waiting_locks
.begin() == iter
) cont
= false;
556 return !overlaps
.empty();
559 void ceph_lock_state_t::split_by_owner(const ceph_filelock
& owner
,
560 list
<multimap
<uint64_t,
561 ceph_filelock
>::iterator
>& locks
,
562 list
<multimap
<uint64_t,
563 ceph_filelock
>::iterator
>&
566 list
<multimap
<uint64_t, ceph_filelock
>::iterator
>::iterator
567 iter
= locks
.begin();
568 ldout(cct
,15) << "owner lock: " << owner
<< dendl
;
569 while (iter
!= locks
.end()) {
570 ldout(cct
,15) << "comparing to " << (*iter
)->second
<< dendl
;
571 if (ceph_filelock_owner_equal((*iter
)->second
, owner
)) {
572 ldout(cct
,15) << "success, pushing to owned_locks" << dendl
;
573 owned_locks
.push_back(*iter
);
574 iter
= locks
.erase(iter
);
576 ldout(cct
,15) << "failure, something not equal in this group "
577 << (*iter
)->second
.client
<< ":" << owner
.client
<< ","
578 << (*iter
)->second
.owner
<< ":" << owner
.owner
<< ","
579 << (*iter
)->second
.pid
<< ":" << owner
.pid
<< dendl
;
586 ceph_lock_state_t::contains_exclusive_lock(list
<multimap
<uint64_t,
587 ceph_filelock
>::iterator
>& locks
)
589 for (list
<multimap
<uint64_t, ceph_filelock
>::iterator
>::iterator
590 iter
= locks
.begin();
593 if (CEPH_LOCK_EXCL
== (*iter
)->second
.type
) return &(*iter
)->second
;