4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include "../../include/linux/libcfs/libcfs.h"
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
56 struct osc_brw_async_args
{
62 struct brw_page
**aa_ppga
;
63 struct client_obd
*aa_cli
;
64 struct list_head aa_oaps
;
65 struct list_head aa_exts
;
66 struct obd_capa
*aa_ocapa
;
67 struct cl_req
*aa_clerq
;
70 struct osc_async_args
{
71 struct obd_info
*aa_oi
;
74 struct osc_setattr_args
{
76 obd_enqueue_update_f sa_upcall
;
80 struct osc_fsync_args
{
81 struct obd_info
*fa_oi
;
82 obd_enqueue_update_f fa_upcall
;
86 struct osc_enqueue_args
{
87 struct obd_export
*oa_exp
;
89 obd_enqueue_update_f oa_upcall
;
91 struct ost_lvb
*oa_lvb
;
92 struct lustre_handle
*oa_lockh
;
93 struct ldlm_enqueue_info
*oa_ei
;
94 unsigned int oa_agl
:1;
97 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
);
98 static int brw_interpret(const struct lu_env
*env
,
99 struct ptlrpc_request
*req
, void *data
, int rc
);
100 int osc_cleanup(struct obd_device
*obd
);
102 /* Pack OSC object metadata for disk storage (LE byte order). */
103 static int osc_packmd(struct obd_export
*exp
, struct lov_mds_md
**lmmp
,
104 struct lov_stripe_md
*lsm
)
108 lmm_size
= sizeof(**lmmp
);
112 if (*lmmp
!= NULL
&& lsm
== NULL
) {
116 } else if (unlikely(lsm
!= NULL
&& ostid_id(&lsm
->lsm_oi
) == 0)) {
121 *lmmp
= kzalloc(lmm_size
, GFP_NOFS
);
127 ostid_cpu_to_le(&lsm
->lsm_oi
, &(*lmmp
)->lmm_oi
);
132 /* Unpack OSC object metadata from disk storage (LE byte order). */
133 static int osc_unpackmd(struct obd_export
*exp
, struct lov_stripe_md
**lsmp
,
134 struct lov_mds_md
*lmm
, int lmm_bytes
)
137 struct obd_import
*imp
= class_exp2cliimp(exp
);
140 if (lmm_bytes
< sizeof(*lmm
)) {
141 CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 exp
->exp_obd
->obd_name
, lmm_bytes
,
146 /* XXX LOV_MAGIC etc check? */
148 if (unlikely(ostid_id(&lmm
->lmm_oi
) == 0)) {
149 CERROR("%s: zero lmm_object_id: rc = %d\n",
150 exp
->exp_obd
->obd_name
, -EINVAL
);
155 lsm_size
= lov_stripe_md_size(1);
159 if (*lsmp
!= NULL
&& lmm
== NULL
) {
160 kfree((*lsmp
)->lsm_oinfo
[0]);
167 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
168 if (unlikely(*lsmp
== NULL
))
170 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
),
172 if (unlikely((*lsmp
)->lsm_oinfo
[0] == NULL
)) {
176 loi_init((*lsmp
)->lsm_oinfo
[0]);
177 } else if (unlikely(ostid_id(&(*lsmp
)->lsm_oi
) == 0)) {
182 /* XXX zero *lsmp? */
183 ostid_le_to_cpu(&lmm
->lmm_oi
, &(*lsmp
)->lsm_oi
);
186 (imp
->imp_connect_data
.ocd_connect_flags
& OBD_CONNECT_MAXBYTES
))
187 (*lsmp
)->lsm_maxbytes
= imp
->imp_connect_data
.ocd_maxbytes
;
189 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
194 static inline void osc_pack_capa(struct ptlrpc_request
*req
,
195 struct ost_body
*body
, void *capa
)
197 struct obd_capa
*oc
= (struct obd_capa
*)capa
;
198 struct lustre_capa
*c
;
203 c
= req_capsule_client_get(&req
->rq_pill
, &RMF_CAPA1
);
206 body
->oa
.o_valid
|= OBD_MD_FLOSSCAPA
;
207 DEBUG_CAPA(D_SEC
, c
, "pack");
210 static inline void osc_pack_req_body(struct ptlrpc_request
*req
,
211 struct obd_info
*oinfo
)
213 struct ost_body
*body
;
215 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
218 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
220 osc_pack_capa(req
, body
, oinfo
->oi_capa
);
223 static inline void osc_set_capa_size(struct ptlrpc_request
*req
,
224 const struct req_msg_field
*field
,
228 req_capsule_set_size(&req
->rq_pill
, field
, RCL_CLIENT
, 0);
230 /* it is already calculated as sizeof struct obd_capa */
234 static int osc_getattr_interpret(const struct lu_env
*env
,
235 struct ptlrpc_request
*req
,
236 struct osc_async_args
*aa
, int rc
)
238 struct ost_body
*body
;
243 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
245 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
246 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
247 aa
->aa_oi
->oi_oa
, &body
->oa
);
249 /* This should really be sent by the OST */
250 aa
->aa_oi
->oi_oa
->o_blksize
= DT_MAX_BRW_SIZE
;
251 aa
->aa_oi
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
253 CDEBUG(D_INFO
, "can't unpack ost_body\n");
255 aa
->aa_oi
->oi_oa
->o_valid
= 0;
258 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
262 static int osc_getattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
263 struct ptlrpc_request_set
*set
)
265 struct ptlrpc_request
*req
;
266 struct osc_async_args
*aa
;
269 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
273 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
274 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
276 ptlrpc_request_free(req
);
280 osc_pack_req_body(req
, oinfo
);
282 ptlrpc_request_set_replen(req
);
283 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_getattr_interpret
;
285 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
286 aa
= ptlrpc_req_async_args(req
);
289 ptlrpc_set_add_req(set
, req
);
293 static int osc_getattr(const struct lu_env
*env
, struct obd_export
*exp
,
294 struct obd_info
*oinfo
)
296 struct ptlrpc_request
*req
;
297 struct ost_body
*body
;
300 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
304 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
305 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
307 ptlrpc_request_free(req
);
311 osc_pack_req_body(req
, oinfo
);
313 ptlrpc_request_set_replen(req
);
315 rc
= ptlrpc_queue_wait(req
);
319 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
325 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
326 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
329 oinfo
->oi_oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
330 oinfo
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
333 ptlrpc_req_finished(req
);
337 static int osc_setattr(const struct lu_env
*env
, struct obd_export
*exp
,
338 struct obd_info
*oinfo
, struct obd_trans_info
*oti
)
340 struct ptlrpc_request
*req
;
341 struct ost_body
*body
;
344 LASSERT(oinfo
->oi_oa
->o_valid
& OBD_MD_FLGROUP
);
346 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
350 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
351 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
353 ptlrpc_request_free(req
);
357 osc_pack_req_body(req
, oinfo
);
359 ptlrpc_request_set_replen(req
);
361 rc
= ptlrpc_queue_wait(req
);
365 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
371 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
375 ptlrpc_req_finished(req
);
379 static int osc_setattr_interpret(const struct lu_env
*env
,
380 struct ptlrpc_request
*req
,
381 struct osc_setattr_args
*sa
, int rc
)
383 struct ost_body
*body
;
388 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
394 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, sa
->sa_oa
,
397 rc
= sa
->sa_upcall(sa
->sa_cookie
, rc
);
401 int osc_setattr_async_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
402 struct obd_trans_info
*oti
,
403 obd_enqueue_update_f upcall
, void *cookie
,
404 struct ptlrpc_request_set
*rqset
)
406 struct ptlrpc_request
*req
;
407 struct osc_setattr_args
*sa
;
410 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
414 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
415 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
417 ptlrpc_request_free(req
);
421 if (oti
&& oinfo
->oi_oa
->o_valid
& OBD_MD_FLCOOKIE
)
422 oinfo
->oi_oa
->o_lcookie
= *oti
->oti_logcookies
;
424 osc_pack_req_body(req
, oinfo
);
426 ptlrpc_request_set_replen(req
);
428 /* do mds to ost setattr asynchronously */
430 /* Do not wait for response. */
431 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
433 req
->rq_interpret_reply
=
434 (ptlrpc_interpterer_t
)osc_setattr_interpret
;
436 CLASSERT (sizeof(*sa
) <= sizeof(req
->rq_async_args
));
437 sa
= ptlrpc_req_async_args(req
);
438 sa
->sa_oa
= oinfo
->oi_oa
;
439 sa
->sa_upcall
= upcall
;
440 sa
->sa_cookie
= cookie
;
442 if (rqset
== PTLRPCD_SET
)
443 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
445 ptlrpc_set_add_req(rqset
, req
);
451 static int osc_setattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
452 struct obd_trans_info
*oti
,
453 struct ptlrpc_request_set
*rqset
)
455 return osc_setattr_async_base(exp
, oinfo
, oti
,
456 oinfo
->oi_cb_up
, oinfo
, rqset
);
459 int osc_real_create(struct obd_export
*exp
, struct obdo
*oa
,
460 struct lov_stripe_md
**ea
, struct obd_trans_info
*oti
)
462 struct ptlrpc_request
*req
;
463 struct ost_body
*body
;
464 struct lov_stripe_md
*lsm
;
472 rc
= obd_alloc_memmd(exp
, &lsm
);
477 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_CREATE
);
483 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_CREATE
);
485 ptlrpc_request_free(req
);
489 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
492 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
494 ptlrpc_request_set_replen(req
);
496 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
497 oa
->o_flags
== OBD_FL_DELORPHAN
) {
499 "delorphan from OST integration");
500 /* Don't resend the delorphan req */
501 req
->rq_no_resend
= req
->rq_no_delay
= 1;
504 rc
= ptlrpc_queue_wait(req
);
508 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
514 CDEBUG(D_INFO
, "oa flags %x\n", oa
->o_flags
);
515 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oa
, &body
->oa
);
517 oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
518 oa
->o_valid
|= OBD_MD_FLBLKSZ
;
520 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
521 * have valid lsm_oinfo data structs, so don't go touching that.
522 * This needs to be fixed in a big way.
524 lsm
->lsm_oi
= oa
->o_oi
;
528 oti
->oti_transno
= lustre_msg_get_transno(req
->rq_repmsg
);
530 if (oa
->o_valid
& OBD_MD_FLCOOKIE
) {
531 if (!oti
->oti_logcookies
)
532 oti_alloc_cookies(oti
, 1);
533 *oti
->oti_logcookies
= oa
->o_lcookie
;
537 CDEBUG(D_HA
, "transno: %lld\n",
538 lustre_msg_get_transno(req
->rq_repmsg
));
540 ptlrpc_req_finished(req
);
543 obd_free_memmd(exp
, &lsm
);
547 int osc_punch_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
548 obd_enqueue_update_f upcall
, void *cookie
,
549 struct ptlrpc_request_set
*rqset
)
551 struct ptlrpc_request
*req
;
552 struct osc_setattr_args
*sa
;
553 struct ost_body
*body
;
556 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_PUNCH
);
560 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
561 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_PUNCH
);
563 ptlrpc_request_free(req
);
566 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
567 ptlrpc_at_set_req_timeout(req
);
569 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
571 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
573 osc_pack_capa(req
, body
, oinfo
->oi_capa
);
575 ptlrpc_request_set_replen(req
);
577 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_setattr_interpret
;
578 CLASSERT (sizeof(*sa
) <= sizeof(req
->rq_async_args
));
579 sa
= ptlrpc_req_async_args(req
);
580 sa
->sa_oa
= oinfo
->oi_oa
;
581 sa
->sa_upcall
= upcall
;
582 sa
->sa_cookie
= cookie
;
583 if (rqset
== PTLRPCD_SET
)
584 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
586 ptlrpc_set_add_req(rqset
, req
);
591 static int osc_sync_interpret(const struct lu_env
*env
,
592 struct ptlrpc_request
*req
,
595 struct osc_fsync_args
*fa
= arg
;
596 struct ost_body
*body
;
601 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
603 CERROR ("can't unpack ost_body\n");
608 *fa
->fa_oi
->oi_oa
= body
->oa
;
610 rc
= fa
->fa_upcall(fa
->fa_cookie
, rc
);
614 int osc_sync_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
615 obd_enqueue_update_f upcall
, void *cookie
,
616 struct ptlrpc_request_set
*rqset
)
618 struct ptlrpc_request
*req
;
619 struct ost_body
*body
;
620 struct osc_fsync_args
*fa
;
623 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SYNC
);
627 osc_set_capa_size(req
, &RMF_CAPA1
, oinfo
->oi_capa
);
628 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SYNC
);
630 ptlrpc_request_free(req
);
634 /* overload the size and blocks fields in the oa with start/end */
635 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
637 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
639 osc_pack_capa(req
, body
, oinfo
->oi_capa
);
641 ptlrpc_request_set_replen(req
);
642 req
->rq_interpret_reply
= osc_sync_interpret
;
644 CLASSERT(sizeof(*fa
) <= sizeof(req
->rq_async_args
));
645 fa
= ptlrpc_req_async_args(req
);
647 fa
->fa_upcall
= upcall
;
648 fa
->fa_cookie
= cookie
;
650 if (rqset
== PTLRPCD_SET
)
651 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
653 ptlrpc_set_add_req(rqset
, req
);
658 /* Find and cancel locally locks matched by @mode in the resource found by
659 * @objid. Found locks are added into @cancel list. Returns the amount of
660 * locks added to @cancels list. */
661 static int osc_resource_get_unused(struct obd_export
*exp
, struct obdo
*oa
,
662 struct list_head
*cancels
,
663 ldlm_mode_t mode
, __u64 lock_flags
)
665 struct ldlm_namespace
*ns
= exp
->exp_obd
->obd_namespace
;
666 struct ldlm_res_id res_id
;
667 struct ldlm_resource
*res
;
670 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671 * export) but disabled through procfs (flag in NS).
673 * This distinguishes from a case when ELC is not supported originally,
674 * when we still want to cancel locks in advance and just cancel them
675 * locally, without sending any RPC. */
676 if (exp_connect_cancelset(exp
) && !ns_connect_cancelset(ns
))
679 ostid_build_res_name(&oa
->o_oi
, &res_id
);
680 res
= ldlm_resource_get(ns
, NULL
, &res_id
, 0, 0);
684 LDLM_RESOURCE_ADDREF(res
);
685 count
= ldlm_cancel_resource_local(res
, cancels
, NULL
, mode
,
686 lock_flags
, 0, NULL
);
687 LDLM_RESOURCE_DELREF(res
);
688 ldlm_resource_putref(res
);
692 static int osc_destroy_interpret(const struct lu_env
*env
,
693 struct ptlrpc_request
*req
, void *data
,
696 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
698 atomic_dec(&cli
->cl_destroy_in_flight
);
699 wake_up(&cli
->cl_destroy_waitq
);
703 static int osc_can_send_destroy(struct client_obd
*cli
)
705 if (atomic_inc_return(&cli
->cl_destroy_in_flight
) <=
706 cli
->cl_max_rpcs_in_flight
) {
707 /* The destroy request can be sent */
710 if (atomic_dec_return(&cli
->cl_destroy_in_flight
) <
711 cli
->cl_max_rpcs_in_flight
) {
713 * The counter has been modified between the two atomic
716 wake_up(&cli
->cl_destroy_waitq
);
721 int osc_create(const struct lu_env
*env
, struct obd_export
*exp
,
722 struct obdo
*oa
, struct lov_stripe_md
**ea
,
723 struct obd_trans_info
*oti
)
729 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
731 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
732 oa
->o_flags
== OBD_FL_RECREATE_OBJS
) {
733 return osc_real_create(exp
, oa
, ea
, oti
);
736 if (!fid_seq_is_mdt(ostid_seq(&oa
->o_oi
)))
737 return osc_real_create(exp
, oa
, ea
, oti
);
739 /* we should not get here anymore */
745 /* Destroy requests can be async always on the client, and we don't even really
746 * care about the return code since the client cannot do anything at all about
748 * When the MDS is unlinking a filename, it saves the file objects into a
749 * recovery llog, and these object records are cancelled when the OST reports
750 * they were destroyed and sync'd to disk (i.e. transaction committed).
751 * If the client dies, or the OST is down when the object should be destroyed,
752 * the records are not cancelled, and when the OST reconnects to the MDS next,
753 * it will retrieve the llog unlink logs and then sends the log cancellation
754 * cookies to the MDS after committing destroy transactions. */
755 static int osc_destroy(const struct lu_env
*env
, struct obd_export
*exp
,
756 struct obdo
*oa
, struct lov_stripe_md
*ea
,
757 struct obd_trans_info
*oti
, struct obd_export
*md_export
,
760 struct client_obd
*cli
= &exp
->exp_obd
->u
.cli
;
761 struct ptlrpc_request
*req
;
762 struct ost_body
*body
;
767 CDEBUG(D_INFO
, "oa NULL\n");
771 count
= osc_resource_get_unused(exp
, oa
, &cancels
, LCK_PW
,
772 LDLM_FL_DISCARD_DATA
);
774 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_DESTROY
);
776 ldlm_lock_list_put(&cancels
, l_bl_ast
, count
);
780 osc_set_capa_size(req
, &RMF_CAPA1
, (struct obd_capa
*)capa
);
781 rc
= ldlm_prep_elc_req(exp
, req
, LUSTRE_OST_VERSION
, OST_DESTROY
,
784 ptlrpc_request_free(req
);
788 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
789 ptlrpc_at_set_req_timeout(req
);
791 if (oti
!= NULL
&& oa
->o_valid
& OBD_MD_FLCOOKIE
)
792 oa
->o_lcookie
= *oti
->oti_logcookies
;
793 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
795 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
797 osc_pack_capa(req
, body
, (struct obd_capa
*)capa
);
798 ptlrpc_request_set_replen(req
);
800 /* If osc_destroy is for destroying the unlink orphan,
801 * sent from MDT to OST, which should not be blocked here,
802 * because the process might be triggered by ptlrpcd, and
803 * it is not good to block ptlrpcd thread (b=16006)*/
804 if (!(oa
->o_flags
& OBD_FL_DELORPHAN
)) {
805 req
->rq_interpret_reply
= osc_destroy_interpret
;
806 if (!osc_can_send_destroy(cli
)) {
807 struct l_wait_info lwi
= LWI_INTR(LWI_ON_SIGNAL_NOOP
,
811 * Wait until the number of on-going destroy RPCs drops
812 * under max_rpc_in_flight
814 l_wait_event_exclusive(cli
->cl_destroy_waitq
,
815 osc_can_send_destroy(cli
), &lwi
);
819 /* Do not wait for response */
820 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
824 static void osc_announce_cached(struct client_obd
*cli
, struct obdo
*oa
,
827 u32 bits
= OBD_MD_FLBLOCKS
|OBD_MD_FLGRANT
;
829 LASSERT(!(oa
->o_valid
& bits
));
832 client_obd_list_lock(&cli
->cl_loi_list_lock
);
833 oa
->o_dirty
= cli
->cl_dirty
;
834 if (unlikely(cli
->cl_dirty
- cli
->cl_dirty_transit
>
835 cli
->cl_dirty_max
)) {
836 CERROR("dirty %lu - %lu > dirty_max %lu\n",
837 cli
->cl_dirty
, cli
->cl_dirty_transit
, cli
->cl_dirty_max
);
839 } else if (unlikely(atomic_read(&obd_dirty_pages
) -
840 atomic_read(&obd_dirty_transit_pages
) >
841 (long)(obd_max_dirty_pages
+ 1))) {
842 /* The atomic_read() allowing the atomic_inc() are
843 * not covered by a lock thus they may safely race and trip
844 * this CERROR() unless we add in a small fudge factor (+1). */
845 CERROR("dirty %d - %d > system dirty_max %d\n",
846 atomic_read(&obd_dirty_pages
),
847 atomic_read(&obd_dirty_transit_pages
),
848 obd_max_dirty_pages
);
850 } else if (unlikely(cli
->cl_dirty_max
- cli
->cl_dirty
> 0x7fffffff)) {
851 CERROR("dirty %lu - dirty_max %lu too big???\n",
852 cli
->cl_dirty
, cli
->cl_dirty_max
);
855 long max_in_flight
= (cli
->cl_max_pages_per_rpc
<<
857 (cli
->cl_max_rpcs_in_flight
+ 1);
858 oa
->o_undirty
= max(cli
->cl_dirty_max
, max_in_flight
);
860 oa
->o_grant
= cli
->cl_avail_grant
+ cli
->cl_reserved_grant
;
861 oa
->o_dropped
= cli
->cl_lost_grant
;
862 cli
->cl_lost_grant
= 0;
863 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
864 CDEBUG(D_CACHE
, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
865 oa
->o_dirty
, oa
->o_undirty
, oa
->o_dropped
, oa
->o_grant
);
869 void osc_update_next_shrink(struct client_obd
*cli
)
871 cli
->cl_next_shrink_grant
=
872 cfs_time_shift(cli
->cl_grant_shrink_interval
);
873 CDEBUG(D_CACHE
, "next time %ld to shrink grant \n",
874 cli
->cl_next_shrink_grant
);
877 static void __osc_update_grant(struct client_obd
*cli
, u64 grant
)
879 client_obd_list_lock(&cli
->cl_loi_list_lock
);
880 cli
->cl_avail_grant
+= grant
;
881 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
884 static void osc_update_grant(struct client_obd
*cli
, struct ost_body
*body
)
886 if (body
->oa
.o_valid
& OBD_MD_FLGRANT
) {
887 CDEBUG(D_CACHE
, "got %llu extra grant\n", body
->oa
.o_grant
);
888 __osc_update_grant(cli
, body
->oa
.o_grant
);
892 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
893 u32 keylen
, void *key
, u32 vallen
,
894 void *val
, struct ptlrpc_request_set
*set
);
896 static int osc_shrink_grant_interpret(const struct lu_env
*env
,
897 struct ptlrpc_request
*req
,
900 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
901 struct obdo
*oa
= ((struct osc_brw_async_args
*)aa
)->aa_oa
;
902 struct ost_body
*body
;
905 __osc_update_grant(cli
, oa
->o_grant
);
909 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
911 osc_update_grant(cli
, body
);
917 static void osc_shrink_grant_local(struct client_obd
*cli
, struct obdo
*oa
)
919 client_obd_list_lock(&cli
->cl_loi_list_lock
);
920 oa
->o_grant
= cli
->cl_avail_grant
/ 4;
921 cli
->cl_avail_grant
-= oa
->o_grant
;
922 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
923 if (!(oa
->o_valid
& OBD_MD_FLFLAGS
)) {
924 oa
->o_valid
|= OBD_MD_FLFLAGS
;
927 oa
->o_flags
|= OBD_FL_SHRINK_GRANT
;
928 osc_update_next_shrink(cli
);
931 /* Shrink the current grant, either from some large amount to enough for a
932 * full set of in-flight RPCs, or if we have already shrunk to that limit
933 * then to enough for a single RPC. This avoids keeping more grant than
934 * needed, and avoids shrinking the grant piecemeal. */
935 static int osc_shrink_grant(struct client_obd
*cli
)
937 __u64 target_bytes
= (cli
->cl_max_rpcs_in_flight
+ 1) *
938 (cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
);
940 client_obd_list_lock(&cli
->cl_loi_list_lock
);
941 if (cli
->cl_avail_grant
<= target_bytes
)
942 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
943 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
945 return osc_shrink_grant_to_target(cli
, target_bytes
);
948 int osc_shrink_grant_to_target(struct client_obd
*cli
, __u64 target_bytes
)
951 struct ost_body
*body
;
953 client_obd_list_lock(&cli
->cl_loi_list_lock
);
954 /* Don't shrink if we are already above or below the desired limit
955 * We don't want to shrink below a single RPC, as that will negatively
956 * impact block allocation and long-term performance. */
957 if (target_bytes
< cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
)
958 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
960 if (target_bytes
>= cli
->cl_avail_grant
) {
961 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
964 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
966 body
= kzalloc(sizeof(*body
), GFP_NOFS
);
970 osc_announce_cached(cli
, &body
->oa
, 0);
972 client_obd_list_lock(&cli
->cl_loi_list_lock
);
973 body
->oa
.o_grant
= cli
->cl_avail_grant
- target_bytes
;
974 cli
->cl_avail_grant
= target_bytes
;
975 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
976 if (!(body
->oa
.o_valid
& OBD_MD_FLFLAGS
)) {
977 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
978 body
->oa
.o_flags
= 0;
980 body
->oa
.o_flags
|= OBD_FL_SHRINK_GRANT
;
981 osc_update_next_shrink(cli
);
983 rc
= osc_set_info_async(NULL
, cli
->cl_import
->imp_obd
->obd_self_export
,
984 sizeof(KEY_GRANT_SHRINK
), KEY_GRANT_SHRINK
,
985 sizeof(*body
), body
, NULL
);
987 __osc_update_grant(cli
, body
->oa
.o_grant
);
992 static int osc_should_shrink_grant(struct client_obd
*client
)
994 unsigned long time
= cfs_time_current();
995 unsigned long next_shrink
= client
->cl_next_shrink_grant
;
997 if ((client
->cl_import
->imp_connect_data
.ocd_connect_flags
&
998 OBD_CONNECT_GRANT_SHRINK
) == 0)
1001 if (cfs_time_aftereq(time
, next_shrink
- 5 * CFS_TICK
)) {
1002 /* Get the current RPC size directly, instead of going via:
1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004 * Keep comment here so that it can be found by searching. */
1005 int brw_size
= client
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
1007 if (client
->cl_import
->imp_state
== LUSTRE_IMP_FULL
&&
1008 client
->cl_avail_grant
> brw_size
)
1011 osc_update_next_shrink(client
);
1016 static int osc_grant_shrink_grant_cb(struct timeout_item
*item
, void *data
)
1018 struct client_obd
*client
;
1020 list_for_each_entry(client
, &item
->ti_obd_list
,
1021 cl_grant_shrink_list
) {
1022 if (osc_should_shrink_grant(client
))
1023 osc_shrink_grant(client
);
1028 static int osc_add_shrink_grant(struct client_obd
*client
)
1032 rc
= ptlrpc_add_timeout_client(client
->cl_grant_shrink_interval
,
1034 osc_grant_shrink_grant_cb
, NULL
,
1035 &client
->cl_grant_shrink_list
);
1037 CERROR("add grant client %s error %d\n",
1038 client
->cl_import
->imp_obd
->obd_name
, rc
);
1041 CDEBUG(D_CACHE
, "add grant client %s \n",
1042 client
->cl_import
->imp_obd
->obd_name
);
1043 osc_update_next_shrink(client
);
1047 static int osc_del_shrink_grant(struct client_obd
*client
)
1049 return ptlrpc_del_timeout_client(&client
->cl_grant_shrink_list
,
1053 static void osc_init_grant(struct client_obd
*cli
, struct obd_connect_data
*ocd
)
1056 * ocd_grant is the total grant amount we're expect to hold: if we've
1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1060 * race is tolerable here: if we're evicted, but imp_state already
1061 * left EVICTED state, then cl_dirty must be 0 already.
1063 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1064 if (cli
->cl_import
->imp_state
== LUSTRE_IMP_EVICTED
)
1065 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1067 cli
->cl_avail_grant
= ocd
->ocd_grant
- cli
->cl_dirty
;
1069 if (cli
->cl_avail_grant
< 0) {
1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071 cli
->cl_import
->imp_obd
->obd_name
, cli
->cl_avail_grant
,
1072 ocd
->ocd_grant
, cli
->cl_dirty
);
1073 /* workaround for servers which do not have the patch from
1075 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1078 /* determine the appropriate chunk size used by osc_extent. */
1079 cli
->cl_chunkbits
= max_t(int, PAGE_CACHE_SHIFT
, ocd
->ocd_blocksize
);
1080 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
1082 CDEBUG(D_CACHE
, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1083 cli
->cl_import
->imp_obd
->obd_name
,
1084 cli
->cl_avail_grant
, cli
->cl_lost_grant
, cli
->cl_chunkbits
);
1086 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT_SHRINK
&&
1087 list_empty(&cli
->cl_grant_shrink_list
))
1088 osc_add_shrink_grant(cli
);
1091 /* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read
, u32 page_count
,
1096 struct brw_page
**pga
)
1101 /* skip bytes read OK */
1102 while (nob_read
> 0) {
1103 LASSERT (page_count
> 0);
1105 if (pga
[i
]->count
> nob_read
) {
1106 /* EOF inside this page */
1107 ptr
= kmap(pga
[i
]->pg
) +
1108 (pga
[i
]->off
& ~CFS_PAGE_MASK
);
1109 memset(ptr
+ nob_read
, 0, pga
[i
]->count
- nob_read
);
1116 nob_read
-= pga
[i
]->count
;
1121 /* zero remaining pages */
1122 while (page_count
-- > 0) {
1123 ptr
= kmap(pga
[i
]->pg
) + (pga
[i
]->off
& ~CFS_PAGE_MASK
);
1124 memset(ptr
, 0, pga
[i
]->count
);
1130 static int check_write_rcs(struct ptlrpc_request
*req
,
1131 int requested_nob
, int niocount
,
1132 u32 page_count
, struct brw_page
**pga
)
1137 remote_rcs
= req_capsule_server_sized_get(&req
->rq_pill
, &RMF_RCS
,
1138 sizeof(*remote_rcs
) *
1140 if (remote_rcs
== NULL
) {
1141 CDEBUG(D_INFO
, "Missing/short RC vector on BRW_WRITE reply\n");
1145 /* return error if any niobuf was in error */
1146 for (i
= 0; i
< niocount
; i
++) {
1147 if ((int)remote_rcs
[i
] < 0)
1148 return remote_rcs
[i
];
1150 if (remote_rcs
[i
] != 0) {
1151 CDEBUG(D_INFO
, "rc[%d] invalid (%d) req %p\n",
1152 i
, remote_rcs
[i
], req
);
1157 if (req
->rq_bulk
->bd_nob_transferred
!= requested_nob
) {
1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159 req
->rq_bulk
->bd_nob_transferred
, requested_nob
);
1166 static inline int can_merge_pages(struct brw_page
*p1
, struct brw_page
*p2
)
1168 if (p1
->flag
!= p2
->flag
) {
1169 unsigned mask
= ~(OBD_BRW_FROM_GRANT
| OBD_BRW_NOCACHE
|
1170 OBD_BRW_SYNC
| OBD_BRW_ASYNC
|OBD_BRW_NOQUOTA
);
1172 /* warn if we try to combine flags that we don't know to be
1173 * safe to combine */
1174 if (unlikely((p1
->flag
& mask
) != (p2
->flag
& mask
))) {
1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1176 p1
->flag
, p2
->flag
);
1181 return (p1
->off
+ p1
->count
== p2
->off
);
1184 static u32
osc_checksum_bulk(int nob
, u32 pg_count
,
1185 struct brw_page
**pga
, int opc
,
1186 cksum_type_t cksum_type
)
1190 struct cfs_crypto_hash_desc
*hdesc
;
1191 unsigned int bufsize
;
1193 unsigned char cfs_alg
= cksum_obd2cfs(cksum_type
);
1195 LASSERT(pg_count
> 0);
1197 hdesc
= cfs_crypto_hash_init(cfs_alg
, NULL
, 0);
1198 if (IS_ERR(hdesc
)) {
1199 CERROR("Unable to initialize checksum hash %s\n",
1200 cfs_crypto_hash_name(cfs_alg
));
1201 return PTR_ERR(hdesc
);
1204 while (nob
> 0 && pg_count
> 0) {
1205 int count
= pga
[i
]->count
> nob
? nob
: pga
[i
]->count
;
1207 /* corrupt the data before we compute the checksum, to
1208 * simulate an OST->client data error */
1209 if (i
== 0 && opc
== OST_READ
&&
1210 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE
)) {
1211 unsigned char *ptr
= kmap(pga
[i
]->pg
);
1212 int off
= pga
[i
]->off
& ~CFS_PAGE_MASK
;
1213 memcpy(ptr
+ off
, "bad1", min(4, nob
));
1216 cfs_crypto_hash_update_page(hdesc
, pga
[i
]->pg
,
1217 pga
[i
]->off
& ~CFS_PAGE_MASK
,
1220 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1221 pga
[i
]->pg
, pga
[i
]->pg
->mapping
, pga
[i
]->pg
->index
,
1222 (long)pga
[i
]->pg
->flags
, page_count(pga
[i
]->pg
),
1223 page_private(pga
[i
]->pg
),
1224 (int)(pga
[i
]->off
& ~CFS_PAGE_MASK
));
1226 nob
-= pga
[i
]->count
;
1232 err
= cfs_crypto_hash_final(hdesc
, (unsigned char *)&cksum
, &bufsize
);
1235 cfs_crypto_hash_final(hdesc
, NULL
, NULL
);
1237 /* For sending we only compute the wrong checksum instead
1238 * of corrupting the data so it is still correct on a redo */
1239 if (opc
== OST_WRITE
&& OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND
))
1245 static int osc_brw_prep_request(int cmd
, struct client_obd
*cli
,
1247 struct lov_stripe_md
*lsm
, u32 page_count
,
1248 struct brw_page
**pga
,
1249 struct ptlrpc_request
**reqp
,
1250 struct obd_capa
*ocapa
, int reserve
,
1253 struct ptlrpc_request
*req
;
1254 struct ptlrpc_bulk_desc
*desc
;
1255 struct ost_body
*body
;
1256 struct obd_ioobj
*ioobj
;
1257 struct niobuf_remote
*niobuf
;
1258 int niocount
, i
, requested_nob
, opc
, rc
;
1259 struct osc_brw_async_args
*aa
;
1260 struct req_capsule
*pill
;
1261 struct brw_page
*pg_prev
;
1263 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ
))
1264 return -ENOMEM
; /* Recoverable */
1265 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2
))
1266 return -EINVAL
; /* Fatal */
1268 if ((cmd
& OBD_BRW_WRITE
) != 0) {
1270 req
= ptlrpc_request_alloc_pool(cli
->cl_import
,
1271 cli
->cl_import
->imp_rq_pool
,
1272 &RQF_OST_BRW_WRITE
);
1275 req
= ptlrpc_request_alloc(cli
->cl_import
, &RQF_OST_BRW_READ
);
1280 for (niocount
= i
= 1; i
< page_count
; i
++) {
1281 if (!can_merge_pages(pga
[i
- 1], pga
[i
]))
1285 pill
= &req
->rq_pill
;
1286 req_capsule_set_size(pill
, &RMF_OBD_IOOBJ
, RCL_CLIENT
,
1288 req_capsule_set_size(pill
, &RMF_NIOBUF_REMOTE
, RCL_CLIENT
,
1289 niocount
* sizeof(*niobuf
));
1290 osc_set_capa_size(req
, &RMF_CAPA1
, ocapa
);
1292 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, opc
);
1294 ptlrpc_request_free(req
);
1297 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
1298 ptlrpc_at_set_req_timeout(req
);
1299 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1301 req
->rq_no_retry_einprogress
= 1;
1303 desc
= ptlrpc_prep_bulk_imp(req
, page_count
,
1304 cli
->cl_import
->imp_connect_data
.ocd_brw_size
>> LNET_MTU_BITS
,
1305 opc
== OST_WRITE
? BULK_GET_SOURCE
: BULK_PUT_SINK
,
1312 /* NB request now owns desc and will free it when it gets freed */
1314 body
= req_capsule_client_get(pill
, &RMF_OST_BODY
);
1315 ioobj
= req_capsule_client_get(pill
, &RMF_OBD_IOOBJ
);
1316 niobuf
= req_capsule_client_get(pill
, &RMF_NIOBUF_REMOTE
);
1317 LASSERT(body
!= NULL
&& ioobj
!= NULL
&& niobuf
!= NULL
);
1319 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
1321 obdo_to_ioobj(oa
, ioobj
);
1322 ioobj
->ioo_bufcnt
= niocount
;
1323 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1324 * that might be send for this request. The actual number is decided
1325 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1326 * "max - 1" for old client compatibility sending "0", and also so the
1327 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1328 ioobj_max_brw_set(ioobj
, desc
->bd_md_max_brw
);
1329 osc_pack_capa(req
, body
, ocapa
);
1330 LASSERT(page_count
> 0);
1332 for (requested_nob
= i
= 0; i
< page_count
; i
++, niobuf
++) {
1333 struct brw_page
*pg
= pga
[i
];
1334 int poff
= pg
->off
& ~CFS_PAGE_MASK
;
1336 LASSERT(pg
->count
> 0);
1337 /* make sure there is no gap in the middle of page array */
1338 LASSERTF(page_count
== 1 ||
1339 (ergo(i
== 0, poff
+ pg
->count
== PAGE_CACHE_SIZE
) &&
1340 ergo(i
> 0 && i
< page_count
- 1,
1341 poff
== 0 && pg
->count
== PAGE_CACHE_SIZE
) &&
1342 ergo(i
== page_count
- 1, poff
== 0)),
1343 "i: %d/%d pg: %p off: %llu, count: %u\n",
1344 i
, page_count
, pg
, pg
->off
, pg
->count
);
1345 LASSERTF(i
== 0 || pg
->off
> pg_prev
->off
,
1346 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1348 pg
->pg
, page_private(pg
->pg
), pg
->pg
->index
, pg
->off
,
1349 pg_prev
->pg
, page_private(pg_prev
->pg
),
1350 pg_prev
->pg
->index
, pg_prev
->off
);
1351 LASSERT((pga
[0]->flag
& OBD_BRW_SRVLOCK
) ==
1352 (pg
->flag
& OBD_BRW_SRVLOCK
));
1354 ptlrpc_prep_bulk_page_pin(desc
, pg
->pg
, poff
, pg
->count
);
1355 requested_nob
+= pg
->count
;
1357 if (i
> 0 && can_merge_pages(pg_prev
, pg
)) {
1359 niobuf
->len
+= pg
->count
;
1361 niobuf
->offset
= pg
->off
;
1362 niobuf
->len
= pg
->count
;
1363 niobuf
->flags
= pg
->flag
;
1368 LASSERTF((void *)(niobuf
- niocount
) ==
1369 req_capsule_client_get(&req
->rq_pill
, &RMF_NIOBUF_REMOTE
),
1370 "want %p - real %p\n", req_capsule_client_get(&req
->rq_pill
,
1371 &RMF_NIOBUF_REMOTE
), (void *)(niobuf
- niocount
));
1373 osc_announce_cached(cli
, &body
->oa
, opc
== OST_WRITE
? requested_nob
:0);
1375 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1376 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
1377 body
->oa
.o_flags
= 0;
1379 body
->oa
.o_flags
|= OBD_FL_RECOV_RESEND
;
1382 if (osc_should_shrink_grant(cli
))
1383 osc_shrink_grant_local(cli
, &body
->oa
);
1385 /* size[REQ_REC_OFF] still sizeof (*body) */
1386 if (opc
== OST_WRITE
) {
1387 if (cli
->cl_checksum
&&
1388 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1389 /* store cl_cksum_type in a local variable since
1390 * it can be changed via lprocfs */
1391 cksum_type_t cksum_type
= cli
->cl_cksum_type
;
1393 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1394 oa
->o_flags
&= OBD_FL_LOCAL_MASK
;
1395 body
->oa
.o_flags
= 0;
1397 body
->oa
.o_flags
|= cksum_type_pack(cksum_type
);
1398 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1399 body
->oa
.o_cksum
= osc_checksum_bulk(requested_nob
,
1403 CDEBUG(D_PAGE
, "checksum at write origin: %x\n",
1405 /* save this in 'oa', too, for later checking */
1406 oa
->o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1407 oa
->o_flags
|= cksum_type_pack(cksum_type
);
1409 /* clear out the checksum flag, in case this is a
1410 * resend but cl_checksum is no longer set. b=11238 */
1411 oa
->o_valid
&= ~OBD_MD_FLCKSUM
;
1413 oa
->o_cksum
= body
->oa
.o_cksum
;
1414 /* 1 RC per niobuf */
1415 req_capsule_set_size(pill
, &RMF_RCS
, RCL_SERVER
,
1416 sizeof(__u32
) * niocount
);
1418 if (cli
->cl_checksum
&&
1419 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1420 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0)
1421 body
->oa
.o_flags
= 0;
1422 body
->oa
.o_flags
|= cksum_type_pack(cli
->cl_cksum_type
);
1423 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1426 ptlrpc_request_set_replen(req
);
1428 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
1429 aa
= ptlrpc_req_async_args(req
);
1431 aa
->aa_requested_nob
= requested_nob
;
1432 aa
->aa_nio_count
= niocount
;
1433 aa
->aa_page_count
= page_count
;
1437 INIT_LIST_HEAD(&aa
->aa_oaps
);
1438 if (ocapa
&& reserve
)
1439 aa
->aa_ocapa
= capa_get(ocapa
);
1445 ptlrpc_req_finished(req
);
1449 static int check_write_checksum(struct obdo
*oa
, const lnet_process_id_t
*peer
,
1450 __u32 client_cksum
, __u32 server_cksum
, int nob
,
1451 u32 page_count
, struct brw_page
**pga
,
1452 cksum_type_t client_cksum_type
)
1456 cksum_type_t cksum_type
;
1458 if (server_cksum
== client_cksum
) {
1459 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1463 cksum_type
= cksum_type_unpack(oa
->o_valid
& OBD_MD_FLFLAGS
?
1465 new_cksum
= osc_checksum_bulk(nob
, page_count
, pga
, OST_WRITE
,
1468 if (cksum_type
!= client_cksum_type
)
1469 msg
= "the server did not use the checksum type specified in the original request - likely a protocol problem"
1471 else if (new_cksum
== server_cksum
)
1472 msg
= "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1474 else if (new_cksum
== client_cksum
)
1475 msg
= "changed in transit before arrival at OST";
1477 msg
= "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1480 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1481 " object "DOSTID
" extent [%llu-%llu]\n",
1482 msg
, libcfs_nid2str(peer
->nid
),
1483 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_seq
: (__u64
)0,
1484 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_oid
: 0,
1485 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_ver
: 0,
1486 POSTID(&oa
->o_oi
), pga
[0]->off
,
1487 pga
[page_count
-1]->off
+ pga
[page_count
-1]->count
- 1);
1488 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1489 client_cksum
, client_cksum_type
,
1490 server_cksum
, cksum_type
, new_cksum
);
1494 /* Note rc enters this function as number of bytes transferred */
1495 static int osc_brw_fini_request(struct ptlrpc_request
*req
, int rc
)
1497 struct osc_brw_async_args
*aa
= (void *)&req
->rq_async_args
;
1498 const lnet_process_id_t
*peer
=
1499 &req
->rq_import
->imp_connection
->c_peer
;
1500 struct client_obd
*cli
= aa
->aa_cli
;
1501 struct ost_body
*body
;
1502 __u32 client_cksum
= 0;
1504 if (rc
< 0 && rc
!= -EDQUOT
) {
1505 DEBUG_REQ(D_INFO
, req
, "Failed request with rc = %d\n", rc
);
1509 LASSERTF(req
->rq_repmsg
!= NULL
, "rc = %d\n", rc
);
1510 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
1512 DEBUG_REQ(D_INFO
, req
, "Can't unpack body\n");
1516 /* set/clear over quota flag for a uid/gid */
1517 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
&&
1518 body
->oa
.o_valid
& (OBD_MD_FLUSRQUOTA
| OBD_MD_FLGRPQUOTA
)) {
1519 unsigned int qid
[MAXQUOTAS
] = { body
->oa
.o_uid
, body
->oa
.o_gid
};
1521 CDEBUG(D_QUOTA
, "setdq for [%u %u] with valid %#llx, flags %x\n",
1522 body
->oa
.o_uid
, body
->oa
.o_gid
, body
->oa
.o_valid
,
1524 osc_quota_setdq(cli
, qid
, body
->oa
.o_valid
, body
->oa
.o_flags
);
1527 osc_update_grant(cli
, body
);
1532 if (aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
)
1533 client_cksum
= aa
->aa_oa
->o_cksum
; /* save for later */
1535 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
) {
1537 CERROR("Unexpected +ve rc %d\n", rc
);
1540 LASSERT(req
->rq_bulk
->bd_nob
== aa
->aa_requested_nob
);
1542 if (sptlrpc_cli_unwrap_bulk_write(req
, req
->rq_bulk
))
1545 if ((aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
) && client_cksum
&&
1546 check_write_checksum(&body
->oa
, peer
, client_cksum
,
1547 body
->oa
.o_cksum
, aa
->aa_requested_nob
,
1548 aa
->aa_page_count
, aa
->aa_ppga
,
1549 cksum_type_unpack(aa
->aa_oa
->o_flags
)))
1552 rc
= check_write_rcs(req
, aa
->aa_requested_nob
,
1554 aa
->aa_page_count
, aa
->aa_ppga
);
1558 /* The rest of this function executes only for OST_READs */
1560 /* if unwrap_bulk failed, return -EAGAIN to retry */
1561 rc
= sptlrpc_cli_unwrap_bulk_read(req
, req
->rq_bulk
, rc
);
1567 if (rc
> aa
->aa_requested_nob
) {
1568 CERROR("Unexpected rc %d (%d requested)\n", rc
,
1569 aa
->aa_requested_nob
);
1573 if (rc
!= req
->rq_bulk
->bd_nob_transferred
) {
1574 CERROR ("Unexpected rc %d (%d transferred)\n",
1575 rc
, req
->rq_bulk
->bd_nob_transferred
);
1579 if (rc
< aa
->aa_requested_nob
)
1580 handle_short_read(rc
, aa
->aa_page_count
, aa
->aa_ppga
);
1582 if (body
->oa
.o_valid
& OBD_MD_FLCKSUM
) {
1583 static int cksum_counter
;
1584 __u32 server_cksum
= body
->oa
.o_cksum
;
1587 cksum_type_t cksum_type
;
1589 cksum_type
= cksum_type_unpack(body
->oa
.o_valid
&OBD_MD_FLFLAGS
?
1590 body
->oa
.o_flags
: 0);
1591 client_cksum
= osc_checksum_bulk(rc
, aa
->aa_page_count
,
1592 aa
->aa_ppga
, OST_READ
,
1595 if (peer
->nid
== req
->rq_bulk
->bd_sender
) {
1599 router
= libcfs_nid2str(req
->rq_bulk
->bd_sender
);
1602 if (server_cksum
!= client_cksum
) {
1603 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID
" object " DOSTID
" extent [%llu-%llu]\n",
1604 req
->rq_import
->imp_obd
->obd_name
,
1605 libcfs_nid2str(peer
->nid
),
1607 body
->oa
.o_valid
& OBD_MD_FLFID
?
1608 body
->oa
.o_parent_seq
: (__u64
)0,
1609 body
->oa
.o_valid
& OBD_MD_FLFID
?
1610 body
->oa
.o_parent_oid
: 0,
1611 body
->oa
.o_valid
& OBD_MD_FLFID
?
1612 body
->oa
.o_parent_ver
: 0,
1613 POSTID(&body
->oa
.o_oi
),
1614 aa
->aa_ppga
[0]->off
,
1615 aa
->aa_ppga
[aa
->aa_page_count
-1]->off
+
1616 aa
->aa_ppga
[aa
->aa_page_count
-1]->count
-
1618 CERROR("client %x, server %x, cksum_type %x\n",
1619 client_cksum
, server_cksum
, cksum_type
);
1621 aa
->aa_oa
->o_cksum
= client_cksum
;
1625 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1628 } else if (unlikely(client_cksum
)) {
1629 static int cksum_missed
;
1632 if ((cksum_missed
& (-cksum_missed
)) == cksum_missed
)
1633 CERROR("Checksum %u requested from %s but not sent\n",
1634 cksum_missed
, libcfs_nid2str(peer
->nid
));
1640 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
1641 aa
->aa_oa
, &body
->oa
);
1646 static int osc_brw_redo_request(struct ptlrpc_request
*request
,
1647 struct osc_brw_async_args
*aa
, int rc
)
1649 struct ptlrpc_request
*new_req
;
1650 struct osc_brw_async_args
*new_aa
;
1651 struct osc_async_page
*oap
;
1653 DEBUG_REQ(rc
== -EINPROGRESS
? D_RPCTRACE
: D_ERROR
, request
,
1654 "redo for recoverable error %d", rc
);
1656 rc
= osc_brw_prep_request(lustre_msg_get_opc(request
->rq_reqmsg
) ==
1657 OST_WRITE
? OBD_BRW_WRITE
:OBD_BRW_READ
,
1658 aa
->aa_cli
, aa
->aa_oa
,
1659 NULL
/* lsm unused by osc currently */,
1660 aa
->aa_page_count
, aa
->aa_ppga
,
1661 &new_req
, aa
->aa_ocapa
, 0, 1);
1665 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
1666 if (oap
->oap_request
!= NULL
) {
1667 LASSERTF(request
== oap
->oap_request
,
1668 "request %p != oap_request %p\n",
1669 request
, oap
->oap_request
);
1670 if (oap
->oap_interrupted
) {
1671 ptlrpc_req_finished(new_req
);
1676 /* New request takes over pga and oaps from old request.
1677 * Note that copying a list_head doesn't work, need to move it... */
1679 new_req
->rq_interpret_reply
= request
->rq_interpret_reply
;
1680 new_req
->rq_async_args
= request
->rq_async_args
;
1681 /* cap resend delay to the current request timeout, this is similar to
1682 * what ptlrpc does (see after_reply()) */
1683 if (aa
->aa_resends
> new_req
->rq_timeout
)
1684 new_req
->rq_sent
= get_seconds() + new_req
->rq_timeout
;
1686 new_req
->rq_sent
= get_seconds() + aa
->aa_resends
;
1687 new_req
->rq_generation_set
= 1;
1688 new_req
->rq_import_generation
= request
->rq_import_generation
;
1690 new_aa
= ptlrpc_req_async_args(new_req
);
1692 INIT_LIST_HEAD(&new_aa
->aa_oaps
);
1693 list_splice_init(&aa
->aa_oaps
, &new_aa
->aa_oaps
);
1694 INIT_LIST_HEAD(&new_aa
->aa_exts
);
1695 list_splice_init(&aa
->aa_exts
, &new_aa
->aa_exts
);
1696 new_aa
->aa_resends
= aa
->aa_resends
;
1698 list_for_each_entry(oap
, &new_aa
->aa_oaps
, oap_rpc_item
) {
1699 if (oap
->oap_request
) {
1700 ptlrpc_req_finished(oap
->oap_request
);
1701 oap
->oap_request
= ptlrpc_request_addref(new_req
);
1705 new_aa
->aa_ocapa
= aa
->aa_ocapa
;
1706 aa
->aa_ocapa
= NULL
;
1708 /* XXX: This code will run into problem if we're going to support
1709 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1710 * and wait for all of them to be finished. We should inherit request
1711 * set from old request. */
1712 ptlrpcd_add_req(new_req
, PDL_POLICY_SAME
, -1);
1714 DEBUG_REQ(D_INFO
, new_req
, "new request");
1719 * ugh, we want disk allocation on the target to happen in offset order. we'll
1720 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1721 * fine for our small page arrays and doesn't require allocation. its an
1722 * insertion sort that swaps elements that are strides apart, shrinking the
1723 * stride down until its '1' and the array is sorted.
1725 static void sort_brw_pages(struct brw_page
**array
, int num
)
1728 struct brw_page
*tmp
;
1732 for (stride
= 1; stride
< num
; stride
= (stride
* 3) + 1)
1737 for (i
= stride
; i
< num
; i
++) {
1740 while (j
>= stride
&& array
[j
- stride
]->off
> tmp
->off
) {
1741 array
[j
] = array
[j
- stride
];
1746 } while (stride
> 1);
1749 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
)
1751 LASSERT(ppga
!= NULL
);
1755 static int brw_interpret(const struct lu_env
*env
,
1756 struct ptlrpc_request
*req
, void *data
, int rc
)
1758 struct osc_brw_async_args
*aa
= data
;
1759 struct osc_extent
*ext
;
1760 struct osc_extent
*tmp
;
1761 struct cl_object
*obj
= NULL
;
1762 struct client_obd
*cli
= aa
->aa_cli
;
1764 rc
= osc_brw_fini_request(req
, rc
);
1765 CDEBUG(D_INODE
, "request %p aa %p rc %d\n", req
, aa
, rc
);
1766 /* When server return -EINPROGRESS, client should always retry
1767 * regardless of the number of times the bulk was resent already. */
1768 if (osc_recoverable_error(rc
)) {
1769 if (req
->rq_import_generation
!=
1770 req
->rq_import
->imp_generation
) {
1771 CDEBUG(D_HA
, "%s: resend cross eviction for object: " DOSTID
", rc = %d.\n",
1772 req
->rq_import
->imp_obd
->obd_name
,
1773 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1774 } else if (rc
== -EINPROGRESS
||
1775 client_should_resend(aa
->aa_resends
, aa
->aa_cli
)) {
1776 rc
= osc_brw_redo_request(req
, aa
, rc
);
1778 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1779 req
->rq_import
->imp_obd
->obd_name
,
1780 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1785 else if (rc
== -EAGAIN
|| rc
== -EINPROGRESS
)
1790 capa_put(aa
->aa_ocapa
);
1791 aa
->aa_ocapa
= NULL
;
1794 list_for_each_entry_safe(ext
, tmp
, &aa
->aa_exts
, oe_link
) {
1795 if (obj
== NULL
&& rc
== 0) {
1796 obj
= osc2cl(ext
->oe_obj
);
1800 list_del_init(&ext
->oe_link
);
1801 osc_extent_finish(env
, ext
, 1, rc
);
1803 LASSERT(list_empty(&aa
->aa_exts
));
1804 LASSERT(list_empty(&aa
->aa_oaps
));
1807 struct obdo
*oa
= aa
->aa_oa
;
1808 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
1809 unsigned long valid
= 0;
1812 if (oa
->o_valid
& OBD_MD_FLBLOCKS
) {
1813 attr
->cat_blocks
= oa
->o_blocks
;
1814 valid
|= CAT_BLOCKS
;
1816 if (oa
->o_valid
& OBD_MD_FLMTIME
) {
1817 attr
->cat_mtime
= oa
->o_mtime
;
1820 if (oa
->o_valid
& OBD_MD_FLATIME
) {
1821 attr
->cat_atime
= oa
->o_atime
;
1824 if (oa
->o_valid
& OBD_MD_FLCTIME
) {
1825 attr
->cat_ctime
= oa
->o_ctime
;
1829 cl_object_attr_lock(obj
);
1830 cl_object_attr_set(env
, obj
, attr
, valid
);
1831 cl_object_attr_unlock(obj
);
1833 cl_object_put(env
, obj
);
1835 OBDO_FREE(aa
->aa_oa
);
1837 cl_req_completion(env
, aa
->aa_clerq
, rc
< 0 ? rc
:
1838 req
->rq_bulk
->bd_nob_transferred
);
1839 osc_release_ppga(aa
->aa_ppga
, aa
->aa_page_count
);
1840 ptlrpc_lprocfs_brw(req
, req
->rq_bulk
->bd_nob_transferred
);
1842 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1843 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1844 * is called so we know whether to go to sync BRWs or wait for more
1845 * RPCs to complete */
1846 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
)
1847 cli
->cl_w_in_flight
--;
1849 cli
->cl_r_in_flight
--;
1850 osc_wake_cache_waiters(cli
);
1851 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
1853 osc_io_unplug(env
, cli
, NULL
, PDL_POLICY_SAME
);
1858 * Build an RPC by the list of extent @ext_list. The caller must ensure
1859 * that the total pages in this list are NOT over max pages per RPC.
1860 * Extents in the list must be in OES_RPC state.
1862 int osc_build_rpc(const struct lu_env
*env
, struct client_obd
*cli
,
1863 struct list_head
*ext_list
, int cmd
, pdl_policy_t pol
)
1865 struct ptlrpc_request
*req
= NULL
;
1866 struct osc_extent
*ext
;
1867 struct brw_page
**pga
= NULL
;
1868 struct osc_brw_async_args
*aa
= NULL
;
1869 struct obdo
*oa
= NULL
;
1870 struct osc_async_page
*oap
;
1871 struct osc_async_page
*tmp
;
1872 struct cl_req
*clerq
= NULL
;
1873 enum cl_req_type crt
= (cmd
& OBD_BRW_WRITE
) ? CRT_WRITE
: CRT_READ
;
1874 struct ldlm_lock
*lock
= NULL
;
1875 struct cl_req_attr
*crattr
= NULL
;
1876 u64 starting_offset
= OBD_OBJECT_EOF
;
1877 u64 ending_offset
= 0;
1883 struct ost_body
*body
;
1884 LIST_HEAD(rpc_list
);
1886 LASSERT(!list_empty(ext_list
));
1888 /* add pages into rpc_list to build BRW rpc */
1889 list_for_each_entry(ext
, ext_list
, oe_link
) {
1890 LASSERT(ext
->oe_state
== OES_RPC
);
1891 mem_tight
|= ext
->oe_memalloc
;
1892 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
1894 list_add_tail(&oap
->oap_rpc_item
, &rpc_list
);
1895 if (starting_offset
> oap
->oap_obj_off
)
1896 starting_offset
= oap
->oap_obj_off
;
1898 LASSERT(oap
->oap_page_off
== 0);
1899 if (ending_offset
< oap
->oap_obj_off
+ oap
->oap_count
)
1900 ending_offset
= oap
->oap_obj_off
+
1903 LASSERT(oap
->oap_page_off
+ oap
->oap_count
==
1909 mpflag
= cfs_memory_pressure_get_and_set();
1911 crattr
= kzalloc(sizeof(*crattr
), GFP_NOFS
);
1917 pga
= kcalloc(page_count
, sizeof(*pga
), GFP_NOFS
);
1930 list_for_each_entry(oap
, &rpc_list
, oap_rpc_item
) {
1931 struct cl_page
*page
= oap2cl_page(oap
);
1932 if (clerq
== NULL
) {
1933 clerq
= cl_req_alloc(env
, page
, crt
,
1934 1 /* only 1-object rpcs for now */);
1935 if (IS_ERR(clerq
)) {
1936 rc
= PTR_ERR(clerq
);
1939 lock
= oap
->oap_ldlm_lock
;
1942 oap
->oap_brw_flags
|= OBD_BRW_MEMALLOC
;
1943 pga
[i
] = &oap
->oap_brw_page
;
1944 pga
[i
]->off
= oap
->oap_obj_off
+ oap
->oap_page_off
;
1945 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1946 pga
[i
]->pg
, page_index(oap
->oap_page
), oap
,
1949 cl_req_page_add(env
, clerq
, page
);
1952 /* always get the data for the obdo for the rpc */
1953 LASSERT(clerq
!= NULL
);
1954 crattr
->cra_oa
= oa
;
1955 cl_req_attr_set(env
, clerq
, crattr
, ~0ULL);
1957 oa
->o_handle
= lock
->l_remote_handle
;
1958 oa
->o_valid
|= OBD_MD_FLHANDLE
;
1961 rc
= cl_req_prep(env
, clerq
);
1963 CERROR("cl_req_prep failed: %d\n", rc
);
1967 sort_brw_pages(pga
, page_count
);
1968 rc
= osc_brw_prep_request(cmd
, cli
, oa
, NULL
, page_count
,
1969 pga
, &req
, crattr
->cra_capa
, 1, 0);
1971 CERROR("prep_req failed: %d\n", rc
);
1975 req
->rq_interpret_reply
= brw_interpret
;
1978 req
->rq_memalloc
= 1;
1980 /* Need to update the timestamps after the request is built in case
1981 * we race with setattr (locally or in queue at OST). If OST gets
1982 * later setattr before earlier BRW (as determined by the request xid),
1983 * the OST will not use BRW timestamps. Sadly, there is no obvious
1984 * way to do this in a single call. bug 10150 */
1985 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
1986 crattr
->cra_oa
= &body
->oa
;
1987 cl_req_attr_set(env
, clerq
, crattr
,
1988 OBD_MD_FLMTIME
|OBD_MD_FLCTIME
|OBD_MD_FLATIME
);
1990 lustre_msg_set_jobid(req
->rq_reqmsg
, crattr
->cra_jobid
);
1992 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
1993 aa
= ptlrpc_req_async_args(req
);
1994 INIT_LIST_HEAD(&aa
->aa_oaps
);
1995 list_splice_init(&rpc_list
, &aa
->aa_oaps
);
1996 INIT_LIST_HEAD(&aa
->aa_exts
);
1997 list_splice_init(ext_list
, &aa
->aa_exts
);
1998 aa
->aa_clerq
= clerq
;
2000 /* queued sync pages can be torn down while the pages
2001 * were between the pending list and the rpc */
2003 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
2004 /* only one oap gets a request reference */
2007 if (oap
->oap_interrupted
&& !req
->rq_intr
) {
2008 CDEBUG(D_INODE
, "oap %p in req %p interrupted\n",
2010 ptlrpc_mark_interrupted(req
);
2014 tmp
->oap_request
= ptlrpc_request_addref(req
);
2016 client_obd_list_lock(&cli
->cl_loi_list_lock
);
2017 starting_offset
>>= PAGE_CACHE_SHIFT
;
2018 if (cmd
== OBD_BRW_READ
) {
2019 cli
->cl_r_in_flight
++;
2020 lprocfs_oh_tally_log2(&cli
->cl_read_page_hist
, page_count
);
2021 lprocfs_oh_tally(&cli
->cl_read_rpc_hist
, cli
->cl_r_in_flight
);
2022 lprocfs_oh_tally_log2(&cli
->cl_read_offset_hist
,
2023 starting_offset
+ 1);
2025 cli
->cl_w_in_flight
++;
2026 lprocfs_oh_tally_log2(&cli
->cl_write_page_hist
, page_count
);
2027 lprocfs_oh_tally(&cli
->cl_write_rpc_hist
, cli
->cl_w_in_flight
);
2028 lprocfs_oh_tally_log2(&cli
->cl_write_offset_hist
,
2029 starting_offset
+ 1);
2031 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
2033 DEBUG_REQ(D_INODE
, req
, "%d pages, aa %p. now %dr/%dw in flight",
2034 page_count
, aa
, cli
->cl_r_in_flight
,
2035 cli
->cl_w_in_flight
);
2037 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2038 * see which CPU/NUMA node the majority of pages were allocated
2039 * on, and try to assign the async RPC to the CPU core
2040 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2042 * But on the other hand, we expect that multiple ptlrpcd
2043 * threads and the initial write sponsor can run in parallel,
2044 * especially when data checksum is enabled, which is CPU-bound
2045 * operation and single ptlrpcd thread cannot process in time.
2046 * So more ptlrpcd threads sharing BRW load
2047 * (with PDL_POLICY_ROUND) seems better.
2049 ptlrpcd_add_req(req
, pol
, -1);
2054 cfs_memory_pressure_restore(mpflag
);
2056 if (crattr
!= NULL
) {
2057 capa_put(crattr
->cra_capa
);
2062 LASSERT(req
== NULL
);
2067 /* this should happen rarely and is pretty bad, it makes the
2068 * pending list not follow the dirty order */
2069 while (!list_empty(ext_list
)) {
2070 ext
= list_entry(ext_list
->next
, struct osc_extent
,
2072 list_del_init(&ext
->oe_link
);
2073 osc_extent_finish(env
, ext
, 0, rc
);
2075 if (clerq
&& !IS_ERR(clerq
))
2076 cl_req_completion(env
, clerq
, rc
);
2081 static int osc_set_lock_data_with_check(struct ldlm_lock
*lock
,
2082 struct ldlm_enqueue_info
*einfo
)
2084 void *data
= einfo
->ei_cbdata
;
2087 LASSERT(lock
!= NULL
);
2088 LASSERT(lock
->l_blocking_ast
== einfo
->ei_cb_bl
);
2089 LASSERT(lock
->l_resource
->lr_type
== einfo
->ei_type
);
2090 LASSERT(lock
->l_completion_ast
== einfo
->ei_cb_cp
);
2091 LASSERT(lock
->l_glimpse_ast
== einfo
->ei_cb_gl
);
2093 lock_res_and_lock(lock
);
2094 spin_lock(&osc_ast_guard
);
2096 if (lock
->l_ast_data
== NULL
)
2097 lock
->l_ast_data
= data
;
2098 if (lock
->l_ast_data
== data
)
2101 spin_unlock(&osc_ast_guard
);
2102 unlock_res_and_lock(lock
);
2107 static int osc_set_data_with_check(struct lustre_handle
*lockh
,
2108 struct ldlm_enqueue_info
*einfo
)
2110 struct ldlm_lock
*lock
= ldlm_handle2lock(lockh
);
2114 set
= osc_set_lock_data_with_check(lock
, einfo
);
2115 LDLM_LOCK_PUT(lock
);
2117 CERROR("lockh %p, data %p - client evicted?\n",
2118 lockh
, einfo
->ei_cbdata
);
2122 /* find any ldlm lock of the inode in osc
2126 static int osc_find_cbdata(struct obd_export
*exp
, struct lov_stripe_md
*lsm
,
2127 ldlm_iterator_t replace
, void *data
)
2129 struct ldlm_res_id res_id
;
2130 struct obd_device
*obd
= class_exp2obd(exp
);
2133 ostid_build_res_name(&lsm
->lsm_oi
, &res_id
);
2134 rc
= ldlm_resource_iterate(obd
->obd_namespace
, &res_id
, replace
, data
);
2135 if (rc
== LDLM_ITER_STOP
)
2137 if (rc
== LDLM_ITER_CONTINUE
)
2142 static int osc_enqueue_fini(struct ptlrpc_request
*req
, struct ost_lvb
*lvb
,
2143 obd_enqueue_update_f upcall
, void *cookie
,
2144 __u64
*flags
, int agl
, int rc
)
2146 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2149 /* The request was created before ldlm_cli_enqueue call. */
2150 if (rc
== ELDLM_LOCK_ABORTED
) {
2151 struct ldlm_reply
*rep
;
2152 rep
= req_capsule_server_get(&req
->rq_pill
,
2155 LASSERT(rep
!= NULL
);
2156 rep
->lock_policy_res1
=
2157 ptlrpc_status_ntoh(rep
->lock_policy_res1
);
2158 if (rep
->lock_policy_res1
)
2159 rc
= rep
->lock_policy_res1
;
2163 if ((intent
!= 0 && rc
== ELDLM_LOCK_ABORTED
&& agl
== 0) ||
2165 *flags
|= LDLM_FL_LVB_READY
;
2166 CDEBUG(D_INODE
, "got kms %llu blocks %llu mtime %llu\n",
2167 lvb
->lvb_size
, lvb
->lvb_blocks
, lvb
->lvb_mtime
);
2170 /* Call the update callback. */
2171 rc
= (*upcall
)(cookie
, rc
);
2175 static int osc_enqueue_interpret(const struct lu_env
*env
,
2176 struct ptlrpc_request
*req
,
2177 struct osc_enqueue_args
*aa
, int rc
)
2179 struct ldlm_lock
*lock
;
2180 struct lustre_handle handle
;
2182 struct ost_lvb
*lvb
;
2184 __u64
*flags
= aa
->oa_flags
;
2186 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2187 * might be freed anytime after lock upcall has been called. */
2188 lustre_handle_copy(&handle
, aa
->oa_lockh
);
2189 mode
= aa
->oa_ei
->ei_mode
;
2191 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2193 lock
= ldlm_handle2lock(&handle
);
2195 /* Take an additional reference so that a blocking AST that
2196 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2197 * to arrive after an upcall has been executed by
2198 * osc_enqueue_fini(). */
2199 ldlm_lock_addref(&handle
, mode
);
2201 /* Let CP AST to grant the lock first. */
2202 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 1);
2204 if (aa
->oa_agl
&& rc
== ELDLM_LOCK_ABORTED
) {
2209 lvb_len
= sizeof(*aa
->oa_lvb
);
2212 /* Complete obtaining the lock procedure. */
2213 rc
= ldlm_cli_enqueue_fini(aa
->oa_exp
, req
, aa
->oa_ei
->ei_type
, 1,
2214 mode
, flags
, lvb
, lvb_len
, &handle
, rc
);
2215 /* Complete osc stuff. */
2216 rc
= osc_enqueue_fini(req
, aa
->oa_lvb
, aa
->oa_upcall
, aa
->oa_cookie
,
2217 flags
, aa
->oa_agl
, rc
);
2219 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE
, 10);
2221 /* Release the lock for async request. */
2222 if (lustre_handle_is_used(&handle
) && rc
== ELDLM_OK
)
2224 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2225 * not already released by
2226 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2228 ldlm_lock_decref(&handle
, mode
);
2230 LASSERTF(lock
!= NULL
, "lockh %p, req %p, aa %p - client evicted?\n",
2231 aa
->oa_lockh
, req
, aa
);
2232 ldlm_lock_decref(&handle
, mode
);
2233 LDLM_LOCK_PUT(lock
);
2237 struct ptlrpc_request_set
*PTLRPCD_SET
= (void *)1;
2239 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2240 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2241 * other synchronous requests, however keeping some locks and trying to obtain
2242 * others may take a considerable amount of time in a case of ost failure; and
2243 * when other sync requests do not get released lock from a client, the client
2244 * is excluded from the cluster -- such scenarious make the life difficult, so
2245 * release locks just after they are obtained. */
2246 int osc_enqueue_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2247 __u64
*flags
, ldlm_policy_data_t
*policy
,
2248 struct ost_lvb
*lvb
, int kms_valid
,
2249 obd_enqueue_update_f upcall
, void *cookie
,
2250 struct ldlm_enqueue_info
*einfo
,
2251 struct lustre_handle
*lockh
,
2252 struct ptlrpc_request_set
*rqset
, int async
, int agl
)
2254 struct obd_device
*obd
= exp
->exp_obd
;
2255 struct ptlrpc_request
*req
= NULL
;
2256 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2257 __u64 match_lvb
= (agl
!= 0 ? 0 : LDLM_FL_LVB_READY
);
2261 /* Filesystem lock extents are extended to page boundaries so that
2262 * dealing with the page cache is a little smoother. */
2263 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~CFS_PAGE_MASK
;
2264 policy
->l_extent
.end
|= ~CFS_PAGE_MASK
;
2267 * kms is not valid when either object is completely fresh (so that no
2268 * locks are cached), or object was evicted. In the latter case cached
2269 * lock cannot be used, because it would prime inode state with
2270 * potentially stale LVB.
2275 /* Next, search for already existing extent locks that will cover us */
2276 /* If we're trying to read, we also search for an existing PW lock. The
2277 * VFS and page cache already protect us locally, so lots of readers/
2278 * writers can share a single PW lock.
2280 * There are problems with conversion deadlocks, so instead of
2281 * converting a read lock to a write lock, we'll just enqueue a new
2284 * At some point we should cancel the read lock instead of making them
2285 * send us a blocking callback, but there are problems with canceling
2286 * locks out from other users right now, too. */
2287 mode
= einfo
->ei_mode
;
2288 if (einfo
->ei_mode
== LCK_PR
)
2290 mode
= ldlm_lock_match(obd
->obd_namespace
, *flags
| match_lvb
, res_id
,
2291 einfo
->ei_type
, policy
, mode
, lockh
, 0);
2293 struct ldlm_lock
*matched
= ldlm_handle2lock(lockh
);
2295 if ((agl
!= 0) && !(matched
->l_flags
& LDLM_FL_LVB_READY
)) {
2296 /* For AGL, if enqueue RPC is sent but the lock is not
2297 * granted, then skip to process this strpe.
2298 * Return -ECANCELED to tell the caller. */
2299 ldlm_lock_decref(lockh
, mode
);
2300 LDLM_LOCK_PUT(matched
);
2304 if (osc_set_lock_data_with_check(matched
, einfo
)) {
2305 *flags
|= LDLM_FL_LVB_READY
;
2306 /* addref the lock only if not async requests and PW
2307 * lock is matched whereas we asked for PR. */
2308 if (!rqset
&& einfo
->ei_mode
!= mode
)
2309 ldlm_lock_addref(lockh
, LCK_PR
);
2311 /* I would like to be able to ASSERT here that
2312 * rss <= kms, but I can't, for reasons which
2313 * are explained in lov_enqueue() */
2316 /* We already have a lock, and it's referenced.
2318 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2319 * AGL upcall may change it to CLS_HELD directly. */
2320 (*upcall
)(cookie
, ELDLM_OK
);
2322 if (einfo
->ei_mode
!= mode
)
2323 ldlm_lock_decref(lockh
, LCK_PW
);
2325 /* For async requests, decref the lock. */
2326 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
2327 LDLM_LOCK_PUT(matched
);
2331 ldlm_lock_decref(lockh
, mode
);
2332 LDLM_LOCK_PUT(matched
);
2338 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2339 &RQF_LDLM_ENQUEUE_LVB
);
2343 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, 0);
2345 ptlrpc_request_free(req
);
2349 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
,
2351 ptlrpc_request_set_replen(req
);
2354 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2355 *flags
&= ~LDLM_FL_BLOCK_GRANTED
;
2357 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, res_id
, policy
, flags
, lvb
,
2358 sizeof(*lvb
), LVB_T_OST
, lockh
, async
);
2361 struct osc_enqueue_args
*aa
;
2362 CLASSERT (sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2363 aa
= ptlrpc_req_async_args(req
);
2366 aa
->oa_flags
= flags
;
2367 aa
->oa_upcall
= upcall
;
2368 aa
->oa_cookie
= cookie
;
2370 aa
->oa_lockh
= lockh
;
2373 req
->rq_interpret_reply
=
2374 (ptlrpc_interpterer_t
)osc_enqueue_interpret
;
2375 if (rqset
== PTLRPCD_SET
)
2376 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
2378 ptlrpc_set_add_req(rqset
, req
);
2379 } else if (intent
) {
2380 ptlrpc_req_finished(req
);
2385 rc
= osc_enqueue_fini(req
, lvb
, upcall
, cookie
, flags
, agl
, rc
);
2387 ptlrpc_req_finished(req
);
2392 int osc_match_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2393 __u32 type
, ldlm_policy_data_t
*policy
, __u32 mode
,
2394 __u64
*flags
, void *data
, struct lustre_handle
*lockh
,
2397 struct obd_device
*obd
= exp
->exp_obd
;
2398 __u64 lflags
= *flags
;
2401 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH
))
2404 /* Filesystem lock extents are extended to page boundaries so that
2405 * dealing with the page cache is a little smoother */
2406 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~CFS_PAGE_MASK
;
2407 policy
->l_extent
.end
|= ~CFS_PAGE_MASK
;
2409 /* Next, search for already existing extent locks that will cover us */
2410 /* If we're trying to read, we also search for an existing PW lock. The
2411 * VFS and page cache already protect us locally, so lots of readers/
2412 * writers can share a single PW lock. */
2416 rc
= ldlm_lock_match(obd
->obd_namespace
, lflags
,
2417 res_id
, type
, policy
, rc
, lockh
, unref
);
2420 if (!osc_set_data_with_check(lockh
, data
)) {
2421 if (!(lflags
& LDLM_FL_TEST_LOCK
))
2422 ldlm_lock_decref(lockh
, rc
);
2426 if (!(lflags
& LDLM_FL_TEST_LOCK
) && mode
!= rc
) {
2427 ldlm_lock_addref(lockh
, LCK_PR
);
2428 ldlm_lock_decref(lockh
, LCK_PW
);
2435 int osc_cancel_base(struct lustre_handle
*lockh
, __u32 mode
)
2437 if (unlikely(mode
== LCK_GROUP
))
2438 ldlm_lock_decref_and_cancel(lockh
, mode
);
2440 ldlm_lock_decref(lockh
, mode
);
2445 static int osc_statfs_interpret(const struct lu_env
*env
,
2446 struct ptlrpc_request
*req
,
2447 struct osc_async_args
*aa
, int rc
)
2449 struct obd_statfs
*msfs
;
2452 /* The request has in fact never been sent
2453 * due to issues at a higher level (LOV).
2454 * Exit immediately since the caller is
2455 * aware of the problem and takes care
2456 * of the clean up */
2459 if ((rc
== -ENOTCONN
|| rc
== -EAGAIN
) &&
2460 (aa
->aa_oi
->oi_flags
& OBD_STATFS_NODELAY
)) {
2468 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2474 *aa
->aa_oi
->oi_osfs
= *msfs
;
2476 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
2480 static int osc_statfs_async(struct obd_export
*exp
,
2481 struct obd_info
*oinfo
, __u64 max_age
,
2482 struct ptlrpc_request_set
*rqset
)
2484 struct obd_device
*obd
= class_exp2obd(exp
);
2485 struct ptlrpc_request
*req
;
2486 struct osc_async_args
*aa
;
2489 /* We could possibly pass max_age in the request (as an absolute
2490 * timestamp or a "seconds.usec ago") so the target can avoid doing
2491 * extra calls into the filesystem if that isn't necessary (e.g.
2492 * during mount that would help a bit). Having relative timestamps
2493 * is not so great if request processing is slow, while absolute
2494 * timestamps are not ideal because they need time synchronization. */
2495 req
= ptlrpc_request_alloc(obd
->u
.cli
.cl_import
, &RQF_OST_STATFS
);
2499 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2501 ptlrpc_request_free(req
);
2504 ptlrpc_request_set_replen(req
);
2505 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2506 ptlrpc_at_set_req_timeout(req
);
2508 if (oinfo
->oi_flags
& OBD_STATFS_NODELAY
) {
2509 /* procfs requests not want stat in wait for avoid deadlock */
2510 req
->rq_no_resend
= 1;
2511 req
->rq_no_delay
= 1;
2514 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_statfs_interpret
;
2515 CLASSERT (sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2516 aa
= ptlrpc_req_async_args(req
);
2519 ptlrpc_set_add_req(rqset
, req
);
2523 static int osc_statfs(const struct lu_env
*env
, struct obd_export
*exp
,
2524 struct obd_statfs
*osfs
, __u64 max_age
, __u32 flags
)
2526 struct obd_device
*obd
= class_exp2obd(exp
);
2527 struct obd_statfs
*msfs
;
2528 struct ptlrpc_request
*req
;
2529 struct obd_import
*imp
= NULL
;
2532 /*Since the request might also come from lprocfs, so we need
2533 *sync this with client_disconnect_export Bug15684*/
2534 down_read(&obd
->u
.cli
.cl_sem
);
2535 if (obd
->u
.cli
.cl_import
)
2536 imp
= class_import_get(obd
->u
.cli
.cl_import
);
2537 up_read(&obd
->u
.cli
.cl_sem
);
2541 /* We could possibly pass max_age in the request (as an absolute
2542 * timestamp or a "seconds.usec ago") so the target can avoid doing
2543 * extra calls into the filesystem if that isn't necessary (e.g.
2544 * during mount that would help a bit). Having relative timestamps
2545 * is not so great if request processing is slow, while absolute
2546 * timestamps are not ideal because they need time synchronization. */
2547 req
= ptlrpc_request_alloc(imp
, &RQF_OST_STATFS
);
2549 class_import_put(imp
);
2554 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2556 ptlrpc_request_free(req
);
2559 ptlrpc_request_set_replen(req
);
2560 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2561 ptlrpc_at_set_req_timeout(req
);
2563 if (flags
& OBD_STATFS_NODELAY
) {
2564 /* procfs requests not want stat in wait for avoid deadlock */
2565 req
->rq_no_resend
= 1;
2566 req
->rq_no_delay
= 1;
2569 rc
= ptlrpc_queue_wait(req
);
2573 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2582 ptlrpc_req_finished(req
);
2586 /* Retrieve object striping information.
2588 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2589 * the maximum number of OST indices which will fit in the user buffer.
2590 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2592 static int osc_getstripe(struct lov_stripe_md
*lsm
, struct lov_user_md
*lump
)
2594 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2595 struct lov_user_md_v3 lum
, *lumk
;
2596 struct lov_user_ost_data_v1
*lmm_objects
;
2597 int rc
= 0, lum_size
;
2602 /* we only need the header part from user space to get lmm_magic and
2603 * lmm_stripe_count, (the header part is common to v1 and v3) */
2604 lum_size
= sizeof(struct lov_user_md_v1
);
2605 if (copy_from_user(&lum
, lump
, lum_size
))
2608 if ((lum
.lmm_magic
!= LOV_USER_MAGIC_V1
) &&
2609 (lum
.lmm_magic
!= LOV_USER_MAGIC_V3
))
2612 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2613 LASSERT(sizeof(struct lov_user_md_v1
) == sizeof(struct lov_mds_md_v1
));
2614 LASSERT(sizeof(struct lov_user_md_v3
) == sizeof(struct lov_mds_md_v3
));
2615 LASSERT(sizeof(lum
.lmm_objects
[0]) == sizeof(lumk
->lmm_objects
[0]));
2617 /* we can use lov_mds_md_size() to compute lum_size
2618 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2619 if (lum
.lmm_stripe_count
> 0) {
2620 lum_size
= lov_mds_md_size(lum
.lmm_stripe_count
, lum
.lmm_magic
);
2621 lumk
= kzalloc(lum_size
, GFP_NOFS
);
2625 if (lum
.lmm_magic
== LOV_USER_MAGIC_V1
)
2627 &(((struct lov_user_md_v1
*)lumk
)->lmm_objects
[0]);
2629 lmm_objects
= &(lumk
->lmm_objects
[0]);
2630 lmm_objects
->l_ost_oi
= lsm
->lsm_oi
;
2632 lum_size
= lov_mds_md_size(0, lum
.lmm_magic
);
2636 lumk
->lmm_oi
= lsm
->lsm_oi
;
2637 lumk
->lmm_stripe_count
= 1;
2639 if (copy_to_user(lump
, lumk
, lum_size
))
2649 static int osc_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
2650 void *karg
, void *uarg
)
2652 struct obd_device
*obd
= exp
->exp_obd
;
2653 struct obd_ioctl_data
*data
= karg
;
2656 if (!try_module_get(THIS_MODULE
)) {
2657 CERROR("Can't get module. Is it alive?");
2661 case OBD_IOC_LOV_GET_CONFIG
: {
2663 struct lov_desc
*desc
;
2664 struct obd_uuid uuid
;
2668 if (obd_ioctl_getdata(&buf
, &len
, uarg
)) {
2673 data
= (struct obd_ioctl_data
*)buf
;
2675 if (sizeof(*desc
) > data
->ioc_inllen1
) {
2676 obd_ioctl_freedata(buf
, len
);
2681 if (data
->ioc_inllen2
< sizeof(uuid
)) {
2682 obd_ioctl_freedata(buf
, len
);
2687 desc
= (struct lov_desc
*)data
->ioc_inlbuf1
;
2688 desc
->ld_tgt_count
= 1;
2689 desc
->ld_active_tgt_count
= 1;
2690 desc
->ld_default_stripe_count
= 1;
2691 desc
->ld_default_stripe_size
= 0;
2692 desc
->ld_default_stripe_offset
= 0;
2693 desc
->ld_pattern
= 0;
2694 memcpy(&desc
->ld_uuid
, &obd
->obd_uuid
, sizeof(uuid
));
2696 memcpy(data
->ioc_inlbuf2
, &obd
->obd_uuid
, sizeof(uuid
));
2698 err
= copy_to_user(uarg
, buf
, len
);
2701 obd_ioctl_freedata(buf
, len
);
2704 case LL_IOC_LOV_SETSTRIPE
:
2705 err
= obd_alloc_memmd(exp
, karg
);
2709 case LL_IOC_LOV_GETSTRIPE
:
2710 err
= osc_getstripe(karg
, uarg
);
2712 case OBD_IOC_CLIENT_RECOVER
:
2713 err
= ptlrpc_recover_import(obd
->u
.cli
.cl_import
,
2714 data
->ioc_inlbuf1
, 0);
2718 case IOC_OSC_SET_ACTIVE
:
2719 err
= ptlrpc_set_import_active(obd
->u
.cli
.cl_import
,
2722 case OBD_IOC_POLL_QUOTACHECK
:
2723 err
= osc_quota_poll_check(exp
, (struct if_quotacheck
*)karg
);
2725 case OBD_IOC_PING_TARGET
:
2726 err
= ptlrpc_obd_ping(obd
);
2729 CDEBUG(D_INODE
, "unrecognised ioctl %#x by %s\n",
2730 cmd
, current_comm());
2735 module_put(THIS_MODULE
);
2739 static int osc_get_info(const struct lu_env
*env
, struct obd_export
*exp
,
2740 u32 keylen
, void *key
, __u32
*vallen
, void *val
,
2741 struct lov_stripe_md
*lsm
)
2743 if (!vallen
|| !val
)
2746 if (KEY_IS(KEY_LOCK_TO_STRIPE
)) {
2747 __u32
*stripe
= val
;
2748 *vallen
= sizeof(*stripe
);
2751 } else if (KEY_IS(KEY_LAST_ID
)) {
2752 struct ptlrpc_request
*req
;
2757 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2758 &RQF_OST_GET_INFO_LAST_ID
);
2762 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2763 RCL_CLIENT
, keylen
);
2764 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2766 ptlrpc_request_free(req
);
2770 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2771 memcpy(tmp
, key
, keylen
);
2773 req
->rq_no_delay
= req
->rq_no_resend
= 1;
2774 ptlrpc_request_set_replen(req
);
2775 rc
= ptlrpc_queue_wait(req
);
2779 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_ID
);
2780 if (reply
== NULL
) {
2785 *((u64
*)val
) = *reply
;
2787 ptlrpc_req_finished(req
);
2789 } else if (KEY_IS(KEY_FIEMAP
)) {
2790 struct ll_fiemap_info_key
*fm_key
=
2791 (struct ll_fiemap_info_key
*)key
;
2792 struct ldlm_res_id res_id
;
2793 ldlm_policy_data_t policy
;
2794 struct lustre_handle lockh
;
2795 ldlm_mode_t mode
= 0;
2796 struct ptlrpc_request
*req
;
2797 struct ll_user_fiemap
*reply
;
2801 if (!(fm_key
->fiemap
.fm_flags
& FIEMAP_FLAG_SYNC
))
2804 policy
.l_extent
.start
= fm_key
->fiemap
.fm_start
&
2807 if (OBD_OBJECT_EOF
- fm_key
->fiemap
.fm_length
<=
2808 fm_key
->fiemap
.fm_start
+ PAGE_CACHE_SIZE
- 1)
2809 policy
.l_extent
.end
= OBD_OBJECT_EOF
;
2811 policy
.l_extent
.end
= (fm_key
->fiemap
.fm_start
+
2812 fm_key
->fiemap
.fm_length
+
2813 PAGE_CACHE_SIZE
- 1) & CFS_PAGE_MASK
;
2815 ostid_build_res_name(&fm_key
->oa
.o_oi
, &res_id
);
2816 mode
= ldlm_lock_match(exp
->exp_obd
->obd_namespace
,
2817 LDLM_FL_BLOCK_GRANTED
|
2819 &res_id
, LDLM_EXTENT
, &policy
,
2820 LCK_PR
| LCK_PW
, &lockh
, 0);
2821 if (mode
) { /* lock is cached on client */
2822 if (mode
!= LCK_PR
) {
2823 ldlm_lock_addref(&lockh
, LCK_PR
);
2824 ldlm_lock_decref(&lockh
, LCK_PW
);
2826 } else { /* no cached lock, needs acquire lock on server side */
2827 fm_key
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
2828 fm_key
->oa
.o_flags
|= OBD_FL_SRVLOCK
;
2832 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2833 &RQF_OST_GET_INFO_FIEMAP
);
2839 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_KEY
,
2840 RCL_CLIENT
, keylen
);
2841 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2842 RCL_CLIENT
, *vallen
);
2843 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2844 RCL_SERVER
, *vallen
);
2846 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2848 ptlrpc_request_free(req
);
2852 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_KEY
);
2853 memcpy(tmp
, key
, keylen
);
2854 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2855 memcpy(tmp
, val
, *vallen
);
2857 ptlrpc_request_set_replen(req
);
2858 rc
= ptlrpc_queue_wait(req
);
2862 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2863 if (reply
== NULL
) {
2868 memcpy(val
, reply
, *vallen
);
2870 ptlrpc_req_finished(req
);
2873 ldlm_lock_decref(&lockh
, LCK_PR
);
2880 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
2881 u32 keylen
, void *key
, u32 vallen
,
2882 void *val
, struct ptlrpc_request_set
*set
)
2884 struct ptlrpc_request
*req
;
2885 struct obd_device
*obd
= exp
->exp_obd
;
2886 struct obd_import
*imp
= class_exp2cliimp(exp
);
2890 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN
, 10);
2892 if (KEY_IS(KEY_CHECKSUM
)) {
2893 if (vallen
!= sizeof(int))
2895 exp
->exp_obd
->u
.cli
.cl_checksum
= (*(int *)val
) ? 1 : 0;
2899 if (KEY_IS(KEY_SPTLRPC_CONF
)) {
2900 sptlrpc_conf_client_adapt(obd
);
2904 if (KEY_IS(KEY_FLUSH_CTX
)) {
2905 sptlrpc_import_flush_my_ctx(imp
);
2909 if (KEY_IS(KEY_CACHE_SET
)) {
2910 struct client_obd
*cli
= &obd
->u
.cli
;
2912 LASSERT(cli
->cl_cache
== NULL
); /* only once */
2913 cli
->cl_cache
= (struct cl_client_cache
*)val
;
2914 atomic_inc(&cli
->cl_cache
->ccc_users
);
2915 cli
->cl_lru_left
= &cli
->cl_cache
->ccc_lru_left
;
2917 /* add this osc into entity list */
2918 LASSERT(list_empty(&cli
->cl_lru_osc
));
2919 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
2920 list_add(&cli
->cl_lru_osc
, &cli
->cl_cache
->ccc_lru
);
2921 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
2926 if (KEY_IS(KEY_CACHE_LRU_SHRINK
)) {
2927 struct client_obd
*cli
= &obd
->u
.cli
;
2928 int nr
= atomic_read(&cli
->cl_lru_in_list
) >> 1;
2929 int target
= *(int *)val
;
2931 nr
= osc_lru_shrink(cli
, min(nr
, target
));
2936 if (!set
&& !KEY_IS(KEY_GRANT_SHRINK
))
2939 /* We pass all other commands directly to OST. Since nobody calls osc
2940 methods directly and everybody is supposed to go through LOV, we
2941 assume lov checked invalid values for us.
2942 The only recognised values so far are evict_by_nid and mds_conn.
2943 Even if something bad goes through, we'd get a -EINVAL from OST
2946 req
= ptlrpc_request_alloc(imp
, KEY_IS(KEY_GRANT_SHRINK
) ?
2947 &RQF_OST_SET_GRANT_INFO
:
2952 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2953 RCL_CLIENT
, keylen
);
2954 if (!KEY_IS(KEY_GRANT_SHRINK
))
2955 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
2956 RCL_CLIENT
, vallen
);
2957 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SET_INFO
);
2959 ptlrpc_request_free(req
);
2963 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2964 memcpy(tmp
, key
, keylen
);
2965 tmp
= req_capsule_client_get(&req
->rq_pill
, KEY_IS(KEY_GRANT_SHRINK
) ?
2968 memcpy(tmp
, val
, vallen
);
2970 if (KEY_IS(KEY_GRANT_SHRINK
)) {
2971 struct osc_brw_async_args
*aa
;
2974 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2975 aa
= ptlrpc_req_async_args(req
);
2978 ptlrpc_req_finished(req
);
2981 *oa
= ((struct ost_body
*)val
)->oa
;
2983 req
->rq_interpret_reply
= osc_shrink_grant_interpret
;
2986 ptlrpc_request_set_replen(req
);
2987 if (!KEY_IS(KEY_GRANT_SHRINK
)) {
2988 LASSERT(set
!= NULL
);
2989 ptlrpc_set_add_req(set
, req
);
2990 ptlrpc_check_set(NULL
, set
);
2992 ptlrpcd_add_req(req
, PDL_POLICY_ROUND
, -1);
2997 static int osc_reconnect(const struct lu_env
*env
,
2998 struct obd_export
*exp
, struct obd_device
*obd
,
2999 struct obd_uuid
*cluuid
,
3000 struct obd_connect_data
*data
,
3003 struct client_obd
*cli
= &obd
->u
.cli
;
3005 if (data
!= NULL
&& (data
->ocd_connect_flags
& OBD_CONNECT_GRANT
)) {
3008 client_obd_list_lock(&cli
->cl_loi_list_lock
);
3009 data
->ocd_grant
= (cli
->cl_avail_grant
+ cli
->cl_dirty
) ?:
3010 2 * cli_brw_size(obd
);
3011 lost_grant
= cli
->cl_lost_grant
;
3012 cli
->cl_lost_grant
= 0;
3013 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
3015 CDEBUG(D_RPCTRACE
, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3016 data
->ocd_connect_flags
,
3017 data
->ocd_version
, data
->ocd_grant
, lost_grant
);
3023 static int osc_disconnect(struct obd_export
*exp
)
3025 struct obd_device
*obd
= class_exp2obd(exp
);
3028 rc
= client_disconnect_export(exp
);
3030 * Initially we put del_shrink_grant before disconnect_export, but it
3031 * causes the following problem if setup (connect) and cleanup
3032 * (disconnect) are tangled together.
3033 * connect p1 disconnect p2
3034 * ptlrpc_connect_import
3035 * ............... class_manual_cleanup
3038 * ptlrpc_connect_interrupt
3040 * add this client to shrink list
3042 * Bang! pinger trigger the shrink.
3043 * So the osc should be disconnected from the shrink list, after we
3044 * are sure the import has been destroyed. BUG18662
3046 if (obd
->u
.cli
.cl_import
== NULL
)
3047 osc_del_shrink_grant(&obd
->u
.cli
);
3051 static int osc_import_event(struct obd_device
*obd
,
3052 struct obd_import
*imp
,
3053 enum obd_import_event event
)
3055 struct client_obd
*cli
;
3058 LASSERT(imp
->imp_obd
== obd
);
3061 case IMP_EVENT_DISCON
: {
3063 client_obd_list_lock(&cli
->cl_loi_list_lock
);
3064 cli
->cl_avail_grant
= 0;
3065 cli
->cl_lost_grant
= 0;
3066 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
3069 case IMP_EVENT_INACTIVE
: {
3070 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_INACTIVE
, NULL
);
3073 case IMP_EVENT_INVALIDATE
: {
3074 struct ldlm_namespace
*ns
= obd
->obd_namespace
;
3078 env
= cl_env_get(&refcheck
);
3082 /* all pages go to failing rpcs due to the invalid
3084 osc_io_unplug(env
, cli
, NULL
, PDL_POLICY_ROUND
);
3086 ldlm_namespace_cleanup(ns
, LDLM_FL_LOCAL_ONLY
);
3087 cl_env_put(env
, &refcheck
);
3092 case IMP_EVENT_ACTIVE
: {
3093 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVE
, NULL
);
3096 case IMP_EVENT_OCD
: {
3097 struct obd_connect_data
*ocd
= &imp
->imp_connect_data
;
3099 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT
)
3100 osc_init_grant(&obd
->u
.cli
, ocd
);
3103 if (ocd
->ocd_connect_flags
& OBD_CONNECT_REQPORTAL
)
3104 imp
->imp_client
->cli_request_portal
=OST_REQUEST_PORTAL
;
3106 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_OCD
, NULL
);
3109 case IMP_EVENT_DEACTIVATE
: {
3110 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_DEACTIVATE
, NULL
);
3113 case IMP_EVENT_ACTIVATE
: {
3114 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVATE
, NULL
);
3118 CERROR("Unknown import event %d\n", event
);
3125 * Determine whether the lock can be canceled before replaying the lock
3126 * during recovery, see bug16774 for detailed information.
3128 * \retval zero the lock can't be canceled
3129 * \retval other ok to cancel
3131 static int osc_cancel_for_recovery(struct ldlm_lock
*lock
)
3133 check_res_locked(lock
->l_resource
);
3136 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3138 * XXX as a future improvement, we can also cancel unused write lock
3139 * if it doesn't have dirty data and active mmaps.
3141 if (lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
3142 (lock
->l_granted_mode
== LCK_PR
||
3143 lock
->l_granted_mode
== LCK_CR
) &&
3144 (osc_dlm_lock_pageref(lock
) == 0))
3150 static int brw_queue_work(const struct lu_env
*env
, void *data
)
3152 struct client_obd
*cli
= data
;
3154 CDEBUG(D_CACHE
, "Run writeback work for client obd %p.\n", cli
);
3156 osc_io_unplug(env
, cli
, NULL
, PDL_POLICY_SAME
);
3160 int osc_setup(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3162 struct lprocfs_static_vars lvars
= { NULL
};
3163 struct client_obd
*cli
= &obd
->u
.cli
;
3167 rc
= ptlrpcd_addref();
3171 rc
= client_obd_setup(obd
, lcfg
);
3175 handler
= ptlrpcd_alloc_work(cli
->cl_import
, brw_queue_work
, cli
);
3176 if (IS_ERR(handler
)) {
3177 rc
= PTR_ERR(handler
);
3178 goto out_client_setup
;
3180 cli
->cl_writeback_work
= handler
;
3182 rc
= osc_quota_setup(obd
);
3184 goto out_ptlrpcd_work
;
3186 cli
->cl_grant_shrink_interval
= GRANT_SHRINK_INTERVAL
;
3187 lprocfs_osc_init_vars(&lvars
);
3188 if (lprocfs_obd_setup(obd
, lvars
.obd_vars
, lvars
.sysfs_vars
) == 0) {
3189 lproc_osc_attach_seqstat(obd
);
3190 sptlrpc_lprocfs_cliobd_attach(obd
);
3191 ptlrpc_lprocfs_register_obd(obd
);
3194 /* We need to allocate a few requests more, because
3195 * brw_interpret tries to create new requests before freeing
3196 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3197 * reserved, but I'm afraid that might be too much wasted RAM
3198 * in fact, so 2 is just my guess and still should work. */
3199 cli
->cl_import
->imp_rq_pool
=
3200 ptlrpc_init_rq_pool(cli
->cl_max_rpcs_in_flight
+ 2,
3202 ptlrpc_add_rqs_to_pool
);
3204 INIT_LIST_HEAD(&cli
->cl_grant_shrink_list
);
3205 ns_register_cancel(obd
->obd_namespace
, osc_cancel_for_recovery
);
3209 ptlrpcd_destroy_work(handler
);
3211 client_obd_cleanup(obd
);
3217 static int osc_precleanup(struct obd_device
*obd
, enum obd_cleanup_stage stage
)
3220 case OBD_CLEANUP_EARLY
: {
3221 struct obd_import
*imp
;
3222 imp
= obd
->u
.cli
.cl_import
;
3223 CDEBUG(D_HA
, "Deactivating import %s\n", obd
->obd_name
);
3224 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3225 ptlrpc_deactivate_import(imp
);
3226 spin_lock(&imp
->imp_lock
);
3227 imp
->imp_pingable
= 0;
3228 spin_unlock(&imp
->imp_lock
);
3231 case OBD_CLEANUP_EXPORTS
: {
3232 struct client_obd
*cli
= &obd
->u
.cli
;
3234 * for echo client, export may be on zombie list, wait for
3235 * zombie thread to cull it, because cli.cl_import will be
3236 * cleared in client_disconnect_export():
3237 * class_export_destroy() -> obd_cleanup() ->
3238 * echo_device_free() -> echo_client_cleanup() ->
3239 * obd_disconnect() -> osc_disconnect() ->
3240 * client_disconnect_export()
3242 obd_zombie_barrier();
3243 if (cli
->cl_writeback_work
) {
3244 ptlrpcd_destroy_work(cli
->cl_writeback_work
);
3245 cli
->cl_writeback_work
= NULL
;
3247 obd_cleanup_client_import(obd
);
3248 ptlrpc_lprocfs_unregister_obd(obd
);
3249 lprocfs_obd_cleanup(obd
);
3256 int osc_cleanup(struct obd_device
*obd
)
3258 struct client_obd
*cli
= &obd
->u
.cli
;
3262 if (cli
->cl_cache
!= NULL
) {
3263 LASSERT(atomic_read(&cli
->cl_cache
->ccc_users
) > 0);
3264 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
3265 list_del_init(&cli
->cl_lru_osc
);
3266 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
3267 cli
->cl_lru_left
= NULL
;
3268 atomic_dec(&cli
->cl_cache
->ccc_users
);
3269 cli
->cl_cache
= NULL
;
3272 /* free memory of osc quota cache */
3273 osc_quota_cleanup(obd
);
3275 rc
= client_obd_cleanup(obd
);
3281 int osc_process_config_base(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3283 struct lprocfs_static_vars lvars
= { NULL
};
3286 lprocfs_osc_init_vars(&lvars
);
3288 switch (lcfg
->lcfg_command
) {
3290 rc
= class_process_proc_param(PARAM_OSC
, lvars
.obd_vars
,
3300 static int osc_process_config(struct obd_device
*obd
, u32 len
, void *buf
)
3302 return osc_process_config_base(obd
, buf
);
3305 struct obd_ops osc_obd_ops
= {
3306 .o_owner
= THIS_MODULE
,
3307 .o_setup
= osc_setup
,
3308 .o_precleanup
= osc_precleanup
,
3309 .o_cleanup
= osc_cleanup
,
3310 .o_add_conn
= client_import_add_conn
,
3311 .o_del_conn
= client_import_del_conn
,
3312 .o_connect
= client_connect_import
,
3313 .o_reconnect
= osc_reconnect
,
3314 .o_disconnect
= osc_disconnect
,
3315 .o_statfs
= osc_statfs
,
3316 .o_statfs_async
= osc_statfs_async
,
3317 .o_packmd
= osc_packmd
,
3318 .o_unpackmd
= osc_unpackmd
,
3319 .o_create
= osc_create
,
3320 .o_destroy
= osc_destroy
,
3321 .o_getattr
= osc_getattr
,
3322 .o_getattr_async
= osc_getattr_async
,
3323 .o_setattr
= osc_setattr
,
3324 .o_setattr_async
= osc_setattr_async
,
3325 .o_find_cbdata
= osc_find_cbdata
,
3326 .o_iocontrol
= osc_iocontrol
,
3327 .o_get_info
= osc_get_info
,
3328 .o_set_info_async
= osc_set_info_async
,
3329 .o_import_event
= osc_import_event
,
3330 .o_process_config
= osc_process_config
,
3331 .o_quotactl
= osc_quotactl
,
3332 .o_quotacheck
= osc_quotacheck
,
3335 extern struct lu_kmem_descr osc_caches
[];
3336 extern spinlock_t osc_ast_guard
;
3337 extern struct lock_class_key osc_ast_guard_class
;
3339 static int __init
osc_init(void)
3341 struct lprocfs_static_vars lvars
= { NULL
};
3344 /* print an address of _any_ initialized kernel symbol from this
3345 * module, to allow debugging with gdb that doesn't support data
3346 * symbols from modules.*/
3347 CDEBUG(D_INFO
, "Lustre OSC module (%p).\n", &osc_caches
);
3349 rc
= lu_kmem_init(osc_caches
);
3353 lprocfs_osc_init_vars(&lvars
);
3355 rc
= class_register_type(&osc_obd_ops
, NULL
,
3356 LUSTRE_OSC_NAME
, &osc_device_type
);
3358 lu_kmem_fini(osc_caches
);
3362 spin_lock_init(&osc_ast_guard
);
3363 lockdep_set_class(&osc_ast_guard
, &osc_ast_guard_class
);
3368 static void /*__exit*/ osc_exit(void)
3370 class_unregister_type(LUSTRE_OSC_NAME
);
3371 lu_kmem_fini(osc_caches
);
3374 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3375 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3376 MODULE_LICENSE("GPL");
3377 MODULE_VERSION(LUSTRE_VERSION_STRING
);
3379 module_init(osc_init
);
3380 module_exit(osc_exit
);