4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
50 #include "echo_internal.h"
52 /** \defgroup echo_client Echo Client
57 struct cl_device ed_cl
;
58 struct echo_client_obd
*ed_ec
;
60 struct cl_site ed_site_myself
;
61 struct cl_site
*ed_site
;
62 struct lu_device
*ed_next
;
67 struct cl_object eo_cl
;
68 struct cl_object_header eo_hdr
;
70 struct echo_device
*eo_dev
;
71 struct list_head eo_obj_chain
;
72 struct lov_stripe_md
*eo_lsm
;
77 struct echo_object_conf
{
78 struct cl_object_conf eoc_cl
;
79 struct lov_stripe_md
**eoc_md
;
83 struct cl_page_slice ep_cl
;
85 struct page
*ep_vmpage
;
89 struct cl_lock_slice el_cl
;
90 struct list_head el_chain
;
91 struct echo_object
*el_object
;
96 static int echo_client_setup(const struct lu_env
*env
,
97 struct obd_device
*obddev
,
98 struct lustre_cfg
*lcfg
);
99 static int echo_client_cleanup(struct obd_device
*obddev
);
101 /** \defgroup echo_helpers Helper functions
104 static inline struct echo_device
*cl2echo_dev(const struct cl_device
*dev
)
106 return container_of0(dev
, struct echo_device
, ed_cl
);
109 static inline struct cl_device
*echo_dev2cl(struct echo_device
*d
)
114 static inline struct echo_device
*obd2echo_dev(const struct obd_device
*obd
)
116 return cl2echo_dev(lu2cl_dev(obd
->obd_lu_dev
));
119 static inline struct cl_object
*echo_obj2cl(struct echo_object
*eco
)
124 static inline struct echo_object
*cl2echo_obj(const struct cl_object
*o
)
126 return container_of(o
, struct echo_object
, eo_cl
);
129 static inline struct echo_page
*cl2echo_page(const struct cl_page_slice
*s
)
131 return container_of(s
, struct echo_page
, ep_cl
);
134 static inline struct echo_lock
*cl2echo_lock(const struct cl_lock_slice
*s
)
136 return container_of(s
, struct echo_lock
, el_cl
);
139 static inline struct cl_lock
*echo_lock2cl(const struct echo_lock
*ecl
)
141 return ecl
->el_cl
.cls_lock
;
144 static struct lu_context_key echo_thread_key
;
145 static inline struct echo_thread_info
*echo_env_info(const struct lu_env
*env
)
147 struct echo_thread_info
*info
;
149 info
= lu_context_key_get(&env
->le_ctx
, &echo_thread_key
);
150 LASSERT(info
!= NULL
);
155 struct echo_object_conf
*cl2echo_conf(const struct cl_object_conf
*c
)
157 return container_of(c
, struct echo_object_conf
, eoc_cl
);
160 /** @} echo_helpers */
162 static struct echo_object
*cl_echo_object_find(struct echo_device
*d
,
163 struct lov_stripe_md
**lsm
);
164 static int cl_echo_object_put(struct echo_object
*eco
);
165 static int cl_echo_enqueue(struct echo_object
*eco
, u64 start
,
166 u64 end
, int mode
, __u64
*cookie
);
167 static int cl_echo_cancel(struct echo_device
*d
, __u64 cookie
);
168 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
169 struct page
**pages
, int npages
, int async
);
171 static struct echo_thread_info
*echo_env_info(const struct lu_env
*env
);
173 struct echo_thread_info
{
174 struct echo_object_conf eti_conf
;
175 struct lustre_md eti_md
;
177 struct cl_2queue eti_queue
;
179 struct cl_lock_descr eti_descr
;
180 struct lu_fid eti_fid
;
181 struct lu_fid eti_fid2
;
184 /* No session used right now */
185 struct echo_session_info
{
189 static struct kmem_cache
*echo_lock_kmem
;
190 static struct kmem_cache
*echo_object_kmem
;
191 static struct kmem_cache
*echo_thread_kmem
;
192 static struct kmem_cache
*echo_session_kmem
;
194 static struct lu_kmem_descr echo_caches
[] = {
196 .ckd_cache
= &echo_lock_kmem
,
197 .ckd_name
= "echo_lock_kmem",
198 .ckd_size
= sizeof(struct echo_lock
)
201 .ckd_cache
= &echo_object_kmem
,
202 .ckd_name
= "echo_object_kmem",
203 .ckd_size
= sizeof(struct echo_object
)
206 .ckd_cache
= &echo_thread_kmem
,
207 .ckd_name
= "echo_thread_kmem",
208 .ckd_size
= sizeof(struct echo_thread_info
)
211 .ckd_cache
= &echo_session_kmem
,
212 .ckd_name
= "echo_session_kmem",
213 .ckd_size
= sizeof(struct echo_session_info
)
220 /** \defgroup echo_page Page operations
222 * Echo page operations.
226 static struct page
*echo_page_vmpage(const struct lu_env
*env
,
227 const struct cl_page_slice
*slice
)
229 return cl2echo_page(slice
)->ep_vmpage
;
232 static int echo_page_own(const struct lu_env
*env
,
233 const struct cl_page_slice
*slice
,
234 struct cl_io
*io
, int nonblock
)
236 struct echo_page
*ep
= cl2echo_page(slice
);
239 mutex_lock(&ep
->ep_lock
);
240 else if (!mutex_trylock(&ep
->ep_lock
))
245 static void echo_page_disown(const struct lu_env
*env
,
246 const struct cl_page_slice
*slice
,
249 struct echo_page
*ep
= cl2echo_page(slice
);
251 LASSERT(mutex_is_locked(&ep
->ep_lock
));
252 mutex_unlock(&ep
->ep_lock
);
255 static void echo_page_discard(const struct lu_env
*env
,
256 const struct cl_page_slice
*slice
,
257 struct cl_io
*unused
)
259 cl_page_delete(env
, slice
->cpl_page
);
262 static int echo_page_is_vmlocked(const struct lu_env
*env
,
263 const struct cl_page_slice
*slice
)
265 if (mutex_is_locked(&cl2echo_page(slice
)->ep_lock
))
270 static void echo_page_completion(const struct lu_env
*env
,
271 const struct cl_page_slice
*slice
,
274 LASSERT(slice
->cpl_page
->cp_sync_io
!= NULL
);
277 static void echo_page_fini(const struct lu_env
*env
,
278 struct cl_page_slice
*slice
)
280 struct echo_page
*ep
= cl2echo_page(slice
);
281 struct echo_object
*eco
= cl2echo_obj(slice
->cpl_obj
);
282 struct page
*vmpage
= ep
->ep_vmpage
;
284 atomic_dec(&eco
->eo_npages
);
285 page_cache_release(vmpage
);
288 static int echo_page_prep(const struct lu_env
*env
,
289 const struct cl_page_slice
*slice
,
290 struct cl_io
*unused
)
295 static int echo_page_print(const struct lu_env
*env
,
296 const struct cl_page_slice
*slice
,
297 void *cookie
, lu_printer_t printer
)
299 struct echo_page
*ep
= cl2echo_page(slice
);
301 (*printer
)(env
, cookie
, LUSTRE_ECHO_CLIENT_NAME
"-page@%p %d vm@%p\n",
302 ep
, mutex_is_locked(&ep
->ep_lock
), ep
->ep_vmpage
);
306 static const struct cl_page_operations echo_page_ops
= {
307 .cpo_own
= echo_page_own
,
308 .cpo_disown
= echo_page_disown
,
309 .cpo_discard
= echo_page_discard
,
310 .cpo_vmpage
= echo_page_vmpage
,
311 .cpo_fini
= echo_page_fini
,
312 .cpo_print
= echo_page_print
,
313 .cpo_is_vmlocked
= echo_page_is_vmlocked
,
316 .cpo_prep
= echo_page_prep
,
317 .cpo_completion
= echo_page_completion
,
320 .cpo_prep
= echo_page_prep
,
321 .cpo_completion
= echo_page_completion
,
328 /** \defgroup echo_lock Locking
330 * echo lock operations
334 static void echo_lock_fini(const struct lu_env
*env
,
335 struct cl_lock_slice
*slice
)
337 struct echo_lock
*ecl
= cl2echo_lock(slice
);
339 LASSERT(list_empty(&ecl
->el_chain
));
340 kmem_cache_free(echo_lock_kmem
, ecl
);
343 static void echo_lock_delete(const struct lu_env
*env
,
344 const struct cl_lock_slice
*slice
)
346 struct echo_lock
*ecl
= cl2echo_lock(slice
);
348 LASSERT(list_empty(&ecl
->el_chain
));
351 static int echo_lock_fits_into(const struct lu_env
*env
,
352 const struct cl_lock_slice
*slice
,
353 const struct cl_lock_descr
*need
,
354 const struct cl_io
*unused
)
359 static struct cl_lock_operations echo_lock_ops
= {
360 .clo_fini
= echo_lock_fini
,
361 .clo_delete
= echo_lock_delete
,
362 .clo_fits_into
= echo_lock_fits_into
367 /** \defgroup echo_cl_ops cl_object operations
369 * operations for cl_object
373 static int echo_page_init(const struct lu_env
*env
, struct cl_object
*obj
,
374 struct cl_page
*page
, struct page
*vmpage
)
376 struct echo_page
*ep
= cl_object_page_slice(obj
, page
);
377 struct echo_object
*eco
= cl2echo_obj(obj
);
379 ep
->ep_vmpage
= vmpage
;
380 page_cache_get(vmpage
);
381 mutex_init(&ep
->ep_lock
);
382 cl_page_slice_add(page
, &ep
->ep_cl
, obj
, &echo_page_ops
);
383 atomic_inc(&eco
->eo_npages
);
387 static int echo_io_init(const struct lu_env
*env
, struct cl_object
*obj
,
393 static int echo_lock_init(const struct lu_env
*env
,
394 struct cl_object
*obj
, struct cl_lock
*lock
,
395 const struct cl_io
*unused
)
397 struct echo_lock
*el
;
399 el
= kmem_cache_alloc(echo_lock_kmem
, GFP_NOFS
| __GFP_ZERO
);
401 cl_lock_slice_add(lock
, &el
->el_cl
, obj
, &echo_lock_ops
);
402 el
->el_object
= cl2echo_obj(obj
);
403 INIT_LIST_HEAD(&el
->el_chain
);
404 atomic_set(&el
->el_refcount
, 0);
406 return el
== NULL
? -ENOMEM
: 0;
409 static int echo_conf_set(const struct lu_env
*env
, struct cl_object
*obj
,
410 const struct cl_object_conf
*conf
)
415 static const struct cl_object_operations echo_cl_obj_ops
= {
416 .coo_page_init
= echo_page_init
,
417 .coo_lock_init
= echo_lock_init
,
418 .coo_io_init
= echo_io_init
,
419 .coo_conf_set
= echo_conf_set
422 /** @} echo_cl_ops */
424 /** \defgroup echo_lu_ops lu_object operations
426 * operations for echo lu object.
430 static int echo_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
431 const struct lu_object_conf
*conf
)
433 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(obj
->lo_dev
));
434 struct echo_client_obd
*ec
= ed
->ed_ec
;
435 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
436 const struct cl_object_conf
*cconf
;
437 struct echo_object_conf
*econf
;
440 struct lu_object
*below
;
441 struct lu_device
*under
;
444 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
,
448 lu_object_add(obj
, below
);
451 cconf
= lu2cl_conf(conf
);
452 econf
= cl2echo_conf(cconf
);
454 LASSERT(econf
->eoc_md
);
455 eco
->eo_lsm
= *econf
->eoc_md
;
456 /* clear the lsm pointer so that it won't get freed. */
457 *econf
->eoc_md
= NULL
;
460 atomic_set(&eco
->eo_npages
, 0);
461 cl_object_page_init(lu2cl(obj
), sizeof(struct echo_page
));
463 spin_lock(&ec
->ec_lock
);
464 list_add_tail(&eco
->eo_obj_chain
, &ec
->ec_objects
);
465 spin_unlock(&ec
->ec_lock
);
470 /* taken from osc_unpackmd() */
471 static int echo_alloc_memmd(struct echo_device
*ed
,
472 struct lov_stripe_md
**lsmp
)
476 /* If export is lov/osc then use their obd method */
477 if (ed
->ed_next
!= NULL
)
478 return obd_alloc_memmd(ed
->ed_ec
->ec_exp
, lsmp
);
479 /* OFD has no unpackmd method, do everything here */
480 lsm_size
= lov_stripe_md_size(1);
482 LASSERT(*lsmp
== NULL
);
483 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
487 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
), GFP_NOFS
);
488 if (!(*lsmp
)->lsm_oinfo
[0]) {
493 loi_init((*lsmp
)->lsm_oinfo
[0]);
494 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
495 ostid_set_seq_echo(&(*lsmp
)->lsm_oi
);
500 static int echo_free_memmd(struct echo_device
*ed
, struct lov_stripe_md
**lsmp
)
504 /* If export is lov/osc then use their obd method */
505 if (ed
->ed_next
!= NULL
)
506 return obd_free_memmd(ed
->ed_ec
->ec_exp
, lsmp
);
507 /* OFD has no unpackmd method, do everything here */
508 lsm_size
= lov_stripe_md_size(1);
510 LASSERT(*lsmp
!= NULL
);
511 kfree((*lsmp
)->lsm_oinfo
[0]);
517 static void echo_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
519 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
520 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
522 LASSERT(atomic_read(&eco
->eo_npages
) == 0);
524 spin_lock(&ec
->ec_lock
);
525 list_del_init(&eco
->eo_obj_chain
);
526 spin_unlock(&ec
->ec_lock
);
529 lu_object_header_fini(obj
->lo_header
);
532 echo_free_memmd(eco
->eo_dev
, &eco
->eo_lsm
);
533 kmem_cache_free(echo_object_kmem
, eco
);
536 static int echo_object_print(const struct lu_env
*env
, void *cookie
,
537 lu_printer_t p
, const struct lu_object
*o
)
539 struct echo_object
*obj
= cl2echo_obj(lu2cl(o
));
541 return (*p
)(env
, cookie
, "echoclient-object@%p", obj
);
544 static const struct lu_object_operations echo_lu_obj_ops
= {
545 .loo_object_init
= echo_object_init
,
546 .loo_object_delete
= NULL
,
547 .loo_object_release
= NULL
,
548 .loo_object_free
= echo_object_free
,
549 .loo_object_print
= echo_object_print
,
550 .loo_object_invariant
= NULL
553 /** @} echo_lu_ops */
555 /** \defgroup echo_lu_dev_ops lu_device operations
557 * Operations for echo lu device.
561 static struct lu_object
*echo_object_alloc(const struct lu_env
*env
,
562 const struct lu_object_header
*hdr
,
563 struct lu_device
*dev
)
565 struct echo_object
*eco
;
566 struct lu_object
*obj
= NULL
;
568 /* we're the top dev. */
569 LASSERT(hdr
== NULL
);
570 eco
= kmem_cache_alloc(echo_object_kmem
, GFP_NOFS
| __GFP_ZERO
);
572 struct cl_object_header
*hdr
= &eco
->eo_hdr
;
574 obj
= &echo_obj2cl(eco
)->co_lu
;
575 cl_object_header_init(hdr
);
576 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
577 lu_object_add_top(&hdr
->coh_lu
, obj
);
579 eco
->eo_cl
.co_ops
= &echo_cl_obj_ops
;
580 obj
->lo_ops
= &echo_lu_obj_ops
;
585 static struct lu_device_operations echo_device_lu_ops
= {
586 .ldo_object_alloc
= echo_object_alloc
,
589 /** @} echo_lu_dev_ops */
591 static struct cl_device_operations echo_device_cl_ops
= {
594 /** \defgroup echo_init Setup and teardown
596 * Init and fini functions for echo client.
600 static int echo_site_init(const struct lu_env
*env
, struct echo_device
*ed
)
602 struct cl_site
*site
= &ed
->ed_site_myself
;
605 /* initialize site */
606 rc
= cl_site_init(site
, &ed
->ed_cl
);
608 CERROR("Cannot initialize site for echo client(%d)\n", rc
);
612 rc
= lu_site_init_finish(&site
->cs_lu
);
620 static void echo_site_fini(const struct lu_env
*env
, struct echo_device
*ed
)
623 cl_site_fini(ed
->ed_site
);
628 static void *echo_thread_key_init(const struct lu_context
*ctx
,
629 struct lu_context_key
*key
)
631 struct echo_thread_info
*info
;
633 info
= kmem_cache_alloc(echo_thread_kmem
, GFP_NOFS
| __GFP_ZERO
);
635 info
= ERR_PTR(-ENOMEM
);
639 static void echo_thread_key_fini(const struct lu_context
*ctx
,
640 struct lu_context_key
*key
, void *data
)
642 struct echo_thread_info
*info
= data
;
644 kmem_cache_free(echo_thread_kmem
, info
);
647 static void echo_thread_key_exit(const struct lu_context
*ctx
,
648 struct lu_context_key
*key
, void *data
)
652 static struct lu_context_key echo_thread_key
= {
653 .lct_tags
= LCT_CL_THREAD
,
654 .lct_init
= echo_thread_key_init
,
655 .lct_fini
= echo_thread_key_fini
,
656 .lct_exit
= echo_thread_key_exit
659 static void *echo_session_key_init(const struct lu_context
*ctx
,
660 struct lu_context_key
*key
)
662 struct echo_session_info
*session
;
664 session
= kmem_cache_alloc(echo_session_kmem
, GFP_NOFS
| __GFP_ZERO
);
666 session
= ERR_PTR(-ENOMEM
);
670 static void echo_session_key_fini(const struct lu_context
*ctx
,
671 struct lu_context_key
*key
, void *data
)
673 struct echo_session_info
*session
= data
;
675 kmem_cache_free(echo_session_kmem
, session
);
678 static void echo_session_key_exit(const struct lu_context
*ctx
,
679 struct lu_context_key
*key
, void *data
)
683 static struct lu_context_key echo_session_key
= {
684 .lct_tags
= LCT_SESSION
,
685 .lct_init
= echo_session_key_init
,
686 .lct_fini
= echo_session_key_fini
,
687 .lct_exit
= echo_session_key_exit
690 LU_TYPE_INIT_FINI(echo
, &echo_thread_key
, &echo_session_key
);
692 static struct lu_device
*echo_device_alloc(const struct lu_env
*env
,
693 struct lu_device_type
*t
,
694 struct lustre_cfg
*cfg
)
696 struct lu_device
*next
;
697 struct echo_device
*ed
;
698 struct cl_device
*cd
;
699 struct obd_device
*obd
= NULL
; /* to keep compiler happy */
700 struct obd_device
*tgt
;
701 const char *tgt_type_name
;
705 ed
= kzalloc(sizeof(*ed
), GFP_NOFS
);
713 rc
= cl_device_init(cd
, t
);
717 cd
->cd_lu_dev
.ld_ops
= &echo_device_lu_ops
;
718 cd
->cd_ops
= &echo_device_cl_ops
;
721 obd
= class_name2obd(lustre_cfg_string(cfg
, 0));
722 LASSERT(obd
!= NULL
);
723 LASSERT(env
!= NULL
);
725 tgt
= class_name2obd(lustre_cfg_string(cfg
, 1));
727 CERROR("Can not find tgt device %s\n",
728 lustre_cfg_string(cfg
, 1));
733 next
= tgt
->obd_lu_dev
;
734 if (!strcmp(tgt
->obd_type
->typ_name
, LUSTRE_MDT_NAME
)) {
735 CERROR("echo MDT client must be run on server\n");
740 rc
= echo_site_init(env
, ed
);
746 rc
= echo_client_setup(env
, obd
, cfg
);
750 ed
->ed_ec
= &obd
->u
.echo_client
;
753 /* if echo client is to be stacked upon ost device, the next is
754 * NULL since ost is not a clio device so far */
755 if (next
!= NULL
&& !lu_device_is_cl(next
))
758 tgt_type_name
= tgt
->obd_type
->typ_name
;
760 LASSERT(next
!= NULL
);
761 if (next
->ld_site
!= NULL
) {
766 next
->ld_site
= &ed
->ed_site
->cs_lu
;
767 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(env
, next
,
768 next
->ld_type
->ldt_name
,
773 /* Tricky case, I have to determine the obd type since
774 * CLIO uses the different parameters to initialize
775 * objects for lov & osc. */
776 if (strcmp(tgt_type_name
, LUSTRE_LOV_NAME
) == 0)
777 ed
->ed_next_islov
= 1;
779 LASSERT(strcmp(tgt_type_name
,
780 LUSTRE_OSC_NAME
) == 0);
782 LASSERT(strcmp(tgt_type_name
, LUSTRE_OST_NAME
) == 0);
786 return &cd
->cd_lu_dev
;
792 rc2
= echo_client_cleanup(obd
);
794 CERROR("Cleanup obd device %s error(%d)\n",
799 echo_site_fini(env
, ed
);
801 cl_device_fini(&ed
->ed_cl
);
811 static int echo_device_init(const struct lu_env
*env
, struct lu_device
*d
,
812 const char *name
, struct lu_device
*next
)
818 static struct lu_device
*echo_device_fini(const struct lu_env
*env
,
821 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
822 struct lu_device
*next
= ed
->ed_next
;
825 next
= next
->ld_type
->ldt_ops
->ldto_device_fini(env
, next
);
829 static void echo_lock_release(const struct lu_env
*env
,
830 struct echo_lock
*ecl
,
833 struct cl_lock
*clk
= echo_lock2cl(ecl
);
837 cl_lock_release(env
, clk
, "ec enqueue", ecl
->el_object
);
839 cl_lock_mutex_get(env
, clk
);
840 cl_lock_cancel(env
, clk
);
841 cl_lock_delete(env
, clk
);
842 cl_lock_mutex_put(env
, clk
);
844 cl_lock_put(env
, clk
);
847 static struct lu_device
*echo_device_free(const struct lu_env
*env
,
850 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
851 struct echo_client_obd
*ec
= ed
->ed_ec
;
852 struct echo_object
*eco
;
853 struct lu_device
*next
= ed
->ed_next
;
855 CDEBUG(D_INFO
, "echo device:%p is going to be freed, next = %p\n",
858 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
860 /* check if there are objects still alive.
861 * It shouldn't have any object because lu_site_purge would cleanup
862 * all of cached objects. Anyway, probably the echo device is being
863 * parallelly accessed.
865 spin_lock(&ec
->ec_lock
);
866 list_for_each_entry(eco
, &ec
->ec_objects
, eo_obj_chain
)
868 spin_unlock(&ec
->ec_lock
);
871 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
874 "Waiting for the reference of echo object to be dropped\n");
876 /* Wait for the last reference to be dropped. */
877 spin_lock(&ec
->ec_lock
);
878 while (!list_empty(&ec
->ec_objects
)) {
879 spin_unlock(&ec
->ec_lock
);
880 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
881 set_current_state(TASK_UNINTERRUPTIBLE
);
882 schedule_timeout(cfs_time_seconds(1));
883 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
884 spin_lock(&ec
->ec_lock
);
886 spin_unlock(&ec
->ec_lock
);
888 LASSERT(list_empty(&ec
->ec_locks
));
890 CDEBUG(D_INFO
, "No object exists, exiting...\n");
892 echo_client_cleanup(d
->ld_obd
);
895 next
= next
->ld_type
->ldt_ops
->ldto_device_free(env
, next
);
897 LASSERT(ed
->ed_site
== lu2cl_site(d
->ld_site
));
898 echo_site_fini(env
, ed
);
899 cl_device_fini(&ed
->ed_cl
);
905 static const struct lu_device_type_operations echo_device_type_ops
= {
906 .ldto_init
= echo_type_init
,
907 .ldto_fini
= echo_type_fini
,
909 .ldto_start
= echo_type_start
,
910 .ldto_stop
= echo_type_stop
,
912 .ldto_device_alloc
= echo_device_alloc
,
913 .ldto_device_free
= echo_device_free
,
914 .ldto_device_init
= echo_device_init
,
915 .ldto_device_fini
= echo_device_fini
918 static struct lu_device_type echo_device_type
= {
919 .ldt_tags
= LU_DEVICE_CL
,
920 .ldt_name
= LUSTRE_ECHO_CLIENT_NAME
,
921 .ldt_ops
= &echo_device_type_ops
,
922 .ldt_ctx_tags
= LCT_CL_THREAD
,
927 /** \defgroup echo_exports Exported operations
929 * exporting functions to echo client
934 /* Interfaces to echo client obd device */
935 static struct echo_object
*cl_echo_object_find(struct echo_device
*d
,
936 struct lov_stripe_md
**lsmp
)
939 struct echo_thread_info
*info
;
940 struct echo_object_conf
*conf
;
941 struct lov_stripe_md
*lsm
;
942 struct echo_object
*eco
;
943 struct cl_object
*obj
;
951 LASSERTF(ostid_id(&lsm
->lsm_oi
) != 0, DOSTID
"\n", POSTID(&lsm
->lsm_oi
));
952 LASSERTF(ostid_seq(&lsm
->lsm_oi
) == FID_SEQ_ECHO
, DOSTID
"\n",
953 POSTID(&lsm
->lsm_oi
));
955 /* Never return an object if the obd is to be freed. */
956 if (echo_dev2cl(d
)->cd_lu_dev
.ld_obd
->obd_stopping
)
957 return ERR_PTR(-ENODEV
);
959 env
= cl_env_get(&refcheck
);
963 info
= echo_env_info(env
);
964 conf
= &info
->eti_conf
;
966 if (!d
->ed_next_islov
) {
967 struct lov_oinfo
*oinfo
= lsm
->lsm_oinfo
[0];
969 LASSERT(oinfo
!= NULL
);
970 oinfo
->loi_oi
= lsm
->lsm_oi
;
971 conf
->eoc_cl
.u
.coc_oinfo
= oinfo
;
973 struct lustre_md
*md
;
976 memset(md
, 0, sizeof(*md
));
978 conf
->eoc_cl
.u
.coc_md
= md
;
983 fid
= &info
->eti_fid
;
984 rc
= ostid_to_fid(fid
, &lsm
->lsm_oi
, 0);
990 /* In the function below, .hs_keycmp resolves to
991 * lu_obj_hop_keycmp() */
992 /* coverity[overrun-buffer-val] */
993 obj
= cl_object_find(env
, echo_dev2cl(d
), fid
, &conf
->eoc_cl
);
999 eco
= cl2echo_obj(obj
);
1000 if (eco
->eo_deleted
) {
1001 cl_object_put(env
, obj
);
1002 eco
= ERR_PTR(-EAGAIN
);
1006 cl_env_put(env
, &refcheck
);
1010 static int cl_echo_object_put(struct echo_object
*eco
)
1013 struct cl_object
*obj
= echo_obj2cl(eco
);
1016 env
= cl_env_get(&refcheck
);
1018 return PTR_ERR(env
);
1020 /* an external function to kill an object? */
1021 if (eco
->eo_deleted
) {
1022 struct lu_object_header
*loh
= obj
->co_lu
.lo_header
;
1024 LASSERT(&eco
->eo_hdr
== luh2coh(loh
));
1025 set_bit(LU_OBJECT_HEARD_BANSHEE
, &loh
->loh_flags
);
1028 cl_object_put(env
, obj
);
1029 cl_env_put(env
, &refcheck
);
1033 static int cl_echo_enqueue0(struct lu_env
*env
, struct echo_object
*eco
,
1034 u64 start
, u64 end
, int mode
,
1035 __u64
*cookie
, __u32 enqflags
)
1038 struct cl_lock
*lck
;
1039 struct cl_object
*obj
;
1040 struct cl_lock_descr
*descr
;
1041 struct echo_thread_info
*info
;
1044 info
= echo_env_info(env
);
1046 descr
= &info
->eti_descr
;
1047 obj
= echo_obj2cl(eco
);
1049 descr
->cld_obj
= obj
;
1050 descr
->cld_start
= cl_index(obj
, start
);
1051 descr
->cld_end
= cl_index(obj
, end
);
1052 descr
->cld_mode
= mode
== LCK_PW
? CLM_WRITE
: CLM_READ
;
1053 descr
->cld_enq_flags
= enqflags
;
1056 lck
= cl_lock_request(env
, io
, descr
, "ec enqueue", eco
);
1058 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
1059 struct echo_lock
*el
;
1061 rc
= cl_wait(env
, lck
);
1063 el
= cl2echo_lock(cl_lock_at(lck
, &echo_device_type
));
1064 spin_lock(&ec
->ec_lock
);
1065 if (list_empty(&el
->el_chain
)) {
1066 list_add(&el
->el_chain
, &ec
->ec_locks
);
1067 el
->el_cookie
= ++ec
->ec_unique
;
1069 atomic_inc(&el
->el_refcount
);
1070 *cookie
= el
->el_cookie
;
1071 spin_unlock(&ec
->ec_lock
);
1073 cl_lock_release(env
, lck
, "ec enqueue", current
);
1079 static int cl_echo_enqueue(struct echo_object
*eco
, u64 start
, u64 end
,
1080 int mode
, __u64
*cookie
)
1082 struct echo_thread_info
*info
;
1088 env
= cl_env_get(&refcheck
);
1090 return PTR_ERR(env
);
1092 info
= echo_env_info(env
);
1095 io
->ci_ignore_layout
= 1;
1096 result
= cl_io_init(env
, io
, CIT_MISC
, echo_obj2cl(eco
));
1099 LASSERT(result
== 0);
1101 result
= cl_echo_enqueue0(env
, eco
, start
, end
, mode
, cookie
, 0);
1102 cl_io_fini(env
, io
);
1105 cl_env_put(env
, &refcheck
);
1109 static int cl_echo_cancel0(struct lu_env
*env
, struct echo_device
*ed
,
1112 struct echo_client_obd
*ec
= ed
->ed_ec
;
1113 struct echo_lock
*ecl
= NULL
;
1114 struct list_head
*el
;
1115 int found
= 0, still_used
= 0;
1117 LASSERT(ec
!= NULL
);
1118 spin_lock(&ec
->ec_lock
);
1119 list_for_each(el
, &ec
->ec_locks
) {
1120 ecl
= list_entry(el
, struct echo_lock
, el_chain
);
1121 CDEBUG(D_INFO
, "ecl: %p, cookie: %#llx\n", ecl
, ecl
->el_cookie
);
1122 found
= (ecl
->el_cookie
== cookie
);
1124 if (atomic_dec_and_test(&ecl
->el_refcount
))
1125 list_del_init(&ecl
->el_chain
);
1131 spin_unlock(&ec
->ec_lock
);
1136 echo_lock_release(env
, ecl
, still_used
);
1140 static int cl_echo_cancel(struct echo_device
*ed
, __u64 cookie
)
1146 env
= cl_env_get(&refcheck
);
1148 return PTR_ERR(env
);
1150 rc
= cl_echo_cancel0(env
, ed
, cookie
);
1152 cl_env_put(env
, &refcheck
);
1156 static int cl_echo_async_brw(const struct lu_env
*env
, struct cl_io
*io
,
1157 enum cl_req_type unused
, struct cl_2queue
*queue
)
1159 struct cl_page
*clp
;
1160 struct cl_page
*temp
;
1163 cl_page_list_for_each_safe(clp
, temp
, &queue
->c2_qin
) {
1166 rc
= cl_page_cache_add(env
, io
, clp
, CRT_WRITE
);
1169 result
= result
?: rc
;
1174 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
1175 struct page
**pages
, int npages
, int async
)
1178 struct echo_thread_info
*info
;
1179 struct cl_object
*obj
= echo_obj2cl(eco
);
1180 struct echo_device
*ed
= eco
->eo_dev
;
1181 struct cl_2queue
*queue
;
1183 struct cl_page
*clp
;
1184 struct lustre_handle lh
= { 0 };
1185 int page_size
= cl_page_size(obj
);
1190 LASSERT((offset
& ~CFS_PAGE_MASK
) == 0);
1191 LASSERT(ed
->ed_next
!= NULL
);
1192 env
= cl_env_get(&refcheck
);
1194 return PTR_ERR(env
);
1196 info
= echo_env_info(env
);
1198 queue
= &info
->eti_queue
;
1200 cl_2queue_init(queue
);
1202 io
->ci_ignore_layout
= 1;
1203 rc
= cl_io_init(env
, io
, CIT_MISC
, obj
);
1208 rc
= cl_echo_enqueue0(env
, eco
, offset
,
1209 offset
+ npages
* PAGE_CACHE_SIZE
- 1,
1210 rw
== READ
? LCK_PR
: LCK_PW
, &lh
.cookie
,
1215 for (i
= 0; i
< npages
; i
++) {
1217 clp
= cl_page_find(env
, obj
, cl_index(obj
, offset
),
1218 pages
[i
], CPT_TRANSIENT
);
1223 LASSERT(clp
->cp_type
== CPT_TRANSIENT
);
1225 rc
= cl_page_own(env
, io
, clp
);
1227 LASSERT(clp
->cp_state
== CPS_FREEING
);
1228 cl_page_put(env
, clp
);
1232 cl_2queue_add(queue
, clp
);
1234 /* drop the reference count for cl_page_find, so that the page
1235 * will be freed in cl_2queue_fini. */
1236 cl_page_put(env
, clp
);
1237 cl_page_clip(env
, clp
, 0, page_size
);
1239 offset
+= page_size
;
1243 enum cl_req_type typ
= rw
== READ
? CRT_READ
: CRT_WRITE
;
1245 async
= async
&& (typ
== CRT_WRITE
);
1247 rc
= cl_echo_async_brw(env
, io
, typ
, queue
);
1249 rc
= cl_io_submit_sync(env
, io
, typ
, queue
, 0);
1250 CDEBUG(D_INFO
, "echo_client %s write returns %d\n",
1251 async
? "async" : "sync", rc
);
1254 cl_echo_cancel0(env
, ed
, lh
.cookie
);
1256 cl_2queue_discard(env
, io
, queue
);
1257 cl_2queue_disown(env
, io
, queue
);
1258 cl_2queue_fini(env
, queue
);
1259 cl_io_fini(env
, io
);
1261 cl_env_put(env
, &refcheck
);
1265 /** @} echo_exports */
1267 static u64 last_object_id
;
1270 echo_copyout_lsm(struct lov_stripe_md
*lsm
, void *_ulsm
, int ulsm_nob
)
1272 struct lov_stripe_md
*ulsm
= _ulsm
;
1275 nob
= offsetof(struct lov_stripe_md
, lsm_oinfo
[lsm
->lsm_stripe_count
]);
1279 if (copy_to_user(ulsm
, lsm
, sizeof(*ulsm
)))
1282 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
1283 if (copy_to_user(ulsm
->lsm_oinfo
[i
], lsm
->lsm_oinfo
[i
],
1284 sizeof(lsm
->lsm_oinfo
[0])))
1291 echo_copyin_lsm(struct echo_device
*ed
, struct lov_stripe_md
*lsm
,
1292 void *ulsm
, int ulsm_nob
)
1294 struct echo_client_obd
*ec
= ed
->ed_ec
;
1297 if (ulsm_nob
< sizeof(*lsm
))
1300 if (copy_from_user(lsm
, ulsm
, sizeof(*lsm
)))
1303 if (lsm
->lsm_stripe_count
> ec
->ec_nstripes
||
1304 lsm
->lsm_magic
!= LOV_MAGIC
||
1305 (lsm
->lsm_stripe_size
& (~CFS_PAGE_MASK
)) != 0 ||
1306 ((__u64
)lsm
->lsm_stripe_size
* lsm
->lsm_stripe_count
> ~0UL))
1309 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
1310 if (copy_from_user(lsm
->lsm_oinfo
[i
],
1311 ((struct lov_stripe_md
*)ulsm
)-> \
1313 sizeof(lsm
->lsm_oinfo
[0])))
1319 static int echo_create_object(const struct lu_env
*env
, struct echo_device
*ed
,
1320 int on_target
, struct obdo
*oa
, void *ulsm
,
1321 int ulsm_nob
, struct obd_trans_info
*oti
)
1323 struct echo_object
*eco
;
1324 struct echo_client_obd
*ec
= ed
->ed_ec
;
1325 struct lov_stripe_md
*lsm
= NULL
;
1329 if ((oa
->o_valid
& OBD_MD_FLID
) == 0 && /* no obj id */
1330 (on_target
|| /* set_stripe */
1331 ec
->ec_nstripes
!= 0)) { /* LOV */
1332 CERROR("No valid oid\n");
1336 rc
= echo_alloc_memmd(ed
, &lsm
);
1338 CERROR("Cannot allocate md: rc = %d\n", rc
);
1345 rc
= echo_copyin_lsm(ed
, lsm
, ulsm
, ulsm_nob
);
1349 if (lsm
->lsm_stripe_count
== 0)
1350 lsm
->lsm_stripe_count
= ec
->ec_nstripes
;
1352 if (lsm
->lsm_stripe_size
== 0)
1353 lsm
->lsm_stripe_size
= PAGE_CACHE_SIZE
;
1357 /* setup stripes: indices + default ids if required */
1358 for (i
= 0; i
< lsm
->lsm_stripe_count
; i
++) {
1359 if (ostid_id(&lsm
->lsm_oinfo
[i
]->loi_oi
) == 0)
1360 lsm
->lsm_oinfo
[i
]->loi_oi
= lsm
->lsm_oi
;
1362 lsm
->lsm_oinfo
[i
]->loi_ost_idx
=
1363 (idx
+ i
) % ec
->ec_nstripes
;
1367 /* setup object ID here for !on_target and LOV hint */
1368 if (oa
->o_valid
& OBD_MD_FLID
) {
1369 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
1370 lsm
->lsm_oi
= oa
->o_oi
;
1373 if (ostid_id(&lsm
->lsm_oi
) == 0)
1374 ostid_set_id(&lsm
->lsm_oi
, ++last_object_id
);
1378 /* Only echo objects are allowed to be created */
1379 LASSERT((oa
->o_valid
& OBD_MD_FLGROUP
) &&
1380 (ostid_seq(&oa
->o_oi
) == FID_SEQ_ECHO
));
1381 rc
= obd_create(env
, ec
->ec_exp
, oa
, &lsm
, oti
);
1383 CERROR("Cannot create objects: rc = %d\n", rc
);
1389 /* See what object ID we were given */
1390 oa
->o_oi
= lsm
->lsm_oi
;
1391 oa
->o_valid
|= OBD_MD_FLID
;
1393 eco
= cl_echo_object_find(ed
, &lsm
);
1398 cl_echo_object_put(eco
);
1400 CDEBUG(D_INFO
, "oa oid "DOSTID
"\n", POSTID(&oa
->o_oi
));
1404 obd_destroy(env
, ec
->ec_exp
, oa
, lsm
, oti
, NULL
);
1406 echo_free_memmd(ed
, &lsm
);
1408 CERROR("create object failed with: rc = %d\n", rc
);
1412 static int echo_get_object(struct echo_object
**ecop
, struct echo_device
*ed
,
1415 struct lov_stripe_md
*lsm
= NULL
;
1416 struct echo_object
*eco
;
1419 if ((oa
->o_valid
& OBD_MD_FLID
) == 0 || ostid_id(&oa
->o_oi
) == 0) {
1420 /* disallow use of object id 0 */
1421 CERROR("No valid oid\n");
1425 rc
= echo_alloc_memmd(ed
, &lsm
);
1429 lsm
->lsm_oi
= oa
->o_oi
;
1430 if (!(oa
->o_valid
& OBD_MD_FLGROUP
))
1431 ostid_set_seq_echo(&lsm
->lsm_oi
);
1434 eco
= cl_echo_object_find(ed
, &lsm
);
1440 echo_free_memmd(ed
, &lsm
);
1444 static void echo_put_object(struct echo_object
*eco
)
1446 if (cl_echo_object_put(eco
))
1447 CERROR("echo client: drop an object failed");
1451 echo_get_stripe_off_id(struct lov_stripe_md
*lsm
, u64
*offp
, u64
*idp
)
1453 unsigned long stripe_count
;
1454 unsigned long stripe_size
;
1455 unsigned long width
;
1456 unsigned long woffset
;
1460 if (lsm
->lsm_stripe_count
<= 1)
1464 stripe_size
= lsm
->lsm_stripe_size
;
1465 stripe_count
= lsm
->lsm_stripe_count
;
1467 /* width = # bytes in all stripes */
1468 width
= stripe_size
* stripe_count
;
1470 /* woffset = offset within a width; offset = whole number of widths */
1471 woffset
= do_div(offset
, width
);
1473 stripe_index
= woffset
/ stripe_size
;
1475 *idp
= ostid_id(&lsm
->lsm_oinfo
[stripe_index
]->loi_oi
);
1476 *offp
= offset
* stripe_size
+ woffset
% stripe_size
;
1480 echo_client_page_debug_setup(struct lov_stripe_md
*lsm
,
1481 struct page
*page
, int rw
, u64 id
,
1482 u64 offset
, u64 count
)
1489 /* no partial pages on the client */
1490 LASSERT(count
== PAGE_CACHE_SIZE
);
1494 for (delta
= 0; delta
< PAGE_CACHE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1495 if (rw
== OBD_BRW_WRITE
) {
1496 stripe_off
= offset
+ delta
;
1498 echo_get_stripe_off_id(lsm
, &stripe_off
, &stripe_id
);
1500 stripe_off
= 0xdeadbeef00c0ffeeULL
;
1501 stripe_id
= 0xdeadbeef00c0ffeeULL
;
1503 block_debug_setup(addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1504 stripe_off
, stripe_id
);
1510 static int echo_client_page_debug_check(struct lov_stripe_md
*lsm
,
1511 struct page
*page
, u64 id
,
1512 u64 offset
, u64 count
)
1521 /* no partial pages on the client */
1522 LASSERT(count
== PAGE_CACHE_SIZE
);
1526 for (rc
= delta
= 0; delta
< PAGE_CACHE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1527 stripe_off
= offset
+ delta
;
1529 echo_get_stripe_off_id(lsm
, &stripe_off
, &stripe_id
);
1531 rc2
= block_debug_check("test_brw",
1532 addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1533 stripe_off
, stripe_id
);
1535 CERROR("Error in echo object %#llx\n", id
);
1544 static int echo_client_kbrw(struct echo_device
*ed
, int rw
, struct obdo
*oa
,
1545 struct echo_object
*eco
, u64 offset
,
1546 u64 count
, int async
,
1547 struct obd_trans_info
*oti
)
1549 struct lov_stripe_md
*lsm
= eco
->eo_lsm
;
1551 struct brw_page
*pga
;
1552 struct brw_page
*pgp
;
1553 struct page
**pages
;
1561 verify
= (ostid_id(&oa
->o_oi
) != ECHO_PERSISTENT_OBJID
&&
1562 (oa
->o_valid
& OBD_MD_FLFLAGS
) != 0 &&
1563 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) != 0);
1565 gfp_mask
= ((ostid_id(&oa
->o_oi
) & 2) == 0) ? GFP_IOFS
: GFP_HIGHUSER
;
1567 LASSERT(rw
== OBD_BRW_WRITE
|| rw
== OBD_BRW_READ
);
1568 LASSERT(lsm
!= NULL
);
1569 LASSERT(ostid_id(&lsm
->lsm_oi
) == ostid_id(&oa
->o_oi
));
1572 (count
& (~CFS_PAGE_MASK
)) != 0)
1575 /* XXX think again with misaligned I/O */
1576 npages
= count
>> PAGE_CACHE_SHIFT
;
1578 if (rw
== OBD_BRW_WRITE
)
1579 brw_flags
= OBD_BRW_ASYNC
;
1581 pga
= kcalloc(npages
, sizeof(*pga
), GFP_NOFS
);
1585 pages
= kcalloc(npages
, sizeof(*pages
), GFP_NOFS
);
1586 if (pages
== NULL
) {
1591 for (i
= 0, pgp
= pga
, off
= offset
;
1593 i
++, pgp
++, off
+= PAGE_CACHE_SIZE
) {
1595 LASSERT(pgp
->pg
== NULL
); /* for cleanup */
1598 pgp
->pg
= alloc_page(gfp_mask
);
1599 if (pgp
->pg
== NULL
)
1603 pgp
->count
= PAGE_CACHE_SIZE
;
1605 pgp
->flag
= brw_flags
;
1608 echo_client_page_debug_setup(lsm
, pgp
->pg
, rw
,
1609 ostid_id(&oa
->o_oi
), off
,
1613 /* brw mode can only be used at client */
1614 LASSERT(ed
->ed_next
!= NULL
);
1615 rc
= cl_echo_object_brw(eco
, rw
, offset
, pages
, npages
, async
);
1618 if (rc
!= 0 || rw
!= OBD_BRW_READ
)
1621 for (i
= 0, pgp
= pga
; i
< npages
; i
++, pgp
++) {
1622 if (pgp
->pg
== NULL
)
1628 vrc
= echo_client_page_debug_check(lsm
, pgp
->pg
,
1629 ostid_id(&oa
->o_oi
),
1630 pgp
->off
, pgp
->count
);
1631 if (vrc
!= 0 && rc
== 0)
1634 __free_page(pgp
->pg
);
1641 static int echo_client_prep_commit(const struct lu_env
*env
,
1642 struct obd_export
*exp
, int rw
,
1643 struct obdo
*oa
, struct echo_object
*eco
,
1644 u64 offset
, u64 count
,
1645 u64 batch
, struct obd_trans_info
*oti
,
1648 struct lov_stripe_md
*lsm
= eco
->eo_lsm
;
1649 struct obd_ioobj ioo
;
1650 struct niobuf_local
*lnb
;
1651 struct niobuf_remote
*rnb
;
1653 u64 npages
, tot_pages
;
1654 int i
, ret
= 0, brw_flags
= 0;
1656 if (count
<= 0 || (count
& (~CFS_PAGE_MASK
)) != 0 ||
1657 (lsm
!= NULL
&& ostid_id(&lsm
->lsm_oi
) != ostid_id(&oa
->o_oi
)))
1660 npages
= batch
>> PAGE_CACHE_SHIFT
;
1661 tot_pages
= count
>> PAGE_CACHE_SHIFT
;
1663 lnb
= kcalloc(npages
, sizeof(struct niobuf_local
), GFP_NOFS
);
1664 rnb
= kcalloc(npages
, sizeof(struct niobuf_remote
), GFP_NOFS
);
1666 if (lnb
== NULL
|| rnb
== NULL
) {
1671 if (rw
== OBD_BRW_WRITE
&& async
)
1672 brw_flags
|= OBD_BRW_ASYNC
;
1674 obdo_to_ioobj(oa
, &ioo
);
1678 for (; tot_pages
; tot_pages
-= npages
) {
1681 if (tot_pages
< npages
)
1684 for (i
= 0; i
< npages
; i
++, off
+= PAGE_CACHE_SIZE
) {
1685 rnb
[i
].offset
= off
;
1686 rnb
[i
].len
= PAGE_CACHE_SIZE
;
1687 rnb
[i
].flags
= brw_flags
;
1690 ioo
.ioo_bufcnt
= npages
;
1691 oti
->oti_transno
= 0;
1694 ret
= obd_preprw(env
, rw
, exp
, oa
, 1, &ioo
, rnb
, &lpages
,
1698 LASSERT(lpages
== npages
);
1700 for (i
= 0; i
< lpages
; i
++) {
1701 struct page
*page
= lnb
[i
].page
;
1703 /* read past eof? */
1704 if (page
== NULL
&& lnb
[i
].rc
== 0)
1708 lnb
[i
].flags
|= OBD_BRW_ASYNC
;
1710 if (ostid_id(&oa
->o_oi
) == ECHO_PERSISTENT_OBJID
||
1711 (oa
->o_valid
& OBD_MD_FLFLAGS
) == 0 ||
1712 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) == 0)
1715 if (rw
== OBD_BRW_WRITE
)
1716 echo_client_page_debug_setup(lsm
, page
, rw
,
1717 ostid_id(&oa
->o_oi
),
1721 echo_client_page_debug_check(lsm
, page
,
1722 ostid_id(&oa
->o_oi
),
1727 ret
= obd_commitrw(env
, rw
, exp
, oa
, 1, &ioo
,
1728 rnb
, npages
, lnb
, oti
, ret
);
1732 /* Reset oti otherwise it would confuse ldiskfs. */
1733 memset(oti
, 0, sizeof(*oti
));
1735 /* Reuse env context. */
1736 lu_context_exit((struct lu_context
*)&env
->le_ctx
);
1737 lu_context_enter((struct lu_context
*)&env
->le_ctx
);
1746 static int echo_client_brw_ioctl(const struct lu_env
*env
, int rw
,
1747 struct obd_export
*exp
,
1748 struct obd_ioctl_data
*data
,
1749 struct obd_trans_info
*dummy_oti
)
1751 struct obd_device
*obd
= class_exp2obd(exp
);
1752 struct echo_device
*ed
= obd2echo_dev(obd
);
1753 struct echo_client_obd
*ec
= ed
->ed_ec
;
1754 struct obdo
*oa
= &data
->ioc_obdo1
;
1755 struct echo_object
*eco
;
1760 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
1762 rc
= echo_get_object(&eco
, ed
, oa
);
1766 oa
->o_valid
&= ~OBD_MD_FLHANDLE
;
1768 /* OFD/obdfilter works only via prep/commit */
1769 test_mode
= (long)data
->ioc_pbuf1
;
1773 if (ed
->ed_next
== NULL
&& test_mode
!= 3) {
1775 data
->ioc_plen1
= data
->ioc_count
;
1778 /* Truncate batch size to maximum */
1779 if (data
->ioc_plen1
> PTLRPC_MAX_BRW_SIZE
)
1780 data
->ioc_plen1
= PTLRPC_MAX_BRW_SIZE
;
1782 switch (test_mode
) {
1786 rc
= echo_client_kbrw(ed
, rw
, oa
,
1787 eco
, data
->ioc_offset
,
1788 data
->ioc_count
, async
, dummy_oti
);
1791 rc
= echo_client_prep_commit(env
, ec
->ec_exp
, rw
, oa
,
1792 eco
, data
->ioc_offset
,
1793 data
->ioc_count
, data
->ioc_plen1
,
1799 echo_put_object(eco
);
1804 echo_client_enqueue(struct obd_export
*exp
, struct obdo
*oa
,
1805 int mode
, u64 offset
, u64 nob
)
1807 struct echo_device
*ed
= obd2echo_dev(exp
->exp_obd
);
1808 struct lustre_handle
*ulh
= &oa
->o_handle
;
1809 struct echo_object
*eco
;
1813 if (ed
->ed_next
== NULL
)
1816 if (!(mode
== LCK_PR
|| mode
== LCK_PW
))
1819 if ((offset
& (~CFS_PAGE_MASK
)) != 0 ||
1820 (nob
& (~CFS_PAGE_MASK
)) != 0)
1823 rc
= echo_get_object(&eco
, ed
, oa
);
1827 end
= (nob
== 0) ? ((u64
) -1) : (offset
+ nob
- 1);
1828 rc
= cl_echo_enqueue(eco
, offset
, end
, mode
, &ulh
->cookie
);
1830 oa
->o_valid
|= OBD_MD_FLHANDLE
;
1831 CDEBUG(D_INFO
, "Cookie is %#llx\n", ulh
->cookie
);
1833 echo_put_object(eco
);
1838 echo_client_cancel(struct obd_export
*exp
, struct obdo
*oa
)
1840 struct echo_device
*ed
= obd2echo_dev(exp
->exp_obd
);
1841 __u64 cookie
= oa
->o_handle
.cookie
;
1843 if ((oa
->o_valid
& OBD_MD_FLHANDLE
) == 0)
1846 CDEBUG(D_INFO
, "Cookie is %#llx\n", cookie
);
1847 return cl_echo_cancel(ed
, cookie
);
1851 echo_client_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
1852 void *karg
, void *uarg
)
1854 struct obd_device
*obd
= exp
->exp_obd
;
1855 struct echo_device
*ed
= obd2echo_dev(obd
);
1856 struct echo_client_obd
*ec
= ed
->ed_ec
;
1857 struct echo_object
*eco
;
1858 struct obd_ioctl_data
*data
= karg
;
1859 struct obd_trans_info dummy_oti
;
1861 struct oti_req_ack_lock
*ack_lock
;
1864 int rw
= OBD_BRW_READ
;
1868 memset(&dummy_oti
, 0, sizeof(dummy_oti
));
1870 oa
= &data
->ioc_obdo1
;
1871 if (!(oa
->o_valid
& OBD_MD_FLGROUP
)) {
1872 oa
->o_valid
|= OBD_MD_FLGROUP
;
1873 ostid_set_seq_echo(&oa
->o_oi
);
1876 /* This FID is unpacked just for validation at this point */
1877 rc
= ostid_to_fid(&fid
, &oa
->o_oi
, 0);
1881 env
= kzalloc(sizeof(*env
), GFP_NOFS
);
1885 rc
= lu_env_init(env
, LCT_DT_THREAD
);
1892 case OBD_IOC_CREATE
: /* may create echo object */
1893 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1898 rc
= echo_create_object(env
, ed
, 1, oa
, data
->ioc_pbuf1
,
1899 data
->ioc_plen1
, &dummy_oti
);
1902 case OBD_IOC_DESTROY
:
1903 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1908 rc
= echo_get_object(&eco
, ed
, oa
);
1910 rc
= obd_destroy(env
, ec
->ec_exp
, oa
, eco
->eo_lsm
,
1913 eco
->eo_deleted
= 1;
1914 echo_put_object(eco
);
1918 case OBD_IOC_GETATTR
:
1919 rc
= echo_get_object(&eco
, ed
, oa
);
1921 struct obd_info oinfo
= { };
1923 oinfo
.oi_md
= eco
->eo_lsm
;
1925 rc
= obd_getattr(env
, ec
->ec_exp
, &oinfo
);
1926 echo_put_object(eco
);
1930 case OBD_IOC_SETATTR
:
1931 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1936 rc
= echo_get_object(&eco
, ed
, oa
);
1938 struct obd_info oinfo
= { };
1941 oinfo
.oi_md
= eco
->eo_lsm
;
1943 rc
= obd_setattr(env
, ec
->ec_exp
, &oinfo
, NULL
);
1944 echo_put_object(eco
);
1948 case OBD_IOC_BRW_WRITE
:
1949 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1956 case OBD_IOC_BRW_READ
:
1957 rc
= echo_client_brw_ioctl(env
, rw
, exp
, data
, &dummy_oti
);
1960 case ECHO_IOC_GET_STRIPE
:
1961 rc
= echo_get_object(&eco
, ed
, oa
);
1963 rc
= echo_copyout_lsm(eco
->eo_lsm
, data
->ioc_pbuf1
,
1965 echo_put_object(eco
);
1969 case ECHO_IOC_SET_STRIPE
:
1970 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1975 if (data
->ioc_pbuf1
== NULL
) { /* unset */
1976 rc
= echo_get_object(&eco
, ed
, oa
);
1978 eco
->eo_deleted
= 1;
1979 echo_put_object(eco
);
1982 rc
= echo_create_object(env
, ed
, 0, oa
,
1984 data
->ioc_plen1
, &dummy_oti
);
1988 case ECHO_IOC_ENQUEUE
:
1989 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1994 rc
= echo_client_enqueue(exp
, oa
,
1995 data
->ioc_conn1
, /* lock mode */
1997 data
->ioc_count
);/*extent*/
2000 case ECHO_IOC_CANCEL
:
2001 rc
= echo_client_cancel(exp
, oa
);
2005 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd
);
2014 /* XXX this should be in a helper also called by target_send_reply */
2015 for (ack_lock
= dummy_oti
.oti_ack_locks
, i
= 0; i
< 4;
2017 if (!ack_lock
->mode
)
2019 ldlm_lock_decref(&ack_lock
->lock
, ack_lock
->mode
);
2025 static int echo_client_setup(const struct lu_env
*env
,
2026 struct obd_device
*obddev
, struct lustre_cfg
*lcfg
)
2028 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
2029 struct obd_device
*tgt
;
2030 struct obd_uuid echo_uuid
= { "ECHO_UUID" };
2031 struct obd_connect_data
*ocd
= NULL
;
2034 if (lcfg
->lcfg_bufcount
< 2 || LUSTRE_CFG_BUFLEN(lcfg
, 1) < 1) {
2035 CERROR("requires a TARGET OBD name\n");
2039 tgt
= class_name2obd(lustre_cfg_string(lcfg
, 1));
2040 if (!tgt
|| !tgt
->obd_attached
|| !tgt
->obd_set_up
) {
2041 CERROR("device not attached or not set up (%s)\n",
2042 lustre_cfg_string(lcfg
, 1));
2046 spin_lock_init(&ec
->ec_lock
);
2047 INIT_LIST_HEAD(&ec
->ec_objects
);
2048 INIT_LIST_HEAD(&ec
->ec_locks
);
2050 ec
->ec_nstripes
= 0;
2052 ocd
= kzalloc(sizeof(*ocd
), GFP_NOFS
);
2054 CERROR("Can't alloc ocd connecting to %s\n",
2055 lustre_cfg_string(lcfg
, 1));
2059 ocd
->ocd_connect_flags
= OBD_CONNECT_VERSION
| OBD_CONNECT_REQPORTAL
|
2060 OBD_CONNECT_BRW_SIZE
|
2061 OBD_CONNECT_GRANT
| OBD_CONNECT_FULL20
|
2062 OBD_CONNECT_64BITHASH
| OBD_CONNECT_LVB_TYPE
|
2064 ocd
->ocd_brw_size
= DT_MAX_BRW_SIZE
;
2065 ocd
->ocd_version
= LUSTRE_VERSION_CODE
;
2066 ocd
->ocd_group
= FID_SEQ_ECHO
;
2068 rc
= obd_connect(env
, &ec
->ec_exp
, tgt
, &echo_uuid
, ocd
, NULL
);
2073 CERROR("fail to connect to device %s\n",
2074 lustre_cfg_string(lcfg
, 1));
2081 static int echo_client_cleanup(struct obd_device
*obddev
)
2083 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
2086 if (!list_empty(&obddev
->obd_exports
)) {
2087 CERROR("still has clients!\n");
2091 LASSERT(atomic_read(&ec
->ec_exp
->exp_refcount
) > 0);
2092 rc
= obd_disconnect(ec
->ec_exp
);
2094 CERROR("fail to disconnect device: %d\n", rc
);
2099 static int echo_client_connect(const struct lu_env
*env
,
2100 struct obd_export
**exp
,
2101 struct obd_device
*src
, struct obd_uuid
*cluuid
,
2102 struct obd_connect_data
*data
, void *localdata
)
2105 struct lustre_handle conn
= { 0 };
2107 rc
= class_connect(&conn
, src
, cluuid
);
2109 *exp
= class_conn2export(&conn
);
2115 static int echo_client_disconnect(struct obd_export
*exp
)
2124 rc
= class_disconnect(exp
);
2130 static struct obd_ops echo_client_obd_ops
= {
2131 .o_owner
= THIS_MODULE
,
2132 .o_iocontrol
= echo_client_iocontrol
,
2133 .o_connect
= echo_client_connect
,
2134 .o_disconnect
= echo_client_disconnect
2137 static int echo_client_init(void)
2141 rc
= lu_kmem_init(echo_caches
);
2143 rc
= class_register_type(&echo_client_obd_ops
, NULL
,
2144 LUSTRE_ECHO_CLIENT_NAME
,
2147 lu_kmem_fini(echo_caches
);
2152 static void echo_client_exit(void)
2154 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME
);
2155 lu_kmem_fini(echo_caches
);
2158 static int __init
obdecho_init(void)
2160 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2162 LASSERT(PAGE_CACHE_SIZE
% OBD_ECHO_BLOCK_SIZE
== 0);
2164 return echo_client_init();
2167 static void /*__exit*/ obdecho_exit(void)
2173 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2174 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2175 MODULE_LICENSE("GPL");
2176 MODULE_VERSION(LUSTRE_VERSION_STRING
);
2178 module_init(obdecho_init
);
2179 module_exit(obdecho_exit
);
2181 /** @} echo_client */