4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_ECHO
34 #include "../../include/linux/libcfs/libcfs.h"
36 #include "../include/obd.h"
37 #include "../include/obd_support.h"
38 #include "../include/obd_class.h"
39 #include "../include/lustre_debug.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/cl_object.h"
42 #include "../include/lustre_fid.h"
43 #include "../include/lustre_acl.h"
44 #include "../include/lustre/lustre_ioctl.h"
45 #include "../include/lustre_net.h"
47 #include "echo_internal.h"
49 /** \defgroup echo_client Echo Client
54 struct cl_device ed_cl
;
55 struct echo_client_obd
*ed_ec
;
57 struct cl_site ed_site_myself
;
58 struct lu_site
*ed_site
;
59 struct lu_device
*ed_next
;
63 struct cl_object eo_cl
;
64 struct cl_object_header eo_hdr
;
66 struct echo_device
*eo_dev
;
67 struct list_head eo_obj_chain
;
68 struct lov_oinfo
*eo_oinfo
;
73 struct echo_object_conf
{
74 struct cl_object_conf eoc_cl
;
75 struct lov_oinfo
**eoc_oinfo
;
79 struct cl_page_slice ep_cl
;
84 struct cl_lock_slice el_cl
;
85 struct list_head el_chain
;
86 struct echo_object
*el_object
;
91 static int echo_client_setup(const struct lu_env
*env
,
92 struct obd_device
*obddev
,
93 struct lustre_cfg
*lcfg
);
94 static int echo_client_cleanup(struct obd_device
*obddev
);
96 /** \defgroup echo_helpers Helper functions
99 static inline struct echo_device
*cl2echo_dev(const struct cl_device
*dev
)
101 return container_of0(dev
, struct echo_device
, ed_cl
);
104 static inline struct cl_device
*echo_dev2cl(struct echo_device
*d
)
109 static inline struct echo_device
*obd2echo_dev(const struct obd_device
*obd
)
111 return cl2echo_dev(lu2cl_dev(obd
->obd_lu_dev
));
114 static inline struct cl_object
*echo_obj2cl(struct echo_object
*eco
)
119 static inline struct echo_object
*cl2echo_obj(const struct cl_object
*o
)
121 return container_of(o
, struct echo_object
, eo_cl
);
124 static inline struct echo_page
*cl2echo_page(const struct cl_page_slice
*s
)
126 return container_of(s
, struct echo_page
, ep_cl
);
129 static inline struct echo_lock
*cl2echo_lock(const struct cl_lock_slice
*s
)
131 return container_of(s
, struct echo_lock
, el_cl
);
134 static inline struct cl_lock
*echo_lock2cl(const struct echo_lock
*ecl
)
136 return ecl
->el_cl
.cls_lock
;
139 static struct lu_context_key echo_thread_key
;
140 static inline struct echo_thread_info
*echo_env_info(const struct lu_env
*env
)
142 struct echo_thread_info
*info
;
144 info
= lu_context_key_get(&env
->le_ctx
, &echo_thread_key
);
150 struct echo_object_conf
*cl2echo_conf(const struct cl_object_conf
*c
)
152 return container_of(c
, struct echo_object_conf
, eoc_cl
);
155 /** @} echo_helpers */
156 static int cl_echo_object_put(struct echo_object
*eco
);
157 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
158 struct page
**pages
, int npages
, int async
);
160 struct echo_thread_info
{
161 struct echo_object_conf eti_conf
;
162 struct lustre_md eti_md
;
164 struct cl_2queue eti_queue
;
166 struct cl_lock eti_lock
;
167 struct lu_fid eti_fid
;
168 struct lu_fid eti_fid2
;
171 /* No session used right now */
172 struct echo_session_info
{
176 static struct kmem_cache
*echo_lock_kmem
;
177 static struct kmem_cache
*echo_object_kmem
;
178 static struct kmem_cache
*echo_thread_kmem
;
179 static struct kmem_cache
*echo_session_kmem
;
181 static struct lu_kmem_descr echo_caches
[] = {
183 .ckd_cache
= &echo_lock_kmem
,
184 .ckd_name
= "echo_lock_kmem",
185 .ckd_size
= sizeof(struct echo_lock
)
188 .ckd_cache
= &echo_object_kmem
,
189 .ckd_name
= "echo_object_kmem",
190 .ckd_size
= sizeof(struct echo_object
)
193 .ckd_cache
= &echo_thread_kmem
,
194 .ckd_name
= "echo_thread_kmem",
195 .ckd_size
= sizeof(struct echo_thread_info
)
198 .ckd_cache
= &echo_session_kmem
,
199 .ckd_name
= "echo_session_kmem",
200 .ckd_size
= sizeof(struct echo_session_info
)
207 /** \defgroup echo_page Page operations
209 * Echo page operations.
213 static int echo_page_own(const struct lu_env
*env
,
214 const struct cl_page_slice
*slice
,
215 struct cl_io
*io
, int nonblock
)
217 struct echo_page
*ep
= cl2echo_page(slice
);
220 mutex_lock(&ep
->ep_lock
);
221 else if (!mutex_trylock(&ep
->ep_lock
))
226 static void echo_page_disown(const struct lu_env
*env
,
227 const struct cl_page_slice
*slice
,
230 struct echo_page
*ep
= cl2echo_page(slice
);
232 LASSERT(mutex_is_locked(&ep
->ep_lock
));
233 mutex_unlock(&ep
->ep_lock
);
236 static void echo_page_discard(const struct lu_env
*env
,
237 const struct cl_page_slice
*slice
,
238 struct cl_io
*unused
)
240 cl_page_delete(env
, slice
->cpl_page
);
243 static int echo_page_is_vmlocked(const struct lu_env
*env
,
244 const struct cl_page_slice
*slice
)
246 if (mutex_is_locked(&cl2echo_page(slice
)->ep_lock
))
251 static void echo_page_completion(const struct lu_env
*env
,
252 const struct cl_page_slice
*slice
,
255 LASSERT(slice
->cpl_page
->cp_sync_io
);
258 static void echo_page_fini(const struct lu_env
*env
,
259 struct cl_page_slice
*slice
)
261 struct echo_object
*eco
= cl2echo_obj(slice
->cpl_obj
);
263 atomic_dec(&eco
->eo_npages
);
264 put_page(slice
->cpl_page
->cp_vmpage
);
267 static int echo_page_prep(const struct lu_env
*env
,
268 const struct cl_page_slice
*slice
,
269 struct cl_io
*unused
)
274 static int echo_page_print(const struct lu_env
*env
,
275 const struct cl_page_slice
*slice
,
276 void *cookie
, lu_printer_t printer
)
278 struct echo_page
*ep
= cl2echo_page(slice
);
280 (*printer
)(env
, cookie
, LUSTRE_ECHO_CLIENT_NAME
"-page@%p %d vm@%p\n",
281 ep
, mutex_is_locked(&ep
->ep_lock
),
282 slice
->cpl_page
->cp_vmpage
);
286 static const struct cl_page_operations echo_page_ops
= {
287 .cpo_own
= echo_page_own
,
288 .cpo_disown
= echo_page_disown
,
289 .cpo_discard
= echo_page_discard
,
290 .cpo_fini
= echo_page_fini
,
291 .cpo_print
= echo_page_print
,
292 .cpo_is_vmlocked
= echo_page_is_vmlocked
,
295 .cpo_prep
= echo_page_prep
,
296 .cpo_completion
= echo_page_completion
,
299 .cpo_prep
= echo_page_prep
,
300 .cpo_completion
= echo_page_completion
,
307 /** \defgroup echo_lock Locking
309 * echo lock operations
313 static void echo_lock_fini(const struct lu_env
*env
,
314 struct cl_lock_slice
*slice
)
316 struct echo_lock
*ecl
= cl2echo_lock(slice
);
318 LASSERT(list_empty(&ecl
->el_chain
));
319 kmem_cache_free(echo_lock_kmem
, ecl
);
322 static struct cl_lock_operations echo_lock_ops
= {
323 .clo_fini
= echo_lock_fini
,
328 /** \defgroup echo_cl_ops cl_object operations
330 * operations for cl_object
334 static int echo_page_init(const struct lu_env
*env
, struct cl_object
*obj
,
335 struct cl_page
*page
, pgoff_t index
)
337 struct echo_page
*ep
= cl_object_page_slice(obj
, page
);
338 struct echo_object
*eco
= cl2echo_obj(obj
);
340 get_page(page
->cp_vmpage
);
341 mutex_init(&ep
->ep_lock
);
342 cl_page_slice_add(page
, &ep
->ep_cl
, obj
, index
, &echo_page_ops
);
343 atomic_inc(&eco
->eo_npages
);
347 static int echo_io_init(const struct lu_env
*env
, struct cl_object
*obj
,
353 static int echo_lock_init(const struct lu_env
*env
,
354 struct cl_object
*obj
, struct cl_lock
*lock
,
355 const struct cl_io
*unused
)
357 struct echo_lock
*el
;
359 el
= kmem_cache_zalloc(echo_lock_kmem
, GFP_NOFS
);
361 cl_lock_slice_add(lock
, &el
->el_cl
, obj
, &echo_lock_ops
);
362 el
->el_object
= cl2echo_obj(obj
);
363 INIT_LIST_HEAD(&el
->el_chain
);
364 atomic_set(&el
->el_refcount
, 0);
366 return !el
? -ENOMEM
: 0;
369 static int echo_conf_set(const struct lu_env
*env
, struct cl_object
*obj
,
370 const struct cl_object_conf
*conf
)
375 static const struct cl_object_operations echo_cl_obj_ops
= {
376 .coo_page_init
= echo_page_init
,
377 .coo_lock_init
= echo_lock_init
,
378 .coo_io_init
= echo_io_init
,
379 .coo_conf_set
= echo_conf_set
382 /** @} echo_cl_ops */
384 /** \defgroup echo_lu_ops lu_object operations
386 * operations for echo lu object.
390 static int echo_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
391 const struct lu_object_conf
*conf
)
393 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(obj
->lo_dev
));
394 struct echo_client_obd
*ec
= ed
->ed_ec
;
395 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
396 const struct cl_object_conf
*cconf
;
397 struct echo_object_conf
*econf
;
400 struct lu_object
*below
;
401 struct lu_device
*under
;
404 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
,
408 lu_object_add(obj
, below
);
411 cconf
= lu2cl_conf(conf
);
412 econf
= cl2echo_conf(cconf
);
414 LASSERT(econf
->eoc_oinfo
);
416 * Transfer the oinfo pointer to eco that it won't be
419 eco
->eo_oinfo
= *econf
->eoc_oinfo
;
420 *econf
->eoc_oinfo
= NULL
;
423 atomic_set(&eco
->eo_npages
, 0);
424 cl_object_page_init(lu2cl(obj
), sizeof(struct echo_page
));
426 spin_lock(&ec
->ec_lock
);
427 list_add_tail(&eco
->eo_obj_chain
, &ec
->ec_objects
);
428 spin_unlock(&ec
->ec_lock
);
433 static void echo_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
435 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
436 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
438 LASSERT(atomic_read(&eco
->eo_npages
) == 0);
440 spin_lock(&ec
->ec_lock
);
441 list_del_init(&eco
->eo_obj_chain
);
442 spin_unlock(&ec
->ec_lock
);
445 lu_object_header_fini(obj
->lo_header
);
447 kfree(eco
->eo_oinfo
);
448 kmem_cache_free(echo_object_kmem
, eco
);
451 static int echo_object_print(const struct lu_env
*env
, void *cookie
,
452 lu_printer_t p
, const struct lu_object
*o
)
454 struct echo_object
*obj
= cl2echo_obj(lu2cl(o
));
456 return (*p
)(env
, cookie
, "echoclient-object@%p", obj
);
459 static const struct lu_object_operations echo_lu_obj_ops
= {
460 .loo_object_init
= echo_object_init
,
461 .loo_object_delete
= NULL
,
462 .loo_object_release
= NULL
,
463 .loo_object_free
= echo_object_free
,
464 .loo_object_print
= echo_object_print
,
465 .loo_object_invariant
= NULL
468 /** @} echo_lu_ops */
470 /** \defgroup echo_lu_dev_ops lu_device operations
472 * Operations for echo lu device.
476 static struct lu_object
*echo_object_alloc(const struct lu_env
*env
,
477 const struct lu_object_header
*hdr
,
478 struct lu_device
*dev
)
480 struct echo_object
*eco
;
481 struct lu_object
*obj
= NULL
;
483 /* we're the top dev. */
485 eco
= kmem_cache_zalloc(echo_object_kmem
, GFP_NOFS
);
487 struct cl_object_header
*hdr
= &eco
->eo_hdr
;
489 obj
= &echo_obj2cl(eco
)->co_lu
;
490 cl_object_header_init(hdr
);
491 hdr
->coh_page_bufsize
= cfs_size_round(sizeof(struct cl_page
));
493 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
494 lu_object_add_top(&hdr
->coh_lu
, obj
);
496 eco
->eo_cl
.co_ops
= &echo_cl_obj_ops
;
497 obj
->lo_ops
= &echo_lu_obj_ops
;
502 static const struct lu_device_operations echo_device_lu_ops
= {
503 .ldo_object_alloc
= echo_object_alloc
,
506 /** @} echo_lu_dev_ops */
508 /** \defgroup echo_init Setup and teardown
510 * Init and fini functions for echo client.
514 static int echo_site_init(const struct lu_env
*env
, struct echo_device
*ed
)
516 struct cl_site
*site
= &ed
->ed_site_myself
;
519 /* initialize site */
520 rc
= cl_site_init(site
, &ed
->ed_cl
);
522 CERROR("Cannot initialize site for echo client(%d)\n", rc
);
526 rc
= lu_site_init_finish(&site
->cs_lu
);
532 ed
->ed_site
= &site
->cs_lu
;
536 static void echo_site_fini(const struct lu_env
*env
, struct echo_device
*ed
)
539 lu_site_fini(ed
->ed_site
);
544 static void *echo_thread_key_init(const struct lu_context
*ctx
,
545 struct lu_context_key
*key
)
547 struct echo_thread_info
*info
;
549 info
= kmem_cache_zalloc(echo_thread_kmem
, GFP_NOFS
);
551 info
= ERR_PTR(-ENOMEM
);
555 static void echo_thread_key_fini(const struct lu_context
*ctx
,
556 struct lu_context_key
*key
, void *data
)
558 struct echo_thread_info
*info
= data
;
560 kmem_cache_free(echo_thread_kmem
, info
);
563 static struct lu_context_key echo_thread_key
= {
564 .lct_tags
= LCT_CL_THREAD
,
565 .lct_init
= echo_thread_key_init
,
566 .lct_fini
= echo_thread_key_fini
,
569 static void *echo_session_key_init(const struct lu_context
*ctx
,
570 struct lu_context_key
*key
)
572 struct echo_session_info
*session
;
574 session
= kmem_cache_zalloc(echo_session_kmem
, GFP_NOFS
);
576 session
= ERR_PTR(-ENOMEM
);
580 static void echo_session_key_fini(const struct lu_context
*ctx
,
581 struct lu_context_key
*key
, void *data
)
583 struct echo_session_info
*session
= data
;
585 kmem_cache_free(echo_session_kmem
, session
);
588 static struct lu_context_key echo_session_key
= {
589 .lct_tags
= LCT_SESSION
,
590 .lct_init
= echo_session_key_init
,
591 .lct_fini
= echo_session_key_fini
,
594 LU_TYPE_INIT_FINI(echo
, &echo_thread_key
, &echo_session_key
);
596 static struct lu_device
*echo_device_alloc(const struct lu_env
*env
,
597 struct lu_device_type
*t
,
598 struct lustre_cfg
*cfg
)
600 struct lu_device
*next
;
601 struct echo_device
*ed
;
602 struct cl_device
*cd
;
603 struct obd_device
*obd
= NULL
; /* to keep compiler happy */
604 struct obd_device
*tgt
;
605 const char *tgt_type_name
;
608 ed
= kzalloc(sizeof(*ed
), GFP_NOFS
);
615 rc
= cl_device_init(cd
, t
);
619 cd
->cd_lu_dev
.ld_ops
= &echo_device_lu_ops
;
621 obd
= class_name2obd(lustre_cfg_string(cfg
, 0));
625 tgt
= class_name2obd(lustre_cfg_string(cfg
, 1));
627 CERROR("Can not find tgt device %s\n",
628 lustre_cfg_string(cfg
, 1));
630 goto out_device_fini
;
633 next
= tgt
->obd_lu_dev
;
634 if (!strcmp(tgt
->obd_type
->typ_name
, LUSTRE_MDT_NAME
)) {
635 CERROR("echo MDT client must be run on server\n");
637 goto out_device_fini
;
640 rc
= echo_site_init(env
, ed
);
642 goto out_device_fini
;
644 rc
= echo_client_setup(env
, obd
, cfg
);
648 ed
->ed_ec
= &obd
->u
.echo_client
;
650 /* if echo client is to be stacked upon ost device, the next is
651 * NULL since ost is not a clio device so far
653 if (next
&& !lu_device_is_cl(next
))
656 tgt_type_name
= tgt
->obd_type
->typ_name
;
663 next
->ld_site
= ed
->ed_site
;
664 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(env
, next
,
665 next
->ld_type
->ldt_name
,
671 LASSERT(strcmp(tgt_type_name
, LUSTRE_OST_NAME
) == 0);
675 return &cd
->cd_lu_dev
;
678 err
= echo_client_cleanup(obd
);
680 CERROR("Cleanup obd device %s error(%d)\n",
683 echo_site_fini(env
, ed
);
685 cl_device_fini(&ed
->ed_cl
);
692 static int echo_device_init(const struct lu_env
*env
, struct lu_device
*d
,
693 const char *name
, struct lu_device
*next
)
699 static struct lu_device
*echo_device_fini(const struct lu_env
*env
,
702 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
703 struct lu_device
*next
= ed
->ed_next
;
706 next
= next
->ld_type
->ldt_ops
->ldto_device_fini(env
, next
);
710 static void echo_lock_release(const struct lu_env
*env
,
711 struct echo_lock
*ecl
,
714 struct cl_lock
*clk
= echo_lock2cl(ecl
);
716 cl_lock_release(env
, clk
);
719 static struct lu_device
*echo_device_free(const struct lu_env
*env
,
722 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
723 struct echo_client_obd
*ec
= ed
->ed_ec
;
724 struct echo_object
*eco
;
725 struct lu_device
*next
= ed
->ed_next
;
727 CDEBUG(D_INFO
, "echo device:%p is going to be freed, next = %p\n",
730 lu_site_purge(env
, ed
->ed_site
, -1);
732 /* check if there are objects still alive.
733 * It shouldn't have any object because lu_site_purge would cleanup
734 * all of cached objects. Anyway, probably the echo device is being
735 * parallelly accessed.
737 spin_lock(&ec
->ec_lock
);
738 list_for_each_entry(eco
, &ec
->ec_objects
, eo_obj_chain
)
740 spin_unlock(&ec
->ec_lock
);
743 lu_site_purge(env
, ed
->ed_site
, -1);
746 "Waiting for the reference of echo object to be dropped\n");
748 /* Wait for the last reference to be dropped. */
749 spin_lock(&ec
->ec_lock
);
750 while (!list_empty(&ec
->ec_objects
)) {
751 spin_unlock(&ec
->ec_lock
);
752 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
753 set_current_state(TASK_UNINTERRUPTIBLE
);
754 schedule_timeout(cfs_time_seconds(1));
755 lu_site_purge(env
, ed
->ed_site
, -1);
756 spin_lock(&ec
->ec_lock
);
758 spin_unlock(&ec
->ec_lock
);
760 LASSERT(list_empty(&ec
->ec_locks
));
762 CDEBUG(D_INFO
, "No object exists, exiting...\n");
764 echo_client_cleanup(d
->ld_obd
);
767 next
= next
->ld_type
->ldt_ops
->ldto_device_free(env
, next
);
769 LASSERT(ed
->ed_site
== d
->ld_site
);
770 echo_site_fini(env
, ed
);
771 cl_device_fini(&ed
->ed_cl
);
774 cl_env_cache_purge(~0);
779 static const struct lu_device_type_operations echo_device_type_ops
= {
780 .ldto_init
= echo_type_init
,
781 .ldto_fini
= echo_type_fini
,
783 .ldto_start
= echo_type_start
,
784 .ldto_stop
= echo_type_stop
,
786 .ldto_device_alloc
= echo_device_alloc
,
787 .ldto_device_free
= echo_device_free
,
788 .ldto_device_init
= echo_device_init
,
789 .ldto_device_fini
= echo_device_fini
792 static struct lu_device_type echo_device_type
= {
793 .ldt_tags
= LU_DEVICE_CL
,
794 .ldt_name
= LUSTRE_ECHO_CLIENT_NAME
,
795 .ldt_ops
= &echo_device_type_ops
,
796 .ldt_ctx_tags
= LCT_CL_THREAD
,
801 /** \defgroup echo_exports Exported operations
803 * exporting functions to echo client
808 /* Interfaces to echo client obd device */
809 static struct echo_object
*
810 cl_echo_object_find(struct echo_device
*d
, const struct ost_id
*oi
)
813 struct echo_thread_info
*info
;
814 struct echo_object_conf
*conf
;
815 struct lov_oinfo
*oinfo
= NULL
;
816 struct echo_object
*eco
;
817 struct cl_object
*obj
;
822 LASSERTF(ostid_id(oi
), DOSTID
"\n", POSTID(oi
));
823 LASSERTF(ostid_seq(oi
) == FID_SEQ_ECHO
, DOSTID
"\n", POSTID(oi
));
825 /* Never return an object if the obd is to be freed. */
826 if (echo_dev2cl(d
)->cd_lu_dev
.ld_obd
->obd_stopping
)
827 return ERR_PTR(-ENODEV
);
829 env
= cl_env_get(&refcheck
);
833 info
= echo_env_info(env
);
834 conf
= &info
->eti_conf
;
836 oinfo
= kzalloc(sizeof(*oinfo
), GFP_NOFS
);
838 eco
= ERR_PTR(-ENOMEM
);
843 conf
->eoc_cl
.u
.coc_oinfo
= oinfo
;
847 * If echo_object_init() is successful then ownership of oinfo
848 * is transferred to the object.
850 conf
->eoc_oinfo
= &oinfo
;
852 fid
= &info
->eti_fid
;
853 rc
= ostid_to_fid(fid
, (struct ost_id
*)oi
, 0);
859 /* In the function below, .hs_keycmp resolves to
860 * lu_obj_hop_keycmp()
862 /* coverity[overrun-buffer-val] */
863 obj
= cl_object_find(env
, echo_dev2cl(d
), fid
, &conf
->eoc_cl
);
869 eco
= cl2echo_obj(obj
);
870 if (eco
->eo_deleted
) {
871 cl_object_put(env
, obj
);
872 eco
= ERR_PTR(-EAGAIN
);
877 cl_env_put(env
, &refcheck
);
881 static int cl_echo_object_put(struct echo_object
*eco
)
884 struct cl_object
*obj
= echo_obj2cl(eco
);
887 env
= cl_env_get(&refcheck
);
891 /* an external function to kill an object? */
892 if (eco
->eo_deleted
) {
893 struct lu_object_header
*loh
= obj
->co_lu
.lo_header
;
895 LASSERT(&eco
->eo_hdr
== luh2coh(loh
));
896 set_bit(LU_OBJECT_HEARD_BANSHEE
, &loh
->loh_flags
);
899 cl_object_put(env
, obj
);
900 cl_env_put(env
, &refcheck
);
904 static int cl_echo_enqueue0(struct lu_env
*env
, struct echo_object
*eco
,
905 u64 start
, u64 end
, int mode
,
906 __u64
*cookie
, __u32 enqflags
)
910 struct cl_object
*obj
;
911 struct cl_lock_descr
*descr
;
912 struct echo_thread_info
*info
;
915 info
= echo_env_info(env
);
917 lck
= &info
->eti_lock
;
918 obj
= echo_obj2cl(eco
);
920 memset(lck
, 0, sizeof(*lck
));
921 descr
= &lck
->cll_descr
;
922 descr
->cld_obj
= obj
;
923 descr
->cld_start
= cl_index(obj
, start
);
924 descr
->cld_end
= cl_index(obj
, end
);
925 descr
->cld_mode
= mode
== LCK_PW
? CLM_WRITE
: CLM_READ
;
926 descr
->cld_enq_flags
= enqflags
;
929 rc
= cl_lock_request(env
, io
, lck
);
931 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
932 struct echo_lock
*el
;
934 el
= cl2echo_lock(cl_lock_at(lck
, &echo_device_type
));
935 spin_lock(&ec
->ec_lock
);
936 if (list_empty(&el
->el_chain
)) {
937 list_add(&el
->el_chain
, &ec
->ec_locks
);
938 el
->el_cookie
= ++ec
->ec_unique
;
940 atomic_inc(&el
->el_refcount
);
941 *cookie
= el
->el_cookie
;
942 spin_unlock(&ec
->ec_lock
);
947 static int cl_echo_cancel0(struct lu_env
*env
, struct echo_device
*ed
,
950 struct echo_client_obd
*ec
= ed
->ed_ec
;
951 struct echo_lock
*ecl
= NULL
;
952 struct list_head
*el
;
953 int found
= 0, still_used
= 0;
955 spin_lock(&ec
->ec_lock
);
956 list_for_each(el
, &ec
->ec_locks
) {
957 ecl
= list_entry(el
, struct echo_lock
, el_chain
);
958 CDEBUG(D_INFO
, "ecl: %p, cookie: %#llx\n", ecl
, ecl
->el_cookie
);
959 found
= (ecl
->el_cookie
== cookie
);
961 if (atomic_dec_and_test(&ecl
->el_refcount
))
962 list_del_init(&ecl
->el_chain
);
968 spin_unlock(&ec
->ec_lock
);
973 echo_lock_release(env
, ecl
, still_used
);
977 static void echo_commit_callback(const struct lu_env
*env
, struct cl_io
*io
,
978 struct cl_page
*page
)
980 struct echo_thread_info
*info
;
981 struct cl_2queue
*queue
;
983 info
= echo_env_info(env
);
984 LASSERT(io
== &info
->eti_io
);
986 queue
= &info
->eti_queue
;
987 cl_page_list_add(&queue
->c2_qout
, page
);
990 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
991 struct page
**pages
, int npages
, int async
)
994 struct echo_thread_info
*info
;
995 struct cl_object
*obj
= echo_obj2cl(eco
);
996 struct echo_device
*ed
= eco
->eo_dev
;
997 struct cl_2queue
*queue
;
1000 struct lustre_handle lh
= { 0 };
1001 size_t page_size
= cl_page_size(obj
);
1006 LASSERT((offset
& ~PAGE_MASK
) == 0);
1007 LASSERT(ed
->ed_next
);
1008 env
= cl_env_get(&refcheck
);
1010 return PTR_ERR(env
);
1012 info
= echo_env_info(env
);
1014 queue
= &info
->eti_queue
;
1016 cl_2queue_init(queue
);
1018 io
->ci_ignore_layout
= 1;
1019 rc
= cl_io_init(env
, io
, CIT_MISC
, obj
);
1024 rc
= cl_echo_enqueue0(env
, eco
, offset
,
1025 offset
+ npages
* PAGE_SIZE
- 1,
1026 rw
== READ
? LCK_PR
: LCK_PW
, &lh
.cookie
,
1031 for (i
= 0; i
< npages
; i
++) {
1033 clp
= cl_page_find(env
, obj
, cl_index(obj
, offset
),
1034 pages
[i
], CPT_TRANSIENT
);
1039 LASSERT(clp
->cp_type
== CPT_TRANSIENT
);
1041 rc
= cl_page_own(env
, io
, clp
);
1043 LASSERT(clp
->cp_state
== CPS_FREEING
);
1044 cl_page_put(env
, clp
);
1048 * Add a page to the incoming page list of 2-queue.
1050 cl_page_list_add(&queue
->c2_qin
, clp
);
1052 /* drop the reference count for cl_page_find, so that the page
1053 * will be freed in cl_2queue_fini.
1055 cl_page_put(env
, clp
);
1056 cl_page_clip(env
, clp
, 0, page_size
);
1058 offset
+= page_size
;
1062 enum cl_req_type typ
= rw
== READ
? CRT_READ
: CRT_WRITE
;
1064 async
= async
&& (typ
== CRT_WRITE
);
1066 rc
= cl_io_commit_async(env
, io
, &queue
->c2_qin
,
1068 echo_commit_callback
);
1070 rc
= cl_io_submit_sync(env
, io
, typ
, queue
, 0);
1071 CDEBUG(D_INFO
, "echo_client %s write returns %d\n",
1072 async
? "async" : "sync", rc
);
1075 cl_echo_cancel0(env
, ed
, lh
.cookie
);
1077 cl_2queue_discard(env
, io
, queue
);
1078 cl_2queue_disown(env
, io
, queue
);
1079 cl_2queue_fini(env
, queue
);
1080 cl_io_fini(env
, io
);
1082 cl_env_put(env
, &refcheck
);
1086 /** @} echo_exports */
1088 static u64 last_object_id
;
1090 static int echo_create_object(const struct lu_env
*env
, struct echo_device
*ed
,
1093 struct echo_object
*eco
;
1094 struct echo_client_obd
*ec
= ed
->ed_ec
;
1098 if (!(oa
->o_valid
& OBD_MD_FLID
) ||
1099 !(oa
->o_valid
& OBD_MD_FLGROUP
) ||
1100 !fid_seq_is_echo(ostid_seq(&oa
->o_oi
))) {
1101 CERROR("invalid oid " DOSTID
"\n", POSTID(&oa
->o_oi
));
1105 if (!ostid_id(&oa
->o_oi
))
1106 ostid_set_id(&oa
->o_oi
, ++last_object_id
);
1108 rc
= obd_create(env
, ec
->ec_exp
, oa
);
1110 CERROR("Cannot create objects: rc = %d\n", rc
);
1115 oa
->o_valid
|= OBD_MD_FLID
;
1117 eco
= cl_echo_object_find(ed
, &oa
->o_oi
);
1122 cl_echo_object_put(eco
);
1124 CDEBUG(D_INFO
, "oa oid " DOSTID
"\n", POSTID(&oa
->o_oi
));
1128 obd_destroy(env
, ec
->ec_exp
, oa
);
1130 CERROR("create object failed with: rc = %d\n", rc
);
1134 static int echo_get_object(struct echo_object
**ecop
, struct echo_device
*ed
,
1137 struct echo_object
*eco
;
1140 if (!(oa
->o_valid
& OBD_MD_FLID
) || !(oa
->o_valid
& OBD_MD_FLGROUP
) ||
1141 !ostid_id(&oa
->o_oi
)) {
1142 CERROR("invalid oid " DOSTID
"\n", POSTID(&oa
->o_oi
));
1147 eco
= cl_echo_object_find(ed
, &oa
->o_oi
);
1155 static void echo_put_object(struct echo_object
*eco
)
1159 rc
= cl_echo_object_put(eco
);
1161 CERROR("%s: echo client drop an object failed: rc = %d\n",
1162 eco
->eo_dev
->ed_ec
->ec_exp
->exp_obd
->obd_name
, rc
);
1166 echo_client_page_debug_setup(struct page
*page
, int rw
, u64 id
,
1167 u64 offset
, u64 count
)
1174 /* no partial pages on the client */
1175 LASSERT(count
== PAGE_SIZE
);
1179 for (delta
= 0; delta
< PAGE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1180 if (rw
== OBD_BRW_WRITE
) {
1181 stripe_off
= offset
+ delta
;
1184 stripe_off
= 0xdeadbeef00c0ffeeULL
;
1185 stripe_id
= 0xdeadbeef00c0ffeeULL
;
1187 block_debug_setup(addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1188 stripe_off
, stripe_id
);
1194 static int echo_client_page_debug_check(struct page
*page
, u64 id
,
1195 u64 offset
, u64 count
)
1204 /* no partial pages on the client */
1205 LASSERT(count
== PAGE_SIZE
);
1209 for (rc
= delta
= 0; delta
< PAGE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1210 stripe_off
= offset
+ delta
;
1213 rc2
= block_debug_check("test_brw",
1214 addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1215 stripe_off
, stripe_id
);
1217 CERROR("Error in echo object %#llx\n", id
);
1226 static int echo_client_kbrw(struct echo_device
*ed
, int rw
, struct obdo
*oa
,
1227 struct echo_object
*eco
, u64 offset
,
1228 u64 count
, int async
)
1231 struct brw_page
*pga
;
1232 struct brw_page
*pgp
;
1233 struct page
**pages
;
1241 verify
= (ostid_id(&oa
->o_oi
) != ECHO_PERSISTENT_OBJID
&&
1242 (oa
->o_valid
& OBD_MD_FLFLAGS
) != 0 &&
1243 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) != 0);
1245 gfp_mask
= ((ostid_id(&oa
->o_oi
) & 2) == 0) ? GFP_KERNEL
: GFP_HIGHUSER
;
1247 LASSERT(rw
== OBD_BRW_WRITE
|| rw
== OBD_BRW_READ
);
1250 (count
& (~PAGE_MASK
)) != 0)
1253 /* XXX think again with misaligned I/O */
1254 npages
= count
>> PAGE_SHIFT
;
1256 if (rw
== OBD_BRW_WRITE
)
1257 brw_flags
= OBD_BRW_ASYNC
;
1259 pga
= kcalloc(npages
, sizeof(*pga
), GFP_NOFS
);
1263 pages
= kcalloc(npages
, sizeof(*pages
), GFP_NOFS
);
1269 for (i
= 0, pgp
= pga
, off
= offset
;
1271 i
++, pgp
++, off
+= PAGE_SIZE
) {
1272 LASSERT(!pgp
->pg
); /* for cleanup */
1275 pgp
->pg
= alloc_page(gfp_mask
);
1280 pgp
->count
= PAGE_SIZE
;
1282 pgp
->flag
= brw_flags
;
1285 echo_client_page_debug_setup(pgp
->pg
, rw
,
1286 ostid_id(&oa
->o_oi
), off
,
1290 /* brw mode can only be used at client */
1291 LASSERT(ed
->ed_next
);
1292 rc
= cl_echo_object_brw(eco
, rw
, offset
, pages
, npages
, async
);
1295 if (rc
!= 0 || rw
!= OBD_BRW_READ
)
1298 for (i
= 0, pgp
= pga
; i
< npages
; i
++, pgp
++) {
1305 vrc
= echo_client_page_debug_check(pgp
->pg
,
1306 ostid_id(&oa
->o_oi
),
1307 pgp
->off
, pgp
->count
);
1308 if (vrc
!= 0 && rc
== 0)
1311 __free_page(pgp
->pg
);
1318 static int echo_client_prep_commit(const struct lu_env
*env
,
1319 struct obd_export
*exp
, int rw
,
1320 struct obdo
*oa
, struct echo_object
*eco
,
1321 u64 offset
, u64 count
,
1322 u64 batch
, int async
)
1324 struct obd_ioobj ioo
;
1325 struct niobuf_local
*lnb
;
1326 struct niobuf_remote rnb
;
1328 u64 npages
, tot_pages
;
1329 int i
, ret
= 0, brw_flags
= 0;
1331 if (count
<= 0 || (count
& (~PAGE_MASK
)) != 0)
1334 npages
= batch
>> PAGE_SHIFT
;
1335 tot_pages
= count
>> PAGE_SHIFT
;
1337 lnb
= kcalloc(npages
, sizeof(struct niobuf_local
), GFP_NOFS
);
1343 if (rw
== OBD_BRW_WRITE
&& async
)
1344 brw_flags
|= OBD_BRW_ASYNC
;
1346 obdo_to_ioobj(oa
, &ioo
);
1350 for (; tot_pages
> 0; tot_pages
-= npages
) {
1353 if (tot_pages
< npages
)
1356 rnb
.rnb_offset
= off
;
1357 rnb
.rnb_len
= npages
* PAGE_SIZE
;
1358 rnb
.rnb_flags
= brw_flags
;
1360 off
+= npages
* PAGE_SIZE
;
1363 ret
= obd_preprw(env
, rw
, exp
, oa
, 1, &ioo
, &rnb
, &lpages
, lnb
);
1367 for (i
= 0; i
< lpages
; i
++) {
1368 struct page
*page
= lnb
[i
].lnb_page
;
1370 /* read past eof? */
1371 if (!page
&& lnb
[i
].lnb_rc
== 0)
1375 lnb
[i
].lnb_flags
|= OBD_BRW_ASYNC
;
1377 if (ostid_id(&oa
->o_oi
) == ECHO_PERSISTENT_OBJID
||
1378 (oa
->o_valid
& OBD_MD_FLFLAGS
) == 0 ||
1379 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) == 0)
1382 if (rw
== OBD_BRW_WRITE
)
1383 echo_client_page_debug_setup(page
, rw
,
1384 ostid_id(&oa
->o_oi
),
1385 lnb
[i
].lnb_file_offset
,
1388 echo_client_page_debug_check(page
,
1389 ostid_id(&oa
->o_oi
),
1390 lnb
[i
].lnb_file_offset
,
1394 ret
= obd_commitrw(env
, rw
, exp
, oa
, 1, &ioo
, &rnb
, npages
, lnb
,
1399 /* Reuse env context. */
1400 lu_context_exit((struct lu_context
*)&env
->le_ctx
);
1401 lu_context_enter((struct lu_context
*)&env
->le_ctx
);
1409 static int echo_client_brw_ioctl(const struct lu_env
*env
, int rw
,
1410 struct obd_export
*exp
,
1411 struct obd_ioctl_data
*data
)
1413 struct obd_device
*obd
= class_exp2obd(exp
);
1414 struct echo_device
*ed
= obd2echo_dev(obd
);
1415 struct echo_client_obd
*ec
= ed
->ed_ec
;
1416 struct obdo
*oa
= &data
->ioc_obdo1
;
1417 struct echo_object
*eco
;
1422 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
1424 rc
= echo_get_object(&eco
, ed
, oa
);
1428 oa
->o_valid
&= ~OBD_MD_FLHANDLE
;
1430 /* OFD/obdfilter works only via prep/commit */
1431 test_mode
= (long)data
->ioc_pbuf1
;
1435 if (!ed
->ed_next
&& test_mode
!= 3) {
1437 data
->ioc_plen1
= data
->ioc_count
;
1440 /* Truncate batch size to maximum */
1441 if (data
->ioc_plen1
> PTLRPC_MAX_BRW_SIZE
)
1442 data
->ioc_plen1
= PTLRPC_MAX_BRW_SIZE
;
1444 switch (test_mode
) {
1448 rc
= echo_client_kbrw(ed
, rw
, oa
, eco
, data
->ioc_offset
,
1449 data
->ioc_count
, async
);
1452 rc
= echo_client_prep_commit(env
, ec
->ec_exp
, rw
, oa
, eco
,
1453 data
->ioc_offset
, data
->ioc_count
,
1454 data
->ioc_plen1
, async
);
1459 echo_put_object(eco
);
1464 echo_client_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
1465 void *karg
, void __user
*uarg
)
1467 struct obd_device
*obd
= exp
->exp_obd
;
1468 struct echo_device
*ed
= obd2echo_dev(obd
);
1469 struct echo_client_obd
*ec
= ed
->ed_ec
;
1470 struct echo_object
*eco
;
1471 struct obd_ioctl_data
*data
= karg
;
1475 int rw
= OBD_BRW_READ
;
1478 oa
= &data
->ioc_obdo1
;
1479 if (!(oa
->o_valid
& OBD_MD_FLGROUP
)) {
1480 oa
->o_valid
|= OBD_MD_FLGROUP
;
1481 ostid_set_seq_echo(&oa
->o_oi
);
1484 /* This FID is unpacked just for validation at this point */
1485 rc
= ostid_to_fid(&fid
, &oa
->o_oi
, 0);
1489 env
= kzalloc(sizeof(*env
), GFP_NOFS
);
1493 rc
= lu_env_init(env
, LCT_DT_THREAD
);
1500 case OBD_IOC_CREATE
: /* may create echo object */
1501 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1506 rc
= echo_create_object(env
, ed
, oa
);
1509 case OBD_IOC_DESTROY
:
1510 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1515 rc
= echo_get_object(&eco
, ed
, oa
);
1517 rc
= obd_destroy(env
, ec
->ec_exp
, oa
);
1519 eco
->eo_deleted
= 1;
1520 echo_put_object(eco
);
1524 case OBD_IOC_GETATTR
:
1525 rc
= echo_get_object(&eco
, ed
, oa
);
1527 rc
= obd_getattr(env
, ec
->ec_exp
, oa
);
1528 echo_put_object(eco
);
1532 case OBD_IOC_SETATTR
:
1533 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1538 rc
= echo_get_object(&eco
, ed
, oa
);
1540 rc
= obd_setattr(env
, ec
->ec_exp
, oa
);
1541 echo_put_object(eco
);
1545 case OBD_IOC_BRW_WRITE
:
1546 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1553 case OBD_IOC_BRW_READ
:
1554 rc
= echo_client_brw_ioctl(env
, rw
, exp
, data
);
1558 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd
);
1570 static int echo_client_setup(const struct lu_env
*env
,
1571 struct obd_device
*obddev
, struct lustre_cfg
*lcfg
)
1573 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
1574 struct obd_device
*tgt
;
1575 struct obd_uuid echo_uuid
= { "ECHO_UUID" };
1576 struct obd_connect_data
*ocd
= NULL
;
1579 if (lcfg
->lcfg_bufcount
< 2 || LUSTRE_CFG_BUFLEN(lcfg
, 1) < 1) {
1580 CERROR("requires a TARGET OBD name\n");
1584 tgt
= class_name2obd(lustre_cfg_string(lcfg
, 1));
1585 if (!tgt
|| !tgt
->obd_attached
|| !tgt
->obd_set_up
) {
1586 CERROR("device not attached or not set up (%s)\n",
1587 lustre_cfg_string(lcfg
, 1));
1591 spin_lock_init(&ec
->ec_lock
);
1592 INIT_LIST_HEAD(&ec
->ec_objects
);
1593 INIT_LIST_HEAD(&ec
->ec_locks
);
1596 ocd
= kzalloc(sizeof(*ocd
), GFP_NOFS
);
1600 ocd
->ocd_connect_flags
= OBD_CONNECT_VERSION
| OBD_CONNECT_REQPORTAL
|
1601 OBD_CONNECT_BRW_SIZE
|
1602 OBD_CONNECT_GRANT
| OBD_CONNECT_FULL20
|
1603 OBD_CONNECT_64BITHASH
| OBD_CONNECT_LVB_TYPE
|
1605 ocd
->ocd_brw_size
= DT_MAX_BRW_SIZE
;
1606 ocd
->ocd_version
= LUSTRE_VERSION_CODE
;
1607 ocd
->ocd_group
= FID_SEQ_ECHO
;
1609 rc
= obd_connect(env
, &ec
->ec_exp
, tgt
, &echo_uuid
, ocd
, NULL
);
1614 CERROR("fail to connect to device %s\n",
1615 lustre_cfg_string(lcfg
, 1));
1622 static int echo_client_cleanup(struct obd_device
*obddev
)
1624 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
1627 if (!list_empty(&obddev
->obd_exports
)) {
1628 CERROR("still has clients!\n");
1632 LASSERT(atomic_read(&ec
->ec_exp
->exp_refcount
) > 0);
1633 rc
= obd_disconnect(ec
->ec_exp
);
1635 CERROR("fail to disconnect device: %d\n", rc
);
1640 static int echo_client_connect(const struct lu_env
*env
,
1641 struct obd_export
**exp
,
1642 struct obd_device
*src
, struct obd_uuid
*cluuid
,
1643 struct obd_connect_data
*data
, void *localdata
)
1646 struct lustre_handle conn
= { 0 };
1648 rc
= class_connect(&conn
, src
, cluuid
);
1650 *exp
= class_conn2export(&conn
);
1655 static int echo_client_disconnect(struct obd_export
*exp
)
1664 rc
= class_disconnect(exp
);
1670 static struct obd_ops echo_client_obd_ops
= {
1671 .owner
= THIS_MODULE
,
1672 .iocontrol
= echo_client_iocontrol
,
1673 .connect
= echo_client_connect
,
1674 .disconnect
= echo_client_disconnect
1677 static int echo_client_init(void)
1681 rc
= lu_kmem_init(echo_caches
);
1683 rc
= class_register_type(&echo_client_obd_ops
, NULL
,
1684 LUSTRE_ECHO_CLIENT_NAME
,
1687 lu_kmem_fini(echo_caches
);
1692 static void echo_client_exit(void)
1694 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME
);
1695 lu_kmem_fini(echo_caches
);
1698 static int __init
obdecho_init(void)
1700 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1702 LASSERT(PAGE_SIZE
% OBD_ECHO_BLOCK_SIZE
== 0);
1704 return echo_client_init();
1707 static void /*__exit*/ obdecho_exit(void)
1712 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1713 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1714 MODULE_VERSION(LUSTRE_VERSION_STRING
);
1715 MODULE_LICENSE("GPL");
1717 module_init(obdecho_init
);
1718 module_exit(obdecho_exit
);
1720 /** @} echo_client */