4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_LOV
39 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/obd_class.h"
42 #include "../include/obd_ost.h"
43 #include "../include/lustre/lustre_idl.h"
44 #include "lov_internal.h"
46 static void lov_init_set(struct lov_request_set
*set
)
49 atomic_set(&set
->set_completes
, 0);
50 atomic_set(&set
->set_success
, 0);
51 atomic_set(&set
->set_finish_checked
, 0);
52 set
->set_cookies
= NULL
;
53 INIT_LIST_HEAD(&set
->set_list
);
54 atomic_set(&set
->set_refcount
, 1);
55 init_waitqueue_head(&set
->set_waitq
);
56 spin_lock_init(&set
->set_lock
);
59 void lov_finish_set(struct lov_request_set
*set
)
61 struct list_head
*pos
, *n
;
64 list_for_each_safe(pos
, n
, &set
->set_list
) {
65 struct lov_request
*req
= list_entry(pos
,
68 list_del_init(&req
->rq_link
);
71 OBDO_FREE(req
->rq_oi
.oi_oa
);
73 OBD_FREE_LARGE(req
->rq_oi
.oi_md
, req
->rq_buflen
);
74 if (req
->rq_oi
.oi_osfs
)
75 OBD_FREE(req
->rq_oi
.oi_osfs
,
76 sizeof(*req
->rq_oi
.oi_osfs
));
77 OBD_FREE(req
, sizeof(*req
));
81 int len
= set
->set_oabufs
* sizeof(*set
->set_pga
);
82 OBD_FREE_LARGE(set
->set_pga
, len
);
85 lov_llh_put(set
->set_lockh
);
87 OBD_FREE(set
, sizeof(*set
));
90 int lov_set_finished(struct lov_request_set
*set
, int idempotent
)
92 int completes
= atomic_read(&set
->set_completes
);
94 CDEBUG(D_INFO
, "check set %d/%d\n", completes
, set
->set_count
);
96 if (completes
== set
->set_count
) {
99 if (atomic_inc_return(&set
->set_finish_checked
) == 1)
105 void lov_update_set(struct lov_request_set
*set
,
106 struct lov_request
*req
, int rc
)
108 req
->rq_complete
= 1;
111 atomic_inc(&set
->set_completes
);
113 atomic_inc(&set
->set_success
);
115 wake_up(&set
->set_waitq
);
118 int lov_update_common_set(struct lov_request_set
*set
,
119 struct lov_request
*req
, int rc
)
121 struct lov_obd
*lov
= &set
->set_exp
->exp_obd
->u
.lov
;
123 lov_update_set(set
, req
, rc
);
125 /* grace error on inactive ost */
126 if (rc
&& !(lov
->lov_tgts
[req
->rq_idx
] &&
127 lov
->lov_tgts
[req
->rq_idx
]->ltd_active
))
130 /* FIXME in raid1 regime, should return 0 */
134 void lov_set_add_req(struct lov_request
*req
, struct lov_request_set
*set
)
136 list_add_tail(&req
->rq_link
, &set
->set_list
);
141 static int lov_check_set(struct lov_obd
*lov
, int idx
)
144 struct lov_tgt_desc
*tgt
;
146 mutex_lock(&lov
->lov_lock
);
147 tgt
= lov
->lov_tgts
[idx
];
148 rc
= !tgt
|| tgt
->ltd_active
||
150 class_exp2cliimp(tgt
->ltd_exp
)->imp_connect_tried
);
151 mutex_unlock(&lov
->lov_lock
);
156 /* Check if the OSC connection exists and is active.
157 * If the OSC has not yet had a chance to connect to the OST the first time,
158 * wait once for it to connect instead of returning an error.
160 int lov_check_and_wait_active(struct lov_obd
*lov
, int ost_idx
)
162 wait_queue_head_t waitq
;
163 struct l_wait_info lwi
;
164 struct lov_tgt_desc
*tgt
;
167 mutex_lock(&lov
->lov_lock
);
169 tgt
= lov
->lov_tgts
[ost_idx
];
171 if (unlikely(tgt
== NULL
))
174 if (likely(tgt
->ltd_active
))
177 if (tgt
->ltd_exp
&& class_exp2cliimp(tgt
->ltd_exp
)->imp_connect_tried
)
180 mutex_unlock(&lov
->lov_lock
);
182 init_waitqueue_head(&waitq
);
183 lwi
= LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout
),
184 cfs_time_seconds(1), NULL
, NULL
);
186 rc
= l_wait_event(waitq
, lov_check_set(lov
, ost_idx
), &lwi
);
187 if (tgt
!= NULL
&& tgt
->ltd_active
)
193 mutex_unlock(&lov
->lov_lock
);
197 static int lov_update_enqueue_lov(struct obd_export
*exp
,
198 struct lustre_handle
*lov_lockhp
,
199 struct lov_oinfo
*loi
, __u64 flags
, int idx
,
200 struct ost_id
*oi
, int rc
)
202 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
204 if (rc
!= ELDLM_OK
&&
205 !(rc
== ELDLM_LOCK_ABORTED
&& (flags
& LDLM_FL_HAS_INTENT
))) {
206 memset(lov_lockhp
, 0, sizeof(*lov_lockhp
));
207 if (lov
->lov_tgts
[idx
] && lov
->lov_tgts
[idx
]->ltd_active
) {
208 /* -EUSERS used by OST to report file contention */
209 if (rc
!= -EINTR
&& rc
!= -EUSERS
)
210 CERROR("%s: enqueue objid "DOSTID
" subobj"
211 DOSTID
" on OST idx %d: rc %d\n",
212 exp
->exp_obd
->obd_name
,
213 POSTID(oi
), POSTID(&loi
->loi_oi
),
214 loi
->loi_ost_idx
, rc
);
221 int lov_update_enqueue_set(struct lov_request
*req
, __u32 mode
, int rc
)
223 struct lov_request_set
*set
= req
->rq_rqset
;
224 struct lustre_handle
*lov_lockhp
;
225 struct obd_info
*oi
= set
->set_oi
;
226 struct lov_oinfo
*loi
;
230 lov_lockhp
= set
->set_lockh
->llh_handles
+ req
->rq_stripe
;
231 loi
= oi
->oi_md
->lsm_oinfo
[req
->rq_stripe
];
233 /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
234 * and that copy can be arbitrarily out of date.
236 * The LOV API is due for a serious rewriting anyways, and this
237 * can be addressed then. */
239 lov_stripe_lock(oi
->oi_md
);
240 osc_update_enqueue(lov_lockhp
, loi
, oi
->oi_flags
,
241 &req
->rq_oi
.oi_md
->lsm_oinfo
[0]->loi_lvb
, mode
, rc
);
242 if (rc
== ELDLM_LOCK_ABORTED
&& (oi
->oi_flags
& LDLM_FL_HAS_INTENT
))
243 memset(lov_lockhp
, 0, sizeof(*lov_lockhp
));
244 rc
= lov_update_enqueue_lov(set
->set_exp
, lov_lockhp
, loi
, oi
->oi_flags
,
245 req
->rq_idx
, &oi
->oi_md
->lsm_oi
, rc
);
246 lov_stripe_unlock(oi
->oi_md
);
247 lov_update_set(set
, req
, rc
);
251 /* The callback for osc_enqueue that updates lov info for every OSC request. */
252 static int cb_update_enqueue(void *cookie
, int rc
)
254 struct obd_info
*oinfo
= cookie
;
255 struct ldlm_enqueue_info
*einfo
;
256 struct lov_request
*lovreq
;
258 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
259 einfo
= lovreq
->rq_rqset
->set_ei
;
260 return lov_update_enqueue_set(lovreq
, einfo
->ei_mode
, rc
);
263 static int enqueue_done(struct lov_request_set
*set
, __u32 mode
)
265 struct lov_request
*req
;
266 struct lov_obd
*lov
= &set
->set_exp
->exp_obd
->u
.lov
;
267 int completes
= atomic_read(&set
->set_completes
);
270 /* enqueue/match success, just return */
271 if (completes
&& completes
== atomic_read(&set
->set_success
))
274 /* cancel enqueued/matched locks */
275 list_for_each_entry(req
, &set
->set_list
, rq_link
) {
276 struct lustre_handle
*lov_lockhp
;
278 if (!req
->rq_complete
|| req
->rq_rc
)
281 lov_lockhp
= set
->set_lockh
->llh_handles
+ req
->rq_stripe
;
283 if (!lustre_handle_is_used(lov_lockhp
))
286 rc
= obd_cancel(lov
->lov_tgts
[req
->rq_idx
]->ltd_exp
,
287 req
->rq_oi
.oi_md
, mode
, lov_lockhp
);
288 if (rc
&& lov
->lov_tgts
[req
->rq_idx
] &&
289 lov
->lov_tgts
[req
->rq_idx
]->ltd_active
)
290 CERROR("%s: cancelling obdjid "DOSTID
" on OST"
291 "idx %d error: rc = %d\n",
292 set
->set_exp
->exp_obd
->obd_name
,
293 POSTID(&req
->rq_oi
.oi_md
->lsm_oi
),
297 lov_llh_put(set
->set_lockh
);
301 int lov_fini_enqueue_set(struct lov_request_set
*set
, __u32 mode
, int rc
,
302 struct ptlrpc_request_set
*rqset
)
308 LASSERT(set
->set_exp
);
309 /* Do enqueue_done only for sync requests and if any request
313 atomic_set(&set
->set_completes
, 0);
314 ret
= enqueue_done(set
, mode
);
315 } else if (set
->set_lockh
)
316 lov_llh_put(set
->set_lockh
);
320 return rc
? rc
: ret
;
323 static void lov_llh_addref(void *llhp
)
325 struct lov_lock_handles
*llh
= llhp
;
327 atomic_inc(&llh
->llh_refcount
);
328 CDEBUG(D_INFO
, "GETting llh %p : new refcount %d\n", llh
,
329 atomic_read(&llh
->llh_refcount
));
332 static struct portals_handle_ops lov_handle_ops
= {
333 .hop_addref
= lov_llh_addref
,
337 static struct lov_lock_handles
*lov_llh_new(struct lov_stripe_md
*lsm
)
339 struct lov_lock_handles
*llh
;
341 OBD_ALLOC(llh
, sizeof(*llh
) +
342 sizeof(*llh
->llh_handles
) * lsm
->lsm_stripe_count
);
346 atomic_set(&llh
->llh_refcount
, 2);
347 llh
->llh_stripe_count
= lsm
->lsm_stripe_count
;
348 INIT_LIST_HEAD(&llh
->llh_handle
.h_link
);
349 class_handle_hash(&llh
->llh_handle
, &lov_handle_ops
);
354 int lov_prep_enqueue_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
355 struct ldlm_enqueue_info
*einfo
,
356 struct lov_request_set
**reqset
)
358 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
359 struct lov_request_set
*set
;
362 OBD_ALLOC(set
, sizeof(*set
));
370 set
->set_lockh
= lov_llh_new(oinfo
->oi_md
);
371 if (set
->set_lockh
== NULL
)
372 GOTO(out_set
, rc
= -ENOMEM
);
373 oinfo
->oi_lockh
->cookie
= set
->set_lockh
->llh_handle
.h_cookie
;
375 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
376 struct lov_oinfo
*loi
;
377 struct lov_request
*req
;
380 loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
381 if (!lov_stripe_intersects(oinfo
->oi_md
, i
,
382 oinfo
->oi_policy
.l_extent
.start
,
383 oinfo
->oi_policy
.l_extent
.end
,
387 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
388 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
392 OBD_ALLOC(req
, sizeof(*req
));
394 GOTO(out_set
, rc
= -ENOMEM
);
396 req
->rq_buflen
= sizeof(*req
->rq_oi
.oi_md
) +
397 sizeof(struct lov_oinfo
*) +
398 sizeof(struct lov_oinfo
);
399 OBD_ALLOC_LARGE(req
->rq_oi
.oi_md
, req
->rq_buflen
);
400 if (req
->rq_oi
.oi_md
== NULL
) {
401 OBD_FREE(req
, sizeof(*req
));
402 GOTO(out_set
, rc
= -ENOMEM
);
404 req
->rq_oi
.oi_md
->lsm_oinfo
[0] =
405 ((void *)req
->rq_oi
.oi_md
) + sizeof(*req
->rq_oi
.oi_md
) +
406 sizeof(struct lov_oinfo
*);
408 /* Set lov request specific parameters. */
409 req
->rq_oi
.oi_lockh
= set
->set_lockh
->llh_handles
+ i
;
410 req
->rq_oi
.oi_cb_up
= cb_update_enqueue
;
411 req
->rq_oi
.oi_flags
= oinfo
->oi_flags
;
413 LASSERT(req
->rq_oi
.oi_lockh
);
415 req
->rq_oi
.oi_policy
.l_extent
.gid
=
416 oinfo
->oi_policy
.l_extent
.gid
;
417 req
->rq_oi
.oi_policy
.l_extent
.start
= start
;
418 req
->rq_oi
.oi_policy
.l_extent
.end
= end
;
420 req
->rq_idx
= loi
->loi_ost_idx
;
423 /* XXX LOV STACKING: submd should be from the subobj */
424 req
->rq_oi
.oi_md
->lsm_oi
= loi
->loi_oi
;
425 req
->rq_oi
.oi_md
->lsm_stripe_count
= 0;
426 req
->rq_oi
.oi_md
->lsm_oinfo
[0]->loi_kms_valid
=
428 req
->rq_oi
.oi_md
->lsm_oinfo
[0]->loi_kms
= loi
->loi_kms
;
429 req
->rq_oi
.oi_md
->lsm_oinfo
[0]->loi_lvb
= loi
->loi_lvb
;
431 lov_set_add_req(req
, set
);
434 GOTO(out_set
, rc
= -EIO
);
438 lov_fini_enqueue_set(set
, einfo
->ei_mode
, rc
, NULL
);
442 int lov_fini_match_set(struct lov_request_set
*set
, __u32 mode
, __u64 flags
)
448 LASSERT(set
->set_exp
);
449 rc
= enqueue_done(set
, mode
);
450 if ((set
->set_count
== atomic_read(&set
->set_success
)) &&
451 (flags
& LDLM_FL_TEST_LOCK
))
452 lov_llh_put(set
->set_lockh
);
459 int lov_prep_match_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
460 struct lov_stripe_md
*lsm
, ldlm_policy_data_t
*policy
,
461 __u32 mode
, struct lustre_handle
*lockh
,
462 struct lov_request_set
**reqset
)
464 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
465 struct lov_request_set
*set
;
468 OBD_ALLOC(set
, sizeof(*set
));
475 set
->set_oi
->oi_md
= lsm
;
476 set
->set_lockh
= lov_llh_new(lsm
);
477 if (set
->set_lockh
== NULL
)
478 GOTO(out_set
, rc
= -ENOMEM
);
479 lockh
->cookie
= set
->set_lockh
->llh_handle
.h_cookie
;
481 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
482 struct lov_oinfo
*loi
;
483 struct lov_request
*req
;
486 loi
= lsm
->lsm_oinfo
[i
];
487 if (!lov_stripe_intersects(lsm
, i
, policy
->l_extent
.start
,
488 policy
->l_extent
.end
, &start
, &end
))
491 /* FIXME raid1 should grace this error */
492 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
493 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
494 GOTO(out_set
, rc
= -EIO
);
497 OBD_ALLOC(req
, sizeof(*req
));
499 GOTO(out_set
, rc
= -ENOMEM
);
501 req
->rq_buflen
= sizeof(*req
->rq_oi
.oi_md
);
502 OBD_ALLOC_LARGE(req
->rq_oi
.oi_md
, req
->rq_buflen
);
503 if (req
->rq_oi
.oi_md
== NULL
) {
504 OBD_FREE(req
, sizeof(*req
));
505 GOTO(out_set
, rc
= -ENOMEM
);
508 req
->rq_oi
.oi_policy
.l_extent
.start
= start
;
509 req
->rq_oi
.oi_policy
.l_extent
.end
= end
;
510 req
->rq_oi
.oi_policy
.l_extent
.gid
= policy
->l_extent
.gid
;
512 req
->rq_idx
= loi
->loi_ost_idx
;
515 /* XXX LOV STACKING: submd should be from the subobj */
516 req
->rq_oi
.oi_md
->lsm_oi
= loi
->loi_oi
;
517 req
->rq_oi
.oi_md
->lsm_stripe_count
= 0;
519 lov_set_add_req(req
, set
);
522 GOTO(out_set
, rc
= -EIO
);
526 lov_fini_match_set(set
, mode
, 0);
530 int lov_fini_cancel_set(struct lov_request_set
*set
)
537 LASSERT(set
->set_exp
);
539 lov_llh_put(set
->set_lockh
);
546 int lov_prep_cancel_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
547 struct lov_stripe_md
*lsm
, __u32 mode
,
548 struct lustre_handle
*lockh
,
549 struct lov_request_set
**reqset
)
551 struct lov_request_set
*set
;
554 OBD_ALLOC(set
, sizeof(*set
));
561 set
->set_oi
->oi_md
= lsm
;
562 set
->set_lockh
= lov_handle2llh(lockh
);
563 if (set
->set_lockh
== NULL
) {
564 CERROR("LOV: invalid lov lock handle %p\n", lockh
);
565 GOTO(out_set
, rc
= -EINVAL
);
567 lockh
->cookie
= set
->set_lockh
->llh_handle
.h_cookie
;
569 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
570 struct lov_request
*req
;
571 struct lustre_handle
*lov_lockhp
;
572 struct lov_oinfo
*loi
= lsm
->lsm_oinfo
[i
];
574 lov_lockhp
= set
->set_lockh
->llh_handles
+ i
;
575 if (!lustre_handle_is_used(lov_lockhp
)) {
576 CDEBUG(D_INFO
, "lov idx %d subobj "DOSTID
" no lock\n",
577 loi
->loi_ost_idx
, POSTID(&loi
->loi_oi
));
581 OBD_ALLOC(req
, sizeof(*req
));
583 GOTO(out_set
, rc
= -ENOMEM
);
585 req
->rq_buflen
= sizeof(*req
->rq_oi
.oi_md
);
586 OBD_ALLOC_LARGE(req
->rq_oi
.oi_md
, req
->rq_buflen
);
587 if (req
->rq_oi
.oi_md
== NULL
) {
588 OBD_FREE(req
, sizeof(*req
));
589 GOTO(out_set
, rc
= -ENOMEM
);
592 req
->rq_idx
= loi
->loi_ost_idx
;
595 /* XXX LOV STACKING: submd should be from the subobj */
596 req
->rq_oi
.oi_md
->lsm_oi
= loi
->loi_oi
;
597 req
->rq_oi
.oi_md
->lsm_stripe_count
= 0;
599 lov_set_add_req(req
, set
);
602 GOTO(out_set
, rc
= -EIO
);
606 lov_fini_cancel_set(set
);
609 static int common_attr_done(struct lov_request_set
*set
)
611 struct list_head
*pos
;
612 struct lov_request
*req
;
614 int rc
= 0, attrset
= 0;
616 LASSERT(set
->set_oi
!= NULL
);
618 if (set
->set_oi
->oi_oa
== NULL
)
621 if (!atomic_read(&set
->set_success
))
626 GOTO(out
, rc
= -ENOMEM
);
628 list_for_each(pos
, &set
->set_list
) {
629 req
= list_entry(pos
, struct lov_request
, rq_link
);
631 if (!req
->rq_complete
|| req
->rq_rc
)
633 if (req
->rq_oi
.oi_oa
->o_valid
== 0) /* inactive stripe */
635 lov_merge_attrs(tmp_oa
, req
->rq_oi
.oi_oa
,
636 req
->rq_oi
.oi_oa
->o_valid
,
637 set
->set_oi
->oi_md
, req
->rq_stripe
, &attrset
);
640 CERROR("No stripes had valid attrs\n");
643 if ((set
->set_oi
->oi_oa
->o_valid
& OBD_MD_FLEPOCH
) &&
644 (set
->set_oi
->oi_md
->lsm_stripe_count
!= attrset
)) {
645 /* When we take attributes of some epoch, we require all the
646 * ost to be active. */
647 CERROR("Not all the stripes had valid attrs\n");
648 GOTO(out
, rc
= -EIO
);
651 tmp_oa
->o_oi
= set
->set_oi
->oi_oa
->o_oi
;
652 memcpy(set
->set_oi
->oi_oa
, tmp_oa
, sizeof(*set
->set_oi
->oi_oa
));
660 static int brw_done(struct lov_request_set
*set
)
662 struct lov_stripe_md
*lsm
= set
->set_oi
->oi_md
;
663 struct lov_oinfo
*loi
= NULL
;
664 struct list_head
*pos
;
665 struct lov_request
*req
;
667 list_for_each(pos
, &set
->set_list
) {
668 req
= list_entry(pos
, struct lov_request
, rq_link
);
670 if (!req
->rq_complete
|| req
->rq_rc
)
673 loi
= lsm
->lsm_oinfo
[req
->rq_stripe
];
675 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLBLOCKS
)
676 loi
->loi_lvb
.lvb_blocks
= req
->rq_oi
.oi_oa
->o_blocks
;
682 int lov_fini_brw_set(struct lov_request_set
*set
)
688 LASSERT(set
->set_exp
);
689 if (atomic_read(&set
->set_completes
)) {
691 /* FIXME update qos data here */
698 int lov_prep_brw_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
699 u32 oa_bufs
, struct brw_page
*pga
,
700 struct obd_trans_info
*oti
,
701 struct lov_request_set
**reqset
)
708 struct lov_request_set
*set
;
709 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
710 int rc
= 0, i
, shift
;
712 OBD_ALLOC(set
, sizeof(*set
));
720 set
->set_oabufs
= oa_bufs
;
721 OBD_ALLOC_LARGE(set
->set_pga
, oa_bufs
* sizeof(*set
->set_pga
));
723 GOTO(out
, rc
= -ENOMEM
);
725 OBD_ALLOC_LARGE(info
, sizeof(*info
) * oinfo
->oi_md
->lsm_stripe_count
);
727 GOTO(out
, rc
= -ENOMEM
);
729 /* calculate the page count for each stripe */
730 for (i
= 0; i
< oa_bufs
; i
++) {
731 int stripe
= lov_stripe_number(oinfo
->oi_md
, pga
[i
].off
);
732 info
[stripe
].count
++;
735 /* alloc and initialize lov request */
737 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
738 struct lov_oinfo
*loi
= NULL
;
739 struct lov_request
*req
;
741 if (info
[i
].count
== 0)
744 loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
745 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
746 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
747 GOTO(out
, rc
= -EIO
);
750 OBD_ALLOC(req
, sizeof(*req
));
752 GOTO(out
, rc
= -ENOMEM
);
754 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
755 if (req
->rq_oi
.oi_oa
== NULL
) {
756 OBD_FREE(req
, sizeof(*req
));
757 GOTO(out
, rc
= -ENOMEM
);
761 memcpy(req
->rq_oi
.oi_oa
, oinfo
->oi_oa
,
762 sizeof(*req
->rq_oi
.oi_oa
));
764 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
765 req
->rq_oi
.oi_oa
->o_stripe_idx
= i
;
767 req
->rq_buflen
= sizeof(*req
->rq_oi
.oi_md
);
768 OBD_ALLOC_LARGE(req
->rq_oi
.oi_md
, req
->rq_buflen
);
769 if (req
->rq_oi
.oi_md
== NULL
) {
770 OBDO_FREE(req
->rq_oi
.oi_oa
);
771 OBD_FREE(req
, sizeof(*req
));
772 GOTO(out
, rc
= -ENOMEM
);
775 req
->rq_idx
= loi
->loi_ost_idx
;
778 /* XXX LOV STACKING */
779 req
->rq_oi
.oi_md
->lsm_oi
= loi
->loi_oi
;
780 req
->rq_oabufs
= info
[i
].count
;
781 req
->rq_pgaidx
= shift
;
782 shift
+= req
->rq_oabufs
;
784 /* remember the index for sort brw_page array */
785 info
[i
].index
= req
->rq_pgaidx
;
787 req
->rq_oi
.oi_capa
= oinfo
->oi_capa
;
789 lov_set_add_req(req
, set
);
792 GOTO(out
, rc
= -EIO
);
794 /* rotate & sort the brw_page array */
795 for (i
= 0; i
< oa_bufs
; i
++) {
796 int stripe
= lov_stripe_number(oinfo
->oi_md
, pga
[i
].off
);
798 shift
= info
[stripe
].index
+ info
[stripe
].off
;
799 LASSERT(shift
< oa_bufs
);
800 set
->set_pga
[shift
] = pga
[i
];
801 lov_stripe_offset(oinfo
->oi_md
, pga
[i
].off
, stripe
,
802 &set
->set_pga
[shift
].off
);
808 sizeof(*info
) * oinfo
->oi_md
->lsm_stripe_count
);
813 lov_fini_brw_set(set
);
818 int lov_fini_getattr_set(struct lov_request_set
*set
)
824 LASSERT(set
->set_exp
);
825 if (atomic_read(&set
->set_completes
))
826 rc
= common_attr_done(set
);
833 /* The callback for osc_getattr_async that finalizes a request info when a
834 * response is received. */
835 static int cb_getattr_update(void *cookie
, int rc
)
837 struct obd_info
*oinfo
= cookie
;
838 struct lov_request
*lovreq
;
840 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
841 return lov_update_common_set(lovreq
->rq_rqset
, lovreq
, rc
);
844 int lov_prep_getattr_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
845 struct lov_request_set
**reqset
)
847 struct lov_request_set
*set
;
848 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
851 OBD_ALLOC(set
, sizeof(*set
));
859 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
860 struct lov_oinfo
*loi
;
861 struct lov_request
*req
;
863 loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
864 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
865 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
866 if (oinfo
->oi_oa
->o_valid
& OBD_MD_FLEPOCH
)
867 /* SOM requires all the OSTs to be active. */
868 GOTO(out_set
, rc
= -EIO
);
872 OBD_ALLOC(req
, sizeof(*req
));
874 GOTO(out_set
, rc
= -ENOMEM
);
877 req
->rq_idx
= loi
->loi_ost_idx
;
879 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
880 if (req
->rq_oi
.oi_oa
== NULL
) {
881 OBD_FREE(req
, sizeof(*req
));
882 GOTO(out_set
, rc
= -ENOMEM
);
884 memcpy(req
->rq_oi
.oi_oa
, oinfo
->oi_oa
,
885 sizeof(*req
->rq_oi
.oi_oa
));
886 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
887 req
->rq_oi
.oi_cb_up
= cb_getattr_update
;
888 req
->rq_oi
.oi_capa
= oinfo
->oi_capa
;
890 lov_set_add_req(req
, set
);
893 GOTO(out_set
, rc
= -EIO
);
897 lov_fini_getattr_set(set
);
901 int lov_fini_destroy_set(struct lov_request_set
*set
)
905 LASSERT(set
->set_exp
);
906 if (atomic_read(&set
->set_completes
)) {
907 /* FIXME update qos data here */
915 int lov_prep_destroy_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
916 struct obdo
*src_oa
, struct lov_stripe_md
*lsm
,
917 struct obd_trans_info
*oti
,
918 struct lov_request_set
**reqset
)
920 struct lov_request_set
*set
;
921 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
924 OBD_ALLOC(set
, sizeof(*set
));
931 set
->set_oi
->oi_md
= lsm
;
932 set
->set_oi
->oi_oa
= src_oa
;
934 if (oti
!= NULL
&& src_oa
->o_valid
& OBD_MD_FLCOOKIE
)
935 set
->set_cookies
= oti
->oti_logcookies
;
937 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
938 struct lov_oinfo
*loi
;
939 struct lov_request
*req
;
941 loi
= lsm
->lsm_oinfo
[i
];
942 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
943 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
947 OBD_ALLOC(req
, sizeof(*req
));
949 GOTO(out_set
, rc
= -ENOMEM
);
952 req
->rq_idx
= loi
->loi_ost_idx
;
954 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
955 if (req
->rq_oi
.oi_oa
== NULL
) {
956 OBD_FREE(req
, sizeof(*req
));
957 GOTO(out_set
, rc
= -ENOMEM
);
959 memcpy(req
->rq_oi
.oi_oa
, src_oa
, sizeof(*req
->rq_oi
.oi_oa
));
960 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
961 lov_set_add_req(req
, set
);
964 GOTO(out_set
, rc
= -EIO
);
968 lov_fini_destroy_set(set
);
972 int lov_fini_setattr_set(struct lov_request_set
*set
)
978 LASSERT(set
->set_exp
);
979 if (atomic_read(&set
->set_completes
)) {
980 rc
= common_attr_done(set
);
981 /* FIXME update qos data here */
988 int lov_update_setattr_set(struct lov_request_set
*set
,
989 struct lov_request
*req
, int rc
)
991 struct lov_obd
*lov
= &req
->rq_rqset
->set_exp
->exp_obd
->u
.lov
;
992 struct lov_stripe_md
*lsm
= req
->rq_rqset
->set_oi
->oi_md
;
994 lov_update_set(set
, req
, rc
);
996 /* grace error on inactive ost */
997 if (rc
&& !(lov
->lov_tgts
[req
->rq_idx
] &&
998 lov
->lov_tgts
[req
->rq_idx
]->ltd_active
))
1002 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLCTIME
)
1003 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_ctime
=
1004 req
->rq_oi
.oi_oa
->o_ctime
;
1005 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLMTIME
)
1006 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_mtime
=
1007 req
->rq_oi
.oi_oa
->o_mtime
;
1008 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLATIME
)
1009 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_atime
=
1010 req
->rq_oi
.oi_oa
->o_atime
;
1016 /* The callback for osc_setattr_async that finalizes a request info when a
1017 * response is received. */
1018 static int cb_setattr_update(void *cookie
, int rc
)
1020 struct obd_info
*oinfo
= cookie
;
1021 struct lov_request
*lovreq
;
1023 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
1024 return lov_update_setattr_set(lovreq
->rq_rqset
, lovreq
, rc
);
1027 int lov_prep_setattr_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
1028 struct obd_trans_info
*oti
,
1029 struct lov_request_set
**reqset
)
1031 struct lov_request_set
*set
;
1032 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
1035 OBD_ALLOC(set
, sizeof(*set
));
1042 set
->set_oi
= oinfo
;
1043 if (oti
!= NULL
&& oinfo
->oi_oa
->o_valid
& OBD_MD_FLCOOKIE
)
1044 set
->set_cookies
= oti
->oti_logcookies
;
1046 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
1047 struct lov_oinfo
*loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
1048 struct lov_request
*req
;
1050 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
1051 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
1055 OBD_ALLOC(req
, sizeof(*req
));
1057 GOTO(out_set
, rc
= -ENOMEM
);
1059 req
->rq_idx
= loi
->loi_ost_idx
;
1061 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
1062 if (req
->rq_oi
.oi_oa
== NULL
) {
1063 OBD_FREE(req
, sizeof(*req
));
1064 GOTO(out_set
, rc
= -ENOMEM
);
1066 memcpy(req
->rq_oi
.oi_oa
, oinfo
->oi_oa
,
1067 sizeof(*req
->rq_oi
.oi_oa
));
1068 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
1069 req
->rq_oi
.oi_oa
->o_stripe_idx
= i
;
1070 req
->rq_oi
.oi_cb_up
= cb_setattr_update
;
1071 req
->rq_oi
.oi_capa
= oinfo
->oi_capa
;
1073 if (oinfo
->oi_oa
->o_valid
& OBD_MD_FLSIZE
) {
1074 int off
= lov_stripe_offset(oinfo
->oi_md
,
1075 oinfo
->oi_oa
->o_size
, i
,
1076 &req
->rq_oi
.oi_oa
->o_size
);
1078 if (off
< 0 && req
->rq_oi
.oi_oa
->o_size
)
1079 req
->rq_oi
.oi_oa
->o_size
--;
1081 CDEBUG(D_INODE
, "stripe %d has size %llu/%llu\n",
1082 i
, req
->rq_oi
.oi_oa
->o_size
,
1083 oinfo
->oi_oa
->o_size
);
1085 lov_set_add_req(req
, set
);
1087 if (!set
->set_count
)
1088 GOTO(out_set
, rc
= -EIO
);
1092 lov_fini_setattr_set(set
);
1096 int lov_fini_punch_set(struct lov_request_set
*set
)
1102 LASSERT(set
->set_exp
);
1103 if (atomic_read(&set
->set_completes
)) {
1105 /* FIXME update qos data here */
1106 if (atomic_read(&set
->set_success
))
1107 rc
= common_attr_done(set
);
1110 lov_put_reqset(set
);
1115 int lov_update_punch_set(struct lov_request_set
*set
,
1116 struct lov_request
*req
, int rc
)
1118 struct lov_obd
*lov
= &req
->rq_rqset
->set_exp
->exp_obd
->u
.lov
;
1119 struct lov_stripe_md
*lsm
= req
->rq_rqset
->set_oi
->oi_md
;
1121 lov_update_set(set
, req
, rc
);
1123 /* grace error on inactive ost */
1124 if (rc
&& !lov
->lov_tgts
[req
->rq_idx
]->ltd_active
)
1128 lov_stripe_lock(lsm
);
1129 if (req
->rq_oi
.oi_oa
->o_valid
& OBD_MD_FLBLOCKS
) {
1130 lsm
->lsm_oinfo
[req
->rq_stripe
]->loi_lvb
.lvb_blocks
=
1131 req
->rq_oi
.oi_oa
->o_blocks
;
1134 lov_stripe_unlock(lsm
);
1140 /* The callback for osc_punch that finalizes a request info when a response
1142 static int cb_update_punch(void *cookie
, int rc
)
1144 struct obd_info
*oinfo
= cookie
;
1145 struct lov_request
*lovreq
;
1147 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
1148 return lov_update_punch_set(lovreq
->rq_rqset
, lovreq
, rc
);
1151 int lov_prep_punch_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
1152 struct obd_trans_info
*oti
,
1153 struct lov_request_set
**reqset
)
1155 struct lov_request_set
*set
;
1156 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
1159 OBD_ALLOC(set
, sizeof(*set
));
1164 set
->set_oi
= oinfo
;
1167 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
1168 struct lov_oinfo
*loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
1169 struct lov_request
*req
;
1172 if (!lov_stripe_intersects(oinfo
->oi_md
, i
,
1173 oinfo
->oi_policy
.l_extent
.start
,
1174 oinfo
->oi_policy
.l_extent
.end
,
1178 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
1179 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
1180 GOTO(out_set
, rc
= -EIO
);
1183 OBD_ALLOC(req
, sizeof(*req
));
1185 GOTO(out_set
, rc
= -ENOMEM
);
1187 req
->rq_idx
= loi
->loi_ost_idx
;
1189 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
1190 if (req
->rq_oi
.oi_oa
== NULL
) {
1191 OBD_FREE(req
, sizeof(*req
));
1192 GOTO(out_set
, rc
= -ENOMEM
);
1194 memcpy(req
->rq_oi
.oi_oa
, oinfo
->oi_oa
,
1195 sizeof(*req
->rq_oi
.oi_oa
));
1196 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
1197 req
->rq_oi
.oi_oa
->o_valid
|= OBD_MD_FLGROUP
;
1199 req
->rq_oi
.oi_oa
->o_stripe_idx
= i
;
1200 req
->rq_oi
.oi_cb_up
= cb_update_punch
;
1202 req
->rq_oi
.oi_policy
.l_extent
.start
= rs
;
1203 req
->rq_oi
.oi_policy
.l_extent
.end
= re
;
1204 req
->rq_oi
.oi_policy
.l_extent
.gid
= -1;
1206 req
->rq_oi
.oi_capa
= oinfo
->oi_capa
;
1208 lov_set_add_req(req
, set
);
1210 if (!set
->set_count
)
1211 GOTO(out_set
, rc
= -EIO
);
1215 lov_fini_punch_set(set
);
1219 int lov_fini_sync_set(struct lov_request_set
*set
)
1225 LASSERT(set
->set_exp
);
1226 if (atomic_read(&set
->set_completes
)) {
1227 if (!atomic_read(&set
->set_success
))
1229 /* FIXME update qos data here */
1232 lov_put_reqset(set
);
1237 /* The callback for osc_sync that finalizes a request info when a
1238 * response is received. */
1239 static int cb_sync_update(void *cookie
, int rc
)
1241 struct obd_info
*oinfo
= cookie
;
1242 struct lov_request
*lovreq
;
1244 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
1245 return lov_update_common_set(lovreq
->rq_rqset
, lovreq
, rc
);
1248 int lov_prep_sync_set(struct obd_export
*exp
, struct obd_info
*oinfo
,
1250 struct lov_request_set
**reqset
)
1252 struct lov_request_set
*set
;
1253 struct lov_obd
*lov
= &exp
->exp_obd
->u
.lov
;
1262 set
->set_oi
= oinfo
;
1264 for (i
= 0; i
< oinfo
->oi_md
->lsm_stripe_count
; i
++) {
1265 struct lov_oinfo
*loi
= oinfo
->oi_md
->lsm_oinfo
[i
];
1266 struct lov_request
*req
;
1269 if (!lov_check_and_wait_active(lov
, loi
->loi_ost_idx
)) {
1270 CDEBUG(D_HA
, "lov idx %d inactive\n", loi
->loi_ost_idx
);
1274 if (!lov_stripe_intersects(oinfo
->oi_md
, i
, start
, end
, &rs
,
1280 GOTO(out_set
, rc
= -ENOMEM
);
1282 req
->rq_idx
= loi
->loi_ost_idx
;
1284 OBDO_ALLOC(req
->rq_oi
.oi_oa
);
1285 if (req
->rq_oi
.oi_oa
== NULL
) {
1286 OBD_FREE(req
, sizeof(*req
));
1287 GOTO(out_set
, rc
= -ENOMEM
);
1289 *req
->rq_oi
.oi_oa
= *oinfo
->oi_oa
;
1290 req
->rq_oi
.oi_oa
->o_oi
= loi
->loi_oi
;
1291 req
->rq_oi
.oi_oa
->o_stripe_idx
= i
;
1293 req
->rq_oi
.oi_policy
.l_extent
.start
= rs
;
1294 req
->rq_oi
.oi_policy
.l_extent
.end
= re
;
1295 req
->rq_oi
.oi_policy
.l_extent
.gid
= -1;
1296 req
->rq_oi
.oi_cb_up
= cb_sync_update
;
1298 lov_set_add_req(req
, set
);
1300 if (!set
->set_count
)
1301 GOTO(out_set
, rc
= -EIO
);
1305 lov_fini_sync_set(set
);
1309 #define LOV_U64_MAX ((__u64)~0ULL)
1310 #define LOV_SUM_MAX(tot, add) \
1312 if ((tot) + (add) < (tot)) \
1313 (tot) = LOV_U64_MAX; \
1318 int lov_fini_statfs(struct obd_device
*obd
, struct obd_statfs
*osfs
,int success
)
1321 __u32 expected_stripes
= lov_get_stripecnt(&obd
->u
.lov
,
1323 if (osfs
->os_files
!= LOV_U64_MAX
)
1324 lov_do_div64(osfs
->os_files
, expected_stripes
);
1325 if (osfs
->os_ffree
!= LOV_U64_MAX
)
1326 lov_do_div64(osfs
->os_ffree
, expected_stripes
);
1328 spin_lock(&obd
->obd_osfs_lock
);
1329 memcpy(&obd
->obd_osfs
, osfs
, sizeof(*osfs
));
1330 obd
->obd_osfs_age
= cfs_time_current_64();
1331 spin_unlock(&obd
->obd_osfs_lock
);
1338 int lov_fini_statfs_set(struct lov_request_set
*set
)
1345 if (atomic_read(&set
->set_completes
)) {
1346 rc
= lov_fini_statfs(set
->set_obd
, set
->set_oi
->oi_osfs
,
1347 atomic_read(&set
->set_success
));
1349 lov_put_reqset(set
);
1353 void lov_update_statfs(struct obd_statfs
*osfs
, struct obd_statfs
*lov_sfs
,
1356 int shift
= 0, quit
= 0;
1360 memcpy(osfs
, lov_sfs
, sizeof(*lov_sfs
));
1362 if (osfs
->os_bsize
!= lov_sfs
->os_bsize
) {
1363 /* assume all block sizes are always powers of 2 */
1364 /* get the bits difference */
1365 tmp
= osfs
->os_bsize
| lov_sfs
->os_bsize
;
1366 for (shift
= 0; shift
<= 64; ++shift
) {
1378 if (osfs
->os_bsize
< lov_sfs
->os_bsize
) {
1379 osfs
->os_bsize
= lov_sfs
->os_bsize
;
1381 osfs
->os_bfree
>>= shift
;
1382 osfs
->os_bavail
>>= shift
;
1383 osfs
->os_blocks
>>= shift
;
1384 } else if (shift
!= 0) {
1385 lov_sfs
->os_bfree
>>= shift
;
1386 lov_sfs
->os_bavail
>>= shift
;
1387 lov_sfs
->os_blocks
>>= shift
;
1389 osfs
->os_bfree
+= lov_sfs
->os_bfree
;
1390 osfs
->os_bavail
+= lov_sfs
->os_bavail
;
1391 osfs
->os_blocks
+= lov_sfs
->os_blocks
;
1392 /* XXX not sure about this one - depends on policy.
1393 * - could be minimum if we always stripe on all OBDs
1394 * (but that would be wrong for any other policy,
1395 * if one of the OBDs has no more objects left)
1396 * - could be sum if we stripe whole objects
1397 * - could be average, just to give a nice number
1399 * To give a "reasonable" (if not wholly accurate)
1400 * number, we divide the total number of free objects
1401 * by expected stripe count (watch out for overflow).
1403 LOV_SUM_MAX(osfs
->os_files
, lov_sfs
->os_files
);
1404 LOV_SUM_MAX(osfs
->os_ffree
, lov_sfs
->os_ffree
);
1408 /* The callback for osc_statfs_async that finalizes a request info when a
1409 * response is received. */
1410 static int cb_statfs_update(void *cookie
, int rc
)
1412 struct obd_info
*oinfo
= cookie
;
1413 struct lov_request
*lovreq
;
1414 struct lov_request_set
*set
;
1415 struct obd_statfs
*osfs
, *lov_sfs
;
1416 struct lov_obd
*lov
;
1417 struct lov_tgt_desc
*tgt
;
1418 struct obd_device
*lovobd
, *tgtobd
;
1421 lovreq
= container_of(oinfo
, struct lov_request
, rq_oi
);
1422 set
= lovreq
->rq_rqset
;
1423 lovobd
= set
->set_obd
;
1424 lov
= &lovobd
->u
.lov
;
1425 osfs
= set
->set_oi
->oi_osfs
;
1426 lov_sfs
= oinfo
->oi_osfs
;
1427 success
= atomic_read(&set
->set_success
);
1428 /* XXX: the same is done in lov_update_common_set, however
1429 lovset->set_exp is not initialized. */
1430 lov_update_set(set
, lovreq
, rc
);
1435 tgt
= lov
->lov_tgts
[lovreq
->rq_idx
];
1436 if (!tgt
|| !tgt
->ltd_active
)
1437 GOTO(out_update
, rc
);
1439 tgtobd
= class_exp2obd(tgt
->ltd_exp
);
1440 spin_lock(&tgtobd
->obd_osfs_lock
);
1441 memcpy(&tgtobd
->obd_osfs
, lov_sfs
, sizeof(*lov_sfs
));
1442 if ((oinfo
->oi_flags
& OBD_STATFS_FROM_CACHE
) == 0)
1443 tgtobd
->obd_osfs_age
= cfs_time_current_64();
1444 spin_unlock(&tgtobd
->obd_osfs_lock
);
1447 lov_update_statfs(osfs
, lov_sfs
, success
);
1451 if (set
->set_oi
->oi_flags
& OBD_STATFS_PTLRPCD
&&
1452 lov_set_finished(set
, 0)) {
1453 lov_statfs_interpret(NULL
, set
, set
->set_count
!=
1454 atomic_read(&set
->set_success
));
1460 int lov_prep_statfs_set(struct obd_device
*obd
, struct obd_info
*oinfo
,
1461 struct lov_request_set
**reqset
)
1463 struct lov_request_set
*set
;
1464 struct lov_obd
*lov
= &obd
->u
.lov
;
1467 OBD_ALLOC(set
, sizeof(*set
));
1473 set
->set_oi
= oinfo
;
1475 /* We only get block data from the OBD */
1476 for (i
= 0; i
< lov
->desc
.ld_tgt_count
; i
++) {
1477 struct lov_request
*req
;
1479 if (lov
->lov_tgts
[i
] == NULL
||
1480 (!lov_check_and_wait_active(lov
, i
) &&
1481 (oinfo
->oi_flags
& OBD_STATFS_NODELAY
))) {
1482 CDEBUG(D_HA
, "lov idx %d inactive\n", i
);
1486 /* skip targets that have been explicitly disabled by the
1488 if (!lov
->lov_tgts
[i
]->ltd_exp
) {
1489 CDEBUG(D_HA
, "lov idx %d administratively disabled\n", i
);
1493 OBD_ALLOC(req
, sizeof(*req
));
1495 GOTO(out_set
, rc
= -ENOMEM
);
1497 OBD_ALLOC(req
->rq_oi
.oi_osfs
, sizeof(*req
->rq_oi
.oi_osfs
));
1498 if (req
->rq_oi
.oi_osfs
== NULL
) {
1499 OBD_FREE(req
, sizeof(*req
));
1500 GOTO(out_set
, rc
= -ENOMEM
);
1504 req
->rq_oi
.oi_cb_up
= cb_statfs_update
;
1505 req
->rq_oi
.oi_flags
= oinfo
->oi_flags
;
1507 lov_set_add_req(req
, set
);
1509 if (!set
->set_count
)
1510 GOTO(out_set
, rc
= -EIO
);
1514 lov_fini_statfs_set(set
);