]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/staging/lustre/lustre/lov/lov_object.c
sched/wait: Rename wait_queue_t => wait_queue_entry_t
[mirror_ubuntu-bionic-kernel.git] / drivers / staging / lustre / lustre / lov / lov_object.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
6a5b99a4 18 * http://www.gnu.org/licenses/gpl-2.0.html
d7e09d03 19 *
d7e09d03
PT
20 * GPL HEADER END
21 */
22/*
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
25 *
1dc563a6 26 * Copyright (c) 2011, 2015, Intel Corporation.
d7e09d03
PT
27 */
28/*
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
31 *
32 * Implementation of cl_object for LOV layer.
33 *
34 * Author: Nikita Danilov <nikita.danilov@sun.com>
35 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
36 */
37
38#define DEBUG_SUBSYSTEM S_LOV
39
40#include "lov_cl_internal.h"
d7e09d03 41
55051039
JH
42static inline struct lov_device *lov_object_dev(struct lov_object *obj)
43{
44 return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
45}
46
d7e09d03
PT
47/** \addtogroup lov
48 * @{
49 */
50
51/*****************************************************************************
52 *
53 * Layout operations.
54 *
55 */
56
57struct lov_layout_operations {
58 int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
55051039 59 struct lov_object *lov, struct lov_stripe_md *lsm,
d7e09d03
PT
60 const struct cl_object_conf *conf,
61 union lov_layout_state *state);
62 int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
f1564f16 63 union lov_layout_state *state);
d7e09d03
PT
64 void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
65 union lov_layout_state *state);
66 void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
67 union lov_layout_state *state);
68 int (*llo_print)(const struct lu_env *env, void *cookie,
69 lu_printer_t p, const struct lu_object *o);
70 int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
7addf402 71 struct cl_page *page, pgoff_t index);
d7e09d03
PT
72 int (*llo_lock_init)(const struct lu_env *env,
73 struct cl_object *obj, struct cl_lock *lock,
74 const struct cl_io *io);
75 int (*llo_io_init)(const struct lu_env *env,
76 struct cl_object *obj, struct cl_io *io);
77 int (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
78 struct cl_attr *attr);
79};
80
81static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
82
55554f31 83static void lov_lsm_put(struct lov_stripe_md *lsm)
a33fdc0d
BJ
84{
85 if (lsm)
86 lov_free_memmd(&lsm);
87}
a33fdc0d 88
d7e09d03
PT
89/*****************************************************************************
90 *
91 * Lov object layout operations.
92 *
93 */
94
95static void lov_install_empty(const struct lu_env *env,
96 struct lov_object *lov,
97 union lov_layout_state *state)
98{
99 /*
100 * File without objects.
101 */
102}
103
55051039
JH
104static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
105 struct lov_object *lov, struct lov_stripe_md *lsm,
d7e09d03 106 const struct cl_object_conf *conf,
55051039 107 union lov_layout_state *state)
d7e09d03
PT
108{
109 return 0;
110}
111
112static void lov_install_raid0(const struct lu_env *env,
113 struct lov_object *lov,
55051039 114 union lov_layout_state *state)
d7e09d03
PT
115{
116}
117
118static struct cl_object *lov_sub_find(const struct lu_env *env,
119 struct cl_device *dev,
120 const struct lu_fid *fid,
121 const struct cl_object_conf *conf)
122{
123 struct lu_object *o;
124
d7e09d03
PT
125 o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
126 LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
0a3bdb00 127 return lu2cl(o);
d7e09d03
PT
128}
129
130static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
f34b6cd3
BJ
131 struct cl_object *stripe, struct lov_layout_raid0 *r0,
132 int idx)
d7e09d03
PT
133{
134 struct cl_object_header *hdr;
135 struct cl_object_header *subhdr;
136 struct cl_object_header *parent;
137 struct lov_oinfo *oinfo;
138 int result;
139
140 if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
141 /* For sanity:test_206.
142 * Do not leave the object in cache to avoid accessing
143 * freed memory. This is because osc_object is referring to
144 * lov_oinfo of lsm_stripe_data which will be freed due to
acb9abc1
OD
145 * this failure.
146 */
d7e09d03
PT
147 cl_object_kill(env, stripe);
148 cl_object_put(env, stripe);
149 return -EIO;
150 }
151
152 hdr = cl_object_header(lov2cl(lov));
153 subhdr = cl_object_header(stripe);
d7e09d03
PT
154
155 oinfo = lov->lo_lsm->lsm_oinfo[idx];
156 CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
157 " idx: %d gen: %d\n",
158 PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
159 PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
160 oinfo->loi_ost_idx, oinfo->loi_ost_gen);
161
f34b6cd3
BJ
162 /* reuse ->coh_attr_guard to protect coh_parent change */
163 spin_lock(&subhdr->coh_attr_guard);
164 parent = subhdr->coh_parent;
00697c43 165 if (!parent) {
d7e09d03 166 subhdr->coh_parent = hdr;
f34b6cd3 167 spin_unlock(&subhdr->coh_attr_guard);
d7e09d03
PT
168 subhdr->coh_nesting = hdr->coh_nesting + 1;
169 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
170 r0->lo_sub[idx] = cl2lovsub(stripe);
171 r0->lo_sub[idx]->lso_super = lov;
172 r0->lo_sub[idx]->lso_index = idx;
173 result = 0;
174 } else {
175 struct lu_object *old_obj;
176 struct lov_object *old_lov;
177 unsigned int mask = D_INODE;
178
f34b6cd3 179 spin_unlock(&subhdr->coh_attr_guard);
d7e09d03 180 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
00697c43 181 LASSERT(old_obj);
d7e09d03
PT
182 old_lov = cl2lov(lu2cl(old_obj));
183 if (old_lov->lo_layout_invalid) {
184 /* the object's layout has already changed but isn't
acb9abc1
OD
185 * refreshed
186 */
d7e09d03
PT
187 lu_object_unhash(env, &stripe->co_lu);
188 result = -EAGAIN;
189 } else {
190 mask = D_ERROR;
191 result = -EIO;
192 }
193
194 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
e93876dd
AZ
195 "stripe %d is already owned.", idx);
196 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
d7e09d03
PT
197 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
198 cl_object_put(env, stripe);
199 }
200 return result;
201}
202
7addf402
JX
203static int lov_page_slice_fixup(struct lov_object *lov,
204 struct cl_object *stripe)
205{
206 struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
207 struct cl_object *o;
208
3dd282c4
JX
209 if (!stripe)
210 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
211 cfs_size_round(sizeof(struct lov_page));
212
7addf402
JX
213 cl_object_for_each(o, stripe)
214 o->co_slice_off += hdr->coh_page_bufsize;
215
216 return cl_object_header(stripe)->coh_page_bufsize;
217}
218
55051039
JH
219static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
220 struct lov_object *lov, struct lov_stripe_md *lsm,
d7e09d03
PT
221 const struct cl_object_conf *conf,
222 union lov_layout_state *state)
223{
224 int result;
225 int i;
226
227 struct cl_object *stripe;
228 struct lov_thread_info *lti = lov_env_info(env);
229 struct cl_object_conf *subconf = &lti->lti_stripe_conf;
d7e09d03
PT
230 struct lu_fid *ofid = &lti->lti_fid;
231 struct lov_layout_raid0 *r0 = &state->raid0;
232
d7e09d03
PT
233 if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
234 dump_lsm(D_ERROR, lsm);
235 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
236 LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
237 }
238
00697c43 239 LASSERT(!lov->lo_lsm);
d7e09d03 240 lov->lo_lsm = lsm_addref(lsm);
1d82425f 241 lov->lo_layout_invalid = true;
d7e09d03
PT
242 r0->lo_nr = lsm->lsm_stripe_count;
243 LASSERT(r0->lo_nr <= lov_targets_nr(dev));
244
3d0ba716
JL
245 r0->lo_sub = libcfs_kvzalloc(r0->lo_nr * sizeof(r0->lo_sub[0]),
246 GFP_NOFS);
00697c43 247 if (r0->lo_sub) {
7addf402
JX
248 int psz = 0;
249
d7e09d03
PT
250 result = 0;
251 subconf->coc_inode = conf->coc_inode;
252 spin_lock_init(&r0->lo_sub_lock);
253 /*
254 * Create stripe cl_objects.
255 */
256 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
257 struct cl_device *subdev;
258 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
259 int ost_idx = oinfo->loi_ost_idx;
260
397632e4
YS
261 if (lov_oinfo_is_dummy(oinfo))
262 continue;
263
d7e09d03
PT
264 result = ostid_to_fid(ofid, &oinfo->loi_oi,
265 oinfo->loi_ost_idx);
266 if (result != 0)
18751668 267 goto out;
d7e09d03 268
098b325b
BJ
269 if (!dev->ld_target[ost_idx]) {
270 CERROR("%s: OST %04x is not initialized\n",
271 lov2obd(dev->ld_lov)->obd_name, ost_idx);
272 result = -EIO;
273 goto out;
274 }
275
d7e09d03
PT
276 subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
277 subconf->u.coc_oinfo = oinfo;
00697c43 278 LASSERTF(subdev, "not init ost %d\n", ost_idx);
d7e09d03 279 /* In the function below, .hs_keycmp resolves to
acb9abc1
OD
280 * lu_obj_hop_keycmp()
281 */
d7e09d03
PT
282 /* coverity[overrun-buffer-val] */
283 stripe = lov_sub_find(env, subdev, ofid, subconf);
284 if (!IS_ERR(stripe)) {
285 result = lov_init_sub(env, lov, stripe, r0, i);
286 if (result == -EAGAIN) { /* try again */
287 --i;
288 result = 0;
7addf402 289 continue;
d7e09d03
PT
290 }
291 } else {
292 result = PTR_ERR(stripe);
293 }
7addf402
JX
294
295 if (result == 0) {
296 int sz = lov_page_slice_fixup(lov, stripe);
297
298 LASSERT(ergo(psz > 0, psz == sz));
299 psz = sz;
300 }
d7e09d03 301 }
7addf402
JX
302 if (result == 0)
303 cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
da5ecb4d 304 } else {
d7e09d03 305 result = -ENOMEM;
da5ecb4d 306 }
d7e09d03 307out:
0a3bdb00 308 return result;
d7e09d03
PT
309}
310
55051039
JH
311static int lov_init_released(const struct lu_env *env, struct lov_device *dev,
312 struct lov_object *lov, struct lov_stripe_md *lsm,
f1564f16
OD
313 const struct cl_object_conf *conf,
314 union lov_layout_state *state)
5dd16419 315{
00697c43 316 LASSERT(lsm);
5dd16419 317 LASSERT(lsm_is_released(lsm));
00697c43 318 LASSERT(!lov->lo_lsm);
5dd16419
JX
319
320 lov->lo_lsm = lsm_addref(lsm);
321 return 0;
322}
323
cbd4d4a8
BJ
324static struct cl_object *lov_find_subobj(const struct lu_env *env,
325 struct lov_object *lov,
326 struct lov_stripe_md *lsm,
327 int stripe_idx)
328{
329 struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
330 struct lov_oinfo *oinfo = lsm->lsm_oinfo[stripe_idx];
331 struct lov_thread_info *lti = lov_env_info(env);
332 struct lu_fid *ofid = &lti->lti_fid;
333 struct cl_device *subdev;
334 struct cl_object *result;
335 int ost_idx;
336 int rc;
337
338 if (lov->lo_type != LLT_RAID0) {
339 result = NULL;
340 goto out;
341 }
342
343 ost_idx = oinfo->loi_ost_idx;
344 rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
345 if (rc) {
346 result = NULL;
347 goto out;
348 }
349
350 subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
351 result = lov_sub_find(env, subdev, ofid, NULL);
352out:
353 if (!result)
354 result = ERR_PTR(-EINVAL);
355 return result;
356}
357
d7e09d03
PT
358static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
359 union lov_layout_state *state)
360{
5dd16419 361 LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
d7e09d03
PT
362
363 lov_layout_wait(env, lov);
d7e09d03
PT
364 return 0;
365}
366
367static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
368 struct lovsub_object *los, int idx)
369{
370 struct cl_object *sub;
371 struct lov_layout_raid0 *r0;
372 struct lu_site *site;
373 struct lu_site_bkt_data *bkt;
ac6424b9 374 wait_queue_entry_t *waiter;
d7e09d03
PT
375
376 r0 = &lov->u.raid0;
377 LASSERT(r0->lo_sub[idx] == los);
378
379 sub = lovsub2cl(los);
380 site = sub->co_lu.lo_dev->ld_site;
381 bkt = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
382
383 cl_object_kill(env, sub);
384 /* release a reference to the sub-object and ... */
385 lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
386 cl_object_put(env, sub);
387
388 /* ... wait until it is actually destroyed---sub-object clears its
acb9abc1
OD
389 * ->lo_sub[] slot in lovsub_object_fini()
390 */
d7e09d03
PT
391 if (r0->lo_sub[idx] == los) {
392 waiter = &lov_env_info(env)->lti_waiter;
9e795d35 393 init_waitqueue_entry(waiter, current);
d7e09d03
PT
394 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
395 set_current_state(TASK_UNINTERRUPTIBLE);
396 while (1) {
397 /* this wait-queue is signaled at the end of
acb9abc1
OD
398 * lu_object_free().
399 */
d7e09d03
PT
400 set_current_state(TASK_UNINTERRUPTIBLE);
401 spin_lock(&r0->lo_sub_lock);
402 if (r0->lo_sub[idx] == los) {
403 spin_unlock(&r0->lo_sub_lock);
b3669a7f 404 schedule();
d7e09d03
PT
405 } else {
406 spin_unlock(&r0->lo_sub_lock);
407 set_current_state(TASK_RUNNING);
408 break;
409 }
410 }
411 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
412 }
00697c43 413 LASSERT(!r0->lo_sub[idx]);
d7e09d03
PT
414}
415
416static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
417 union lov_layout_state *state)
418{
419 struct lov_layout_raid0 *r0 = &state->raid0;
420 struct lov_stripe_md *lsm = lov->lo_lsm;
421 int i;
422
d7e09d03
PT
423 dump_lsm(D_INODE, lsm);
424
425 lov_layout_wait(env, lov);
00697c43 426 if (r0->lo_sub) {
d7e09d03
PT
427 for (i = 0; i < r0->lo_nr; ++i) {
428 struct lovsub_object *los = r0->lo_sub[i];
429
00697c43 430 if (los) {
06563b56 431 cl_object_prune(env, &los->lso_cl);
d7e09d03
PT
432 /*
433 * If top-level object is to be evicted from
434 * the cache, so are its sub-objects.
435 */
436 lov_subobject_kill(env, lov, los, i);
437 }
438 }
439 }
0a3bdb00 440 return 0;
d7e09d03
PT
441}
442
443static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
444 union lov_layout_state *state)
445{
5dd16419 446 LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
d7e09d03
PT
447}
448
449static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
450 union lov_layout_state *state)
451{
452 struct lov_layout_raid0 *r0 = &state->raid0;
d7e09d03 453
00697c43 454 if (r0->lo_sub) {
3d0ba716 455 kvfree(r0->lo_sub);
d7e09d03
PT
456 r0->lo_sub = NULL;
457 }
458
459 dump_lsm(D_INODE, lov->lo_lsm);
460 lov_free_memmd(&lov->lo_lsm);
d7e09d03
PT
461}
462
5dd16419 463static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
f1564f16 464 union lov_layout_state *state)
5dd16419 465{
5dd16419
JX
466 dump_lsm(D_INODE, lov->lo_lsm);
467 lov_free_memmd(&lov->lo_lsm);
5dd16419
JX
468}
469
d7e09d03
PT
470static int lov_print_empty(const struct lu_env *env, void *cookie,
471 lu_printer_t p, const struct lu_object *o)
472{
473 (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
474 return 0;
475}
476
477static int lov_print_raid0(const struct lu_env *env, void *cookie,
478 lu_printer_t p, const struct lu_object *o)
479{
48d23e61
JX
480 struct lov_object *lov = lu2lov(o);
481 struct lov_layout_raid0 *r0 = lov_r0(lov);
482 struct lov_stripe_md *lsm = lov->lo_lsm;
483 int i;
d7e09d03 484
48d23e61 485 (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
f1564f16
OD
486 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
487 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
488 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
d7e09d03
PT
489 for (i = 0; i < r0->lo_nr; ++i) {
490 struct lu_object *sub;
491
00697c43 492 if (r0->lo_sub[i]) {
d7e09d03
PT
493 sub = lovsub2lu(r0->lo_sub[i]);
494 lu_object_print(env, cookie, p, sub);
48d23e61 495 } else {
d7e09d03 496 (*p)(env, cookie, "sub %d absent\n", i);
48d23e61 497 }
d7e09d03
PT
498 }
499 return 0;
500}
501
5dd16419 502static int lov_print_released(const struct lu_env *env, void *cookie,
f1564f16 503 lu_printer_t p, const struct lu_object *o)
5dd16419 504{
48d23e61
JX
505 struct lov_object *lov = lu2lov(o);
506 struct lov_stripe_md *lsm = lov->lo_lsm;
507
508 (*p)(env, cookie,
f1564f16
OD
509 "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
510 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
511 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
512 lsm->lsm_stripe_count, lsm->lsm_layout_gen);
5dd16419
JX
513 return 0;
514}
515
d7e09d03
PT
516/**
517 * Implements cl_object_operations::coo_attr_get() method for an object
518 * without stripes (LLT_EMPTY layout type).
519 *
520 * The only attributes this layer is authoritative in this case is
521 * cl_attr::cat_blocks---it's 0.
522 */
523static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
524 struct cl_attr *attr)
525{
526 attr->cat_blocks = 0;
527 return 0;
528}
529
530static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
531 struct cl_attr *attr)
532{
533 struct lov_object *lov = cl2lov(obj);
534 struct lov_layout_raid0 *r0 = lov_r0(lov);
535 struct cl_attr *lov_attr = &r0->lo_attr;
536 int result = 0;
537
d7e09d03
PT
538 /* this is called w/o holding type guard mutex, so it must be inside
539 * an on going IO otherwise lsm may be replaced.
540 * LU-2117: it turns out there exists one exception. For mmaped files,
541 * the lock of those files may be requested in the other file's IO
542 * context, and this function is called in ccc_lock_state(), it will
543 * hit this assertion.
544 * Anyway, it's still okay to call attr_get w/o type guard as layout
acb9abc1
OD
545 * can't go if locks exist.
546 */
d7e09d03
PT
547 /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
548
549 if (!r0->lo_attr_valid) {
550 struct lov_stripe_md *lsm = lov->lo_lsm;
551 struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
552 __u64 kms = 0;
553
554 memset(lvb, 0, sizeof(*lvb));
555 /* XXX: timestamps can be negative by sanity:test_39m,
acb9abc1
OD
556 * how can it be?
557 */
d7e09d03
PT
558 lvb->lvb_atime = LLONG_MIN;
559 lvb->lvb_ctime = LLONG_MIN;
560 lvb->lvb_mtime = LLONG_MIN;
561
562 /*
563 * XXX that should be replaced with a loop over sub-objects,
564 * doing cl_object_attr_get() on them. But for now, let's
565 * reuse old lov code.
566 */
567
568 /*
569 * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
570 * happy. It's not needed, because new code uses
571 * ->coh_attr_guard spin-lock to protect consistency of
572 * sub-object attributes.
573 */
574 lov_stripe_lock(lsm);
575 result = lov_merge_lvb_kms(lsm, lvb, &kms);
576 lov_stripe_unlock(lsm);
577 if (result == 0) {
578 cl_lvb2attr(lov_attr, lvb);
579 lov_attr->cat_kms = kms;
580 r0->lo_attr_valid = 1;
581 }
582 }
583 if (result == 0) { /* merge results */
584 attr->cat_blocks = lov_attr->cat_blocks;
585 attr->cat_size = lov_attr->cat_size;
586 attr->cat_kms = lov_attr->cat_kms;
587 if (attr->cat_atime < lov_attr->cat_atime)
588 attr->cat_atime = lov_attr->cat_atime;
589 if (attr->cat_ctime < lov_attr->cat_ctime)
590 attr->cat_ctime = lov_attr->cat_ctime;
591 if (attr->cat_mtime < lov_attr->cat_mtime)
592 attr->cat_mtime = lov_attr->cat_mtime;
593 }
0a3bdb00 594 return result;
d7e09d03
PT
595}
596
fcda2f5b 597static const struct lov_layout_operations lov_dispatch[] = {
d7e09d03
PT
598 [LLT_EMPTY] = {
599 .llo_init = lov_init_empty,
600 .llo_delete = lov_delete_empty,
601 .llo_fini = lov_fini_empty,
602 .llo_install = lov_install_empty,
603 .llo_print = lov_print_empty,
604 .llo_page_init = lov_page_init_empty,
605 .llo_lock_init = lov_lock_init_empty,
606 .llo_io_init = lov_io_init_empty,
607 .llo_getattr = lov_attr_get_empty
608 },
609 [LLT_RAID0] = {
610 .llo_init = lov_init_raid0,
611 .llo_delete = lov_delete_raid0,
612 .llo_fini = lov_fini_raid0,
613 .llo_install = lov_install_raid0,
614 .llo_print = lov_print_raid0,
615 .llo_page_init = lov_page_init_raid0,
616 .llo_lock_init = lov_lock_init_raid0,
617 .llo_io_init = lov_io_init_raid0,
618 .llo_getattr = lov_attr_get_raid0
5dd16419
JX
619 },
620 [LLT_RELEASED] = {
621 .llo_init = lov_init_released,
622 .llo_delete = lov_delete_empty,
623 .llo_fini = lov_fini_released,
624 .llo_install = lov_install_empty,
625 .llo_print = lov_print_released,
626 .llo_page_init = lov_page_init_empty,
627 .llo_lock_init = lov_lock_init_empty,
628 .llo_io_init = lov_io_init_released,
629 .llo_getattr = lov_attr_get_empty
d7e09d03
PT
630 }
631};
632
d7e09d03
PT
633/**
634 * Performs a double-dispatch based on the layout type of an object.
635 */
636#define LOV_2DISPATCH_NOLOCK(obj, op, ...) \
637({ \
638 struct lov_object *__obj = (obj); \
639 enum lov_layout_type __llt; \
640 \
641 __llt = __obj->lo_type; \
642 LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch)); \
643 lov_dispatch[__llt].op(__VA_ARGS__); \
644})
645
5dd16419
JX
646/**
647 * Return lov_layout_type associated with a given lsm
648 */
12e397cd 649static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
5dd16419 650{
00697c43 651 if (!lsm)
5dd16419
JX
652 return LLT_EMPTY;
653 if (lsm_is_released(lsm))
654 return LLT_RELEASED;
655 return LLT_RAID0;
656}
657
d7e09d03
PT
658static inline void lov_conf_freeze(struct lov_object *lov)
659{
d49ae438
JX
660 CDEBUG(D_INODE, "To take share lov(%p) owner %p/%p\n",
661 lov, lov->lo_owner, current);
d7e09d03
PT
662 if (lov->lo_owner != current)
663 down_read(&lov->lo_type_guard);
664}
665
666static inline void lov_conf_thaw(struct lov_object *lov)
667{
d49ae438
JX
668 CDEBUG(D_INODE, "To release share lov(%p) owner %p/%p\n",
669 lov, lov->lo_owner, current);
d7e09d03
PT
670 if (lov->lo_owner != current)
671 up_read(&lov->lo_type_guard);
672}
673
674#define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...) \
675({ \
676 struct lov_object *__obj = (obj); \
677 int __lock = !!(lock); \
678 typeof(lov_dispatch[0].op(__VA_ARGS__)) __result; \
679 \
680 if (__lock) \
681 lov_conf_freeze(__obj); \
682 __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__); \
683 if (__lock) \
684 lov_conf_thaw(__obj); \
685 __result; \
686})
687
688/**
689 * Performs a locked double-dispatch based on the layout type of an object.
690 */
691#define LOV_2DISPATCH(obj, op, ...) \
692 LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
693
694#define LOV_2DISPATCH_VOID(obj, op, ...) \
695do { \
696 struct lov_object *__obj = (obj); \
697 enum lov_layout_type __llt; \
698 \
699 lov_conf_freeze(__obj); \
700 __llt = __obj->lo_type; \
701 LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch)); \
702 lov_dispatch[__llt].op(__VA_ARGS__); \
703 lov_conf_thaw(__obj); \
704} while (0)
705
706static void lov_conf_lock(struct lov_object *lov)
707{
708 LASSERT(lov->lo_owner != current);
709 down_write(&lov->lo_type_guard);
00697c43 710 LASSERT(!lov->lo_owner);
d7e09d03 711 lov->lo_owner = current;
d49ae438
JX
712 CDEBUG(D_INODE, "Took exclusive lov(%p) owner %p\n",
713 lov, lov->lo_owner);
d7e09d03
PT
714}
715
716static void lov_conf_unlock(struct lov_object *lov)
717{
d49ae438
JX
718 CDEBUG(D_INODE, "To release exclusive lov(%p) owner %p\n",
719 lov, lov->lo_owner);
d7e09d03
PT
720 lov->lo_owner = NULL;
721 up_write(&lov->lo_type_guard);
722}
723
724static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
725{
726 struct l_wait_info lwi = { 0 };
d7e09d03
PT
727
728 while (atomic_read(&lov->lo_active_ios) > 0) {
f1564f16
OD
729 CDEBUG(D_INODE, "file:" DFID " wait for active IO, now: %d.\n",
730 PFID(lu_object_fid(lov2lu(lov))),
731 atomic_read(&lov->lo_active_ios));
d7e09d03
PT
732
733 l_wait_event(lov->lo_waitq,
734 atomic_read(&lov->lo_active_ios) == 0, &lwi);
735 }
0a3bdb00 736 return 0;
d7e09d03
PT
737}
738
739static int lov_layout_change(const struct lu_env *unused,
55051039 740 struct lov_object *lov, struct lov_stripe_md *lsm,
d7e09d03
PT
741 const struct cl_object_conf *conf)
742{
d49ae438 743 struct lov_device *lov_dev = lov_object_dev(lov);
55051039 744 enum lov_layout_type llt = lov_type(lsm);
d7e09d03
PT
745 union lov_layout_state *state = &lov->u;
746 const struct lov_layout_operations *old_ops;
747 const struct lov_layout_operations *new_ops;
d7e09d03 748 struct lu_env *env;
3ee45c7e 749 u16 refcheck;
55051039 750 int rc;
d7e09d03
PT
751
752 LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
753
d7e09d03 754 env = cl_env_get(&refcheck);
a763e916 755 if (IS_ERR(env))
0a3bdb00 756 return PTR_ERR(env);
d7e09d03 757
55051039
JH
758 LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
759
48d23e61
JX
760 CDEBUG(D_INODE, DFID" from %s to %s\n",
761 PFID(lu_object_fid(lov2lu(lov))),
762 llt2str(lov->lo_type), llt2str(llt));
763
d7e09d03
PT
764 old_ops = &lov_dispatch[lov->lo_type];
765 new_ops = &lov_dispatch[llt];
766
55051039
JH
767 rc = cl_object_prune(env, &lov->lo_cl);
768 if (rc)
769 goto out;
770
771 rc = old_ops->llo_delete(env, lov, &lov->u);
772 if (rc)
06563b56 773 goto out;
d9d47901 774
55051039
JH
775 old_ops->llo_fini(env, lov, &lov->u);
776
777 LASSERT(!atomic_read(&lov->lo_active_ios));
d7e09d03 778
d49ae438
JX
779 CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
780 PFID(lu_object_fid(lov2lu(lov))), lov, llt);
781
55051039 782 lov->lo_type = LLT_EMPTY;
d7e09d03 783
55051039
JH
784 /* page bufsize fixup */
785 cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
3dd282c4
JX
786 lov_page_slice_fixup(lov, NULL);
787
d49ae438 788 rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
55051039 789 if (rc) {
d49ae438
JX
790 struct obd_device *obd = lov2obd(lov_dev->ld_lov);
791
792 CERROR("%s: cannot apply new layout on " DFID " : rc = %d\n",
793 obd->obd_name, PFID(lu_object_fid(lov2lu(lov))), rc);
55051039
JH
794 new_ops->llo_delete(env, lov, state);
795 new_ops->llo_fini(env, lov, state);
796 /* this file becomes an EMPTY file. */
797 goto out;
d7e09d03
PT
798 }
799
55051039
JH
800 new_ops->llo_install(env, lov, state);
801 lov->lo_type = llt;
06563b56 802out:
d7e09d03 803 cl_env_put(env, &refcheck);
55051039 804 return rc;
d7e09d03
PT
805}
806
807/*****************************************************************************
808 *
809 * Lov object operations.
810 *
811 */
d7e09d03
PT
812int lov_object_init(const struct lu_env *env, struct lu_object *obj,
813 const struct lu_object_conf *conf)
814{
d7e09d03 815 struct lov_object *lov = lu2lov(obj);
55051039 816 struct lov_device *dev = lov_object_dev(lov);
d7e09d03
PT
817 const struct cl_object_conf *cconf = lu2cl_conf(conf);
818 union lov_layout_state *set = &lov->u;
819 const struct lov_layout_operations *ops;
55051039
JH
820 struct lov_stripe_md *lsm = NULL;
821 int rc;
d7e09d03 822
d7e09d03
PT
823 init_rwsem(&lov->lo_type_guard);
824 atomic_set(&lov->lo_active_ios, 0);
825 init_waitqueue_head(&lov->lo_waitq);
d7e09d03
PT
826 cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
827
f761d016 828 lov->lo_type = LLT_EMPTY;
55051039
JH
829 if (cconf->u.coc_layout.lb_buf) {
830 lsm = lov_unpackmd(dev->ld_lov,
831 cconf->u.coc_layout.lb_buf,
832 cconf->u.coc_layout.lb_len);
833 if (IS_ERR(lsm))
834 return PTR_ERR(lsm);
835 }
836
d7e09d03 837 /* no locking is necessary, as object is being created */
55051039 838 lov->lo_type = lov_type(lsm);
d7e09d03 839 ops = &lov_dispatch[lov->lo_type];
55051039
JH
840 rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
841 if (!rc)
d7e09d03 842 ops->llo_install(env, lov, set);
55051039
JH
843
844 lov_lsm_put(lsm);
845
846 return rc;
d7e09d03
PT
847}
848
849static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
850 const struct cl_object_conf *conf)
851{
5dd16419
JX
852 struct lov_stripe_md *lsm = NULL;
853 struct lov_object *lov = cl2lov(obj);
854 int result = 0;
d7e09d03 855
55051039
JH
856 if (conf->coc_opc == OBJECT_CONF_SET &&
857 conf->u.coc_layout.lb_buf) {
858 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
859 conf->u.coc_layout.lb_buf,
860 conf->u.coc_layout.lb_len);
861 if (IS_ERR(lsm))
862 return PTR_ERR(lsm);
863 }
864
d7e09d03
PT
865 lov_conf_lock(lov);
866 if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
867 lov->lo_layout_invalid = true;
18751668
JL
868 result = 0;
869 goto out;
d7e09d03
PT
870 }
871
872 if (conf->coc_opc == OBJECT_CONF_WAIT) {
873 if (lov->lo_layout_invalid &&
874 atomic_read(&lov->lo_active_ios) > 0) {
875 lov_conf_unlock(lov);
876 result = lov_layout_wait(env, lov);
877 lov_conf_lock(lov);
878 }
18751668 879 goto out;
d7e09d03
PT
880 }
881
882 LASSERT(conf->coc_opc == OBJECT_CONF_SET);
883
00697c43
OD
884 if ((!lsm && !lov->lo_lsm) ||
885 ((lsm && lov->lo_lsm) &&
48d23e61
JX
886 (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
887 (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
d7e09d03
PT
888 /* same version of layout */
889 lov->lo_layout_invalid = false;
18751668
JL
890 result = 0;
891 goto out;
d7e09d03
PT
892 }
893
894 /* will change layout - check if there still exists active IO. */
895 if (atomic_read(&lov->lo_active_ios) > 0) {
896 lov->lo_layout_invalid = true;
18751668
JL
897 result = -EBUSY;
898 goto out;
d7e09d03
PT
899 }
900
55051039 901 result = lov_layout_change(env, lov, lsm, conf);
06563b56 902 lov->lo_layout_invalid = result != 0;
d7e09d03
PT
903
904out:
905 lov_conf_unlock(lov);
55051039 906 lov_lsm_put(lsm);
48d23e61
JX
907 CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
908 PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
0a3bdb00 909 return result;
d7e09d03
PT
910}
911
912static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
913{
914 struct lov_object *lov = lu2lov(obj);
915
d7e09d03 916 LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
d7e09d03
PT
917}
918
919static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
920{
921 struct lov_object *lov = lu2lov(obj);
922
d7e09d03
PT
923 LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
924 lu_object_fini(obj);
50d30362 925 kmem_cache_free(lov_object_kmem, lov);
d7e09d03
PT
926}
927
928static int lov_object_print(const struct lu_env *env, void *cookie,
929 lu_printer_t p, const struct lu_object *o)
930{
931 return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
932}
933
934int lov_page_init(const struct lu_env *env, struct cl_object *obj,
7addf402 935 struct cl_page *page, pgoff_t index)
d7e09d03 936{
7addf402
JX
937 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
938 index);
d7e09d03
PT
939}
940
941/**
942 * Implements cl_object_operations::clo_io_init() method for lov
943 * layer. Dispatches to the appropriate layout io initialization method.
944 */
945int lov_io_init(const struct lu_env *env, struct cl_object *obj,
946 struct cl_io *io)
947{
948 CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
d49ae438
JX
949
950 CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
951 PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
952 io->ci_ignore_layout, io->ci_verify_layout);
953
d7e09d03
PT
954 return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
955 !io->ci_ignore_layout, env, obj, io);
956}
957
958/**
959 * An implementation of cl_object_operations::clo_attr_get() method for lov
960 * layer. For raid0 layout this collects and merges attributes of all
961 * sub-objects.
962 */
963static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
964 struct cl_attr *attr)
965{
966 /* do not take lock, as this function is called under a
acb9abc1
OD
967 * spin-lock. Layout is protected from changing by ongoing IO.
968 */
d7e09d03
PT
969 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
970}
971
96234ec5
BJ
972static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
973 const struct cl_attr *attr, unsigned int valid)
d7e09d03
PT
974{
975 /*
976 * No dispatch is required here, as no layout implements this.
977 */
978 return 0;
979}
980
981int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
982 struct cl_lock *lock, const struct cl_io *io)
983{
984 /* No need to lock because we've taken one refcount of layout. */
985 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
986 io);
987}
988
cbd4d4a8
BJ
989/**
990 * We calculate on which OST the mapping will end. If the length of mapping
991 * is greater than (stripe_size * stripe_count) then the last_stripe will
992 * will be one just before start_stripe. Else we check if the mapping
993 * intersects each OST and find last_stripe.
994 * This function returns the last_stripe and also sets the stripe_count
995 * over which the mapping is spread
996 *
997 * \param lsm [in] striping information for the file
998 * \param fm_start [in] logical start of mapping
999 * \param fm_end [in] logical end of mapping
1000 * \param start_stripe [in] starting stripe of the mapping
1001 * \param stripe_count [out] the number of stripes across which to map is
1002 * returned
1003 *
1004 * \retval last_stripe return the last stripe of the mapping
1005 */
1006static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
1007 loff_t fm_start, loff_t fm_end,
1008 int start_stripe, int *stripe_count)
1009{
1010 int last_stripe;
1011 loff_t obd_start;
1012 loff_t obd_end;
1013 int i, j;
1014
1015 if (fm_end - fm_start > lsm->lsm_stripe_size * lsm->lsm_stripe_count) {
1016 last_stripe = (start_stripe < 1 ? lsm->lsm_stripe_count - 1 :
1017 start_stripe - 1);
1018 *stripe_count = lsm->lsm_stripe_count;
1019 } else {
1020 for (j = 0, i = start_stripe; j < lsm->lsm_stripe_count;
1021 i = (i + 1) % lsm->lsm_stripe_count, j++) {
1022 if (!(lov_stripe_intersects(lsm, i, fm_start, fm_end,
1023 &obd_start, &obd_end)))
1024 break;
1025 }
1026 *stripe_count = j;
1027 last_stripe = (start_stripe + j - 1) % lsm->lsm_stripe_count;
1028 }
1029
1030 return last_stripe;
1031}
1032
1033/**
1034 * Set fe_device and copy extents from local buffer into main return buffer.
1035 *
1036 * \param fiemap [out] fiemap to hold all extents
1037 * \param lcl_fm_ext [in] array of fiemap extents get from OSC layer
1038 * \param ost_index [in] OST index to be written into the fm_device
1039 * field for each extent
1040 * \param ext_count [in] number of extents to be copied
1041 * \param current_extent [in] where to start copying in the extent array
1042 */
1043static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1044 struct fiemap_extent *lcl_fm_ext,
1045 int ost_index, unsigned int ext_count,
1046 int current_extent)
1047{
1048 unsigned int ext;
1049 char *to;
1050
1051 for (ext = 0; ext < ext_count; ext++) {
1052 lcl_fm_ext[ext].fe_device = ost_index;
1053 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1054 }
1055
1056 /* Copy fm_extent's from fm_local to return buffer */
1057 to = (char *)fiemap + fiemap_count_to_size(current_extent);
1058 memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1059}
1060
1061#define FIEMAP_BUFFER_SIZE 4096
1062
1063/**
1064 * Non-zero fe_logical indicates that this is a continuation FIEMAP
1065 * call. The local end offset and the device are sent in the first
1066 * fm_extent. This function calculates the stripe number from the index.
1067 * This function returns a stripe_no on which mapping is to be restarted.
1068 *
1069 * This function returns fm_end_offset which is the in-OST offset at which
1070 * mapping should be restarted. If fm_end_offset=0 is returned then caller
1071 * will re-calculate proper offset in next stripe.
1072 * Note that the first extent is passed to lov_get_info via the value field.
1073 *
1074 * \param fiemap [in] fiemap request header
1075 * \param lsm [in] striping information for the file
1076 * \param fm_start [in] logical start of mapping
1077 * \param fm_end [in] logical end of mapping
1078 * \param start_stripe [out] starting stripe will be returned in this
1079 */
1080static loff_t fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1081 struct lov_stripe_md *lsm,
1082 loff_t fm_start, loff_t fm_end,
1083 int *start_stripe)
1084{
1085 loff_t local_end = fiemap->fm_extents[0].fe_logical;
1086 loff_t lun_start, lun_end;
1087 loff_t fm_end_offset;
1088 int stripe_no = -1;
1089 int i;
1090
1091 if (!fiemap->fm_extent_count || !fiemap->fm_extents[0].fe_logical)
1092 return 0;
1093
1094 /* Find out stripe_no from ost_index saved in the fe_device */
1095 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1096 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
1097
1098 if (lov_oinfo_is_dummy(oinfo))
1099 continue;
1100
1101 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1102 stripe_no = i;
1103 break;
1104 }
1105 }
1106
1107 if (stripe_no == -1)
1108 return -EINVAL;
1109
1110 /*
1111 * If we have finished mapping on previous device, shift logical
1112 * offset to start of next device
1113 */
1114 if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
1115 &lun_start, &lun_end) &&
1116 local_end < lun_end) {
1117 fm_end_offset = local_end;
1118 *start_stripe = stripe_no;
1119 } else {
1120 /* This is a special value to indicate that caller should
1121 * calculate offset in next stripe.
1122 */
1123 fm_end_offset = 0;
1124 *start_stripe = (stripe_no + 1) % lsm->lsm_stripe_count;
1125 }
1126
1127 return fm_end_offset;
1128}
1129
1130/**
1131 * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1132 * This also handles the restarting of FIEMAP calls in case mapping overflows
1133 * the available number of extents in single call.
1134 *
1135 * \param env [in] lustre environment
1136 * \param obj [in] file object
1137 * \param fmkey [in] fiemap request header and other info
1138 * \param fiemap [out] fiemap buffer holding retrived map extents
1139 * \param buflen [in/out] max buffer length of @fiemap, when iterate
1140 * each OST, it is used to limit max map needed
1141 * \retval 0 success
1142 * \retval < 0 error
1143 */
1144static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1145 struct ll_fiemap_info_key *fmkey,
1146 struct fiemap *fiemap, size_t *buflen)
1147{
1148 struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1149 unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
1150 struct fiemap_extent *lcl_fm_ext;
1151 struct cl_object *subobj = NULL;
1152 struct fiemap *fm_local = NULL;
1153 struct lov_stripe_md *lsm;
1154 loff_t fm_start;
1155 loff_t fm_end;
1156 loff_t fm_length;
1157 loff_t fm_end_offset;
1158 int count_local;
1159 int ost_index = 0;
1160 int start_stripe;
1161 int current_extent = 0;
1162 int rc = 0;
1163 int last_stripe;
1164 int cur_stripe = 0;
1165 int cur_stripe_wrap = 0;
1166 int stripe_count;
1167 /* Whether have we collected enough extents */
1168 bool enough = false;
1169 /* EOF for object */
1170 bool ost_eof = false;
1171 /* done with required mapping for this OST? */
1172 bool ost_done = false;
1173
1174 lsm = lov_lsm_addref(cl2lov(obj));
1175 if (!lsm)
1176 return -ENODATA;
1177
1178 /**
1179 * If the stripe_count > 1 and the application does not understand
1180 * DEVICE_ORDER flag, it cannot interpret the extents correctly.
1181 */
1182 if (lsm->lsm_stripe_count > 1 &&
1183 !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
1184 rc = -ENOTSUPP;
1185 goto out;
1186 }
1187
1188 if (lsm_is_released(lsm)) {
1189 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1190 /**
1191 * released file, return a minimal FIEMAP if
1192 * request fits in file-size.
1193 */
1194 fiemap->fm_mapped_extents = 1;
1195 fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1196 if (fiemap->fm_start + fiemap->fm_length <
1197 fmkey->lfik_oa.o_size)
1198 fiemap->fm_extents[0].fe_length =
1199 fiemap->fm_length;
1200 else
1201 fiemap->fm_extents[0].fe_length =
1202 fmkey->lfik_oa.o_size -
1203 fiemap->fm_start;
1204 fiemap->fm_extents[0].fe_flags |=
1205 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1206 }
1207 rc = 0;
1208 goto out;
1209 }
1210
1211 if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1212 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1213
1214 fm_local = libcfs_kvzalloc(buffer_size, GFP_NOFS);
1215 if (!fm_local) {
1216 rc = -ENOMEM;
1217 goto out;
1218 }
1219 lcl_fm_ext = &fm_local->fm_extents[0];
1220 count_local = fiemap_size_to_count(buffer_size);
1221
1222 fm_start = fiemap->fm_start;
1223 fm_length = fiemap->fm_length;
1224 /* Calculate start stripe, last stripe and length of mapping */
1225 start_stripe = lov_stripe_number(lsm, fm_start);
1226 fm_end = (fm_length == ~0ULL) ? fmkey->lfik_oa.o_size :
1227 fm_start + fm_length - 1;
1228 /* If fm_length != ~0ULL but fm_start_fm_length-1 exceeds file size */
1229 if (fm_end > fmkey->lfik_oa.o_size)
1230 fm_end = fmkey->lfik_oa.o_size;
1231
1232 last_stripe = fiemap_calc_last_stripe(lsm, fm_start, fm_end,
1233 start_stripe, &stripe_count);
1234 fm_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fm_start, fm_end,
1235 &start_stripe);
1236 if (fm_end_offset == -EINVAL) {
1237 rc = -EINVAL;
1238 goto out;
1239 }
1240
1241 /**
1242 * Requested extent count exceeds the fiemap buffer size, shrink our
1243 * ambition.
1244 */
1245 if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1246 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1247 if (!fiemap->fm_extent_count)
1248 count_local = 0;
1249
1250 /* Check each stripe */
1251 for (cur_stripe = start_stripe; stripe_count > 0;
1252 --stripe_count,
1253 cur_stripe = (cur_stripe + 1) % lsm->lsm_stripe_count) {
1254 loff_t req_fm_len; /* Stores length of required mapping */
1255 loff_t len_mapped_single_call;
1256 loff_t lun_start;
1257 loff_t lun_end;
1258 loff_t obd_object_end;
1259 unsigned int ext_count;
1260
1261 cur_stripe_wrap = cur_stripe;
1262
1263 /* Find out range of mapping on this stripe */
1264 if (!(lov_stripe_intersects(lsm, cur_stripe, fm_start, fm_end,
1265 &lun_start, &obd_object_end)))
1266 continue;
1267
1268 if (lov_oinfo_is_dummy(lsm->lsm_oinfo[cur_stripe])) {
1269 rc = -EIO;
1270 goto out;
1271 }
1272
1273 /*
1274 * If this is a continuation FIEMAP call and we are on
1275 * starting stripe then lun_start needs to be set to
1276 * fm_end_offset
1277 */
1278 if (fm_end_offset && cur_stripe == start_stripe)
1279 lun_start = fm_end_offset;
1280
1281 if (fm_length != ~0ULL) {
1282 /* Handle fm_start + fm_length overflow */
1283 if (fm_start + fm_length < fm_start)
1284 fm_length = ~0ULL - fm_start;
1285 lun_end = lov_size_to_stripe(lsm, fm_start + fm_length,
1286 cur_stripe);
1287 } else {
1288 lun_end = ~0ULL;
1289 }
1290
1291 if (lun_start == lun_end)
1292 continue;
1293
1294 req_fm_len = obd_object_end - lun_start;
1295 fm_local->fm_length = 0;
1296 len_mapped_single_call = 0;
1297
1298 /* find lobsub object */
1299 subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1300 cur_stripe);
1301 if (IS_ERR(subobj)) {
1302 rc = PTR_ERR(subobj);
1303 goto out;
1304 }
1305 /*
1306 * If the output buffer is very large and the objects have many
1307 * extents we may need to loop on a single OST repeatedly
1308 */
1309 ost_eof = false;
1310 ost_done = false;
1311 do {
1312 if (fiemap->fm_extent_count > 0) {
1313 /* Don't get too many extents. */
1314 if (current_extent + count_local >
1315 fiemap->fm_extent_count)
1316 count_local = fiemap->fm_extent_count -
1317 current_extent;
1318 }
1319
1320 lun_start += len_mapped_single_call;
1321 fm_local->fm_length = req_fm_len -
1322 len_mapped_single_call;
1323 req_fm_len = fm_local->fm_length;
1324 fm_local->fm_extent_count = enough ? 1 : count_local;
1325 fm_local->fm_mapped_extents = 0;
1326 fm_local->fm_flags = fiemap->fm_flags;
1327
1328 ost_index = lsm->lsm_oinfo[cur_stripe]->loi_ost_idx;
1329
1330 if (ost_index < 0 ||
1331 ost_index >= lov->desc.ld_tgt_count) {
1332 rc = -EINVAL;
1333 goto obj_put;
1334 }
1335 /*
1336 * If OST is inactive, return extent with UNKNOWN
1337 * flag.
1338 */
1339 if (!lov->lov_tgts[ost_index]->ltd_active) {
1340 fm_local->fm_flags |= FIEMAP_EXTENT_LAST;
1341 fm_local->fm_mapped_extents = 1;
1342
1343 lcl_fm_ext[0].fe_logical = lun_start;
1344 lcl_fm_ext[0].fe_length = obd_object_end -
1345 lun_start;
1346 lcl_fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1347
1348 goto inactive_tgt;
1349 }
1350
1351 fm_local->fm_start = lun_start;
1352 fm_local->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1353 memcpy(&fmkey->lfik_fiemap, fm_local, sizeof(*fm_local));
1354 *buflen = fiemap_count_to_size(fm_local->fm_extent_count);
1355
1356 rc = cl_object_fiemap(env, subobj, fmkey, fm_local,
1357 buflen);
1358 if (rc)
1359 goto obj_put;
1360inactive_tgt:
1361 ext_count = fm_local->fm_mapped_extents;
1362 if (!ext_count) {
1363 ost_done = true;
1364 /*
1365 * If last stripe has hold at the end,
1366 * we need to return
1367 */
1368 if (cur_stripe_wrap == last_stripe) {
1369 fiemap->fm_mapped_extents = 0;
1370 goto finish;
1371 }
1372 break;
1373 } else if (enough) {
1374 /*
1375 * We've collected enough extents and there are
1376 * more extents after it.
1377 */
1378 goto finish;
1379 }
1380
1381 /* If we just need num of extents, got to next device */
1382 if (!fiemap->fm_extent_count) {
1383 current_extent += ext_count;
1384 break;
1385 }
1386
1387 /* prepare to copy retrived map extents */
1388 len_mapped_single_call =
1389 lcl_fm_ext[ext_count - 1].fe_logical -
1390 lun_start + lcl_fm_ext[ext_count - 1].fe_length;
1391
1392 /* Have we finished mapping on this device? */
1393 if (req_fm_len <= len_mapped_single_call)
1394 ost_done = true;
1395
1396 /*
1397 * Clear the EXTENT_LAST flag which can be present on
1398 * the last extent
1399 */
1400 if (lcl_fm_ext[ext_count - 1].fe_flags &
1401 FIEMAP_EXTENT_LAST)
1402 lcl_fm_ext[ext_count - 1].fe_flags &=
1403 ~FIEMAP_EXTENT_LAST;
1404
1405 if (lov_stripe_size(lsm,
1406 lcl_fm_ext[ext_count - 1].fe_logical +
1407 lcl_fm_ext[ext_count - 1].fe_length,
1408 cur_stripe) >= fmkey->lfik_oa.o_size)
1409 ost_eof = true;
1410
1411 fiemap_prepare_and_copy_exts(fiemap, lcl_fm_ext,
1412 ost_index, ext_count,
1413 current_extent);
1414 current_extent += ext_count;
1415
1416 /* Ran out of available extents? */
1417 if (current_extent >= fiemap->fm_extent_count)
1418 enough = true;
1419 } while (!ost_done && !ost_eof);
1420
1421 cl_object_put(env, subobj);
1422 subobj = NULL;
1423
1424 if (cur_stripe_wrap == last_stripe)
1425 goto finish;
1426 } /* for each stripe */
1427finish:
1428 /*
1429 * Indicate that we are returning device offsets unless file just has
1430 * single stripe
1431 */
1432 if (lsm->lsm_stripe_count > 1)
1433 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1434
1435 if (!fiemap->fm_extent_count)
1436 goto skip_last_device_calc;
1437
1438 /*
1439 * Check if we have reached the last stripe and whether mapping for that
1440 * stripe is done.
1441 */
1442 if ((cur_stripe_wrap == last_stripe) && (ost_done || ost_eof))
1443 fiemap->fm_extents[current_extent - 1].fe_flags |=
1444 FIEMAP_EXTENT_LAST;
1445skip_last_device_calc:
1446 fiemap->fm_mapped_extents = current_extent;
1447obj_put:
1448 if (subobj)
1449 cl_object_put(env, subobj);
1450out:
1451 kvfree(fm_local);
55554f31 1452 lov_lsm_put(lsm);
cbd4d4a8
BJ
1453 return rc;
1454}
1455
a33fdc0d
BJ
1456static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1457 struct lov_user_md __user *lum)
1458{
1459 struct lov_object *lov = cl2lov(obj);
1460 struct lov_stripe_md *lsm;
1461 int rc = 0;
1462
1463 lsm = lov_lsm_addref(lov);
1464 if (!lsm)
1465 return -ENODATA;
1466
1467 rc = lov_getstripe(cl2lov(obj), lsm, lum);
55554f31 1468 lov_lsm_put(lsm);
a33fdc0d
BJ
1469 return rc;
1470}
1471
55554f31
JH
1472static int lov_object_layout_get(const struct lu_env *env,
1473 struct cl_object *obj,
1474 struct cl_layout *cl)
1475{
1476 struct lov_object *lov = cl2lov(obj);
1477 struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1478 struct lu_buf *buf = &cl->cl_buf;
1479 ssize_t rc;
1480
1481 if (!lsm) {
1482 cl->cl_size = 0;
1483 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
55554f31
JH
1484 return 0;
1485 }
1486
1487 cl->cl_size = lov_mds_md_size(lsm->lsm_stripe_count, lsm->lsm_magic);
1488 cl->cl_layout_gen = lsm->lsm_layout_gen;
55554f31
JH
1489
1490 rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1491 lov_lsm_put(lsm);
1492
1493 return rc < 0 ? rc : 0;
1494}
1495
090a6485
JH
1496static loff_t lov_object_maxbytes(struct cl_object *obj)
1497{
1498 struct lov_object *lov = cl2lov(obj);
1499 struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1500 loff_t maxbytes;
1501
1502 if (!lsm)
1503 return LLONG_MAX;
1504
1505 maxbytes = lsm->lsm_maxbytes;
1506
1507 lov_lsm_put(lsm);
1508
1509 return maxbytes;
1510}
1511
d7e09d03
PT
1512static const struct cl_object_operations lov_ops = {
1513 .coo_page_init = lov_page_init,
1514 .coo_lock_init = lov_lock_init,
1515 .coo_io_init = lov_io_init,
1516 .coo_attr_get = lov_attr_get,
96234ec5 1517 .coo_attr_update = lov_attr_update,
a33fdc0d 1518 .coo_conf_set = lov_conf_set,
cbd4d4a8 1519 .coo_getstripe = lov_object_getstripe,
55554f31 1520 .coo_layout_get = lov_object_layout_get,
090a6485 1521 .coo_maxbytes = lov_object_maxbytes,
cbd4d4a8 1522 .coo_fiemap = lov_object_fiemap,
d7e09d03
PT
1523};
1524
1525static const struct lu_object_operations lov_lu_obj_ops = {
1526 .loo_object_init = lov_object_init,
1527 .loo_object_delete = lov_object_delete,
1528 .loo_object_release = NULL,
1529 .loo_object_free = lov_object_free,
1530 .loo_object_print = lov_object_print,
1531 .loo_object_invariant = NULL
1532};
1533
1534struct lu_object *lov_object_alloc(const struct lu_env *env,
1535 const struct lu_object_header *unused,
1536 struct lu_device *dev)
1537{
1538 struct lov_object *lov;
1539 struct lu_object *obj;
1540
8210132b 1541 lov = kmem_cache_zalloc(lov_object_kmem, GFP_NOFS);
00697c43 1542 if (lov) {
d7e09d03
PT
1543 obj = lov2lu(lov);
1544 lu_object_init(obj, NULL, dev);
1545 lov->lo_cl.co_ops = &lov_ops;
1546 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1547 /*
1548 * object io operation vector (cl_object::co_iop) is installed
1549 * later in lov_object_init(), as different vectors are used
1550 * for object with different layouts.
1551 */
1552 obj->lo_ops = &lov_lu_obj_ops;
da5ecb4d 1553 } else {
d7e09d03 1554 obj = NULL;
da5ecb4d 1555 }
0a3bdb00 1556 return obj;
d7e09d03
PT
1557}
1558
a33fdc0d 1559struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
d7e09d03
PT
1560{
1561 struct lov_stripe_md *lsm = NULL;
1562
1563 lov_conf_freeze(lov);
00697c43 1564 if (lov->lo_lsm) {
d7e09d03
PT
1565 lsm = lsm_addref(lov->lo_lsm);
1566 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
f1564f16
OD
1567 lsm, atomic_read(&lsm->lsm_refc),
1568 lov->lo_layout_invalid, current);
d7e09d03
PT
1569 }
1570 lov_conf_thaw(lov);
1571 return lsm;
1572}
1573
d7e09d03
PT
1574int lov_read_and_clear_async_rc(struct cl_object *clob)
1575{
1576 struct lu_object *luobj;
1577 int rc = 0;
d7e09d03
PT
1578
1579 luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1580 &lov_device_type);
00697c43 1581 if (luobj) {
d7e09d03
PT
1582 struct lov_object *lov = lu2lov(luobj);
1583
1584 lov_conf_freeze(lov);
1585 switch (lov->lo_type) {
1586 case LLT_RAID0: {
1587 struct lov_stripe_md *lsm;
1588 int i;
1589
1590 lsm = lov->lo_lsm;
d7e09d03
PT
1591 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1592 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
397632e4
YS
1593
1594 if (lov_oinfo_is_dummy(loi))
1595 continue;
1596
d7e09d03
PT
1597 if (loi->loi_ar.ar_rc && !rc)
1598 rc = loi->loi_ar.ar_rc;
1599 loi->loi_ar.ar_rc = 0;
1600 }
1601 }
5dd16419 1602 case LLT_RELEASED:
d7e09d03
PT
1603 case LLT_EMPTY:
1604 break;
1605 default:
1606 LBUG();
1607 }
1608 lov_conf_thaw(lov);
1609 }
0a3bdb00 1610 return rc;
d7e09d03
PT
1611}
1612EXPORT_SYMBOL(lov_read_and_clear_async_rc);
1613
1614/** @} lov */