]> git.proxmox.com Git - mirror_ubuntu-eoan-kernel.git/blame - drivers/staging/lustre/lustre/obdclass/lu_object.c
Merge tag 'jfs-3.12' of git://github.com/kleikamp/linux-shaggy
[mirror_ubuntu-eoan-kernel.git] / drivers / staging / lustre / lustre / obdclass / lu_object.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/lu_object.c
37 *
38 * Lustre Object.
39 * These are the only exported functions, they provide some generic
40 * infrastructure for managing object devices
41 *
42 * Author: Nikita Danilov <nikita.danilov@sun.com>
43 */
44
45#define DEBUG_SUBSYSTEM S_CLASS
46
47#include <linux/libcfs/libcfs.h>
48
49# include <linux/module.h>
50
51/* hash_long() */
52#include <linux/libcfs/libcfs_hash.h>
53#include <obd_class.h>
54#include <obd_support.h>
55#include <lustre_disk.h>
56#include <lustre_fid.h>
57#include <lu_object.h>
58#include <lu_ref.h>
59#include <linux/list.h>
60
61static void lu_object_free(const struct lu_env *env, struct lu_object *o);
62
63/**
64 * Decrease reference counter on object. If last reference is freed, return
65 * object to the cache, unless lu_object_is_dying(o) holds. In the latter
66 * case, free object immediately.
67 */
68void lu_object_put(const struct lu_env *env, struct lu_object *o)
69{
70 struct lu_site_bkt_data *bkt;
71 struct lu_object_header *top;
72 struct lu_site *site;
73 struct lu_object *orig;
74 cfs_hash_bd_t bd;
75 const struct lu_fid *fid;
76
77 top = o->lo_header;
78 site = o->lo_dev->ld_site;
79 orig = o;
80
81 /*
82 * till we have full fids-on-OST implemented anonymous objects
83 * are possible in OSP. such an object isn't listed in the site
84 * so we should not remove it from the site.
85 */
86 fid = lu_object_fid(o);
87 if (fid_is_zero(fid)) {
88 LASSERT(top->loh_hash.next == NULL
89 && top->loh_hash.pprev == NULL);
90 LASSERT(list_empty(&top->loh_lru));
91 if (!atomic_dec_and_test(&top->loh_ref))
92 return;
93 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
94 if (o->lo_ops->loo_object_release != NULL)
95 o->lo_ops->loo_object_release(env, o);
96 }
97 lu_object_free(env, orig);
98 return;
99 }
100
101 cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
102 bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
103
104 if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
105 if (lu_object_is_dying(top)) {
106
107 /*
108 * somebody may be waiting for this, currently only
109 * used for cl_object, see cl_object_put_last().
110 */
111 wake_up_all(&bkt->lsb_marche_funebre);
112 }
113 return;
114 }
115
116 LASSERT(bkt->lsb_busy > 0);
117 bkt->lsb_busy--;
118 /*
119 * When last reference is released, iterate over object
120 * layers, and notify them that object is no longer busy.
121 */
122 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
123 if (o->lo_ops->loo_object_release != NULL)
124 o->lo_ops->loo_object_release(env, o);
125 }
126
127 if (!lu_object_is_dying(top)) {
128 LASSERT(list_empty(&top->loh_lru));
129 list_add_tail(&top->loh_lru, &bkt->lsb_lru);
130 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
131 return;
132 }
133
134 /*
135 * If object is dying (will not be cached), removed it
136 * from hash table and LRU.
137 *
138 * This is done with hash table and LRU lists locked. As the only
139 * way to acquire first reference to previously unreferenced
140 * object is through hash-table lookup (lu_object_find()),
141 * or LRU scanning (lu_site_purge()), that are done under hash-table
142 * and LRU lock, no race with concurrent object lookup is possible
143 * and we can safely destroy object below.
144 */
145 if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
146 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
147 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
148 /*
149 * Object was already removed from hash and lru above, can
150 * kill it.
151 */
152 lu_object_free(env, orig);
153}
154EXPORT_SYMBOL(lu_object_put);
155
156/**
157 * Put object and don't keep in cache. This is temporary solution for
158 * multi-site objects when its layering is not constant.
159 */
160void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
161{
162 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
163 return lu_object_put(env, o);
164}
165EXPORT_SYMBOL(lu_object_put_nocache);
166
167/**
168 * Kill the object and take it out of LRU cache.
169 * Currently used by client code for layout change.
170 */
171void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
172{
173 struct lu_object_header *top;
174
175 top = o->lo_header;
176 set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
177 if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
178 cfs_hash_t *obj_hash = o->lo_dev->ld_site->ls_obj_hash;
179 cfs_hash_bd_t bd;
180
181 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
182 list_del_init(&top->loh_lru);
183 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
184 cfs_hash_bd_unlock(obj_hash, &bd, 1);
185 }
186}
187EXPORT_SYMBOL(lu_object_unhash);
188
189/**
190 * Allocate new object.
191 *
192 * This follows object creation protocol, described in the comment within
193 * struct lu_device_operations definition.
194 */
195static struct lu_object *lu_object_alloc(const struct lu_env *env,
196 struct lu_device *dev,
197 const struct lu_fid *f,
198 const struct lu_object_conf *conf)
199{
200 struct lu_object *scan;
201 struct lu_object *top;
202 struct list_head *layers;
203 int clean;
204 int result;
d7e09d03
PT
205
206 /*
207 * Create top-level object slice. This will also create
208 * lu_object_header.
209 */
210 top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
211 if (top == NULL)
0a3bdb00 212 return ERR_PTR(-ENOMEM);
d7e09d03 213 if (IS_ERR(top))
0a3bdb00 214 return top;
d7e09d03
PT
215 /*
216 * This is the only place where object fid is assigned. It's constant
217 * after this point.
218 */
219 top->lo_header->loh_fid = *f;
220 layers = &top->lo_header->loh_layers;
221 do {
222 /*
223 * Call ->loo_object_init() repeatedly, until no more new
224 * object slices are created.
225 */
226 clean = 1;
227 list_for_each_entry(scan, layers, lo_linkage) {
228 if (scan->lo_flags & LU_OBJECT_ALLOCATED)
229 continue;
230 clean = 0;
231 scan->lo_header = top->lo_header;
232 result = scan->lo_ops->loo_object_init(env, scan, conf);
233 if (result != 0) {
234 lu_object_free(env, top);
0a3bdb00 235 return ERR_PTR(result);
d7e09d03
PT
236 }
237 scan->lo_flags |= LU_OBJECT_ALLOCATED;
238 }
239 } while (!clean);
240
241 list_for_each_entry_reverse(scan, layers, lo_linkage) {
242 if (scan->lo_ops->loo_object_start != NULL) {
243 result = scan->lo_ops->loo_object_start(env, scan);
244 if (result != 0) {
245 lu_object_free(env, top);
0a3bdb00 246 return ERR_PTR(result);
d7e09d03
PT
247 }
248 }
249 }
250
251 lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
0a3bdb00 252 return top;
d7e09d03
PT
253}
254
255/**
256 * Free an object.
257 */
258static void lu_object_free(const struct lu_env *env, struct lu_object *o)
259{
260 struct lu_site_bkt_data *bkt;
261 struct lu_site *site;
262 struct lu_object *scan;
263 struct list_head *layers;
264 struct list_head splice;
265
266 site = o->lo_dev->ld_site;
267 layers = &o->lo_header->loh_layers;
268 bkt = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
269 /*
270 * First call ->loo_object_delete() method to release all resources.
271 */
272 list_for_each_entry_reverse(scan, layers, lo_linkage) {
273 if (scan->lo_ops->loo_object_delete != NULL)
274 scan->lo_ops->loo_object_delete(env, scan);
275 }
276
277 /*
278 * Then, splice object layers into stand-alone list, and call
279 * ->loo_object_free() on all layers to free memory. Splice is
280 * necessary, because lu_object_header is freed together with the
281 * top-level slice.
282 */
283 INIT_LIST_HEAD(&splice);
284 list_splice_init(layers, &splice);
285 while (!list_empty(&splice)) {
286 /*
287 * Free layers in bottom-to-top order, so that object header
288 * lives as long as possible and ->loo_object_free() methods
289 * can look at its contents.
290 */
291 o = container_of0(splice.prev, struct lu_object, lo_linkage);
292 list_del_init(&o->lo_linkage);
293 LASSERT(o->lo_ops->loo_object_free != NULL);
294 o->lo_ops->loo_object_free(env, o);
295 }
296
297 if (waitqueue_active(&bkt->lsb_marche_funebre))
298 wake_up_all(&bkt->lsb_marche_funebre);
299}
300
301/**
302 * Free \a nr objects from the cold end of the site LRU list.
303 */
304int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
305{
306 struct lu_object_header *h;
307 struct lu_object_header *temp;
308 struct lu_site_bkt_data *bkt;
309 cfs_hash_bd_t bd;
310 cfs_hash_bd_t bd2;
311 struct list_head dispose;
312 int did_sth;
313 int start;
314 int count;
315 int bnr;
316 int i;
317
318 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
0a3bdb00 319 return 0;
d7e09d03
PT
320
321 INIT_LIST_HEAD(&dispose);
322 /*
323 * Under LRU list lock, scan LRU list and move unreferenced objects to
324 * the dispose list, removing them from LRU and hash table.
325 */
326 start = s->ls_purge_start;
327 bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
328 again:
329 did_sth = 0;
330 cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
331 if (i < start)
332 continue;
333 count = bnr;
334 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
335 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
336
337 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
338 LASSERT(atomic_read(&h->loh_ref) == 0);
339
340 cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
341 LASSERT(bd.bd_bucket == bd2.bd_bucket);
342
343 cfs_hash_bd_del_locked(s->ls_obj_hash,
344 &bd2, &h->loh_hash);
345 list_move(&h->loh_lru, &dispose);
346 if (did_sth == 0)
347 did_sth = 1;
348
349 if (nr != ~0 && --nr == 0)
350 break;
351
352 if (count > 0 && --count == 0)
353 break;
354
355 }
356 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
357 cond_resched();
358 /*
359 * Free everything on the dispose list. This is safe against
360 * races due to the reasons described in lu_object_put().
361 */
362 while (!list_empty(&dispose)) {
363 h = container_of0(dispose.next,
364 struct lu_object_header, loh_lru);
365 list_del_init(&h->loh_lru);
366 lu_object_free(env, lu_object_top(h));
367 lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
368 }
369
370 if (nr == 0)
371 break;
372 }
373
374 if (nr != 0 && did_sth && start != 0) {
375 start = 0; /* restart from the first bucket */
376 goto again;
377 }
378 /* race on s->ls_purge_start, but nobody cares */
379 s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
380
381 return nr;
382}
383EXPORT_SYMBOL(lu_site_purge);
384
385/*
386 * Object printing.
387 *
388 * Code below has to jump through certain loops to output object description
389 * into libcfs_debug_msg-based log. The problem is that lu_object_print()
390 * composes object description from strings that are parts of _lines_ of
391 * output (i.e., strings that are not terminated by newline). This doesn't fit
392 * very well into libcfs_debug_msg() interface that assumes that each message
393 * supplied to it is a self-contained output line.
394 *
395 * To work around this, strings are collected in a temporary buffer
396 * (implemented as a value of lu_cdebug_key key), until terminating newline
397 * character is detected.
398 *
399 */
400
401enum {
402 /**
403 * Maximal line size.
404 *
405 * XXX overflow is not handled correctly.
406 */
407 LU_CDEBUG_LINE = 512
408};
409
410struct lu_cdebug_data {
411 /**
412 * Temporary buffer.
413 */
414 char lck_area[LU_CDEBUG_LINE];
415};
416
417/* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */
418LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
419
420/**
421 * Key, holding temporary buffer. This key is registered very early by
422 * lu_global_init().
423 */
424struct lu_context_key lu_global_key = {
425 .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
426 LCT_MG_THREAD | LCT_CL_THREAD,
427 .lct_init = lu_global_key_init,
428 .lct_fini = lu_global_key_fini
429};
430
431/**
432 * Printer function emitting messages through libcfs_debug_msg().
433 */
434int lu_cdebug_printer(const struct lu_env *env,
435 void *cookie, const char *format, ...)
436{
437 struct libcfs_debug_msg_data *msgdata = cookie;
438 struct lu_cdebug_data *key;
439 int used;
440 int complete;
441 va_list args;
442
443 va_start(args, format);
444
445 key = lu_context_key_get(&env->le_ctx, &lu_global_key);
446 LASSERT(key != NULL);
447
448 used = strlen(key->lck_area);
449 complete = format[strlen(format) - 1] == '\n';
450 /*
451 * Append new chunk to the buffer.
452 */
453 vsnprintf(key->lck_area + used,
454 ARRAY_SIZE(key->lck_area) - used, format, args);
455 if (complete) {
456 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
457 libcfs_debug_msg(msgdata, "%s", key->lck_area);
458 key->lck_area[0] = 0;
459 }
460 va_end(args);
461 return 0;
462}
463EXPORT_SYMBOL(lu_cdebug_printer);
464
465/**
466 * Print object header.
467 */
468void lu_object_header_print(const struct lu_env *env, void *cookie,
469 lu_printer_t printer,
470 const struct lu_object_header *hdr)
471{
472 (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
473 hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
474 PFID(&hdr->loh_fid),
475 hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
476 list_empty((struct list_head *)&hdr->loh_lru) ? \
477 "" : " lru",
478 hdr->loh_attr & LOHA_EXISTS ? " exist":"");
479}
480EXPORT_SYMBOL(lu_object_header_print);
481
482/**
483 * Print human readable representation of the \a o to the \a printer.
484 */
485void lu_object_print(const struct lu_env *env, void *cookie,
486 lu_printer_t printer, const struct lu_object *o)
487{
488 static const char ruler[] = "........................................";
489 struct lu_object_header *top;
490 int depth;
491
492 top = o->lo_header;
493 lu_object_header_print(env, cookie, printer, top);
494 (*printer)(env, cookie, "{ \n");
495 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
496 depth = o->lo_depth + 4;
497
498 /*
499 * print `.' \a depth times followed by type name and address
500 */
501 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
502 o->lo_dev->ld_type->ldt_name, o);
503 if (o->lo_ops->loo_object_print != NULL)
504 o->lo_ops->loo_object_print(env, cookie, printer, o);
505 (*printer)(env, cookie, "\n");
506 }
507 (*printer)(env, cookie, "} header@%p\n", top);
508}
509EXPORT_SYMBOL(lu_object_print);
510
511/**
512 * Check object consistency.
513 */
514int lu_object_invariant(const struct lu_object *o)
515{
516 struct lu_object_header *top;
517
518 top = o->lo_header;
519 list_for_each_entry(o, &top->loh_layers, lo_linkage) {
520 if (o->lo_ops->loo_object_invariant != NULL &&
521 !o->lo_ops->loo_object_invariant(o))
522 return 0;
523 }
524 return 1;
525}
526EXPORT_SYMBOL(lu_object_invariant);
527
528static struct lu_object *htable_lookup(struct lu_site *s,
529 cfs_hash_bd_t *bd,
530 const struct lu_fid *f,
531 wait_queue_t *waiter,
532 __u64 *version)
533{
534 struct lu_site_bkt_data *bkt;
535 struct lu_object_header *h;
536 struct hlist_node *hnode;
537 __u64 ver = cfs_hash_bd_version_get(bd);
538
539 if (*version == ver)
70b749d4 540 return ERR_PTR(-ENOENT);
d7e09d03
PT
541
542 *version = ver;
543 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
544 /* cfs_hash_bd_peek_locked is a somehow "internal" function
545 * of cfs_hash, it doesn't add refcount on object. */
546 hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
547 if (hnode == NULL) {
548 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
70b749d4 549 return ERR_PTR(-ENOENT);
d7e09d03
PT
550 }
551
552 h = container_of0(hnode, struct lu_object_header, loh_hash);
553 if (likely(!lu_object_is_dying(h))) {
554 cfs_hash_get(s->ls_obj_hash, hnode);
555 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
556 list_del_init(&h->loh_lru);
557 return lu_object_top(h);
558 }
559
560 /*
561 * Lookup found an object being destroyed this object cannot be
562 * returned (to assure that references to dying objects are eventually
563 * drained), and moreover, lookup has to wait until object is freed.
564 */
565
566 init_waitqueue_entry_current(waiter);
567 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
568 set_current_state(TASK_UNINTERRUPTIBLE);
569 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
570 return ERR_PTR(-EAGAIN);
571}
572
573/**
574 * Search cache for an object with the fid \a f. If such object is found,
575 * return it. Otherwise, create new object, insert it into cache and return
576 * it. In any case, additional reference is acquired on the returned object.
577 */
578struct lu_object *lu_object_find(const struct lu_env *env,
579 struct lu_device *dev, const struct lu_fid *f,
580 const struct lu_object_conf *conf)
581{
582 return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf);
583}
584EXPORT_SYMBOL(lu_object_find);
585
586static struct lu_object *lu_object_new(const struct lu_env *env,
587 struct lu_device *dev,
588 const struct lu_fid *f,
589 const struct lu_object_conf *conf)
590{
591 struct lu_object *o;
592 cfs_hash_t *hs;
593 cfs_hash_bd_t bd;
594 struct lu_site_bkt_data *bkt;
595
596 o = lu_object_alloc(env, dev, f, conf);
597 if (unlikely(IS_ERR(o)))
598 return o;
599
600 hs = dev->ld_site->ls_obj_hash;
601 cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
602 bkt = cfs_hash_bd_extra_get(hs, &bd);
603 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
604 bkt->lsb_busy++;
605 cfs_hash_bd_unlock(hs, &bd, 1);
606 return o;
607}
608
609/**
610 * Core logic of lu_object_find*() functions.
611 */
612static struct lu_object *lu_object_find_try(const struct lu_env *env,
613 struct lu_device *dev,
614 const struct lu_fid *f,
615 const struct lu_object_conf *conf,
616 wait_queue_t *waiter)
617{
618 struct lu_object *o;
619 struct lu_object *shadow;
620 struct lu_site *s;
621 cfs_hash_t *hs;
622 cfs_hash_bd_t bd;
623 __u64 version = 0;
624
625 /*
626 * This uses standard index maintenance protocol:
627 *
628 * - search index under lock, and return object if found;
629 * - otherwise, unlock index, allocate new object;
630 * - lock index and search again;
631 * - if nothing is found (usual case), insert newly created
632 * object into index;
633 * - otherwise (race: other thread inserted object), free
634 * object just allocated.
635 * - unlock index;
636 * - return object.
637 *
638 * For "LOC_F_NEW" case, we are sure the object is new established.
639 * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
640 * just alloc and insert directly.
641 *
642 * If dying object is found during index search, add @waiter to the
643 * site wait-queue and return ERR_PTR(-EAGAIN).
644 */
645 if (conf != NULL && conf->loc_flags & LOC_F_NEW)
646 return lu_object_new(env, dev, f, conf);
647
648 s = dev->ld_site;
649 hs = s->ls_obj_hash;
650 cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
651 o = htable_lookup(s, &bd, f, waiter, &version);
652 cfs_hash_bd_unlock(hs, &bd, 1);
70b749d4 653 if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
d7e09d03
PT
654 return o;
655
656 /*
657 * Allocate new object. This may result in rather complicated
658 * operations, including fld queries, inode loading, etc.
659 */
660 o = lu_object_alloc(env, dev, f, conf);
661 if (unlikely(IS_ERR(o)))
662 return o;
663
664 LASSERT(lu_fid_eq(lu_object_fid(o), f));
665
666 cfs_hash_bd_lock(hs, &bd, 1);
667
668 shadow = htable_lookup(s, &bd, f, waiter, &version);
70b749d4 669 if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
d7e09d03
PT
670 struct lu_site_bkt_data *bkt;
671
672 bkt = cfs_hash_bd_extra_get(hs, &bd);
673 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
674 bkt->lsb_busy++;
675 cfs_hash_bd_unlock(hs, &bd, 1);
676 return o;
677 }
678
679 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
680 cfs_hash_bd_unlock(hs, &bd, 1);
681 lu_object_free(env, o);
682 return shadow;
683}
684
685/**
686 * Much like lu_object_find(), but top level device of object is specifically
687 * \a dev rather than top level device of the site. This interface allows
688 * objects of different "stacking" to be created within the same site.
689 */
690struct lu_object *lu_object_find_at(const struct lu_env *env,
691 struct lu_device *dev,
692 const struct lu_fid *f,
693 const struct lu_object_conf *conf)
694{
695 struct lu_site_bkt_data *bkt;
696 struct lu_object *obj;
697 wait_queue_t wait;
698
699 while (1) {
700 obj = lu_object_find_try(env, dev, f, conf, &wait);
701 if (obj != ERR_PTR(-EAGAIN))
702 return obj;
703 /*
704 * lu_object_find_try() already added waiter into the
705 * wait queue.
706 */
707 waitq_wait(&wait, TASK_UNINTERRUPTIBLE);
708 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
709 remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
710 }
711}
712EXPORT_SYMBOL(lu_object_find_at);
713
714/**
715 * Find object with given fid, and return its slice belonging to given device.
716 */
717struct lu_object *lu_object_find_slice(const struct lu_env *env,
718 struct lu_device *dev,
719 const struct lu_fid *f,
720 const struct lu_object_conf *conf)
721{
722 struct lu_object *top;
723 struct lu_object *obj;
724
725 top = lu_object_find(env, dev, f, conf);
726 if (!IS_ERR(top)) {
727 obj = lu_object_locate(top->lo_header, dev->ld_type);
728 if (obj == NULL)
729 lu_object_put(env, top);
730 } else
731 obj = top;
732 return obj;
733}
734EXPORT_SYMBOL(lu_object_find_slice);
735
736/**
737 * Global list of all device types.
738 */
739static LIST_HEAD(lu_device_types);
740
741int lu_device_type_init(struct lu_device_type *ldt)
742{
743 int result = 0;
744
745 INIT_LIST_HEAD(&ldt->ldt_linkage);
746 if (ldt->ldt_ops->ldto_init)
747 result = ldt->ldt_ops->ldto_init(ldt);
748 if (result == 0)
749 list_add(&ldt->ldt_linkage, &lu_device_types);
750 return result;
751}
752EXPORT_SYMBOL(lu_device_type_init);
753
754void lu_device_type_fini(struct lu_device_type *ldt)
755{
756 list_del_init(&ldt->ldt_linkage);
757 if (ldt->ldt_ops->ldto_fini)
758 ldt->ldt_ops->ldto_fini(ldt);
759}
760EXPORT_SYMBOL(lu_device_type_fini);
761
762void lu_types_stop(void)
763{
764 struct lu_device_type *ldt;
765
766 list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
767 if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
768 ldt->ldt_ops->ldto_stop(ldt);
769 }
770}
771EXPORT_SYMBOL(lu_types_stop);
772
773/**
774 * Global list of all sites on this node
775 */
776static LIST_HEAD(lu_sites);
777static DEFINE_MUTEX(lu_sites_guard);
778
779/**
780 * Global environment used by site shrinker.
781 */
782static struct lu_env lu_shrink_env;
783
784struct lu_site_print_arg {
785 struct lu_env *lsp_env;
786 void *lsp_cookie;
787 lu_printer_t lsp_printer;
788};
789
790static int
791lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
792 struct hlist_node *hnode, void *data)
793{
794 struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
795 struct lu_object_header *h;
796
797 h = hlist_entry(hnode, struct lu_object_header, loh_hash);
798 if (!list_empty(&h->loh_layers)) {
799 const struct lu_object *o;
800
801 o = lu_object_top(h);
802 lu_object_print(arg->lsp_env, arg->lsp_cookie,
803 arg->lsp_printer, o);
804 } else {
805 lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
806 arg->lsp_printer, h);
807 }
808 return 0;
809}
810
811/**
812 * Print all objects in \a s.
813 */
814void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
815 lu_printer_t printer)
816{
817 struct lu_site_print_arg arg = {
818 .lsp_env = (struct lu_env *)env,
819 .lsp_cookie = cookie,
820 .lsp_printer = printer,
821 };
822
823 cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
824}
825EXPORT_SYMBOL(lu_site_print);
826
827enum {
828 LU_CACHE_PERCENT_MAX = 50,
829 LU_CACHE_PERCENT_DEFAULT = 20
830};
831
832static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
833CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
834 "Percentage of memory to be used as lu_object cache");
835
836/**
837 * Return desired hash table order.
838 */
839static int lu_htable_order(void)
840{
841 unsigned long cache_size;
842 int bits;
843
844 /*
845 * Calculate hash table size, assuming that we want reasonable
846 * performance when 20% of total memory is occupied by cache of
847 * lu_objects.
848 *
849 * Size of lu_object is (arbitrary) taken as 1K (together with inode).
850 */
4f6cc9ab 851 cache_size = totalram_pages;
d7e09d03
PT
852
853#if BITS_PER_LONG == 32
854 /* limit hashtable size for lowmem systems to low RAM */
855 if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
856 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
857#endif
858
859 /* clear off unreasonable cache setting. */
860 if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
861 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
862 " the range of (0, %u]. Will use default value: %u.\n",
863 lu_cache_percent, LU_CACHE_PERCENT_MAX,
864 LU_CACHE_PERCENT_DEFAULT);
865
866 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
867 }
868 cache_size = cache_size / 100 * lu_cache_percent *
869 (PAGE_CACHE_SIZE / 1024);
870
871 for (bits = 1; (1 << bits) < cache_size; ++bits) {
872 ;
873 }
874 return bits;
875}
876
877static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
878 const void *key, unsigned mask)
879{
880 struct lu_fid *fid = (struct lu_fid *)key;
881 __u32 hash;
882
883 hash = fid_flatten32(fid);
884 hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
885 hash = cfs_hash_long(hash, hs->hs_bkt_bits);
886
887 /* give me another random factor */
888 hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
889
890 hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
891 hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
892
893 return hash & mask;
894}
895
896static void *lu_obj_hop_object(struct hlist_node *hnode)
897{
898 return hlist_entry(hnode, struct lu_object_header, loh_hash);
899}
900
901static void *lu_obj_hop_key(struct hlist_node *hnode)
902{
903 struct lu_object_header *h;
904
905 h = hlist_entry(hnode, struct lu_object_header, loh_hash);
906 return &h->loh_fid;
907}
908
909static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
910{
911 struct lu_object_header *h;
912
913 h = hlist_entry(hnode, struct lu_object_header, loh_hash);
914 return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
915}
916
917static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
918{
919 struct lu_object_header *h;
920
921 h = hlist_entry(hnode, struct lu_object_header, loh_hash);
922 if (atomic_add_return(1, &h->loh_ref) == 1) {
923 struct lu_site_bkt_data *bkt;
924 cfs_hash_bd_t bd;
925
926 cfs_hash_bd_get(hs, &h->loh_fid, &bd);
927 bkt = cfs_hash_bd_extra_get(hs, &bd);
928 bkt->lsb_busy++;
929 }
930}
931
932static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
933{
934 LBUG(); /* we should never called it */
935}
936
937cfs_hash_ops_t lu_site_hash_ops = {
938 .hs_hash = lu_obj_hop_hash,
939 .hs_key = lu_obj_hop_key,
940 .hs_keycmp = lu_obj_hop_keycmp,
941 .hs_object = lu_obj_hop_object,
942 .hs_get = lu_obj_hop_get,
943 .hs_put_locked = lu_obj_hop_put_locked,
944};
945
946void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
947{
948 spin_lock(&s->ls_ld_lock);
949 if (list_empty(&d->ld_linkage))
950 list_add(&d->ld_linkage, &s->ls_ld_linkage);
951 spin_unlock(&s->ls_ld_lock);
952}
953EXPORT_SYMBOL(lu_dev_add_linkage);
954
955void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
956{
957 spin_lock(&s->ls_ld_lock);
958 list_del_init(&d->ld_linkage);
959 spin_unlock(&s->ls_ld_lock);
960}
961EXPORT_SYMBOL(lu_dev_del_linkage);
962
963/**
964 * Initialize site \a s, with \a d as the top level device.
965 */
966#define LU_SITE_BITS_MIN 12
967#define LU_SITE_BITS_MAX 24
968/**
969 * total 256 buckets, we don't want too many buckets because:
970 * - consume too much memory
971 * - avoid unbalanced LRU list
972 */
973#define LU_SITE_BKT_BITS 8
974
975int lu_site_init(struct lu_site *s, struct lu_device *top)
976{
977 struct lu_site_bkt_data *bkt;
978 cfs_hash_bd_t bd;
979 char name[16];
980 int bits;
981 int i;
d7e09d03
PT
982
983 memset(s, 0, sizeof *s);
984 bits = lu_htable_order();
985 snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
986 for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
987 bits >= LU_SITE_BITS_MIN; bits--) {
988 s->ls_obj_hash = cfs_hash_create(name, bits, bits,
989 bits - LU_SITE_BKT_BITS,
990 sizeof(*bkt), 0, 0,
991 &lu_site_hash_ops,
992 CFS_HASH_SPIN_BKTLOCK |
993 CFS_HASH_NO_ITEMREF |
994 CFS_HASH_DEPTH |
995 CFS_HASH_ASSERT_EMPTY);
996 if (s->ls_obj_hash != NULL)
997 break;
998 }
999
1000 if (s->ls_obj_hash == NULL) {
1001 CERROR("failed to create lu_site hash with bits: %d\n", bits);
1002 return -ENOMEM;
1003 }
1004
1005 cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
1006 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
1007 INIT_LIST_HEAD(&bkt->lsb_lru);
1008 init_waitqueue_head(&bkt->lsb_marche_funebre);
1009 }
1010
1011 s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
1012 if (s->ls_stats == NULL) {
1013 cfs_hash_putref(s->ls_obj_hash);
1014 s->ls_obj_hash = NULL;
1015 return -ENOMEM;
1016 }
1017
1018 lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
1019 0, "created", "created");
1020 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT,
1021 0, "cache_hit", "cache_hit");
1022 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS,
1023 0, "cache_miss", "cache_miss");
1024 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE,
1025 0, "cache_race", "cache_race");
1026 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE,
1027 0, "cache_death_race", "cache_death_race");
1028 lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
1029 0, "lru_purged", "lru_purged");
1030
1031 INIT_LIST_HEAD(&s->ls_linkage);
1032 s->ls_top_dev = top;
1033 top->ld_site = s;
1034 lu_device_get(top);
1035 lu_ref_add(&top->ld_reference, "site-top", s);
1036
1037 INIT_LIST_HEAD(&s->ls_ld_linkage);
1038 spin_lock_init(&s->ls_ld_lock);
1039
1040 lu_dev_add_linkage(s, top);
1041
0a3bdb00 1042 return 0;
d7e09d03
PT
1043}
1044EXPORT_SYMBOL(lu_site_init);
1045
1046/**
1047 * Finalize \a s and release its resources.
1048 */
1049void lu_site_fini(struct lu_site *s)
1050{
1051 mutex_lock(&lu_sites_guard);
1052 list_del_init(&s->ls_linkage);
1053 mutex_unlock(&lu_sites_guard);
1054
1055 if (s->ls_obj_hash != NULL) {
1056 cfs_hash_putref(s->ls_obj_hash);
1057 s->ls_obj_hash = NULL;
1058 }
1059
1060 if (s->ls_top_dev != NULL) {
1061 s->ls_top_dev->ld_site = NULL;
1062 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
1063 lu_device_put(s->ls_top_dev);
1064 s->ls_top_dev = NULL;
1065 }
1066
1067 if (s->ls_stats != NULL)
1068 lprocfs_free_stats(&s->ls_stats);
1069}
1070EXPORT_SYMBOL(lu_site_fini);
1071
1072/**
1073 * Called when initialization of stack for this site is completed.
1074 */
1075int lu_site_init_finish(struct lu_site *s)
1076{
1077 int result;
1078 mutex_lock(&lu_sites_guard);
1079 result = lu_context_refill(&lu_shrink_env.le_ctx);
1080 if (result == 0)
1081 list_add(&s->ls_linkage, &lu_sites);
1082 mutex_unlock(&lu_sites_guard);
1083 return result;
1084}
1085EXPORT_SYMBOL(lu_site_init_finish);
1086
1087/**
1088 * Acquire additional reference on device \a d
1089 */
1090void lu_device_get(struct lu_device *d)
1091{
1092 atomic_inc(&d->ld_ref);
1093}
1094EXPORT_SYMBOL(lu_device_get);
1095
1096/**
1097 * Release reference on device \a d.
1098 */
1099void lu_device_put(struct lu_device *d)
1100{
1101 LASSERT(atomic_read(&d->ld_ref) > 0);
1102 atomic_dec(&d->ld_ref);
1103}
1104EXPORT_SYMBOL(lu_device_put);
1105
1106/**
1107 * Initialize device \a d of type \a t.
1108 */
1109int lu_device_init(struct lu_device *d, struct lu_device_type *t)
1110{
1111 if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
1112 t->ldt_ops->ldto_start(t);
1113 memset(d, 0, sizeof *d);
1114 atomic_set(&d->ld_ref, 0);
1115 d->ld_type = t;
1116 lu_ref_init(&d->ld_reference);
1117 INIT_LIST_HEAD(&d->ld_linkage);
1118 return 0;
1119}
1120EXPORT_SYMBOL(lu_device_init);
1121
1122/**
1123 * Finalize device \a d.
1124 */
1125void lu_device_fini(struct lu_device *d)
1126{
1127 struct lu_device_type *t;
1128
1129 t = d->ld_type;
1130 if (d->ld_obd != NULL) {
1131 d->ld_obd->obd_lu_dev = NULL;
1132 d->ld_obd = NULL;
1133 }
1134
1135 lu_ref_fini(&d->ld_reference);
1136 LASSERTF(atomic_read(&d->ld_ref) == 0,
1137 "Refcount is %u\n", atomic_read(&d->ld_ref));
1138 LASSERT(t->ldt_device_nr > 0);
1139 if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
1140 t->ldt_ops->ldto_stop(t);
1141}
1142EXPORT_SYMBOL(lu_device_fini);
1143
1144/**
1145 * Initialize object \a o that is part of compound object \a h and was created
1146 * by device \a d.
1147 */
631abc6e
JH
1148int lu_object_init(struct lu_object *o, struct lu_object_header *h,
1149 struct lu_device *d)
d7e09d03 1150{
631abc6e 1151 memset(o, 0, sizeof(*o));
d7e09d03 1152 o->lo_header = h;
631abc6e 1153 o->lo_dev = d;
d7e09d03 1154 lu_device_get(d);
631abc6e 1155 lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
d7e09d03 1156 INIT_LIST_HEAD(&o->lo_linkage);
631abc6e 1157
d7e09d03
PT
1158 return 0;
1159}
1160EXPORT_SYMBOL(lu_object_init);
1161
1162/**
1163 * Finalize object and release its resources.
1164 */
1165void lu_object_fini(struct lu_object *o)
1166{
1167 struct lu_device *dev = o->lo_dev;
1168
1169 LASSERT(list_empty(&o->lo_linkage));
1170
1171 if (dev != NULL) {
631abc6e
JH
1172 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
1173 "lu_object", o);
d7e09d03
PT
1174 lu_device_put(dev);
1175 o->lo_dev = NULL;
1176 }
1177}
1178EXPORT_SYMBOL(lu_object_fini);
1179
1180/**
1181 * Add object \a o as first layer of compound object \a h
1182 *
1183 * This is typically called by the ->ldo_object_alloc() method of top-level
1184 * device.
1185 */
1186void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
1187{
1188 list_move(&o->lo_linkage, &h->loh_layers);
1189}
1190EXPORT_SYMBOL(lu_object_add_top);
1191
1192/**
1193 * Add object \a o as a layer of compound object, going after \a before.
1194 *
1195 * This is typically called by the ->ldo_object_alloc() method of \a
1196 * before->lo_dev.
1197 */
1198void lu_object_add(struct lu_object *before, struct lu_object *o)
1199{
1200 list_move(&o->lo_linkage, &before->lo_linkage);
1201}
1202EXPORT_SYMBOL(lu_object_add);
1203
1204/**
1205 * Initialize compound object.
1206 */
1207int lu_object_header_init(struct lu_object_header *h)
1208{
1209 memset(h, 0, sizeof *h);
1210 atomic_set(&h->loh_ref, 1);
1211 INIT_HLIST_NODE(&h->loh_hash);
1212 INIT_LIST_HEAD(&h->loh_lru);
1213 INIT_LIST_HEAD(&h->loh_layers);
1214 lu_ref_init(&h->loh_reference);
1215 return 0;
1216}
1217EXPORT_SYMBOL(lu_object_header_init);
1218
1219/**
1220 * Finalize compound object.
1221 */
1222void lu_object_header_fini(struct lu_object_header *h)
1223{
1224 LASSERT(list_empty(&h->loh_layers));
1225 LASSERT(list_empty(&h->loh_lru));
1226 LASSERT(hlist_unhashed(&h->loh_hash));
1227 lu_ref_fini(&h->loh_reference);
1228}
1229EXPORT_SYMBOL(lu_object_header_fini);
1230
1231/**
1232 * Given a compound object, find its slice, corresponding to the device type
1233 * \a dtype.
1234 */
1235struct lu_object *lu_object_locate(struct lu_object_header *h,
1236 const struct lu_device_type *dtype)
1237{
1238 struct lu_object *o;
1239
1240 list_for_each_entry(o, &h->loh_layers, lo_linkage) {
1241 if (o->lo_dev->ld_type == dtype)
1242 return o;
1243 }
1244 return NULL;
1245}
1246EXPORT_SYMBOL(lu_object_locate);
1247
1248
1249
1250/**
1251 * Finalize and free devices in the device stack.
1252 *
1253 * Finalize device stack by purging object cache, and calling
1254 * lu_device_type_operations::ldto_device_fini() and
1255 * lu_device_type_operations::ldto_device_free() on all devices in the stack.
1256 */
1257void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
1258{
1259 struct lu_site *site = top->ld_site;
1260 struct lu_device *scan;
1261 struct lu_device *next;
1262
1263 lu_site_purge(env, site, ~0);
1264 for (scan = top; scan != NULL; scan = next) {
1265 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan);
1266 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init);
1267 lu_device_put(scan);
1268 }
1269
1270 /* purge again. */
1271 lu_site_purge(env, site, ~0);
1272
1273 for (scan = top; scan != NULL; scan = next) {
1274 const struct lu_device_type *ldt = scan->ld_type;
1275 struct obd_type *type;
1276
1277 next = ldt->ldt_ops->ldto_device_free(env, scan);
1278 type = ldt->ldt_obd_type;
1279 if (type != NULL) {
1280 type->typ_refcnt--;
1281 class_put_type(type);
1282 }
1283 }
1284}
1285EXPORT_SYMBOL(lu_stack_fini);
1286
1287enum {
1288 /**
1289 * Maximal number of tld slots.
1290 */
1291 LU_CONTEXT_KEY_NR = 40
1292};
1293
1294static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
1295
1296static DEFINE_SPINLOCK(lu_keys_guard);
1297
1298/**
1299 * Global counter incremented whenever key is registered, unregistered,
1300 * revived or quiesced. This is used to void unnecessary calls to
1301 * lu_context_refill(). No locking is provided, as initialization and shutdown
1302 * are supposed to be externally serialized.
1303 */
1304static unsigned key_set_version = 0;
1305
1306/**
1307 * Register new key.
1308 */
1309int lu_context_key_register(struct lu_context_key *key)
1310{
1311 int result;
1312 int i;
1313
1314 LASSERT(key->lct_init != NULL);
1315 LASSERT(key->lct_fini != NULL);
1316 LASSERT(key->lct_tags != 0);
d7e09d03
PT
1317
1318 result = -ENFILE;
1319 spin_lock(&lu_keys_guard);
1320 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1321 if (lu_keys[i] == NULL) {
1322 key->lct_index = i;
1323 atomic_set(&key->lct_used, 1);
1324 lu_keys[i] = key;
1325 lu_ref_init(&key->lct_reference);
1326 result = 0;
1327 ++key_set_version;
1328 break;
1329 }
1330 }
1331 spin_unlock(&lu_keys_guard);
1332 return result;
1333}
1334EXPORT_SYMBOL(lu_context_key_register);
1335
1336static void key_fini(struct lu_context *ctx, int index)
1337{
1338 if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) {
1339 struct lu_context_key *key;
1340
1341 key = lu_keys[index];
1342 LASSERT(key != NULL);
1343 LASSERT(key->lct_fini != NULL);
1344 LASSERT(atomic_read(&key->lct_used) > 1);
1345
1346 key->lct_fini(ctx, key, ctx->lc_value[index]);
1347 lu_ref_del(&key->lct_reference, "ctx", ctx);
1348 atomic_dec(&key->lct_used);
1349
d7e09d03 1350 if ((ctx->lc_tags & LCT_NOREF) == 0) {
4a1a01ea 1351#ifdef CONFIG_MODULE_UNLOAD
d7e09d03 1352 LINVRNT(module_refcount(key->lct_owner) > 0);
4a1a01ea 1353#endif
d7e09d03
PT
1354 module_put(key->lct_owner);
1355 }
1356 ctx->lc_value[index] = NULL;
1357 }
1358}
1359
1360/**
1361 * Deregister key.
1362 */
1363void lu_context_key_degister(struct lu_context_key *key)
1364{
1365 LASSERT(atomic_read(&key->lct_used) >= 1);
1366 LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1367
1368 lu_context_key_quiesce(key);
1369
1370 ++key_set_version;
1371 spin_lock(&lu_keys_guard);
1372 key_fini(&lu_shrink_env.le_ctx, key->lct_index);
1373 if (lu_keys[key->lct_index]) {
1374 lu_keys[key->lct_index] = NULL;
1375 lu_ref_fini(&key->lct_reference);
1376 }
1377 spin_unlock(&lu_keys_guard);
1378
1379 LASSERTF(atomic_read(&key->lct_used) == 1,
1380 "key has instances: %d\n",
1381 atomic_read(&key->lct_used));
1382}
1383EXPORT_SYMBOL(lu_context_key_degister);
1384
1385/**
1386 * Register a number of keys. This has to be called after all keys have been
1387 * initialized by a call to LU_CONTEXT_KEY_INIT().
1388 */
1389int lu_context_key_register_many(struct lu_context_key *k, ...)
1390{
1391 struct lu_context_key *key = k;
1392 va_list args;
1393 int result;
1394
1395 va_start(args, k);
1396 do {
1397 result = lu_context_key_register(key);
1398 if (result)
1399 break;
1400 key = va_arg(args, struct lu_context_key *);
1401 } while (key != NULL);
1402 va_end(args);
1403
1404 if (result != 0) {
1405 va_start(args, k);
1406 while (k != key) {
1407 lu_context_key_degister(k);
1408 k = va_arg(args, struct lu_context_key *);
1409 }
1410 va_end(args);
1411 }
1412
1413 return result;
1414}
1415EXPORT_SYMBOL(lu_context_key_register_many);
1416
1417/**
1418 * De-register a number of keys. This is a dual to
1419 * lu_context_key_register_many().
1420 */
1421void lu_context_key_degister_many(struct lu_context_key *k, ...)
1422{
1423 va_list args;
1424
1425 va_start(args, k);
1426 do {
1427 lu_context_key_degister(k);
1428 k = va_arg(args, struct lu_context_key*);
1429 } while (k != NULL);
1430 va_end(args);
1431}
1432EXPORT_SYMBOL(lu_context_key_degister_many);
1433
1434/**
1435 * Revive a number of keys.
1436 */
1437void lu_context_key_revive_many(struct lu_context_key *k, ...)
1438{
1439 va_list args;
1440
1441 va_start(args, k);
1442 do {
1443 lu_context_key_revive(k);
1444 k = va_arg(args, struct lu_context_key*);
1445 } while (k != NULL);
1446 va_end(args);
1447}
1448EXPORT_SYMBOL(lu_context_key_revive_many);
1449
1450/**
1451 * Quiescent a number of keys.
1452 */
1453void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
1454{
1455 va_list args;
1456
1457 va_start(args, k);
1458 do {
1459 lu_context_key_quiesce(k);
1460 k = va_arg(args, struct lu_context_key*);
1461 } while (k != NULL);
1462 va_end(args);
1463}
1464EXPORT_SYMBOL(lu_context_key_quiesce_many);
1465
1466/**
1467 * Return value associated with key \a key in context \a ctx.
1468 */
1469void *lu_context_key_get(const struct lu_context *ctx,
1470 const struct lu_context_key *key)
1471{
1472 LINVRNT(ctx->lc_state == LCS_ENTERED);
1473 LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
1474 LASSERT(lu_keys[key->lct_index] == key);
1475 return ctx->lc_value[key->lct_index];
1476}
1477EXPORT_SYMBOL(lu_context_key_get);
1478
1479/**
1480 * List of remembered contexts. XXX document me.
1481 */
1482static LIST_HEAD(lu_context_remembered);
1483
1484/**
1485 * Destroy \a key in all remembered contexts. This is used to destroy key
1486 * values in "shared" contexts (like service threads), when a module owning
1487 * the key is about to be unloaded.
1488 */
1489void lu_context_key_quiesce(struct lu_context_key *key)
1490{
1491 struct lu_context *ctx;
1492
1493 if (!(key->lct_tags & LCT_QUIESCENT)) {
1494 /*
1495 * XXX layering violation.
1496 */
1497 key->lct_tags |= LCT_QUIESCENT;
1498 /*
1499 * XXX memory barrier has to go here.
1500 */
1501 spin_lock(&lu_keys_guard);
1502 list_for_each_entry(ctx, &lu_context_remembered,
1503 lc_remember)
1504 key_fini(ctx, key->lct_index);
1505 spin_unlock(&lu_keys_guard);
1506 ++key_set_version;
1507 }
1508}
1509EXPORT_SYMBOL(lu_context_key_quiesce);
1510
1511void lu_context_key_revive(struct lu_context_key *key)
1512{
1513 key->lct_tags &= ~LCT_QUIESCENT;
1514 ++key_set_version;
1515}
1516EXPORT_SYMBOL(lu_context_key_revive);
1517
1518static void keys_fini(struct lu_context *ctx)
1519{
1520 int i;
1521
1522 if (ctx->lc_value == NULL)
1523 return;
1524
1525 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
1526 key_fini(ctx, i);
1527
1528 OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1529 ctx->lc_value = NULL;
1530}
1531
1532static int keys_fill(struct lu_context *ctx)
1533{
1534 int i;
1535
1536 LINVRNT(ctx->lc_value != NULL);
1537 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1538 struct lu_context_key *key;
1539
1540 key = lu_keys[i];
1541 if (ctx->lc_value[i] == NULL && key != NULL &&
1542 (key->lct_tags & ctx->lc_tags) &&
1543 /*
1544 * Don't create values for a LCT_QUIESCENT key, as this
1545 * will pin module owning a key.
1546 */
1547 !(key->lct_tags & LCT_QUIESCENT)) {
1548 void *value;
1549
1550 LINVRNT(key->lct_init != NULL);
1551 LINVRNT(key->lct_index == i);
1552
1553 value = key->lct_init(ctx, key);
1554 if (unlikely(IS_ERR(value)))
1555 return PTR_ERR(value);
1556
d7e09d03
PT
1557 if (!(ctx->lc_tags & LCT_NOREF))
1558 try_module_get(key->lct_owner);
1559 lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
1560 atomic_inc(&key->lct_used);
1561 /*
1562 * This is the only place in the code, where an
1563 * element of ctx->lc_value[] array is set to non-NULL
1564 * value.
1565 */
1566 ctx->lc_value[i] = value;
1567 if (key->lct_exit != NULL)
1568 ctx->lc_tags |= LCT_HAS_EXIT;
1569 }
1570 ctx->lc_version = key_set_version;
1571 }
1572 return 0;
1573}
1574
1575static int keys_init(struct lu_context *ctx)
1576{
1577 OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
1578 if (likely(ctx->lc_value != NULL))
1579 return keys_fill(ctx);
1580
1581 return -ENOMEM;
1582}
1583
1584/**
1585 * Initialize context data-structure. Create values for all keys.
1586 */
1587int lu_context_init(struct lu_context *ctx, __u32 tags)
1588{
1589 int rc;
1590
1591 memset(ctx, 0, sizeof *ctx);
1592 ctx->lc_state = LCS_INITIALIZED;
1593 ctx->lc_tags = tags;
1594 if (tags & LCT_REMEMBER) {
1595 spin_lock(&lu_keys_guard);
1596 list_add(&ctx->lc_remember, &lu_context_remembered);
1597 spin_unlock(&lu_keys_guard);
1598 } else {
1599 INIT_LIST_HEAD(&ctx->lc_remember);
1600 }
1601
1602 rc = keys_init(ctx);
1603 if (rc != 0)
1604 lu_context_fini(ctx);
1605
1606 return rc;
1607}
1608EXPORT_SYMBOL(lu_context_init);
1609
1610/**
1611 * Finalize context data-structure. Destroy key values.
1612 */
1613void lu_context_fini(struct lu_context *ctx)
1614{
1615 LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1616 ctx->lc_state = LCS_FINALIZED;
1617
1618 if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
1619 LASSERT(list_empty(&ctx->lc_remember));
1620 keys_fini(ctx);
1621
1622 } else { /* could race with key degister */
1623 spin_lock(&lu_keys_guard);
1624 keys_fini(ctx);
1625 list_del_init(&ctx->lc_remember);
1626 spin_unlock(&lu_keys_guard);
1627 }
1628}
1629EXPORT_SYMBOL(lu_context_fini);
1630
1631/**
1632 * Called before entering context.
1633 */
1634void lu_context_enter(struct lu_context *ctx)
1635{
1636 LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT);
1637 ctx->lc_state = LCS_ENTERED;
1638}
1639EXPORT_SYMBOL(lu_context_enter);
1640
1641/**
1642 * Called after exiting from \a ctx
1643 */
1644void lu_context_exit(struct lu_context *ctx)
1645{
1646 int i;
1647
1648 LINVRNT(ctx->lc_state == LCS_ENTERED);
1649 ctx->lc_state = LCS_LEFT;
1650 if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
1651 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
1652 if (ctx->lc_value[i] != NULL) {
1653 struct lu_context_key *key;
1654
1655 key = lu_keys[i];
1656 LASSERT(key != NULL);
1657 if (key->lct_exit != NULL)
1658 key->lct_exit(ctx,
1659 key, ctx->lc_value[i]);
1660 }
1661 }
1662 }
1663}
1664EXPORT_SYMBOL(lu_context_exit);
1665
1666/**
1667 * Allocate for context all missing keys that were registered after context
1668 * creation. key_set_version is only changed in rare cases when modules
1669 * are loaded and removed.
1670 */
1671int lu_context_refill(struct lu_context *ctx)
1672{
1673 return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
1674}
1675EXPORT_SYMBOL(lu_context_refill);
1676
1677/**
1678 * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
1679 * obd being added. Currently, this is only used on client side, specifically
1680 * for echo device client, for other stack (like ptlrpc threads), context are
1681 * predefined when the lu_device type are registered, during the module probe
1682 * phase.
1683 */
1684__u32 lu_context_tags_default = 0;
1685__u32 lu_session_tags_default = 0;
1686
1687void lu_context_tags_update(__u32 tags)
1688{
1689 spin_lock(&lu_keys_guard);
1690 lu_context_tags_default |= tags;
1691 key_set_version++;
1692 spin_unlock(&lu_keys_guard);
1693}
1694EXPORT_SYMBOL(lu_context_tags_update);
1695
1696void lu_context_tags_clear(__u32 tags)
1697{
1698 spin_lock(&lu_keys_guard);
1699 lu_context_tags_default &= ~tags;
1700 key_set_version++;
1701 spin_unlock(&lu_keys_guard);
1702}
1703EXPORT_SYMBOL(lu_context_tags_clear);
1704
1705void lu_session_tags_update(__u32 tags)
1706{
1707 spin_lock(&lu_keys_guard);
1708 lu_session_tags_default |= tags;
1709 key_set_version++;
1710 spin_unlock(&lu_keys_guard);
1711}
1712EXPORT_SYMBOL(lu_session_tags_update);
1713
1714void lu_session_tags_clear(__u32 tags)
1715{
1716 spin_lock(&lu_keys_guard);
1717 lu_session_tags_default &= ~tags;
1718 key_set_version++;
1719 spin_unlock(&lu_keys_guard);
1720}
1721EXPORT_SYMBOL(lu_session_tags_clear);
1722
1723int lu_env_init(struct lu_env *env, __u32 tags)
1724{
1725 int result;
1726
1727 env->le_ses = NULL;
1728 result = lu_context_init(&env->le_ctx, tags);
1729 if (likely(result == 0))
1730 lu_context_enter(&env->le_ctx);
1731 return result;
1732}
1733EXPORT_SYMBOL(lu_env_init);
1734
1735void lu_env_fini(struct lu_env *env)
1736{
1737 lu_context_exit(&env->le_ctx);
1738 lu_context_fini(&env->le_ctx);
1739 env->le_ses = NULL;
1740}
1741EXPORT_SYMBOL(lu_env_fini);
1742
1743int lu_env_refill(struct lu_env *env)
1744{
1745 int result;
1746
1747 result = lu_context_refill(&env->le_ctx);
1748 if (result == 0 && env->le_ses != NULL)
1749 result = lu_context_refill(env->le_ses);
1750 return result;
1751}
1752EXPORT_SYMBOL(lu_env_refill);
1753
1754/**
1755 * Currently, this API will only be used by echo client.
1756 * Because echo client and normal lustre client will share
1757 * same cl_env cache. So echo client needs to refresh
1758 * the env context after it get one from the cache, especially
1759 * when normal client and echo client co-exist in the same client.
1760 */
1761int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
1762 __u32 stags)
1763{
1764 int result;
1765
1766 if ((env->le_ctx.lc_tags & ctags) != ctags) {
1767 env->le_ctx.lc_version = 0;
1768 env->le_ctx.lc_tags |= ctags;
1769 }
1770
1771 if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) {
1772 env->le_ses->lc_version = 0;
1773 env->le_ses->lc_tags |= stags;
1774 }
1775
1776 result = lu_env_refill(env);
1777
1778 return result;
1779}
1780EXPORT_SYMBOL(lu_env_refill_by_tags);
1781
d7e09d03
PT
1782
1783typedef struct lu_site_stats{
1784 unsigned lss_populated;
1785 unsigned lss_max_search;
1786 unsigned lss_total;
1787 unsigned lss_busy;
1788} lu_site_stats_t;
1789
1790static void lu_site_stats_get(cfs_hash_t *hs,
1791 lu_site_stats_t *stats, int populated)
1792{
1793 cfs_hash_bd_t bd;
1794 int i;
1795
1796 cfs_hash_for_each_bucket(hs, &bd, i) {
1797 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
1798 struct hlist_head *hhead;
1799
1800 cfs_hash_bd_lock(hs, &bd, 1);
1801 stats->lss_busy += bkt->lsb_busy;
1802 stats->lss_total += cfs_hash_bd_count_get(&bd);
1803 stats->lss_max_search = max((int)stats->lss_max_search,
1804 cfs_hash_bd_depmax_get(&bd));
1805 if (!populated) {
1806 cfs_hash_bd_unlock(hs, &bd, 1);
1807 continue;
1808 }
1809
1810 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
1811 if (!hlist_empty(hhead))
1812 stats->lss_populated++;
1813 }
1814 cfs_hash_bd_unlock(hs, &bd, 1);
1815 }
1816}
1817
1818
1819/*
1820 * There exists a potential lock inversion deadlock scenario when using
1821 * Lustre on top of ZFS. This occurs between one of ZFS's
1822 * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
1823 * thread A will take the lu_sites_guard lock and sleep on the ht_lock,
1824 * while thread B will take the ht_lock and sleep on the lu_sites_guard
1825 * lock. Obviously neither thread will wake and drop their respective hold
1826 * on their lock.
1827 *
1828 * To prevent this from happening we must ensure the lu_sites_guard lock is
1829 * not taken while down this code path. ZFS reliably does not set the
1830 * __GFP_FS bit in its code paths, so this can be used to determine if it
1831 * is safe to take the lu_sites_guard lock.
1832 *
1833 * Ideally we should accurately return the remaining number of cached
1834 * objects without taking the lu_sites_guard lock, but this is not
1835 * possible in the current implementation.
1836 */
fe92a055
PT
1837static unsigned long lu_cache_shrink_count(struct shrinker *sk,
1838 struct shrink_control *sc)
d7e09d03
PT
1839{
1840 lu_site_stats_t stats;
1841 struct lu_site *s;
1842 struct lu_site *tmp;
fe92a055 1843 unsigned long cached = 0;
d7e09d03 1844
fe92a055
PT
1845 if (!(sc->gfp_mask & __GFP_FS))
1846 return 0;
d7e09d03
PT
1847
1848 mutex_lock(&lu_sites_guard);
1849 list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
d7e09d03
PT
1850 memset(&stats, 0, sizeof(stats));
1851 lu_site_stats_get(s->ls_obj_hash, &stats, 0);
1852 cached += stats.lss_total - stats.lss_busy;
d7e09d03 1853 }
d7e09d03
PT
1854 mutex_unlock(&lu_sites_guard);
1855
1856 cached = (cached / 100) * sysctl_vfs_cache_pressure;
fe92a055 1857 CDEBUG(D_INODE, "%ld objects cached\n", cached);
d7e09d03
PT
1858 return cached;
1859}
1860
fe92a055
PT
1861static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
1862 struct shrink_control *sc)
1863{
1864 struct lu_site *s;
1865 struct lu_site *tmp;
1866 unsigned long remain = sc->nr_to_scan, freed = 0;
1867 LIST_HEAD(splice);
1868
1869 if (!(sc->gfp_mask & __GFP_FS))
1870 /* We must not take the lu_sites_guard lock when
1871 * __GFP_FS is *not* set because of the deadlock
1872 * possibility detailed above. Additionally,
1873 * since we cannot determine the number of
1874 * objects in the cache without taking this
1875 * lock, we're in a particularly tough spot. As
1876 * a result, we'll just lie and say our cache is
1877 * empty. This _should_ be ok, as we can't
1878 * reclaim objects when __GFP_FS is *not* set
1879 * anyways.
1880 */
1881 return SHRINK_STOP;
1882
1883 mutex_lock(&lu_sites_guard);
1884 list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
1885 freed = lu_site_purge(&lu_shrink_env, s, remain);
1886 remain -= freed;
1887 /*
1888 * Move just shrunk site to the tail of site list to
1889 * assure shrinking fairness.
1890 */
1891 list_move_tail(&s->ls_linkage, &splice);
1892 }
1893 list_splice(&splice, lu_sites.prev);
1894 mutex_unlock(&lu_sites_guard);
1895
1896 return sc->nr_to_scan - remain;
1897}
1898
d7e09d03
PT
1899/*
1900 * Debugging stuff.
1901 */
1902
1903/**
1904 * Environment to be used in debugger, contains all tags.
1905 */
1906struct lu_env lu_debugging_env;
1907
1908/**
1909 * Debugging printer function using printk().
1910 */
1911int lu_printk_printer(const struct lu_env *env,
1912 void *unused, const char *format, ...)
1913{
1914 va_list args;
1915
1916 va_start(args, format);
1917 vprintk(format, args);
1918 va_end(args);
1919 return 0;
1920}
1921
fe92a055
PT
1922static struct shrinker lu_site_shrinker = {
1923 .count_objects = lu_cache_shrink_count,
1924 .scan_objects = lu_cache_shrink_scan,
1925 .seeks = DEFAULT_SEEKS,
1926};
1927
d7e09d03
PT
1928/**
1929 * Initialization of global lu_* data.
1930 */
1931int lu_global_init(void)
1932{
1933 int result;
1934
1935 CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
1936
1937 result = lu_ref_global_init();
1938 if (result != 0)
1939 return result;
1940
1941 LU_CONTEXT_KEY_INIT(&lu_global_key);
1942 result = lu_context_key_register(&lu_global_key);
1943 if (result != 0)
1944 return result;
1945
1946 /*
1947 * At this level, we don't know what tags are needed, so allocate them
1948 * conservatively. This should not be too bad, because this
1949 * environment is global.
1950 */
1951 mutex_lock(&lu_sites_guard);
1952 result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
1953 mutex_unlock(&lu_sites_guard);
1954 if (result != 0)
1955 return result;
1956
1957 /*
1958 * seeks estimation: 3 seeks to read a record from oi, one to read
1959 * inode, one for ea. Unfortunately setting this high value results in
1960 * lu_object/inode cache consuming all the memory.
1961 */
fe92a055 1962 register_shrinker(&lu_site_shrinker);
d7e09d03
PT
1963
1964 return result;
1965}
1966
1967/**
1968 * Dual to lu_global_init().
1969 */
1970void lu_global_fini(void)
1971{
fe92a055 1972 unregister_shrinker(&lu_site_shrinker);
d7e09d03
PT
1973 lu_context_key_degister(&lu_global_key);
1974
1975 /*
1976 * Tear shrinker environment down _after_ de-registering
1977 * lu_global_key, because the latter has a value in the former.
1978 */
1979 mutex_lock(&lu_sites_guard);
1980 lu_env_fini(&lu_shrink_env);
1981 mutex_unlock(&lu_sites_guard);
1982
1983 lu_ref_global_fini();
1984}
1985
1986static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
1987{
1988#ifdef LPROCFS
1989 struct lprocfs_counter ret;
1990
1991 lprocfs_stats_collect(stats, idx, &ret);
1992 return (__u32)ret.lc_count;
1993#else
1994 return 0;
1995#endif
1996}
1997
1998/**
1999 * Output site statistical counters into a buffer. Suitable for
2000 * lprocfs_rd_*()-style functions.
2001 */
73bb1da6 2002int lu_site_stats_print(const struct lu_site *s, struct seq_file *m)
d7e09d03
PT
2003{
2004 lu_site_stats_t stats;
2005
2006 memset(&stats, 0, sizeof(stats));
2007 lu_site_stats_get(s->ls_obj_hash, &stats, 1);
2008
73bb1da6 2009 return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
d7e09d03
PT
2010 stats.lss_busy,
2011 stats.lss_total,
2012 stats.lss_populated,
2013 CFS_HASH_NHLIST(s->ls_obj_hash),
2014 stats.lss_max_search,
2015 ls_stats_read(s->ls_stats, LU_SS_CREATED),
2016 ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
2017 ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
2018 ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
2019 ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
2020 ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
2021}
2022EXPORT_SYMBOL(lu_site_stats_print);
2023
2024/**
2025 * Helper function to initialize a number of kmem slab caches at once.
2026 */
2027int lu_kmem_init(struct lu_kmem_descr *caches)
2028{
2029 int result;
2030 struct lu_kmem_descr *iter = caches;
2031
2032 for (result = 0; iter->ckd_cache != NULL; ++iter) {
2033 *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
2034 iter->ckd_size,
2035 0, 0, NULL);
2036 if (*iter->ckd_cache == NULL) {
2037 result = -ENOMEM;
2038 /* free all previously allocated caches */
2039 lu_kmem_fini(caches);
2040 break;
2041 }
2042 }
2043 return result;
2044}
2045EXPORT_SYMBOL(lu_kmem_init);
2046
2047/**
2048 * Helper function to finalize a number of kmem slab cached at once. Dual to
2049 * lu_kmem_init().
2050 */
2051void lu_kmem_fini(struct lu_kmem_descr *caches)
2052{
2053 for (; caches->ckd_cache != NULL; ++caches) {
2054 if (*caches->ckd_cache != NULL) {
2055 kmem_cache_destroy(*caches->ckd_cache);
2056 *caches->ckd_cache = NULL;
2057 }
2058 }
2059}
2060EXPORT_SYMBOL(lu_kmem_fini);
2061
2062/**
2063 * Temporary solution to be able to assign fid in ->do_create()
2064 * till we have fully-functional OST fids
2065 */
2066void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
2067 const struct lu_fid *fid)
2068{
2069 struct lu_site *s = o->lo_dev->ld_site;
2070 struct lu_fid *old = &o->lo_header->loh_fid;
2071 struct lu_site_bkt_data *bkt;
2072 struct lu_object *shadow;
2073 wait_queue_t waiter;
2074 cfs_hash_t *hs;
2075 cfs_hash_bd_t bd;
2076 __u64 version = 0;
2077
2078 LASSERT(fid_is_zero(old));
2079
2080 hs = s->ls_obj_hash;
2081 cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
2082 shadow = htable_lookup(s, &bd, fid, &waiter, &version);
2083 /* supposed to be unique */
70b749d4 2084 LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
d7e09d03
PT
2085 *old = *fid;
2086 bkt = cfs_hash_bd_extra_get(hs, &bd);
2087 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
2088 bkt->lsb_busy++;
2089 cfs_hash_bd_unlock(hs, &bd, 1);
2090}
2091EXPORT_SYMBOL(lu_object_assign_fid);
2092
2093/**
2094 * allocates object with 0 (non-assiged) fid
2095 * XXX: temporary solution to be able to assign fid in ->do_create()
2096 * till we have fully-functional OST fids
2097 */
2098struct lu_object *lu_object_anon(const struct lu_env *env,
2099 struct lu_device *dev,
2100 const struct lu_object_conf *conf)
2101{
2102 struct lu_fid fid;
2103 struct lu_object *o;
2104
2105 fid_zero(&fid);
2106 o = lu_object_alloc(env, dev, &fid, conf);
2107
2108 return o;
2109}
2110EXPORT_SYMBOL(lu_object_anon);
2111
2112struct lu_buf LU_BUF_NULL = {
2113 .lb_buf = NULL,
2114 .lb_len = 0
2115};
2116EXPORT_SYMBOL(LU_BUF_NULL);
2117
2118void lu_buf_free(struct lu_buf *buf)
2119{
2120 LASSERT(buf);
2121 if (buf->lb_buf) {
2122 LASSERT(buf->lb_len > 0);
2123 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2124 buf->lb_buf = NULL;
2125 buf->lb_len = 0;
2126 }
2127}
2128EXPORT_SYMBOL(lu_buf_free);
2129
2130void lu_buf_alloc(struct lu_buf *buf, int size)
2131{
2132 LASSERT(buf);
2133 LASSERT(buf->lb_buf == NULL);
2134 LASSERT(buf->lb_len == 0);
2135 OBD_ALLOC_LARGE(buf->lb_buf, size);
2136 if (likely(buf->lb_buf))
2137 buf->lb_len = size;
2138}
2139EXPORT_SYMBOL(lu_buf_alloc);
2140
2141void lu_buf_realloc(struct lu_buf *buf, int size)
2142{
2143 lu_buf_free(buf);
2144 lu_buf_alloc(buf, size);
2145}
2146EXPORT_SYMBOL(lu_buf_realloc);
2147
2148struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len)
2149{
2150 if (buf->lb_buf == NULL && buf->lb_len == 0)
2151 lu_buf_alloc(buf, len);
2152
2153 if ((len > buf->lb_len) && (buf->lb_buf != NULL))
2154 lu_buf_realloc(buf, len);
2155
2156 return buf;
2157}
2158EXPORT_SYMBOL(lu_buf_check_and_alloc);
2159
2160/**
2161 * Increase the size of the \a buf.
2162 * preserves old data in buffer
2163 * old buffer remains unchanged on error
2164 * \retval 0 or -ENOMEM
2165 */
2166int lu_buf_check_and_grow(struct lu_buf *buf, int len)
2167{
2168 char *ptr;
2169
2170 if (len <= buf->lb_len)
2171 return 0;
2172
2173 OBD_ALLOC_LARGE(ptr, len);
2174 if (ptr == NULL)
2175 return -ENOMEM;
2176
2177 /* Free the old buf */
2178 if (buf->lb_buf != NULL) {
2179 memcpy(ptr, buf->lb_buf, buf->lb_len);
2180 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
2181 }
2182
2183 buf->lb_buf = ptr;
2184 buf->lb_len = len;
2185 return 0;
2186}
2187EXPORT_SYMBOL(lu_buf_check_and_grow);