4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_lock for OSC layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
41 #define DEBUG_SUBSYSTEM S_OSC
43 #include "../../include/linux/libcfs/libcfs.h"
44 /* fid_build_reg_res_name() */
45 #include "../include/lustre_fid.h"
47 #include "osc_cl_internal.h"
53 #define _PAGEREF_MAGIC (-10000000)
55 /*****************************************************************************
61 static const struct cl_lock_operations osc_lock_ops
;
62 static const struct cl_lock_operations osc_lock_lockless_ops
;
63 static void osc_lock_to_lockless(const struct lu_env
*env
,
64 struct osc_lock
*ols
, int force
);
65 static int osc_lock_has_pages(struct osc_lock
*olck
);
67 int osc_lock_is_lockless(const struct osc_lock
*olck
)
69 return (olck
->ols_cl
.cls_ops
== &osc_lock_lockless_ops
);
73 * Returns a weak pointer to the ldlm lock identified by a handle. Returned
74 * pointer cannot be dereferenced, as lock is not protected from concurrent
75 * reclaim. This function is a helper for osc_lock_invariant().
77 static struct ldlm_lock
*osc_handle_ptr(struct lustre_handle
*handle
)
79 struct ldlm_lock
*lock
;
81 lock
= ldlm_handle2lock(handle
);
88 * Invariant that has to be true all of the time.
90 static int osc_lock_invariant(struct osc_lock
*ols
)
92 struct ldlm_lock
*lock
= osc_handle_ptr(&ols
->ols_handle
);
93 struct ldlm_lock
*olock
= ols
->ols_lock
;
94 int handle_used
= lustre_handle_is_used(&ols
->ols_handle
);
96 if (ergo(osc_lock_is_lockless(ols
),
97 ols
->ols_locklessable
&& ols
->ols_lock
== NULL
))
101 * If all the following "ergo"s are true, return 1, otherwise 0
103 if (!ergo(olock
!= NULL
, handle_used
))
106 if (!ergo(olock
!= NULL
,
107 olock
->l_handle
.h_cookie
== ols
->ols_handle
.cookie
))
110 if (!ergo(handle_used
,
111 ergo(lock
!= NULL
&& olock
!= NULL
, lock
== olock
) &&
112 ergo(lock
== NULL
, olock
== NULL
)))
115 * Check that ->ols_handle and ->ols_lock are consistent, but
116 * take into account that they are set at the different time.
118 if (!ergo(ols
->ols_state
== OLS_CANCELLED
,
119 olock
== NULL
&& !handle_used
))
122 * DLM lock is destroyed only after we have seen cancellation
125 if (!ergo(olock
!= NULL
&& ols
->ols_state
< OLS_CANCELLED
,
126 ((olock
->l_flags
& LDLM_FL_DESTROYED
) == 0)))
129 if (!ergo(ols
->ols_state
== OLS_GRANTED
,
131 olock
->l_req_mode
== olock
->l_granted_mode
&&
137 /*****************************************************************************
144 * Breaks a link between osc_lock and dlm_lock.
146 static void osc_lock_detach(const struct lu_env
*env
, struct osc_lock
*olck
)
148 struct ldlm_lock
*dlmlock
;
150 spin_lock(&osc_ast_guard
);
151 dlmlock
= olck
->ols_lock
;
152 if (dlmlock
== NULL
) {
153 spin_unlock(&osc_ast_guard
);
157 olck
->ols_lock
= NULL
;
158 /* wb(); --- for all who checks (ols->ols_lock != NULL) before
159 * call to osc_lock_detach() */
160 dlmlock
->l_ast_data
= NULL
;
161 olck
->ols_handle
.cookie
= 0ULL;
162 spin_unlock(&osc_ast_guard
);
164 lock_res_and_lock(dlmlock
);
165 if (dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
) {
166 struct cl_object
*obj
= olck
->ols_cl
.cls_obj
;
167 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
170 cl_object_attr_lock(obj
);
171 /* Must get the value under the lock to avoid possible races. */
172 old_kms
= cl2osc(obj
)->oo_oinfo
->loi_kms
;
173 /* Update the kms. Need to loop all granted locks.
174 * Not a problem for the client */
175 attr
->cat_kms
= ldlm_extent_shift_kms(dlmlock
, old_kms
);
177 cl_object_attr_set(env
, obj
, attr
, CAT_KMS
);
178 cl_object_attr_unlock(obj
);
180 unlock_res_and_lock(dlmlock
);
182 /* release a reference taken in osc_lock_upcall0(). */
183 LASSERT(olck
->ols_has_ref
);
184 lu_ref_del(&dlmlock
->l_reference
, "osc_lock", olck
);
185 LDLM_LOCK_RELEASE(dlmlock
);
186 olck
->ols_has_ref
= 0;
189 static int osc_lock_unhold(struct osc_lock
*ols
)
195 result
= osc_cancel_base(&ols
->ols_handle
,
196 ols
->ols_einfo
.ei_mode
);
201 static int osc_lock_unuse(const struct lu_env
*env
,
202 const struct cl_lock_slice
*slice
)
204 struct osc_lock
*ols
= cl2osc_lock(slice
);
206 LINVRNT(osc_lock_invariant(ols
));
208 switch (ols
->ols_state
) {
210 LASSERT(!ols
->ols_hold
);
211 LASSERT(ols
->ols_agl
);
213 case OLS_UPCALL_RECEIVED
:
214 osc_lock_unhold(ols
);
216 LASSERT(!ols
->ols_hold
);
217 osc_lock_detach(env
, ols
);
218 ols
->ols_state
= OLS_NEW
;
221 LASSERT(!ols
->ols_glimpse
);
222 LASSERT(ols
->ols_hold
);
224 * Move lock into OLS_RELEASED state before calling
225 * osc_cancel_base() so that possible synchronous cancellation
226 * (that always happens e.g., for liblustre) sees that lock is
229 ols
->ols_state
= OLS_RELEASED
;
230 return osc_lock_unhold(ols
);
232 CERROR("Impossible state: %d\n", ols
->ols_state
);
237 static void osc_lock_fini(const struct lu_env
*env
,
238 struct cl_lock_slice
*slice
)
240 struct osc_lock
*ols
= cl2osc_lock(slice
);
242 LINVRNT(osc_lock_invariant(ols
));
244 * ->ols_hold can still be true at this point if, for example, a
245 * thread that requested a lock was killed (and released a reference
246 * to the lock), before reply from a server was received. In this case
247 * lock is destroyed immediately after upcall.
249 osc_lock_unhold(ols
);
250 LASSERT(ols
->ols_lock
== NULL
);
251 LASSERT(atomic_read(&ols
->ols_pageref
) == 0 ||
252 atomic_read(&ols
->ols_pageref
) == _PAGEREF_MAGIC
);
254 kmem_cache_free(osc_lock_kmem
, ols
);
257 static void osc_lock_build_policy(const struct lu_env
*env
,
258 const struct cl_lock
*lock
,
259 ldlm_policy_data_t
*policy
)
261 const struct cl_lock_descr
*d
= &lock
->cll_descr
;
263 osc_index2policy(policy
, d
->cld_obj
, d
->cld_start
, d
->cld_end
);
264 policy
->l_extent
.gid
= d
->cld_gid
;
267 static __u64
osc_enq2ldlm_flags(__u32 enqflags
)
271 LASSERT((enqflags
& ~CEF_MASK
) == 0);
273 if (enqflags
& CEF_NONBLOCK
)
274 result
|= LDLM_FL_BLOCK_NOWAIT
;
275 if (enqflags
& CEF_ASYNC
)
276 result
|= LDLM_FL_HAS_INTENT
;
277 if (enqflags
& CEF_DISCARD_DATA
)
278 result
|= LDLM_FL_AST_DISCARD_DATA
;
283 * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
284 * pointers. Initialized in osc_init().
286 spinlock_t osc_ast_guard
;
288 static struct osc_lock
*osc_ast_data_get(struct ldlm_lock
*dlm_lock
)
290 struct osc_lock
*olck
;
292 lock_res_and_lock(dlm_lock
);
293 spin_lock(&osc_ast_guard
);
294 olck
= dlm_lock
->l_ast_data
;
296 struct cl_lock
*lock
= olck
->ols_cl
.cls_lock
;
298 * If osc_lock holds a reference on ldlm lock, return it even
299 * when cl_lock is in CLS_FREEING state. This way
301 * osc_ast_data_get(dlmlock) == NULL
303 * guarantees that all osc references on dlmlock were
304 * released. osc_dlm_blocking_ast0() relies on that.
306 if (lock
->cll_state
< CLS_FREEING
|| olck
->ols_has_ref
) {
307 cl_lock_get_trust(lock
);
308 lu_ref_add_atomic(&lock
->cll_reference
,
313 spin_unlock(&osc_ast_guard
);
314 unlock_res_and_lock(dlm_lock
);
318 static void osc_ast_data_put(const struct lu_env
*env
, struct osc_lock
*olck
)
320 struct cl_lock
*lock
;
322 lock
= olck
->ols_cl
.cls_lock
;
323 lu_ref_del(&lock
->cll_reference
, "ast", current
);
324 cl_lock_put(env
, lock
);
328 * Updates object attributes from a lock value block (lvb) received together
329 * with the DLM lock reply from the server. Copy of osc_update_enqueue()
332 * This can be optimized to not update attributes when lock is a result of a
335 * Called under lock and resource spin-locks.
337 static void osc_lock_lvb_update(const struct lu_env
*env
, struct osc_lock
*olck
,
341 struct cl_object
*obj
;
342 struct lov_oinfo
*oinfo
;
343 struct cl_attr
*attr
;
346 if (!(olck
->ols_flags
& LDLM_FL_LVB_READY
))
349 lvb
= &olck
->ols_lvb
;
350 obj
= olck
->ols_cl
.cls_obj
;
351 oinfo
= cl2osc(obj
)->oo_oinfo
;
352 attr
= &osc_env_info(env
)->oti_attr
;
353 valid
= CAT_BLOCKS
| CAT_ATIME
| CAT_CTIME
| CAT_MTIME
| CAT_SIZE
;
354 cl_lvb2attr(attr
, lvb
);
356 cl_object_attr_lock(obj
);
358 struct ldlm_lock
*dlmlock
;
361 dlmlock
= olck
->ols_lock
;
362 LASSERT(dlmlock
!= NULL
);
364 /* re-grab LVB from a dlm lock under DLM spin-locks. */
365 *lvb
= *(struct ost_lvb
*)dlmlock
->l_lvb_data
;
366 size
= lvb
->lvb_size
;
367 /* Extend KMS up to the end of this lock and no further
368 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
369 if (size
> dlmlock
->l_policy_data
.l_extent
.end
)
370 size
= dlmlock
->l_policy_data
.l_extent
.end
+ 1;
371 if (size
>= oinfo
->loi_kms
) {
372 LDLM_DEBUG(dlmlock
, "lock acquired, setting rss=%llu, kms=%llu",
373 lvb
->lvb_size
, size
);
375 attr
->cat_kms
= size
;
377 LDLM_DEBUG(dlmlock
, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
378 lvb
->lvb_size
, oinfo
->loi_kms
,
379 dlmlock
->l_policy_data
.l_extent
.end
);
381 ldlm_lock_allow_match_locked(dlmlock
);
382 } else if (rc
== -ENAVAIL
&& olck
->ols_glimpse
) {
383 CDEBUG(D_INODE
, "glimpsed, setting rss=%llu; leaving kms=%llu\n",
384 lvb
->lvb_size
, oinfo
->loi_kms
);
389 cl_object_attr_set(env
, obj
, attr
, valid
);
391 cl_object_attr_unlock(obj
);
395 * Called when a lock is granted, from an upcall (when server returned a
396 * granted lock), or from completion AST, when server returned a blocked lock.
398 * Called under lock and resource spin-locks, that are released temporarily
401 static void osc_lock_granted(const struct lu_env
*env
, struct osc_lock
*olck
,
402 struct ldlm_lock
*dlmlock
, int rc
)
404 struct ldlm_extent
*ext
;
405 struct cl_lock
*lock
;
406 struct cl_lock_descr
*descr
;
408 LASSERT(dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
);
410 if (olck
->ols_state
< OLS_GRANTED
) {
411 lock
= olck
->ols_cl
.cls_lock
;
412 ext
= &dlmlock
->l_policy_data
.l_extent
;
413 descr
= &osc_env_info(env
)->oti_descr
;
414 descr
->cld_obj
= lock
->cll_descr
.cld_obj
;
416 /* XXX check that ->l_granted_mode is valid. */
417 descr
->cld_mode
= osc_ldlm2cl_lock(dlmlock
->l_granted_mode
);
418 descr
->cld_start
= cl_index(descr
->cld_obj
, ext
->start
);
419 descr
->cld_end
= cl_index(descr
->cld_obj
, ext
->end
);
420 descr
->cld_gid
= ext
->gid
;
422 * tell upper layers the extent of the lock that was actually
425 olck
->ols_state
= OLS_GRANTED
;
426 osc_lock_lvb_update(env
, olck
, rc
);
428 /* release DLM spin-locks to allow cl_lock_{modify,signal}()
429 * to take a semaphore on a parent lock. This is safe, because
430 * spin-locks are needed to protect consistency of
431 * dlmlock->l_*_mode and LVB, and we have finished processing
433 unlock_res_and_lock(dlmlock
);
434 cl_lock_modify(env
, lock
, descr
);
435 cl_lock_signal(env
, lock
);
436 LINVRNT(osc_lock_invariant(olck
));
437 lock_res_and_lock(dlmlock
);
441 static void osc_lock_upcall0(const struct lu_env
*env
, struct osc_lock
*olck
)
444 struct ldlm_lock
*dlmlock
;
446 dlmlock
= ldlm_handle2lock_long(&olck
->ols_handle
, 0);
447 LASSERT(dlmlock
!= NULL
);
449 lock_res_and_lock(dlmlock
);
450 spin_lock(&osc_ast_guard
);
451 LASSERT(dlmlock
->l_ast_data
== olck
);
452 LASSERT(olck
->ols_lock
== NULL
);
453 olck
->ols_lock
= dlmlock
;
454 spin_unlock(&osc_ast_guard
);
457 * Lock might be not yet granted. In this case, completion ast
458 * (osc_ldlm_completion_ast()) comes later and finishes lock
461 if (dlmlock
->l_granted_mode
== dlmlock
->l_req_mode
)
462 osc_lock_granted(env
, olck
, dlmlock
, 0);
463 unlock_res_and_lock(dlmlock
);
466 * osc_enqueue_interpret() decrefs asynchronous locks, counter
469 ldlm_lock_addref(&olck
->ols_handle
, olck
->ols_einfo
.ei_mode
);
472 /* lock reference taken by ldlm_handle2lock_long() is owned by
473 * osc_lock and released in osc_lock_detach() */
474 lu_ref_add(&dlmlock
->l_reference
, "osc_lock", olck
);
475 olck
->ols_has_ref
= 1;
479 * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
480 * received from a server, or after osc_enqueue_base() matched a local DLM
483 static int osc_lock_upcall(void *cookie
, int errcode
)
485 struct osc_lock
*olck
= cookie
;
486 struct cl_lock_slice
*slice
= &olck
->ols_cl
;
487 struct cl_lock
*lock
= slice
->cls_lock
;
489 struct cl_env_nest nest
;
491 env
= cl_env_nested_get(&nest
);
495 cl_lock_mutex_get(env
, lock
);
497 LASSERT(lock
->cll_state
>= CLS_QUEUING
);
498 if (olck
->ols_state
== OLS_ENQUEUED
) {
499 olck
->ols_state
= OLS_UPCALL_RECEIVED
;
500 rc
= ldlm_error2errno(errcode
);
501 } else if (olck
->ols_state
== OLS_CANCELLED
) {
504 CERROR("Impossible state: %d\n", olck
->ols_state
);
508 struct ldlm_lock
*dlmlock
;
510 dlmlock
= ldlm_handle2lock(&olck
->ols_handle
);
511 if (dlmlock
!= NULL
) {
512 lock_res_and_lock(dlmlock
);
513 spin_lock(&osc_ast_guard
);
514 LASSERT(olck
->ols_lock
== NULL
);
515 dlmlock
->l_ast_data
= NULL
;
516 olck
->ols_handle
.cookie
= 0ULL;
517 spin_unlock(&osc_ast_guard
);
518 ldlm_lock_fail_match_locked(dlmlock
);
519 unlock_res_and_lock(dlmlock
);
520 LDLM_LOCK_PUT(dlmlock
);
523 if (olck
->ols_glimpse
)
524 olck
->ols_glimpse
= 0;
525 osc_lock_upcall0(env
, olck
);
528 /* Error handling, some errors are tolerable. */
529 if (olck
->ols_locklessable
&& rc
== -EUSERS
) {
530 /* This is a tolerable error, turn this lock into
533 osc_object_set_contended(cl2osc(slice
->cls_obj
));
534 LASSERT(slice
->cls_ops
== &osc_lock_ops
);
536 /* Change this lock to ldlmlock-less lock. */
537 osc_lock_to_lockless(env
, olck
, 1);
538 olck
->ols_state
= OLS_GRANTED
;
540 } else if (olck
->ols_glimpse
&& rc
== -ENAVAIL
) {
541 osc_lock_lvb_update(env
, olck
, rc
);
542 cl_lock_delete(env
, lock
);
543 /* Hide the error. */
548 /* For AGL case, the RPC sponsor may exits the cl_lock
549 * processing without wait() called before related OSC
550 * lock upcall(). So update the lock status according
551 * to the enqueue result inside AGL upcall(). */
553 lock
->cll_flags
|= CLF_FROM_UPCALL
;
554 cl_wait_try(env
, lock
);
555 lock
->cll_flags
&= ~CLF_FROM_UPCALL
;
556 if (!olck
->ols_glimpse
)
559 cl_lock_signal(env
, lock
);
560 /* del user for lock upcall cookie */
561 cl_unuse_try(env
, lock
);
563 /* del user for lock upcall cookie */
564 cl_lock_user_del(env
, lock
);
565 cl_lock_error(env
, lock
, rc
);
568 /* release cookie reference, acquired by osc_lock_enqueue() */
569 cl_lock_hold_release(env
, lock
, "upcall", lock
);
570 cl_lock_mutex_put(env
, lock
);
572 lu_ref_del(&lock
->cll_reference
, "upcall", lock
);
573 /* This maybe the last reference, so must be called after
574 * cl_lock_mutex_put(). */
575 cl_lock_put(env
, lock
);
577 cl_env_nested_put(&nest
, env
);
579 /* should never happen, similar to osc_ldlm_blocking_ast(). */
586 * Core of osc_dlm_blocking_ast() logic.
588 static void osc_lock_blocking(const struct lu_env
*env
,
589 struct ldlm_lock
*dlmlock
,
590 struct osc_lock
*olck
, int blocking
)
592 struct cl_lock
*lock
= olck
->ols_cl
.cls_lock
;
594 LASSERT(olck
->ols_lock
== dlmlock
);
595 CLASSERT(OLS_BLOCKED
< OLS_CANCELLED
);
596 LASSERT(!osc_lock_is_lockless(olck
));
599 * Lock might be still addref-ed here, if e.g., blocking ast
600 * is sent for a failed lock.
602 osc_lock_unhold(olck
);
604 if (blocking
&& olck
->ols_state
< OLS_BLOCKED
)
606 * Move osc_lock into OLS_BLOCKED before canceling the lock,
607 * because it recursively re-enters osc_lock_blocking(), with
608 * the state set to OLS_CANCELLED.
610 olck
->ols_state
= OLS_BLOCKED
;
612 * cancel and destroy lock at least once no matter how blocking ast is
613 * entered (see comment above osc_ldlm_blocking_ast() for use
614 * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
616 cl_lock_cancel(env
, lock
);
617 cl_lock_delete(env
, lock
);
621 * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
622 * and ldlm_lock caches.
624 static int osc_dlm_blocking_ast0(const struct lu_env
*env
,
625 struct ldlm_lock
*dlmlock
,
626 void *data
, int flag
)
628 struct osc_lock
*olck
;
629 struct cl_lock
*lock
;
633 LASSERT(flag
== LDLM_CB_BLOCKING
|| flag
== LDLM_CB_CANCELING
);
636 olck
= osc_ast_data_get(dlmlock
);
638 lock
= olck
->ols_cl
.cls_lock
;
639 cl_lock_mutex_get(env
, lock
);
640 LINVRNT(osc_lock_invariant(olck
));
641 if (olck
->ols_ast_wait
) {
642 /* wake up osc_lock_use() */
643 cl_lock_signal(env
, lock
);
644 olck
->ols_ast_wait
= 0;
647 * Lock might have been canceled while this thread was
648 * sleeping for lock mutex, but olck is pinned in memory.
650 if (olck
== dlmlock
->l_ast_data
) {
652 * NOTE: DLM sends blocking AST's for failed locks
653 * (that are still in pre-OLS_GRANTED state)
654 * too, and they have to be canceled otherwise
655 * DLM lock is never destroyed and stuck in
658 * Alternatively, ldlm_cli_cancel() can be
659 * called here directly for osc_locks with
660 * ols_state < OLS_GRANTED to maintain an
661 * invariant that ->clo_cancel() is only called
662 * for locks that were granted.
664 LASSERT(data
== olck
);
665 osc_lock_blocking(env
, dlmlock
,
666 olck
, flag
== LDLM_CB_BLOCKING
);
669 cl_lock_mutex_put(env
, lock
);
670 osc_ast_data_put(env
, olck
);
673 * DLM lock exists, but there is no cl_lock attached to it.
674 * This is a `normal' race. cl_object and its cl_lock's can be
675 * removed by memory pressure, together with all pages.
677 cancel
= (flag
== LDLM_CB_BLOCKING
);
680 struct lustre_handle
*lockh
;
682 lockh
= &osc_env_info(env
)->oti_handle
;
683 ldlm_lock2handle(dlmlock
, lockh
);
684 result
= ldlm_cli_cancel(lockh
, LCF_ASYNC
);
691 * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
692 * some other lock, or is canceled. This function is installed as a
693 * ldlm_lock::l_blocking_ast() for client extent locks.
695 * Control flow is tricky, because ldlm uses the same call-back
696 * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
698 * \param dlmlock lock for which ast occurred.
700 * \param new description of a conflicting lock in case of blocking ast.
702 * \param data value of dlmlock->l_ast_data
704 * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
705 * cancellation and blocking ast's.
707 * Possible use cases:
709 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
710 * lock due to lock lru pressure, or explicit user request to purge
713 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
714 * us that dlmlock conflicts with another lock that some client is
715 * enqueing. Lock is canceled.
717 * - cl_lock_cancel() is called. osc_lock_cancel() calls
718 * ldlm_cli_cancel() that calls
720 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
722 * recursively entering osc_ldlm_blocking_ast().
724 * - client cancels lock voluntary (e.g., as a part of early cancellation):
727 * osc_lock_cancel()->
728 * ldlm_cli_cancel()->
729 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
732 static int osc_ldlm_blocking_ast(struct ldlm_lock
*dlmlock
,
733 struct ldlm_lock_desc
*new, void *data
,
737 struct cl_env_nest nest
;
741 * This can be called in the context of outer IO, e.g.,
744 * ->osc_enqueue_base()->...
745 * ->ldlm_prep_elc_req()->...
746 * ->ldlm_cancel_callback()->...
747 * ->osc_ldlm_blocking_ast()
749 * new environment has to be created to not corrupt outer context.
751 env
= cl_env_nested_get(&nest
);
753 result
= osc_dlm_blocking_ast0(env
, dlmlock
, data
, flag
);
754 cl_env_nested_put(&nest
, env
);
756 result
= PTR_ERR(env
);
758 * XXX This should never happen, as cl_lock is
759 * stuck. Pre-allocated environment a la vvp_inode_fini_env
765 if (result
== -ENODATA
)
768 CERROR("BAST failed: %d\n", result
);
773 static int osc_ldlm_completion_ast(struct ldlm_lock
*dlmlock
,
774 __u64 flags
, void *data
)
776 struct cl_env_nest nest
;
778 struct osc_lock
*olck
;
779 struct cl_lock
*lock
;
783 /* first, do dlm part of the work */
784 dlmrc
= ldlm_completion_ast_async(dlmlock
, flags
, data
);
785 /* then, notify cl_lock */
786 env
= cl_env_nested_get(&nest
);
788 olck
= osc_ast_data_get(dlmlock
);
790 lock
= olck
->ols_cl
.cls_lock
;
791 cl_lock_mutex_get(env
, lock
);
793 * ldlm_handle_cp_callback() copied LVB from request
794 * to lock->l_lvb_data, store it in osc_lock.
796 LASSERT(dlmlock
->l_lvb_data
!= NULL
);
797 lock_res_and_lock(dlmlock
);
798 olck
->ols_lvb
= *(struct ost_lvb
*)dlmlock
->l_lvb_data
;
799 if (olck
->ols_lock
== NULL
) {
801 * upcall (osc_lock_upcall()) hasn't yet been
802 * called. Do nothing now, upcall will bind
803 * olck to dlmlock and signal the waiters.
805 * This maintains an invariant that osc_lock
806 * and ldlm_lock are always bound when
807 * osc_lock is in OLS_GRANTED state.
809 } else if (dlmlock
->l_granted_mode
==
810 dlmlock
->l_req_mode
) {
811 osc_lock_granted(env
, olck
, dlmlock
, dlmrc
);
813 unlock_res_and_lock(dlmlock
);
816 CL_LOCK_DEBUG(D_ERROR
, env
, lock
,
817 "dlmlock returned %d\n", dlmrc
);
818 cl_lock_error(env
, lock
, dlmrc
);
820 cl_lock_mutex_put(env
, lock
);
821 osc_ast_data_put(env
, olck
);
824 result
= -ELDLM_NO_LOCK_DATA
;
825 cl_env_nested_put(&nest
, env
);
827 result
= PTR_ERR(env
);
828 return dlmrc
?: result
;
831 static int osc_ldlm_glimpse_ast(struct ldlm_lock
*dlmlock
, void *data
)
833 struct ptlrpc_request
*req
= data
;
834 struct osc_lock
*olck
;
835 struct cl_lock
*lock
;
836 struct cl_object
*obj
;
837 struct cl_env_nest nest
;
840 struct req_capsule
*cap
;
843 LASSERT(lustre_msg_get_opc(req
->rq_reqmsg
) == LDLM_GL_CALLBACK
);
845 env
= cl_env_nested_get(&nest
);
847 /* osc_ast_data_get() has to go after environment is
848 * allocated, because osc_ast_data() acquires a
849 * reference to a lock, and it can only be released in
852 olck
= osc_ast_data_get(dlmlock
);
854 lock
= olck
->ols_cl
.cls_lock
;
855 /* Do not grab the mutex of cl_lock for glimpse.
856 * See LU-1274 for details.
857 * BTW, it's okay for cl_lock to be cancelled during
858 * this period because server can handle this race.
859 * See ldlm_server_glimpse_ast() for details.
860 * cl_lock_mutex_get(env, lock); */
862 req_capsule_extend(cap
, &RQF_LDLM_GL_CALLBACK
);
863 req_capsule_set_size(cap
, &RMF_DLM_LVB
, RCL_SERVER
,
865 result
= req_capsule_server_pack(cap
);
867 lvb
= req_capsule_server_get(cap
, &RMF_DLM_LVB
);
868 obj
= lock
->cll_descr
.cld_obj
;
869 result
= cl_object_glimpse(env
, obj
, lvb
);
871 if (!exp_connect_lvb_type(req
->rq_export
))
872 req_capsule_shrink(&req
->rq_pill
,
874 sizeof(struct ost_lvb_v1
),
876 osc_ast_data_put(env
, olck
);
879 * These errors are normal races, so we don't want to
880 * fill the console with messages by calling
883 lustre_pack_reply(req
, 1, NULL
, NULL
);
884 result
= -ELDLM_NO_LOCK_DATA
;
886 cl_env_nested_put(&nest
, env
);
888 result
= PTR_ERR(env
);
889 req
->rq_status
= result
;
893 static unsigned long osc_lock_weigh(const struct lu_env
*env
,
894 const struct cl_lock_slice
*slice
)
897 * don't need to grab coh_page_guard since we don't care the exact #
900 return cl_object_header(slice
->cls_obj
)->coh_pages
;
903 static void osc_lock_build_einfo(const struct lu_env
*env
,
904 const struct cl_lock
*clock
,
905 struct osc_lock
*lock
,
906 struct ldlm_enqueue_info
*einfo
)
908 enum cl_lock_mode mode
;
910 mode
= clock
->cll_descr
.cld_mode
;
911 if (mode
== CLM_PHANTOM
)
913 * For now, enqueue all glimpse locks in read mode. In the
914 * future, client might choose to enqueue LCK_PW lock for
915 * glimpse on a file opened for write.
919 einfo
->ei_type
= LDLM_EXTENT
;
920 einfo
->ei_mode
= osc_cl_lock2ldlm(mode
);
921 einfo
->ei_cb_bl
= osc_ldlm_blocking_ast
;
922 einfo
->ei_cb_cp
= osc_ldlm_completion_ast
;
923 einfo
->ei_cb_gl
= osc_ldlm_glimpse_ast
;
924 einfo
->ei_cbdata
= lock
; /* value to be put into ->l_ast_data */
928 * Determine if the lock should be converted into a lockless lock.
931 * - if the lock has an explicit requirement for a non-lockless lock;
932 * - if the io lock request type ci_lockreq;
933 * - send the enqueue rpc to ost to make the further decision;
934 * - special treat to truncate lockless lock
936 * Additional policy can be implemented here, e.g., never do lockless-io
939 static void osc_lock_to_lockless(const struct lu_env
*env
,
940 struct osc_lock
*ols
, int force
)
942 struct cl_lock_slice
*slice
= &ols
->ols_cl
;
944 LASSERT(ols
->ols_state
== OLS_NEW
||
945 ols
->ols_state
== OLS_UPCALL_RECEIVED
);
948 ols
->ols_locklessable
= 1;
949 slice
->cls_ops
= &osc_lock_lockless_ops
;
951 struct osc_io
*oio
= osc_env_io(env
);
952 struct cl_io
*io
= oio
->oi_cl
.cis_io
;
953 struct cl_object
*obj
= slice
->cls_obj
;
954 struct osc_object
*oob
= cl2osc(obj
);
955 const struct osc_device
*osd
= lu2osc_dev(obj
->co_lu
.lo_dev
);
956 struct obd_connect_data
*ocd
;
958 LASSERT(io
->ci_lockreq
== CILR_MANDATORY
||
959 io
->ci_lockreq
== CILR_MAYBE
||
960 io
->ci_lockreq
== CILR_NEVER
);
962 ocd
= &class_exp2cliimp(osc_export(oob
))->imp_connect_data
;
963 ols
->ols_locklessable
= (io
->ci_type
!= CIT_SETATTR
) &&
964 (io
->ci_lockreq
== CILR_MAYBE
) &&
965 (ocd
->ocd_connect_flags
& OBD_CONNECT_SRVLOCK
);
966 if (io
->ci_lockreq
== CILR_NEVER
||
968 (ols
->ols_locklessable
&& osc_object_is_contended(oob
)) ||
969 /* lockless truncate */
970 (cl_io_is_trunc(io
) &&
971 (ocd
->ocd_connect_flags
& OBD_CONNECT_TRUNCLOCK
) &&
972 osd
->od_lockless_truncate
)) {
973 ols
->ols_locklessable
= 1;
974 slice
->cls_ops
= &osc_lock_lockless_ops
;
977 LASSERT(ergo(ols
->ols_glimpse
, !osc_lock_is_lockless(ols
)));
980 static int osc_lock_compatible(const struct osc_lock
*qing
,
981 const struct osc_lock
*qed
)
983 enum cl_lock_mode qing_mode
;
984 enum cl_lock_mode qed_mode
;
986 qing_mode
= qing
->ols_cl
.cls_lock
->cll_descr
.cld_mode
;
987 if (qed
->ols_glimpse
&&
988 (qed
->ols_state
>= OLS_UPCALL_RECEIVED
|| qing_mode
== CLM_READ
))
991 qed_mode
= qed
->ols_cl
.cls_lock
->cll_descr
.cld_mode
;
992 return ((qing_mode
== CLM_READ
) && (qed_mode
== CLM_READ
));
996 * Cancel all conflicting locks and wait for them to be destroyed.
998 * This function is used for two purposes:
1000 * - early cancel all conflicting locks before starting IO, and
1002 * - guarantee that pages added to the page cache by lockless IO are never
1003 * covered by locks other than lockless IO lock, and, hence, are not
1004 * visible to other threads.
1006 static int osc_lock_enqueue_wait(const struct lu_env
*env
,
1007 const struct osc_lock
*olck
)
1009 struct cl_lock
*lock
= olck
->ols_cl
.cls_lock
;
1010 struct cl_lock_descr
*descr
= &lock
->cll_descr
;
1011 struct cl_object_header
*hdr
= cl_object_header(descr
->cld_obj
);
1012 struct cl_lock
*scan
;
1013 struct cl_lock
*conflict
= NULL
;
1014 int lockless
= osc_lock_is_lockless(olck
);
1017 LASSERT(cl_lock_is_mutexed(lock
));
1019 /* make it enqueue anyway for glimpse lock, because we actually
1020 * don't need to cancel any conflicting locks. */
1021 if (olck
->ols_glimpse
)
1024 spin_lock(&hdr
->coh_lock_guard
);
1025 list_for_each_entry(scan
, &hdr
->coh_locks
, cll_linkage
) {
1026 struct cl_lock_descr
*cld
= &scan
->cll_descr
;
1027 const struct osc_lock
*scan_ols
;
1032 if (scan
->cll_state
< CLS_QUEUING
||
1033 scan
->cll_state
== CLS_FREEING
||
1034 cld
->cld_start
> descr
->cld_end
||
1035 cld
->cld_end
< descr
->cld_start
)
1038 /* overlapped and living locks. */
1040 /* We're not supposed to give up group lock. */
1041 if (scan
->cll_descr
.cld_mode
== CLM_GROUP
) {
1042 LASSERT(descr
->cld_mode
!= CLM_GROUP
||
1043 descr
->cld_gid
!= scan
->cll_descr
.cld_gid
);
1047 scan_ols
= osc_lock_at(scan
);
1049 /* We need to cancel the compatible locks if we're enqueuing
1050 * a lockless lock, for example:
1051 * imagine that client has PR lock on [0, 1000], and thread T0
1052 * is doing lockless IO in [500, 1500] region. Concurrent
1053 * thread T1 can see lockless data in [500, 1000], which is
1054 * wrong, because these data are possibly stale. */
1055 if (!lockless
&& osc_lock_compatible(olck
, scan_ols
))
1058 cl_lock_get_trust(scan
);
1062 spin_unlock(&hdr
->coh_lock_guard
);
1065 if (lock
->cll_descr
.cld_mode
== CLM_GROUP
) {
1066 /* we want a group lock but a previous lock request
1067 * conflicts, we do not wait but return 0 so the
1068 * request is send to the server
1070 CDEBUG(D_DLMTRACE
, "group lock %p is conflicted with %p, no wait, send to server\n",
1072 cl_lock_put(env
, conflict
);
1075 CDEBUG(D_DLMTRACE
, "lock %p is conflicted with %p, will wait\n",
1077 LASSERT(lock
->cll_conflict
== NULL
);
1078 lu_ref_add(&conflict
->cll_reference
, "cancel-wait",
1080 lock
->cll_conflict
= conflict
;
1088 * Implementation of cl_lock_operations::clo_enqueue() method for osc
1089 * layer. This initiates ldlm enqueue:
1091 * - cancels conflicting locks early (osc_lock_enqueue_wait());
1093 * - calls osc_enqueue_base() to do actual enqueue.
1095 * osc_enqueue_base() is supplied with an upcall function that is executed
1096 * when lock is received either after a local cached ldlm lock is matched, or
1097 * when a reply from the server is received.
1099 * This function does not wait for the network communication to complete.
1101 static int osc_lock_enqueue(const struct lu_env
*env
,
1102 const struct cl_lock_slice
*slice
,
1103 struct cl_io
*unused
, __u32 enqflags
)
1105 struct osc_lock
*ols
= cl2osc_lock(slice
);
1106 struct cl_lock
*lock
= ols
->ols_cl
.cls_lock
;
1109 LASSERT(cl_lock_is_mutexed(lock
));
1110 LASSERTF(ols
->ols_state
== OLS_NEW
,
1111 "Impossible state: %d\n", ols
->ols_state
);
1113 LASSERTF(ergo(ols
->ols_glimpse
, lock
->cll_descr
.cld_mode
<= CLM_READ
),
1114 "lock = %p, ols = %p\n", lock
, ols
);
1116 result
= osc_lock_enqueue_wait(env
, ols
);
1118 if (!osc_lock_is_lockless(ols
)) {
1119 struct osc_object
*obj
= cl2osc(slice
->cls_obj
);
1120 struct osc_thread_info
*info
= osc_env_info(env
);
1121 struct ldlm_res_id
*resname
= &info
->oti_resname
;
1122 ldlm_policy_data_t
*policy
= &info
->oti_policy
;
1123 struct ldlm_enqueue_info
*einfo
= &ols
->ols_einfo
;
1125 /* lock will be passed as upcall cookie,
1126 * hold ref to prevent to be released. */
1127 cl_lock_hold_add(env
, lock
, "upcall", lock
);
1128 /* a user for lock also */
1129 cl_lock_user_add(env
, lock
);
1130 ols
->ols_state
= OLS_ENQUEUED
;
1133 * XXX: this is possible blocking point as
1134 * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1137 ostid_build_res_name(&obj
->oo_oinfo
->loi_oi
, resname
);
1138 osc_lock_build_policy(env
, lock
, policy
);
1139 result
= osc_enqueue_base(osc_export(obj
), resname
,
1140 &ols
->ols_flags
, policy
,
1142 obj
->oo_oinfo
->loi_kms_valid
,
1144 ols
, einfo
, &ols
->ols_handle
,
1145 PTLRPCD_SET
, 1, ols
->ols_agl
);
1147 cl_lock_user_del(env
, lock
);
1148 cl_lock_unhold(env
, lock
, "upcall", lock
);
1149 if (unlikely(result
== -ECANCELED
)) {
1150 ols
->ols_state
= OLS_NEW
;
1155 ols
->ols_state
= OLS_GRANTED
;
1156 ols
->ols_owner
= osc_env_io(env
);
1159 LASSERT(ergo(ols
->ols_glimpse
, !osc_lock_is_lockless(ols
)));
1163 static int osc_lock_wait(const struct lu_env
*env
,
1164 const struct cl_lock_slice
*slice
)
1166 struct osc_lock
*olck
= cl2osc_lock(slice
);
1167 struct cl_lock
*lock
= olck
->ols_cl
.cls_lock
;
1169 LINVRNT(osc_lock_invariant(olck
));
1171 if (olck
->ols_glimpse
&& olck
->ols_state
>= OLS_UPCALL_RECEIVED
) {
1172 if (olck
->ols_flags
& LDLM_FL_LVB_READY
) {
1174 } else if (olck
->ols_agl
) {
1175 if (lock
->cll_flags
& CLF_FROM_UPCALL
)
1176 /* It is from enqueue RPC reply upcall for
1177 * updating state. Do not re-enqueue. */
1179 olck
->ols_state
= OLS_NEW
;
1181 LASSERT(lock
->cll_error
);
1182 return lock
->cll_error
;
1186 if (olck
->ols_state
== OLS_NEW
) {
1189 LASSERT(olck
->ols_agl
);
1191 olck
->ols_flags
&= ~LDLM_FL_BLOCK_NOWAIT
;
1192 rc
= osc_lock_enqueue(env
, slice
, NULL
, CEF_ASYNC
| CEF_MUST
);
1196 return CLO_REENQUEUED
;
1199 LASSERT(equi(olck
->ols_state
>= OLS_UPCALL_RECEIVED
&&
1200 lock
->cll_error
== 0, olck
->ols_lock
!= NULL
));
1202 return lock
->cll_error
?: olck
->ols_state
>= OLS_GRANTED
? 0 : CLO_WAIT
;
1206 * An implementation of cl_lock_operations::clo_use() method that pins cached
1209 static int osc_lock_use(const struct lu_env
*env
,
1210 const struct cl_lock_slice
*slice
)
1212 struct osc_lock
*olck
= cl2osc_lock(slice
);
1215 LASSERT(!olck
->ols_hold
);
1218 * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1219 * flag is not set. This protects us from a concurrent blocking ast.
1221 rc
= ldlm_lock_addref_try(&olck
->ols_handle
, olck
->ols_einfo
.ei_mode
);
1224 olck
->ols_state
= OLS_GRANTED
;
1226 struct cl_lock
*lock
;
1229 * Lock is being cancelled somewhere within
1230 * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1231 * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1234 lock
= slice
->cls_lock
;
1235 LASSERT(lock
->cll_state
== CLS_INTRANSIT
);
1236 LASSERT(lock
->cll_users
> 0);
1237 /* set a flag for osc_dlm_blocking_ast0() to signal the
1239 olck
->ols_ast_wait
= 1;
1245 static int osc_lock_flush(struct osc_lock
*ols
, int discard
)
1247 struct cl_lock
*lock
= ols
->ols_cl
.cls_lock
;
1248 struct cl_env_nest nest
;
1252 env
= cl_env_nested_get(&nest
);
1254 struct osc_object
*obj
= cl2osc(ols
->ols_cl
.cls_obj
);
1255 struct cl_lock_descr
*descr
= &lock
->cll_descr
;
1258 if (descr
->cld_mode
>= CLM_WRITE
) {
1259 result
= osc_cache_writeback_range(env
, obj
,
1260 descr
->cld_start
, descr
->cld_end
,
1262 LDLM_DEBUG(ols
->ols_lock
,
1263 "lock %p: %d pages were %s.\n", lock
, result
,
1264 discard
? "discarded" : "written");
1269 rc
= cl_lock_discard_pages(env
, lock
);
1270 if (result
== 0 && rc
< 0)
1273 cl_env_nested_put(&nest
, env
);
1275 result
= PTR_ERR(env
);
1278 LINVRNT(!osc_lock_has_pages(ols
));
1284 * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1285 * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1286 * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1287 * with some other lock some where in the cluster. This function does the
1290 * - invalidates all pages protected by this lock (after sending dirty
1291 * ones to the server, as necessary);
1293 * - decref's underlying ldlm lock;
1295 * - cancels ldlm lock (ldlm_cli_cancel()).
1297 static void osc_lock_cancel(const struct lu_env
*env
,
1298 const struct cl_lock_slice
*slice
)
1300 struct cl_lock
*lock
= slice
->cls_lock
;
1301 struct osc_lock
*olck
= cl2osc_lock(slice
);
1302 struct ldlm_lock
*dlmlock
= olck
->ols_lock
;
1306 LASSERT(cl_lock_is_mutexed(lock
));
1307 LINVRNT(osc_lock_invariant(olck
));
1309 if (dlmlock
!= NULL
) {
1312 discard
= !!(dlmlock
->l_flags
& LDLM_FL_DISCARD_DATA
);
1313 if (olck
->ols_state
>= OLS_GRANTED
)
1314 result
= osc_lock_flush(olck
, discard
);
1315 osc_lock_unhold(olck
);
1317 lock_res_and_lock(dlmlock
);
1318 /* Now that we're the only user of dlm read/write reference,
1319 * mostly the ->l_readers + ->l_writers should be zero.
1320 * However, there is a corner case.
1321 * See bug 18829 for details.*/
1322 do_cancel
= (dlmlock
->l_readers
== 0 &&
1323 dlmlock
->l_writers
== 0);
1324 dlmlock
->l_flags
|= LDLM_FL_CBPENDING
;
1325 unlock_res_and_lock(dlmlock
);
1327 result
= ldlm_cli_cancel(&olck
->ols_handle
, LCF_ASYNC
);
1329 CL_LOCK_DEBUG(D_ERROR
, env
, lock
,
1330 "lock %p cancel failure with error(%d)\n",
1333 olck
->ols_state
= OLS_CANCELLED
;
1334 olck
->ols_flags
&= ~LDLM_FL_LVB_READY
;
1335 osc_lock_detach(env
, olck
);
1338 static int osc_lock_has_pages(struct osc_lock
*olck
)
1343 static void osc_lock_delete(const struct lu_env
*env
,
1344 const struct cl_lock_slice
*slice
)
1346 struct osc_lock
*olck
;
1348 olck
= cl2osc_lock(slice
);
1349 if (olck
->ols_glimpse
) {
1350 LASSERT(!olck
->ols_hold
);
1351 LASSERT(!olck
->ols_lock
);
1355 LINVRNT(osc_lock_invariant(olck
));
1356 LINVRNT(!osc_lock_has_pages(olck
));
1358 osc_lock_unhold(olck
);
1359 osc_lock_detach(env
, olck
);
1363 * Implements cl_lock_operations::clo_state() method for osc layer.
1365 * Maintains osc_lock::ols_owner field.
1367 * This assumes that lock always enters CLS_HELD (from some other state) in
1368 * the same IO context as one that requested the lock. This should not be a
1369 * problem, because context is by definition shared by all activity pertaining
1370 * to the same high-level IO.
1372 static void osc_lock_state(const struct lu_env
*env
,
1373 const struct cl_lock_slice
*slice
,
1374 enum cl_lock_state state
)
1376 struct osc_lock
*lock
= cl2osc_lock(slice
);
1379 * XXX multiple io contexts can use the lock at the same time.
1381 LINVRNT(osc_lock_invariant(lock
));
1382 if (state
== CLS_HELD
&& slice
->cls_lock
->cll_state
!= CLS_HELD
) {
1383 struct osc_io
*oio
= osc_env_io(env
);
1385 LASSERT(lock
->ols_owner
== NULL
);
1386 lock
->ols_owner
= oio
;
1387 } else if (state
!= CLS_HELD
)
1388 lock
->ols_owner
= NULL
;
1391 static int osc_lock_print(const struct lu_env
*env
, void *cookie
,
1392 lu_printer_t p
, const struct cl_lock_slice
*slice
)
1394 struct osc_lock
*lock
= cl2osc_lock(slice
);
1397 * XXX print ldlm lock and einfo properly.
1399 (*p
)(env
, cookie
, "%p %#16llx %#llx %d %p ",
1400 lock
->ols_lock
, lock
->ols_flags
, lock
->ols_handle
.cookie
,
1401 lock
->ols_state
, lock
->ols_owner
);
1402 osc_lvb_print(env
, cookie
, p
, &lock
->ols_lvb
);
1406 static int osc_lock_fits_into(const struct lu_env
*env
,
1407 const struct cl_lock_slice
*slice
,
1408 const struct cl_lock_descr
*need
,
1409 const struct cl_io
*io
)
1411 struct osc_lock
*ols
= cl2osc_lock(slice
);
1413 if (need
->cld_enq_flags
& CEF_NEVER
)
1416 if (ols
->ols_state
>= OLS_CANCELLED
)
1419 if (need
->cld_mode
== CLM_PHANTOM
) {
1421 return !(ols
->ols_state
> OLS_RELEASED
);
1424 * Note: the QUEUED lock can't be matched here, otherwise
1425 * it might cause the deadlocks.
1427 * P1: enqueued read lock, create sublock1
1428 * P2: enqueued write lock, create sublock2(conflicted
1430 * P1: Grant read lock.
1431 * P1: enqueued glimpse lock(with holding sublock1_read),
1432 * matched with sublock2, waiting sublock2 to be granted.
1433 * But sublock2 can not be granted, because P1
1434 * will not release sublock1. Bang!
1436 if (ols
->ols_state
< OLS_GRANTED
||
1437 ols
->ols_state
> OLS_RELEASED
)
1439 } else if (need
->cld_enq_flags
& CEF_MUST
) {
1441 * If the lock hasn't ever enqueued, it can't be matched
1442 * because enqueue process brings in many information
1443 * which can be used to determine things such as lockless,
1446 if (ols
->ols_state
< OLS_UPCALL_RECEIVED
&&
1447 ols
->ols_locklessable
)
1453 static const struct cl_lock_operations osc_lock_ops
= {
1454 .clo_fini
= osc_lock_fini
,
1455 .clo_enqueue
= osc_lock_enqueue
,
1456 .clo_wait
= osc_lock_wait
,
1457 .clo_unuse
= osc_lock_unuse
,
1458 .clo_use
= osc_lock_use
,
1459 .clo_delete
= osc_lock_delete
,
1460 .clo_state
= osc_lock_state
,
1461 .clo_cancel
= osc_lock_cancel
,
1462 .clo_weigh
= osc_lock_weigh
,
1463 .clo_print
= osc_lock_print
,
1464 .clo_fits_into
= osc_lock_fits_into
,
1467 static int osc_lock_lockless_unuse(const struct lu_env
*env
,
1468 const struct cl_lock_slice
*slice
)
1470 struct osc_lock
*ols
= cl2osc_lock(slice
);
1471 struct cl_lock
*lock
= slice
->cls_lock
;
1473 LASSERT(ols
->ols_state
== OLS_GRANTED
);
1474 LINVRNT(osc_lock_invariant(ols
));
1476 cl_lock_cancel(env
, lock
);
1477 cl_lock_delete(env
, lock
);
1481 static void osc_lock_lockless_cancel(const struct lu_env
*env
,
1482 const struct cl_lock_slice
*slice
)
1484 struct osc_lock
*ols
= cl2osc_lock(slice
);
1487 result
= osc_lock_flush(ols
, 0);
1489 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1491 ols
->ols_state
= OLS_CANCELLED
;
1494 static int osc_lock_lockless_wait(const struct lu_env
*env
,
1495 const struct cl_lock_slice
*slice
)
1497 struct osc_lock
*olck
= cl2osc_lock(slice
);
1498 struct cl_lock
*lock
= olck
->ols_cl
.cls_lock
;
1500 LINVRNT(osc_lock_invariant(olck
));
1501 LASSERT(olck
->ols_state
>= OLS_UPCALL_RECEIVED
);
1503 return lock
->cll_error
;
1506 static void osc_lock_lockless_state(const struct lu_env
*env
,
1507 const struct cl_lock_slice
*slice
,
1508 enum cl_lock_state state
)
1510 struct osc_lock
*lock
= cl2osc_lock(slice
);
1512 LINVRNT(osc_lock_invariant(lock
));
1513 if (state
== CLS_HELD
) {
1514 struct osc_io
*oio
= osc_env_io(env
);
1516 LASSERT(ergo(lock
->ols_owner
, lock
->ols_owner
== oio
));
1517 lock
->ols_owner
= oio
;
1519 /* set the io to be lockless if this lock is for io's
1521 if (cl_object_same(oio
->oi_cl
.cis_obj
, slice
->cls_obj
))
1522 oio
->oi_lockless
= 1;
1526 static int osc_lock_lockless_fits_into(const struct lu_env
*env
,
1527 const struct cl_lock_slice
*slice
,
1528 const struct cl_lock_descr
*need
,
1529 const struct cl_io
*io
)
1531 struct osc_lock
*lock
= cl2osc_lock(slice
);
1533 if (!(need
->cld_enq_flags
& CEF_NEVER
))
1536 /* lockless lock should only be used by its owning io. b22147 */
1537 return (lock
->ols_owner
== osc_env_io(env
));
1540 static const struct cl_lock_operations osc_lock_lockless_ops
= {
1541 .clo_fini
= osc_lock_fini
,
1542 .clo_enqueue
= osc_lock_enqueue
,
1543 .clo_wait
= osc_lock_lockless_wait
,
1544 .clo_unuse
= osc_lock_lockless_unuse
,
1545 .clo_state
= osc_lock_lockless_state
,
1546 .clo_fits_into
= osc_lock_lockless_fits_into
,
1547 .clo_cancel
= osc_lock_lockless_cancel
,
1548 .clo_print
= osc_lock_print
1551 int osc_lock_init(const struct lu_env
*env
,
1552 struct cl_object
*obj
, struct cl_lock
*lock
,
1553 const struct cl_io
*unused
)
1555 struct osc_lock
*clk
;
1558 clk
= kmem_cache_alloc(osc_lock_kmem
, GFP_NOFS
| __GFP_ZERO
);
1560 __u32 enqflags
= lock
->cll_descr
.cld_enq_flags
;
1562 osc_lock_build_einfo(env
, lock
, clk
, &clk
->ols_einfo
);
1563 atomic_set(&clk
->ols_pageref
, 0);
1564 clk
->ols_state
= OLS_NEW
;
1566 clk
->ols_flags
= osc_enq2ldlm_flags(enqflags
);
1567 clk
->ols_agl
= !!(enqflags
& CEF_AGL
);
1569 clk
->ols_flags
|= LDLM_FL_BLOCK_NOWAIT
;
1570 if (clk
->ols_flags
& LDLM_FL_HAS_INTENT
)
1571 clk
->ols_glimpse
= 1;
1573 cl_lock_slice_add(lock
, &clk
->ols_cl
, obj
, &osc_lock_ops
);
1575 if (!(enqflags
& CEF_MUST
))
1576 /* try to convert this lock to a lockless lock */
1577 osc_lock_to_lockless(env
, clk
, (enqflags
& CEF_NEVER
));
1578 if (clk
->ols_locklessable
&& !(enqflags
& CEF_DISCARD_DATA
))
1579 clk
->ols_flags
|= LDLM_FL_DENY_ON_CONTENTION
;
1581 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1582 lock
, clk
, clk
->ols_flags
);
1590 int osc_dlm_lock_pageref(struct ldlm_lock
*dlm
)
1592 struct osc_lock
*olock
;
1595 spin_lock(&osc_ast_guard
);
1596 olock
= dlm
->l_ast_data
;
1598 * there's a very rare race with osc_page_addref_lock(), but that
1599 * doesn't matter because in the worst case we don't cancel a lock
1600 * which we actually can, that's no harm.
1602 if (olock
!= NULL
&&
1603 atomic_add_return(_PAGEREF_MAGIC
,
1604 &olock
->ols_pageref
) != _PAGEREF_MAGIC
) {
1605 atomic_sub(_PAGEREF_MAGIC
, &olock
->ols_pageref
);
1608 spin_unlock(&osc_ast_guard
);