4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/fld/fld_request.c
38 * FLD (Fids Location Database)
40 * Author: Yury Umanets <umka@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_FLD
45 # include <linux/libcfs/libcfs.h>
46 # include <linux/module.h>
47 # include <asm/div64.h>
50 #include <obd_class.h>
51 #include <lustre_ver.h>
52 #include <obd_support.h>
53 #include <lprocfs_status.h>
55 #include <dt_object.h>
56 #include <md_object.h>
57 #include <lustre_req_layout.h>
58 #include <lustre_fld.h>
59 #include <lustre_mdc.h>
60 #include "fld_internal.h"
62 /* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
63 * It should be common thing. The same about mdc RPC lock */
64 static int fld_req_avail(struct client_obd
*cli
, struct mdc_cache_waiter
*mcw
)
68 client_obd_list_lock(&cli
->cl_loi_list_lock
);
69 rc
= list_empty(&mcw
->mcw_entry
);
70 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
74 static void fld_enter_request(struct client_obd
*cli
)
76 struct mdc_cache_waiter mcw
;
77 struct l_wait_info lwi
= { 0 };
79 client_obd_list_lock(&cli
->cl_loi_list_lock
);
80 if (cli
->cl_r_in_flight
>= cli
->cl_max_rpcs_in_flight
) {
81 list_add_tail(&mcw
.mcw_entry
, &cli
->cl_cache_waiters
);
82 init_waitqueue_head(&mcw
.mcw_waitq
);
83 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
84 l_wait_event(mcw
.mcw_waitq
, fld_req_avail(cli
, &mcw
), &lwi
);
86 cli
->cl_r_in_flight
++;
87 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
91 static void fld_exit_request(struct client_obd
*cli
)
93 struct list_head
*l
, *tmp
;
94 struct mdc_cache_waiter
*mcw
;
96 client_obd_list_lock(&cli
->cl_loi_list_lock
);
97 cli
->cl_r_in_flight
--;
98 list_for_each_safe(l
, tmp
, &cli
->cl_cache_waiters
) {
100 if (cli
->cl_r_in_flight
>= cli
->cl_max_rpcs_in_flight
) {
101 /* No free request slots anymore */
105 mcw
= list_entry(l
, struct mdc_cache_waiter
, mcw_entry
);
106 list_del_init(&mcw
->mcw_entry
);
107 cli
->cl_r_in_flight
++;
108 wake_up(&mcw
->mcw_waitq
);
110 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
113 static int fld_rrb_hash(struct lu_client_fld
*fld
,
116 LASSERT(fld
->lcf_count
> 0);
117 return do_div(seq
, fld
->lcf_count
);
120 static struct lu_fld_target
*
121 fld_rrb_scan(struct lu_client_fld
*fld
, seqno_t seq
)
123 struct lu_fld_target
*target
;
126 /* Because almost all of special sequence located in MDT0,
127 * it should go to index 0 directly, instead of calculating
128 * hash again, and also if other MDTs is not being connected,
129 * the fld lookup requests(for seq on MDT0) should not be
130 * blocked because of other MDTs */
131 if (fid_seq_is_norm(seq
))
132 hash
= fld_rrb_hash(fld
, seq
);
136 list_for_each_entry(target
, &fld
->lcf_targets
, ft_chain
) {
137 if (target
->ft_idx
== hash
)
141 CERROR("%s: Can't find target by hash %d (seq "LPX64
"). "
142 "Targets (%d):\n", fld
->lcf_name
, hash
, seq
,
145 list_for_each_entry(target
, &fld
->lcf_targets
, ft_chain
) {
146 const char *srv_name
= target
->ft_srv
!= NULL
?
147 target
->ft_srv
->lsf_name
: "<null>";
148 const char *exp_name
= target
->ft_exp
!= NULL
?
149 (char *)target
->ft_exp
->exp_obd
->obd_uuid
.uuid
:
152 CERROR(" exp: 0x%p (%s), srv: 0x%p (%s), idx: "LPU64
"\n",
153 target
->ft_exp
, exp_name
, target
->ft_srv
,
154 srv_name
, target
->ft_idx
);
158 * If target is not found, there is logical error anyway, so here is
159 * LBUG() to catch this situation.
165 struct lu_fld_hash fld_hash
[] = {
168 .fh_hash_func
= fld_rrb_hash
,
169 .fh_scan_func
= fld_rrb_scan
176 static struct lu_fld_target
*
177 fld_client_get_target(struct lu_client_fld
*fld
, seqno_t seq
)
179 struct lu_fld_target
*target
;
181 LASSERT(fld
->lcf_hash
!= NULL
);
183 spin_lock(&fld
->lcf_lock
);
184 target
= fld
->lcf_hash
->fh_scan_func(fld
, seq
);
185 spin_unlock(&fld
->lcf_lock
);
187 if (target
!= NULL
) {
188 CDEBUG(D_INFO
, "%s: Found target (idx "LPU64
189 ") by seq "LPX64
"\n", fld
->lcf_name
,
190 target
->ft_idx
, seq
);
197 * Add export to FLD. This is usually done by CMM and LMV as they are main users
200 int fld_client_add_target(struct lu_client_fld
*fld
,
201 struct lu_fld_target
*tar
)
204 struct lu_fld_target
*target
, *tmp
;
206 LASSERT(tar
!= NULL
);
207 name
= fld_target_name(tar
);
208 LASSERT(name
!= NULL
);
209 LASSERT(tar
->ft_srv
!= NULL
|| tar
->ft_exp
!= NULL
);
211 if (fld
->lcf_flags
!= LUSTRE_FLD_INIT
) {
212 CERROR("%s: Attempt to add target %s (idx "LPU64
") "
213 "on fly - skip it\n", fld
->lcf_name
, name
,
217 CDEBUG(D_INFO
, "%s: Adding target %s (idx "
218 LPU64
")\n", fld
->lcf_name
, name
, tar
->ft_idx
);
221 OBD_ALLOC_PTR(target
);
225 spin_lock(&fld
->lcf_lock
);
226 list_for_each_entry(tmp
, &fld
->lcf_targets
, ft_chain
) {
227 if (tmp
->ft_idx
== tar
->ft_idx
) {
228 spin_unlock(&fld
->lcf_lock
);
229 OBD_FREE_PTR(target
);
230 CERROR("Target %s exists in FLD and known as %s:#"LPU64
"\n",
231 name
, fld_target_name(tmp
), tmp
->ft_idx
);
236 target
->ft_exp
= tar
->ft_exp
;
237 if (target
->ft_exp
!= NULL
)
238 class_export_get(target
->ft_exp
);
239 target
->ft_srv
= tar
->ft_srv
;
240 target
->ft_idx
= tar
->ft_idx
;
242 list_add_tail(&target
->ft_chain
,
246 spin_unlock(&fld
->lcf_lock
);
250 EXPORT_SYMBOL(fld_client_add_target
);
252 /* Remove export from FLD */
253 int fld_client_del_target(struct lu_client_fld
*fld
, __u64 idx
)
255 struct lu_fld_target
*target
, *tmp
;
257 spin_lock(&fld
->lcf_lock
);
258 list_for_each_entry_safe(target
, tmp
,
259 &fld
->lcf_targets
, ft_chain
) {
260 if (target
->ft_idx
== idx
) {
262 list_del(&target
->ft_chain
);
263 spin_unlock(&fld
->lcf_lock
);
265 if (target
->ft_exp
!= NULL
)
266 class_export_put(target
->ft_exp
);
268 OBD_FREE_PTR(target
);
272 spin_unlock(&fld
->lcf_lock
);
275 EXPORT_SYMBOL(fld_client_del_target
);
278 struct proc_dir_entry
*fld_type_proc_dir
= NULL
;
280 static int fld_client_proc_init(struct lu_client_fld
*fld
)
284 fld
->lcf_proc_dir
= lprocfs_register(fld
->lcf_name
,
288 if (IS_ERR(fld
->lcf_proc_dir
)) {
289 CERROR("%s: LProcFS failed in fld-init\n",
291 rc
= PTR_ERR(fld
->lcf_proc_dir
);
295 rc
= lprocfs_add_vars(fld
->lcf_proc_dir
,
296 fld_client_proc_list
, fld
);
298 CERROR("%s: Can't init FLD proc, rc %d\n",
300 GOTO(out_cleanup
, rc
);
306 fld_client_proc_fini(fld
);
310 void fld_client_proc_fini(struct lu_client_fld
*fld
)
312 if (fld
->lcf_proc_dir
) {
313 if (!IS_ERR(fld
->lcf_proc_dir
))
314 lprocfs_remove(&fld
->lcf_proc_dir
);
315 fld
->lcf_proc_dir
= NULL
;
319 static int fld_client_proc_init(struct lu_client_fld
*fld
)
324 void fld_client_proc_fini(struct lu_client_fld
*fld
)
330 EXPORT_SYMBOL(fld_client_proc_fini
);
332 static inline int hash_is_sane(int hash
)
334 return (hash
>= 0 && hash
< ARRAY_SIZE(fld_hash
));
337 int fld_client_init(struct lu_client_fld
*fld
,
338 const char *prefix
, int hash
)
340 int cache_size
, cache_threshold
;
343 LASSERT(fld
!= NULL
);
345 snprintf(fld
->lcf_name
, sizeof(fld
->lcf_name
),
348 if (!hash_is_sane(hash
)) {
349 CERROR("%s: Wrong hash function %#x\n",
350 fld
->lcf_name
, hash
);
355 spin_lock_init(&fld
->lcf_lock
);
356 fld
->lcf_hash
= &fld_hash
[hash
];
357 fld
->lcf_flags
= LUSTRE_FLD_INIT
;
358 INIT_LIST_HEAD(&fld
->lcf_targets
);
360 cache_size
= FLD_CLIENT_CACHE_SIZE
/
361 sizeof(struct fld_cache_entry
);
363 cache_threshold
= cache_size
*
364 FLD_CLIENT_CACHE_THRESHOLD
/ 100;
366 fld
->lcf_cache
= fld_cache_init(fld
->lcf_name
,
367 cache_size
, cache_threshold
);
368 if (IS_ERR(fld
->lcf_cache
)) {
369 rc
= PTR_ERR(fld
->lcf_cache
);
370 fld
->lcf_cache
= NULL
;
374 rc
= fld_client_proc_init(fld
);
379 fld_client_fini(fld
);
381 CDEBUG(D_INFO
, "%s: Using \"%s\" hash\n",
382 fld
->lcf_name
, fld
->lcf_hash
->fh_name
);
385 EXPORT_SYMBOL(fld_client_init
);
387 void fld_client_fini(struct lu_client_fld
*fld
)
389 struct lu_fld_target
*target
, *tmp
;
391 spin_lock(&fld
->lcf_lock
);
392 list_for_each_entry_safe(target
, tmp
,
393 &fld
->lcf_targets
, ft_chain
) {
395 list_del(&target
->ft_chain
);
396 if (target
->ft_exp
!= NULL
)
397 class_export_put(target
->ft_exp
);
398 OBD_FREE_PTR(target
);
400 spin_unlock(&fld
->lcf_lock
);
402 if (fld
->lcf_cache
!= NULL
) {
403 if (!IS_ERR(fld
->lcf_cache
))
404 fld_cache_fini(fld
->lcf_cache
);
405 fld
->lcf_cache
= NULL
;
408 EXPORT_SYMBOL(fld_client_fini
);
410 int fld_client_rpc(struct obd_export
*exp
,
411 struct lu_seq_range
*range
, __u32 fld_op
)
413 struct ptlrpc_request
*req
;
414 struct lu_seq_range
*prange
;
417 struct obd_import
*imp
;
419 LASSERT(exp
!= NULL
);
421 imp
= class_exp2cliimp(exp
);
422 req
= ptlrpc_request_alloc_pack(imp
, &RQF_FLD_QUERY
, LUSTRE_MDS_VERSION
,
427 op
= req_capsule_client_get(&req
->rq_pill
, &RMF_FLD_OPC
);
430 prange
= req_capsule_client_get(&req
->rq_pill
, &RMF_FLD_MDFLD
);
433 ptlrpc_request_set_replen(req
);
434 req
->rq_request_portal
= FLD_REQUEST_PORTAL
;
435 ptlrpc_at_set_req_timeout(req
);
437 if (fld_op
== FLD_LOOKUP
&&
438 imp
->imp_connect_flags_orig
& OBD_CONNECT_MDS_MDS
)
439 req
->rq_allow_replay
= 1;
441 if (fld_op
!= FLD_LOOKUP
)
442 mdc_get_rpc_lock(exp
->exp_obd
->u
.cli
.cl_rpc_lock
, NULL
);
443 fld_enter_request(&exp
->exp_obd
->u
.cli
);
444 rc
= ptlrpc_queue_wait(req
);
445 fld_exit_request(&exp
->exp_obd
->u
.cli
);
446 if (fld_op
!= FLD_LOOKUP
)
447 mdc_put_rpc_lock(exp
->exp_obd
->u
.cli
.cl_rpc_lock
, NULL
);
451 prange
= req_capsule_server_get(&req
->rq_pill
, &RMF_FLD_MDFLD
);
453 GOTO(out_req
, rc
= -EFAULT
);
456 ptlrpc_req_finished(req
);
460 int fld_client_lookup(struct lu_client_fld
*fld
, seqno_t seq
, mdsno_t
*mds
,
461 __u32 flags
, const struct lu_env
*env
)
463 struct lu_seq_range res
= { 0 };
464 struct lu_fld_target
*target
;
467 fld
->lcf_flags
|= LUSTRE_FLD_RUN
;
469 rc
= fld_cache_lookup(fld
->lcf_cache
, seq
, &res
);
471 *mds
= res
.lsr_index
;
475 /* Can not find it in the cache */
476 target
= fld_client_get_target(fld
, seq
);
477 LASSERT(target
!= NULL
);
479 CDEBUG(D_INFO
, "%s: Lookup fld entry (seq: "LPX64
") on "
480 "target %s (idx "LPU64
")\n", fld
->lcf_name
, seq
,
481 fld_target_name(target
), target
->ft_idx
);
484 fld_range_set_type(&res
, flags
);
485 rc
= fld_client_rpc(target
->ft_exp
, &res
, FLD_LOOKUP
);
488 *mds
= res
.lsr_index
;
490 fld_cache_insert(fld
->lcf_cache
, &res
);
494 EXPORT_SYMBOL(fld_client_lookup
);
496 void fld_client_flush(struct lu_client_fld
*fld
)
498 fld_cache_flush(fld
->lcf_cache
);
500 EXPORT_SYMBOL(fld_client_flush
);
502 static int __init
fld_mod_init(void)
504 fld_type_proc_dir
= lprocfs_register(LUSTRE_FLD_NAME
,
507 return PTR_ERR_OR_ZERO(fld_type_proc_dir
);
510 static void __exit
fld_mod_exit(void)
512 if (fld_type_proc_dir
!= NULL
&& !IS_ERR(fld_type_proc_dir
)) {
513 lprocfs_remove(&fld_type_proc_dir
);
514 fld_type_proc_dir
= NULL
;
518 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
519 MODULE_DESCRIPTION("Lustre FLD");
520 MODULE_LICENSE("GPL");
522 module_init(fld_mod_init
)
523 module_exit(fld_mod_exit
)