4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * cl code shared between vvp and liblustre (and other Lustre clients in the
39 * Author: Nikita Danilov <nikita.danilov@sun.com>
42 #define DEBUG_SUBSYSTEM S_LLITE
44 #include "../../include/linux/libcfs/libcfs.h"
45 # include <linux/fs.h>
46 # include <linux/sched.h>
47 # include <linux/mm.h>
48 # include <linux/quotaops.h>
49 # include <linux/highmem.h>
50 # include <linux/pagemap.h>
51 # include <linux/rbtree.h>
54 #include <obd_support.h>
55 #include <lustre_fid.h>
56 #include <lustre_lite.h>
57 #include <lustre_dlm.h>
58 #include <lustre_ver.h>
59 #include <lustre_mdc.h>
60 #include <cl_object.h>
64 #include "../llite/llite_internal.h"
66 static const struct cl_req_operations ccc_req_ops
;
69 * ccc_ prefix stands for "Common Client Code".
72 static struct kmem_cache
*ccc_lock_kmem
;
73 static struct kmem_cache
*ccc_object_kmem
;
74 static struct kmem_cache
*ccc_thread_kmem
;
75 static struct kmem_cache
*ccc_session_kmem
;
76 static struct kmem_cache
*ccc_req_kmem
;
78 static struct lu_kmem_descr ccc_caches
[] = {
80 .ckd_cache
= &ccc_lock_kmem
,
81 .ckd_name
= "ccc_lock_kmem",
82 .ckd_size
= sizeof(struct ccc_lock
)
85 .ckd_cache
= &ccc_object_kmem
,
86 .ckd_name
= "ccc_object_kmem",
87 .ckd_size
= sizeof(struct ccc_object
)
90 .ckd_cache
= &ccc_thread_kmem
,
91 .ckd_name
= "ccc_thread_kmem",
92 .ckd_size
= sizeof(struct ccc_thread_info
),
95 .ckd_cache
= &ccc_session_kmem
,
96 .ckd_name
= "ccc_session_kmem",
97 .ckd_size
= sizeof(struct ccc_session
)
100 .ckd_cache
= &ccc_req_kmem
,
101 .ckd_name
= "ccc_req_kmem",
102 .ckd_size
= sizeof(struct ccc_req
)
109 /*****************************************************************************
111 * Vvp device and device type functions.
115 void *ccc_key_init(const struct lu_context
*ctx
, struct lu_context_key
*key
)
117 struct ccc_thread_info
*info
;
119 OBD_SLAB_ALLOC_PTR_GFP(info
, ccc_thread_kmem
, GFP_NOFS
);
121 info
= ERR_PTR(-ENOMEM
);
125 void ccc_key_fini(const struct lu_context
*ctx
,
126 struct lu_context_key
*key
, void *data
)
128 struct ccc_thread_info
*info
= data
;
130 OBD_SLAB_FREE_PTR(info
, ccc_thread_kmem
);
133 void *ccc_session_key_init(const struct lu_context
*ctx
,
134 struct lu_context_key
*key
)
136 struct ccc_session
*session
;
138 OBD_SLAB_ALLOC_PTR_GFP(session
, ccc_session_kmem
, GFP_NOFS
);
140 session
= ERR_PTR(-ENOMEM
);
144 void ccc_session_key_fini(const struct lu_context
*ctx
,
145 struct lu_context_key
*key
, void *data
)
147 struct ccc_session
*session
= data
;
149 OBD_SLAB_FREE_PTR(session
, ccc_session_kmem
);
152 struct lu_context_key ccc_key
= {
153 .lct_tags
= LCT_CL_THREAD
,
154 .lct_init
= ccc_key_init
,
155 .lct_fini
= ccc_key_fini
158 struct lu_context_key ccc_session_key
= {
159 .lct_tags
= LCT_SESSION
,
160 .lct_init
= ccc_session_key_init
,
161 .lct_fini
= ccc_session_key_fini
165 /* type constructor/destructor: ccc_type_{init,fini,start,stop}(). */
166 /* LU_TYPE_INIT_FINI(ccc, &ccc_key, &ccc_session_key); */
168 int ccc_device_init(const struct lu_env
*env
, struct lu_device
*d
,
169 const char *name
, struct lu_device
*next
)
171 struct ccc_device
*vdv
;
175 vdv
->cdv_next
= lu2cl_dev(next
);
177 LASSERT(d
->ld_site
!= NULL
&& next
->ld_type
!= NULL
);
178 next
->ld_site
= d
->ld_site
;
179 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(
180 env
, next
, next
->ld_type
->ldt_name
, NULL
);
183 lu_ref_add(&next
->ld_reference
, "lu-stack", &lu_site_init
);
188 struct lu_device
*ccc_device_fini(const struct lu_env
*env
,
191 return cl2lu_dev(lu2ccc_dev(d
)->cdv_next
);
194 struct lu_device
*ccc_device_alloc(const struct lu_env
*env
,
195 struct lu_device_type
*t
,
196 struct lustre_cfg
*cfg
,
197 const struct lu_device_operations
*luops
,
198 const struct cl_device_operations
*clops
)
200 struct ccc_device
*vdv
;
201 struct lu_device
*lud
;
202 struct cl_site
*site
;
207 return ERR_PTR(-ENOMEM
);
209 lud
= &vdv
->cdv_cl
.cd_lu_dev
;
210 cl_device_init(&vdv
->cdv_cl
, t
);
211 ccc2lu_dev(vdv
)->ld_ops
= luops
;
212 vdv
->cdv_cl
.cd_ops
= clops
;
216 rc
= cl_site_init(site
, &vdv
->cdv_cl
);
218 rc
= lu_site_init_finish(&site
->cs_lu
);
220 LASSERT(lud
->ld_site
== NULL
);
221 CERROR("Cannot init lu_site, rc %d.\n", rc
);
227 ccc_device_free(env
, lud
);
233 struct lu_device
*ccc_device_free(const struct lu_env
*env
,
236 struct ccc_device
*vdv
= lu2ccc_dev(d
);
237 struct cl_site
*site
= lu2cl_site(d
->ld_site
);
238 struct lu_device
*next
= cl2lu_dev(vdv
->cdv_next
);
240 if (d
->ld_site
!= NULL
) {
244 cl_device_fini(lu2cl_dev(d
));
249 int ccc_req_init(const struct lu_env
*env
, struct cl_device
*dev
,
255 OBD_SLAB_ALLOC_PTR_GFP(vrq
, ccc_req_kmem
, GFP_NOFS
);
257 cl_req_slice_add(req
, &vrq
->crq_cl
, dev
, &ccc_req_ops
);
265 * An `emergency' environment used by ccc_inode_fini() when cl_env_get()
266 * fails. Access to this environment is serialized by ccc_inode_fini_guard
269 static struct lu_env
*ccc_inode_fini_env
;
272 * A mutex serializing calls to slp_inode_fini() under extreme memory
273 * pressure, when environments cannot be allocated.
275 static DEFINE_MUTEX(ccc_inode_fini_guard
);
276 static int dummy_refcheck
;
278 int ccc_global_init(struct lu_device_type
*device_type
)
282 result
= lu_kmem_init(ccc_caches
);
286 result
= lu_device_type_init(device_type
);
290 ccc_inode_fini_env
= cl_env_alloc(&dummy_refcheck
,
291 LCT_REMEMBER
|LCT_NOREF
);
292 if (IS_ERR(ccc_inode_fini_env
)) {
293 result
= PTR_ERR(ccc_inode_fini_env
);
297 ccc_inode_fini_env
->le_ctx
.lc_cookie
= 0x4;
300 lu_device_type_fini(device_type
);
302 lu_kmem_fini(ccc_caches
);
306 void ccc_global_fini(struct lu_device_type
*device_type
)
308 if (ccc_inode_fini_env
!= NULL
) {
309 cl_env_put(ccc_inode_fini_env
, &dummy_refcheck
);
310 ccc_inode_fini_env
= NULL
;
312 lu_device_type_fini(device_type
);
313 lu_kmem_fini(ccc_caches
);
316 /*****************************************************************************
322 struct lu_object
*ccc_object_alloc(const struct lu_env
*env
,
323 const struct lu_object_header
*unused
,
324 struct lu_device
*dev
,
325 const struct cl_object_operations
*clops
,
326 const struct lu_object_operations
*luops
)
328 struct ccc_object
*vob
;
329 struct lu_object
*obj
;
331 OBD_SLAB_ALLOC_PTR_GFP(vob
, ccc_object_kmem
, GFP_NOFS
);
333 struct cl_object_header
*hdr
;
336 hdr
= &vob
->cob_header
;
337 cl_object_header_init(hdr
);
338 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
339 lu_object_add_top(&hdr
->coh_lu
, obj
);
341 vob
->cob_cl
.co_ops
= clops
;
348 int ccc_object_init0(const struct lu_env
*env
,
349 struct ccc_object
*vob
,
350 const struct cl_object_conf
*conf
)
352 vob
->cob_inode
= conf
->coc_inode
;
353 vob
->cob_transient_pages
= 0;
354 cl_object_page_init(&vob
->cob_cl
, sizeof(struct ccc_page
));
358 int ccc_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
359 const struct lu_object_conf
*conf
)
361 struct ccc_device
*dev
= lu2ccc_dev(obj
->lo_dev
);
362 struct ccc_object
*vob
= lu2ccc(obj
);
363 struct lu_object
*below
;
364 struct lu_device
*under
;
367 under
= &dev
->cdv_next
->cd_lu_dev
;
368 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
, under
);
370 const struct cl_object_conf
*cconf
;
372 cconf
= lu2cl_conf(conf
);
373 INIT_LIST_HEAD(&vob
->cob_pending_list
);
374 lu_object_add(obj
, below
);
375 result
= ccc_object_init0(env
, vob
, cconf
);
381 void ccc_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
383 struct ccc_object
*vob
= lu2ccc(obj
);
386 lu_object_header_fini(obj
->lo_header
);
387 OBD_SLAB_FREE_PTR(vob
, ccc_object_kmem
);
390 int ccc_lock_init(const struct lu_env
*env
,
391 struct cl_object
*obj
, struct cl_lock
*lock
,
392 const struct cl_io
*unused
,
393 const struct cl_lock_operations
*lkops
)
395 struct ccc_lock
*clk
;
398 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
400 OBD_SLAB_ALLOC_PTR_GFP(clk
, ccc_lock_kmem
, GFP_NOFS
);
402 cl_lock_slice_add(lock
, &clk
->clk_cl
, obj
, lkops
);
409 int ccc_attr_set(const struct lu_env
*env
, struct cl_object
*obj
,
410 const struct cl_attr
*attr
, unsigned valid
)
415 int ccc_object_glimpse(const struct lu_env
*env
,
416 const struct cl_object
*obj
, struct ost_lvb
*lvb
)
418 struct inode
*inode
= ccc_object_inode(obj
);
420 lvb
->lvb_mtime
= cl_inode_mtime(inode
);
421 lvb
->lvb_atime
= cl_inode_atime(inode
);
422 lvb
->lvb_ctime
= cl_inode_ctime(inode
);
424 * LU-417: Add dirty pages block count lest i_blocks reports 0, some
425 * "cp" or "tar" on remote node may think it's a completely sparse file
428 if (lvb
->lvb_size
> 0 && lvb
->lvb_blocks
== 0)
429 lvb
->lvb_blocks
= dirty_cnt(inode
);
435 int ccc_conf_set(const struct lu_env
*env
, struct cl_object
*obj
,
436 const struct cl_object_conf
*conf
)
438 /* TODO: destroy all pages attached to this object. */
442 static void ccc_object_size_lock(struct cl_object
*obj
)
444 struct inode
*inode
= ccc_object_inode(obj
);
446 cl_isize_lock(inode
);
447 cl_object_attr_lock(obj
);
450 static void ccc_object_size_unlock(struct cl_object
*obj
)
452 struct inode
*inode
= ccc_object_inode(obj
);
454 cl_object_attr_unlock(obj
);
455 cl_isize_unlock(inode
);
458 /*****************************************************************************
464 struct page
*ccc_page_vmpage(const struct lu_env
*env
,
465 const struct cl_page_slice
*slice
)
467 return cl2vm_page(slice
);
470 int ccc_page_is_under_lock(const struct lu_env
*env
,
471 const struct cl_page_slice
*slice
,
474 struct ccc_io
*cio
= ccc_env_io(env
);
475 struct cl_lock_descr
*desc
= &ccc_env_info(env
)->cti_descr
;
476 struct cl_page
*page
= slice
->cpl_page
;
480 if (io
->ci_type
== CIT_READ
|| io
->ci_type
== CIT_WRITE
||
481 io
->ci_type
== CIT_FAULT
) {
482 if (cio
->cui_fd
->fd_flags
& LL_FILE_GROUP_LOCKED
)
485 desc
->cld_start
= page
->cp_index
;
486 desc
->cld_end
= page
->cp_index
;
487 desc
->cld_obj
= page
->cp_obj
;
488 desc
->cld_mode
= CLM_READ
;
489 result
= cl_queue_match(&io
->ci_lockset
.cls_done
,
497 int ccc_fail(const struct lu_env
*env
, const struct cl_page_slice
*slice
)
506 void ccc_transient_page_verify(const struct cl_page
*page
)
510 int ccc_transient_page_own(const struct lu_env
*env
,
511 const struct cl_page_slice
*slice
,
512 struct cl_io
*unused
,
515 ccc_transient_page_verify(slice
->cpl_page
);
519 void ccc_transient_page_assume(const struct lu_env
*env
,
520 const struct cl_page_slice
*slice
,
521 struct cl_io
*unused
)
523 ccc_transient_page_verify(slice
->cpl_page
);
526 void ccc_transient_page_unassume(const struct lu_env
*env
,
527 const struct cl_page_slice
*slice
,
528 struct cl_io
*unused
)
530 ccc_transient_page_verify(slice
->cpl_page
);
533 void ccc_transient_page_disown(const struct lu_env
*env
,
534 const struct cl_page_slice
*slice
,
535 struct cl_io
*unused
)
537 ccc_transient_page_verify(slice
->cpl_page
);
540 void ccc_transient_page_discard(const struct lu_env
*env
,
541 const struct cl_page_slice
*slice
,
542 struct cl_io
*unused
)
544 struct cl_page
*page
= slice
->cpl_page
;
546 ccc_transient_page_verify(slice
->cpl_page
);
549 * For transient pages, remove it from the radix tree.
551 cl_page_delete(env
, page
);
554 int ccc_transient_page_prep(const struct lu_env
*env
,
555 const struct cl_page_slice
*slice
,
556 struct cl_io
*unused
)
558 /* transient page should always be sent. */
562 /*****************************************************************************
568 void ccc_lock_delete(const struct lu_env
*env
,
569 const struct cl_lock_slice
*slice
)
571 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
574 void ccc_lock_fini(const struct lu_env
*env
, struct cl_lock_slice
*slice
)
576 struct ccc_lock
*clk
= cl2ccc_lock(slice
);
578 OBD_SLAB_FREE_PTR(clk
, ccc_lock_kmem
);
581 int ccc_lock_enqueue(const struct lu_env
*env
,
582 const struct cl_lock_slice
*slice
,
583 struct cl_io
*unused
, __u32 enqflags
)
585 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
589 int ccc_lock_unuse(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
591 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
595 int ccc_lock_wait(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
597 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
602 * Implementation of cl_lock_operations::clo_fits_into() methods for ccc
603 * layer. This function is executed every time io finds an existing lock in
604 * the lock cache while creating new lock. This function has to decide whether
605 * cached lock "fits" into io.
607 * \param slice lock to be checked
608 * \param io IO that wants a lock.
610 * \see lov_lock_fits_into().
612 int ccc_lock_fits_into(const struct lu_env
*env
,
613 const struct cl_lock_slice
*slice
,
614 const struct cl_lock_descr
*need
,
615 const struct cl_io
*io
)
617 const struct cl_lock
*lock
= slice
->cls_lock
;
618 const struct cl_lock_descr
*descr
= &lock
->cll_descr
;
619 const struct ccc_io
*cio
= ccc_env_io(env
);
623 * Work around DLM peculiarity: it assumes that glimpse
624 * (LDLM_FL_HAS_INTENT) lock is always LCK_PR, and returns reads lock
625 * when asked for LCK_PW lock with LDLM_FL_HAS_INTENT flag set. Make
626 * sure that glimpse doesn't get CLM_WRITE top-lock, so that it
627 * doesn't enqueue CLM_WRITE sub-locks.
629 if (cio
->cui_glimpse
)
630 result
= descr
->cld_mode
!= CLM_WRITE
;
633 * Also, don't match incomplete write locks for read, otherwise read
634 * would enqueue missing sub-locks in the write mode.
636 else if (need
->cld_mode
!= descr
->cld_mode
)
637 result
= lock
->cll_state
>= CLS_ENQUEUED
;
644 * Implements cl_lock_operations::clo_state() method for ccc layer, invoked
645 * whenever lock state changes. Transfers object attributes, that might be
646 * updated as a result of lock acquiring into inode.
648 void ccc_lock_state(const struct lu_env
*env
,
649 const struct cl_lock_slice
*slice
,
650 enum cl_lock_state state
)
652 struct cl_lock
*lock
= slice
->cls_lock
;
655 * Refresh inode attributes when the lock is moving into CLS_HELD
656 * state, and only when this is a result of real enqueue, rather than
657 * of finding lock in the cache.
659 if (state
== CLS_HELD
&& lock
->cll_state
< CLS_HELD
) {
660 struct cl_object
*obj
;
663 obj
= slice
->cls_obj
;
664 inode
= ccc_object_inode(obj
);
666 /* vmtruncate() sets the i_size
667 * under both a DLM lock and the
668 * ll_inode_size_lock(). If we don't get the
669 * ll_inode_size_lock() here we can match the DLM lock and
670 * reset i_size. generic_file_write can then trust the
671 * stale i_size when doing appending writes and effectively
672 * cancel the result of the truncate. Getting the
673 * ll_inode_size_lock() after the enqueue maintains the DLM
674 * -> ll_inode_size_lock() acquiring order. */
675 if (lock
->cll_descr
.cld_start
== 0 &&
676 lock
->cll_descr
.cld_end
== CL_PAGE_EOF
)
677 cl_merge_lvb(env
, inode
);
681 /*****************************************************************************
687 void ccc_io_fini(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
689 struct cl_io
*io
= ios
->cis_io
;
691 CLOBINVRNT(env
, io
->ci_obj
, ccc_object_invariant(io
->ci_obj
));
694 int ccc_io_one_lock_index(const struct lu_env
*env
, struct cl_io
*io
,
695 __u32 enqflags
, enum cl_lock_mode mode
,
696 pgoff_t start
, pgoff_t end
)
698 struct ccc_io
*cio
= ccc_env_io(env
);
699 struct cl_lock_descr
*descr
= &cio
->cui_link
.cill_descr
;
700 struct cl_object
*obj
= io
->ci_obj
;
702 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
704 CDEBUG(D_VFSTRACE
, "lock: %d [%lu, %lu]\n", mode
, start
, end
);
706 memset(&cio
->cui_link
, 0, sizeof(cio
->cui_link
));
708 if (cio
->cui_fd
&& (cio
->cui_fd
->fd_flags
& LL_FILE_GROUP_LOCKED
)) {
709 descr
->cld_mode
= CLM_GROUP
;
710 descr
->cld_gid
= cio
->cui_fd
->fd_grouplock
.cg_gid
;
712 descr
->cld_mode
= mode
;
714 descr
->cld_obj
= obj
;
715 descr
->cld_start
= start
;
716 descr
->cld_end
= end
;
717 descr
->cld_enq_flags
= enqflags
;
719 cl_io_lock_add(env
, io
, &cio
->cui_link
);
723 void ccc_io_update_iov(const struct lu_env
*env
,
724 struct ccc_io
*cio
, struct cl_io
*io
)
726 size_t size
= io
->u
.ci_rw
.crw_count
;
728 if (!cl_is_normalio(env
, io
) || cio
->cui_iter
== NULL
)
731 iov_iter_truncate(cio
->cui_iter
, size
);
734 int ccc_io_one_lock(const struct lu_env
*env
, struct cl_io
*io
,
735 __u32 enqflags
, enum cl_lock_mode mode
,
736 loff_t start
, loff_t end
)
738 struct cl_object
*obj
= io
->ci_obj
;
740 return ccc_io_one_lock_index(env
, io
, enqflags
, mode
,
741 cl_index(obj
, start
), cl_index(obj
, end
));
744 void ccc_io_end(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
746 CLOBINVRNT(env
, ios
->cis_io
->ci_obj
,
747 ccc_object_invariant(ios
->cis_io
->ci_obj
));
750 void ccc_io_advance(const struct lu_env
*env
,
751 const struct cl_io_slice
*ios
,
754 struct ccc_io
*cio
= cl2ccc_io(env
, ios
);
755 struct cl_io
*io
= ios
->cis_io
;
756 struct cl_object
*obj
= ios
->cis_io
->ci_obj
;
758 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
760 if (!cl_is_normalio(env
, io
))
763 iov_iter_reexpand(cio
->cui_iter
, cio
->cui_tot_count
-= nob
);
767 * Helper function that if necessary adjusts file size (inode->i_size), when
768 * position at the offset \a pos is accessed. File size can be arbitrary stale
769 * on a Lustre client, but client at least knows KMS. If accessed area is
770 * inside [0, KMS], set file size to KMS, otherwise glimpse file size.
772 * Locking: cl_isize_lock is used to serialize changes to inode size and to
773 * protect consistency between inode size and cl_object
774 * attributes. cl_object_size_lock() protects consistency between cl_attr's of
775 * top-object and sub-objects.
777 int ccc_prep_size(const struct lu_env
*env
, struct cl_object
*obj
,
778 struct cl_io
*io
, loff_t start
, size_t count
, int *exceed
)
780 struct cl_attr
*attr
= ccc_env_thread_attr(env
);
781 struct inode
*inode
= ccc_object_inode(obj
);
782 loff_t pos
= start
+ count
- 1;
787 * Consistency guarantees: following possibilities exist for the
788 * relation between region being accessed and real file size at this
791 * (A): the region is completely inside of the file;
793 * (B-x): x bytes of region are inside of the file, the rest is
796 * (C): the region is completely outside of the file.
798 * This classification is stable under DLM lock already acquired by
799 * the caller, because to change the class, other client has to take
800 * DLM lock conflicting with our lock. Also, any updates to ->i_size
801 * by other threads on this client are serialized by
802 * ll_inode_size_lock(). This guarantees that short reads are handled
803 * correctly in the face of concurrent writes and truncates.
805 ccc_object_size_lock(obj
);
806 result
= cl_object_attr_get(env
, obj
, attr
);
811 * A glimpse is necessary to determine whether we
812 * return a short read (B) or some zeroes at the end
815 ccc_object_size_unlock(obj
);
816 result
= cl_glimpse_lock(env
, io
, inode
, obj
, 0);
817 if (result
== 0 && exceed
!= NULL
) {
818 /* If objective page index exceed end-of-file
819 * page index, return directly. Do not expect
820 * kernel will check such case correctly.
821 * linux-2.6.18-128.1.1 miss to do that.
823 loff_t size
= cl_isize_read(inode
);
824 loff_t cur_index
= start
>> PAGE_CACHE_SHIFT
;
825 loff_t size_index
= ((size
- 1) >> PAGE_CACHE_SHIFT
);
827 if ((size
== 0 && cur_index
!= 0) ||
828 size_index
< cur_index
)
834 * region is within kms and, hence, within real file
835 * size (A). We need to increase i_size to cover the
836 * read region so that generic_file_read() will do its
837 * job, but that doesn't mean the kms size is
838 * _correct_, it is only the _minimum_ size. If
839 * someone does a stat they will get the correct size
840 * which will always be >= the kms value here.
843 if (cl_isize_read(inode
) < kms
) {
844 cl_isize_write_nolock(inode
, kms
);
846 DFID
" updating i_size "LPU64
"\n",
847 PFID(lu_object_fid(&obj
->co_lu
)),
848 (__u64
)cl_isize_read(inode
));
853 ccc_object_size_unlock(obj
);
857 /*****************************************************************************
859 * Transfer operations.
863 void ccc_req_completion(const struct lu_env
*env
,
864 const struct cl_req_slice
*slice
, int ioret
)
869 cl_stats_tally(slice
->crs_dev
, slice
->crs_req
->crq_type
, ioret
);
871 vrq
= cl2ccc_req(slice
);
872 OBD_SLAB_FREE_PTR(vrq
, ccc_req_kmem
);
876 * Implementation of struct cl_req_operations::cro_attr_set() for ccc
877 * layer. ccc is responsible for
895 void ccc_req_attr_set(const struct lu_env
*env
,
896 const struct cl_req_slice
*slice
,
897 const struct cl_object
*obj
,
898 struct cl_req_attr
*attr
, obd_valid flags
)
902 obd_flag valid_flags
;
905 inode
= ccc_object_inode(obj
);
906 valid_flags
= OBD_MD_FLTYPE
;
908 if ((flags
& OBD_MD_FLOSSCAPA
) != 0) {
909 LASSERT(attr
->cra_capa
== NULL
);
910 attr
->cra_capa
= cl_capa_lookup(inode
,
911 slice
->crs_req
->crq_type
);
914 if (slice
->crs_req
->crq_type
== CRT_WRITE
) {
915 if (flags
& OBD_MD_FLEPOCH
) {
916 oa
->o_valid
|= OBD_MD_FLEPOCH
;
917 oa
->o_ioepoch
= cl_i2info(inode
)->lli_ioepoch
;
918 valid_flags
|= OBD_MD_FLMTIME
| OBD_MD_FLCTIME
|
919 OBD_MD_FLUID
| OBD_MD_FLGID
;
922 obdo_from_inode(oa
, inode
, valid_flags
& flags
);
923 obdo_set_parent_fid(oa
, &cl_i2info(inode
)->lli_fid
);
924 memcpy(attr
->cra_jobid
, cl_i2info(inode
)->lli_jobid
,
925 JOBSTATS_JOBID_SIZE
);
928 static const struct cl_req_operations ccc_req_ops
= {
929 .cro_attr_set
= ccc_req_attr_set
,
930 .cro_completion
= ccc_req_completion
933 int cl_setattr_ost(struct inode
*inode
, const struct iattr
*attr
,
934 struct obd_capa
*capa
)
941 env
= cl_env_get(&refcheck
);
945 io
= ccc_env_thread_io(env
);
946 io
->ci_obj
= cl_i2info(inode
)->lli_clob
;
948 io
->u
.ci_setattr
.sa_attr
.lvb_atime
= LTIME_S(attr
->ia_atime
);
949 io
->u
.ci_setattr
.sa_attr
.lvb_mtime
= LTIME_S(attr
->ia_mtime
);
950 io
->u
.ci_setattr
.sa_attr
.lvb_ctime
= LTIME_S(attr
->ia_ctime
);
951 io
->u
.ci_setattr
.sa_attr
.lvb_size
= attr
->ia_size
;
952 io
->u
.ci_setattr
.sa_valid
= attr
->ia_valid
;
953 io
->u
.ci_setattr
.sa_capa
= capa
;
956 if (cl_io_init(env
, io
, CIT_SETATTR
, io
->ci_obj
) == 0) {
957 struct ccc_io
*cio
= ccc_env_io(env
);
959 if (attr
->ia_valid
& ATTR_FILE
)
960 /* populate the file descriptor for ftruncate to honor
961 * group lock - see LU-787 */
962 cio
->cui_fd
= cl_iattr2fd(inode
, attr
);
964 result
= cl_io_loop(env
, io
);
966 result
= io
->ci_result
;
969 if (unlikely(io
->ci_need_restart
))
971 /* HSM import case: file is released, cannot be restored
972 * no need to fail except if restore registration failed
974 if (result
== -ENODATA
&& io
->ci_restore_needed
&&
975 io
->ci_result
!= -ENODATA
)
977 cl_env_put(env
, &refcheck
);
981 /*****************************************************************************
987 struct lu_device
*ccc2lu_dev(struct ccc_device
*vdv
)
989 return &vdv
->cdv_cl
.cd_lu_dev
;
992 struct ccc_device
*lu2ccc_dev(const struct lu_device
*d
)
994 return container_of0(d
, struct ccc_device
, cdv_cl
.cd_lu_dev
);
997 struct ccc_device
*cl2ccc_dev(const struct cl_device
*d
)
999 return container_of0(d
, struct ccc_device
, cdv_cl
);
1002 struct lu_object
*ccc2lu(struct ccc_object
*vob
)
1004 return &vob
->cob_cl
.co_lu
;
1007 struct ccc_object
*lu2ccc(const struct lu_object
*obj
)
1009 return container_of0(obj
, struct ccc_object
, cob_cl
.co_lu
);
1012 struct ccc_object
*cl2ccc(const struct cl_object
*obj
)
1014 return container_of0(obj
, struct ccc_object
, cob_cl
);
1017 struct ccc_lock
*cl2ccc_lock(const struct cl_lock_slice
*slice
)
1019 return container_of(slice
, struct ccc_lock
, clk_cl
);
1022 struct ccc_io
*cl2ccc_io(const struct lu_env
*env
,
1023 const struct cl_io_slice
*slice
)
1027 cio
= container_of(slice
, struct ccc_io
, cui_cl
);
1028 LASSERT(cio
== ccc_env_io(env
));
1032 struct ccc_req
*cl2ccc_req(const struct cl_req_slice
*slice
)
1034 return container_of0(slice
, struct ccc_req
, crq_cl
);
1037 struct page
*cl2vm_page(const struct cl_page_slice
*slice
)
1039 return cl2ccc_page(slice
)->cpg_page
;
1042 /*****************************************************************************
1047 int ccc_object_invariant(const struct cl_object
*obj
)
1049 struct inode
*inode
= ccc_object_inode(obj
);
1050 struct cl_inode_info
*lli
= cl_i2info(inode
);
1052 return (S_ISREG(cl_inode_mode(inode
)) ||
1053 /* i_mode of unlinked inode is zeroed. */
1054 cl_inode_mode(inode
) == 0) && lli
->lli_clob
== obj
;
1057 struct inode
*ccc_object_inode(const struct cl_object
*obj
)
1059 return cl2ccc(obj
)->cob_inode
;
1063 * Returns a pointer to cl_page associated with \a vmpage, without acquiring
1064 * additional reference to the resulting page. This is an unsafe version of
1065 * cl_vmpage_page() that can only be used under vmpage lock.
1067 struct cl_page
*ccc_vmpage_page_transient(struct page
*vmpage
)
1069 KLASSERT(PageLocked(vmpage
));
1070 return (struct cl_page
*)vmpage
->private;
1074 * Initialize or update CLIO structures for regular files when new
1075 * meta-data arrives from the server.
1077 * \param inode regular file inode
1078 * \param md new file metadata from MDS
1079 * - allocates cl_object if necessary,
1080 * - updated layout, if object was already here.
1082 int cl_file_inode_init(struct inode
*inode
, struct lustre_md
*md
)
1085 struct cl_inode_info
*lli
;
1086 struct cl_object
*clob
;
1087 struct lu_site
*site
;
1089 struct cl_object_conf conf
= {
1098 LASSERT(md
->body
->valid
& OBD_MD_FLID
);
1099 LASSERT(S_ISREG(cl_inode_mode(inode
)));
1101 env
= cl_env_get(&refcheck
);
1103 return PTR_ERR(env
);
1105 site
= cl_i2sbi(inode
)->ll_site
;
1106 lli
= cl_i2info(inode
);
1107 fid
= &lli
->lli_fid
;
1108 LASSERT(fid_is_sane(fid
));
1110 if (lli
->lli_clob
== NULL
) {
1111 /* clob is slave of inode, empty lli_clob means for new inode,
1112 * there is no clob in cache with the given fid, so it is
1113 * unnecessary to perform lookup-alloc-lookup-insert, just
1114 * alloc and insert directly. */
1115 LASSERT(inode
->i_state
& I_NEW
);
1116 conf
.coc_lu
.loc_flags
= LOC_F_NEW
;
1117 clob
= cl_object_find(env
, lu2cl_dev(site
->ls_top_dev
),
1119 if (!IS_ERR(clob
)) {
1121 * No locking is necessary, as new inode is
1122 * locked by I_NEW bit.
1124 lli
->lli_clob
= clob
;
1125 lli
->lli_has_smd
= lsm_has_objects(md
->lsm
);
1126 lu_object_ref_add(&clob
->co_lu
, "inode", inode
);
1128 result
= PTR_ERR(clob
);
1130 result
= cl_conf_set(env
, lli
->lli_clob
, &conf
);
1133 cl_env_put(env
, &refcheck
);
1136 CERROR("Failure to initialize cl object "DFID
": %d\n",
1142 * Wait for others drop their references of the object at first, then we drop
1143 * the last one, which will lead to the object be destroyed immediately.
1144 * Must be called after cl_object_kill() against this object.
1146 * The reason we want to do this is: destroying top object will wait for sub
1147 * objects being destroyed first, so we can't let bottom layer (e.g. from ASTs)
1148 * to initiate top object destroying which may deadlock. See bz22520.
1150 static void cl_object_put_last(struct lu_env
*env
, struct cl_object
*obj
)
1152 struct lu_object_header
*header
= obj
->co_lu
.lo_header
;
1153 wait_queue_t waiter
;
1155 if (unlikely(atomic_read(&header
->loh_ref
) != 1)) {
1156 struct lu_site
*site
= obj
->co_lu
.lo_dev
->ld_site
;
1157 struct lu_site_bkt_data
*bkt
;
1159 bkt
= lu_site_bkt_from_fid(site
, &header
->loh_fid
);
1161 init_waitqueue_entry(&waiter
, current
);
1162 add_wait_queue(&bkt
->lsb_marche_funebre
, &waiter
);
1165 set_current_state(TASK_UNINTERRUPTIBLE
);
1166 if (atomic_read(&header
->loh_ref
) == 1)
1171 set_current_state(TASK_RUNNING
);
1172 remove_wait_queue(&bkt
->lsb_marche_funebre
, &waiter
);
1175 cl_object_put(env
, obj
);
1178 void cl_inode_fini(struct inode
*inode
)
1181 struct cl_inode_info
*lli
= cl_i2info(inode
);
1182 struct cl_object
*clob
= lli
->lli_clob
;
1189 cookie
= cl_env_reenter();
1190 env
= cl_env_get(&refcheck
);
1191 emergency
= IS_ERR(env
);
1193 mutex_lock(&ccc_inode_fini_guard
);
1194 LASSERT(ccc_inode_fini_env
!= NULL
);
1195 cl_env_implant(ccc_inode_fini_env
, &refcheck
);
1196 env
= ccc_inode_fini_env
;
1199 * cl_object cache is a slave to inode cache (which, in turn
1200 * is a slave to dentry cache), don't keep cl_object in memory
1201 * when its master is evicted.
1203 cl_object_kill(env
, clob
);
1204 lu_object_ref_del(&clob
->co_lu
, "inode", inode
);
1205 cl_object_put_last(env
, clob
);
1206 lli
->lli_clob
= NULL
;
1208 cl_env_unplant(ccc_inode_fini_env
, &refcheck
);
1209 mutex_unlock(&ccc_inode_fini_guard
);
1211 cl_env_put(env
, &refcheck
);
1212 cl_env_reexit(cookie
);
1217 * return IF_* type for given lu_dirent entry.
1218 * IF_* flag shld be converted to particular OS file type in
1219 * platform llite module.
1221 __u16
ll_dirent_type_get(struct lu_dirent
*ent
)
1224 struct luda_type
*lt
;
1227 if (le32_to_cpu(ent
->lde_attrs
) & LUDA_TYPE
) {
1228 const unsigned align
= sizeof(struct luda_type
) - 1;
1230 len
= le16_to_cpu(ent
->lde_namelen
);
1231 len
= (len
+ align
) & ~align
;
1232 lt
= (void *)ent
->lde_name
+ len
;
1233 type
= IFTODT(le16_to_cpu(lt
->lt_type
));
1239 * build inode number from passed @fid */
1240 __u64
cl_fid_build_ino(const struct lu_fid
*fid
, int api32
)
1242 if (BITS_PER_LONG
== 32 || api32
)
1243 return fid_flatten32(fid
);
1245 return fid_flatten(fid
);
1249 * build inode generation from passed @fid. If our FID overflows the 32-bit
1250 * inode number then return a non-zero generation to distinguish them. */
1251 __u32
cl_fid_build_gen(const struct lu_fid
*fid
)
1255 if (fid_is_igif(fid
)) {
1256 gen
= lu_igif_gen(fid
);
1260 gen
= (fid_flatten(fid
) >> 32);
1264 /* lsm is unreliable after hsm implementation as layout can be changed at
1265 * any time. This is only to support old, non-clio-ized interfaces. It will
1266 * cause deadlock if clio operations are called with this extra layout refcount
1267 * because in case the layout changed during the IO, ll_layout_refresh() will
1268 * have to wait for the refcount to become zero to destroy the older layout.
1270 * Notice that the lsm returned by this function may not be valid unless called
1271 * inside layout lock - MDS_INODELOCK_LAYOUT. */
1272 struct lov_stripe_md
*ccc_inode_lsm_get(struct inode
*inode
)
1274 return lov_lsm_get(cl_i2info(inode
)->lli_clob
);
1277 inline void ccc_inode_lsm_put(struct inode
*inode
, struct lov_stripe_md
*lsm
)
1279 lov_lsm_put(cl_i2info(inode
)->lli_clob
, lsm
);