]> git.proxmox.com Git - mirror_ubuntu-zesty-kernel.git/blob - drivers/staging/lustre/lustre/osc/osc_request.c
Merge tag 'leds_for_4.3' of git://git.kernel.org/pub/scm/linux/kernel/git/j.anaszewsk...
[mirror_ubuntu-zesty-kernel.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26 /*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32 /*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 struct osc_brw_async_args {
57 struct obdo *aa_oa;
58 int aa_requested_nob;
59 int aa_nio_count;
60 u32 aa_page_count;
61 int aa_resends;
62 struct brw_page **aa_ppga;
63 struct client_obd *aa_cli;
64 struct list_head aa_oaps;
65 struct list_head aa_exts;
66 struct obd_capa *aa_ocapa;
67 struct cl_req *aa_clerq;
68 };
69
70 struct osc_async_args {
71 struct obd_info *aa_oi;
72 };
73
74 struct osc_setattr_args {
75 struct obdo *sa_oa;
76 obd_enqueue_update_f sa_upcall;
77 void *sa_cookie;
78 };
79
80 struct osc_fsync_args {
81 struct obd_info *fa_oi;
82 obd_enqueue_update_f fa_upcall;
83 void *fa_cookie;
84 };
85
86 struct osc_enqueue_args {
87 struct obd_export *oa_exp;
88 __u64 *oa_flags;
89 obd_enqueue_update_f oa_upcall;
90 void *oa_cookie;
91 struct ost_lvb *oa_lvb;
92 struct lustre_handle *oa_lockh;
93 struct ldlm_enqueue_info *oa_ei;
94 unsigned int oa_agl:1;
95 };
96
97 static void osc_release_ppga(struct brw_page **ppga, u32 count);
98 static int brw_interpret(const struct lu_env *env,
99 struct ptlrpc_request *req, void *data, int rc);
100 int osc_cleanup(struct obd_device *obd);
101
102 /* Pack OSC object metadata for disk storage (LE byte order). */
103 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104 struct lov_stripe_md *lsm)
105 {
106 int lmm_size;
107
108 lmm_size = sizeof(**lmmp);
109 if (lmmp == NULL)
110 return lmm_size;
111
112 if (*lmmp != NULL && lsm == NULL) {
113 kfree(*lmmp);
114 *lmmp = NULL;
115 return 0;
116 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
117 return -EBADF;
118 }
119
120 if (*lmmp == NULL) {
121 *lmmp = kzalloc(lmm_size, GFP_NOFS);
122 if (!*lmmp)
123 return -ENOMEM;
124 }
125
126 if (lsm)
127 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128
129 return lmm_size;
130 }
131
132 /* Unpack OSC object metadata from disk storage (LE byte order). */
133 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134 struct lov_mds_md *lmm, int lmm_bytes)
135 {
136 int lsm_size;
137 struct obd_import *imp = class_exp2cliimp(exp);
138
139 if (lmm != NULL) {
140 if (lmm_bytes < sizeof(*lmm)) {
141 CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 exp->exp_obd->obd_name, lmm_bytes,
143 (int)sizeof(*lmm));
144 return -EINVAL;
145 }
146 /* XXX LOV_MAGIC etc check? */
147
148 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149 CERROR("%s: zero lmm_object_id: rc = %d\n",
150 exp->exp_obd->obd_name, -EINVAL);
151 return -EINVAL;
152 }
153 }
154
155 lsm_size = lov_stripe_md_size(1);
156 if (lsmp == NULL)
157 return lsm_size;
158
159 if (*lsmp != NULL && lmm == NULL) {
160 kfree((*lsmp)->lsm_oinfo[0]);
161 kfree(*lsmp);
162 *lsmp = NULL;
163 return 0;
164 }
165
166 if (*lsmp == NULL) {
167 *lsmp = kzalloc(lsm_size, GFP_NOFS);
168 if (unlikely(*lsmp == NULL))
169 return -ENOMEM;
170 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
171 GFP_NOFS);
172 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
173 kfree(*lsmp);
174 return -ENOMEM;
175 }
176 loi_init((*lsmp)->lsm_oinfo[0]);
177 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
178 return -EBADF;
179 }
180
181 if (lmm != NULL)
182 /* XXX zero *lsmp? */
183 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
184
185 if (imp != NULL &&
186 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
187 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
188 else
189 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
190
191 return lsm_size;
192 }
193
194 static inline void osc_pack_capa(struct ptlrpc_request *req,
195 struct ost_body *body, void *capa)
196 {
197 struct obd_capa *oc = (struct obd_capa *)capa;
198 struct lustre_capa *c;
199
200 if (!capa)
201 return;
202
203 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
204 LASSERT(c);
205 capa_cpy(c, oc);
206 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
207 DEBUG_CAPA(D_SEC, c, "pack");
208 }
209
210 static inline void osc_pack_req_body(struct ptlrpc_request *req,
211 struct obd_info *oinfo)
212 {
213 struct ost_body *body;
214
215 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
216 LASSERT(body);
217
218 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
219 oinfo->oi_oa);
220 osc_pack_capa(req, body, oinfo->oi_capa);
221 }
222
223 static inline void osc_set_capa_size(struct ptlrpc_request *req,
224 const struct req_msg_field *field,
225 struct obd_capa *oc)
226 {
227 if (oc == NULL)
228 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
229 else
230 /* it is already calculated as sizeof struct obd_capa */
231 ;
232 }
233
234 static int osc_getattr_interpret(const struct lu_env *env,
235 struct ptlrpc_request *req,
236 struct osc_async_args *aa, int rc)
237 {
238 struct ost_body *body;
239
240 if (rc != 0)
241 goto out;
242
243 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
244 if (body) {
245 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
246 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
247 aa->aa_oi->oi_oa, &body->oa);
248
249 /* This should really be sent by the OST */
250 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
251 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
252 } else {
253 CDEBUG(D_INFO, "can't unpack ost_body\n");
254 rc = -EPROTO;
255 aa->aa_oi->oi_oa->o_valid = 0;
256 }
257 out:
258 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
259 return rc;
260 }
261
262 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
263 struct ptlrpc_request_set *set)
264 {
265 struct ptlrpc_request *req;
266 struct osc_async_args *aa;
267 int rc;
268
269 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
270 if (req == NULL)
271 return -ENOMEM;
272
273 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
275 if (rc) {
276 ptlrpc_request_free(req);
277 return rc;
278 }
279
280 osc_pack_req_body(req, oinfo);
281
282 ptlrpc_request_set_replen(req);
283 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
284
285 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
286 aa = ptlrpc_req_async_args(req);
287 aa->aa_oi = oinfo;
288
289 ptlrpc_set_add_req(set, req);
290 return 0;
291 }
292
293 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
294 struct obd_info *oinfo)
295 {
296 struct ptlrpc_request *req;
297 struct ost_body *body;
298 int rc;
299
300 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
301 if (req == NULL)
302 return -ENOMEM;
303
304 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
305 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
306 if (rc) {
307 ptlrpc_request_free(req);
308 return rc;
309 }
310
311 osc_pack_req_body(req, oinfo);
312
313 ptlrpc_request_set_replen(req);
314
315 rc = ptlrpc_queue_wait(req);
316 if (rc)
317 goto out;
318
319 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
320 if (body == NULL) {
321 rc = -EPROTO;
322 goto out;
323 }
324
325 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
326 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
327 &body->oa);
328
329 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
330 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
331
332 out:
333 ptlrpc_req_finished(req);
334 return rc;
335 }
336
337 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
338 struct obd_info *oinfo, struct obd_trans_info *oti)
339 {
340 struct ptlrpc_request *req;
341 struct ost_body *body;
342 int rc;
343
344 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
345
346 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
347 if (req == NULL)
348 return -ENOMEM;
349
350 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
351 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
352 if (rc) {
353 ptlrpc_request_free(req);
354 return rc;
355 }
356
357 osc_pack_req_body(req, oinfo);
358
359 ptlrpc_request_set_replen(req);
360
361 rc = ptlrpc_queue_wait(req);
362 if (rc)
363 goto out;
364
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 if (body == NULL) {
367 rc = -EPROTO;
368 goto out;
369 }
370
371 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
372 &body->oa);
373
374 out:
375 ptlrpc_req_finished(req);
376 return rc;
377 }
378
379 static int osc_setattr_interpret(const struct lu_env *env,
380 struct ptlrpc_request *req,
381 struct osc_setattr_args *sa, int rc)
382 {
383 struct ost_body *body;
384
385 if (rc != 0)
386 goto out;
387
388 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
389 if (body == NULL) {
390 rc = -EPROTO;
391 goto out;
392 }
393
394 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
395 &body->oa);
396 out:
397 rc = sa->sa_upcall(sa->sa_cookie, rc);
398 return rc;
399 }
400
401 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
402 struct obd_trans_info *oti,
403 obd_enqueue_update_f upcall, void *cookie,
404 struct ptlrpc_request_set *rqset)
405 {
406 struct ptlrpc_request *req;
407 struct osc_setattr_args *sa;
408 int rc;
409
410 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
411 if (req == NULL)
412 return -ENOMEM;
413
414 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
415 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
416 if (rc) {
417 ptlrpc_request_free(req);
418 return rc;
419 }
420
421 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
422 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
423
424 osc_pack_req_body(req, oinfo);
425
426 ptlrpc_request_set_replen(req);
427
428 /* do mds to ost setattr asynchronously */
429 if (!rqset) {
430 /* Do not wait for response. */
431 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
432 } else {
433 req->rq_interpret_reply =
434 (ptlrpc_interpterer_t)osc_setattr_interpret;
435
436 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
437 sa = ptlrpc_req_async_args(req);
438 sa->sa_oa = oinfo->oi_oa;
439 sa->sa_upcall = upcall;
440 sa->sa_cookie = cookie;
441
442 if (rqset == PTLRPCD_SET)
443 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
444 else
445 ptlrpc_set_add_req(rqset, req);
446 }
447
448 return 0;
449 }
450
451 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
452 struct obd_trans_info *oti,
453 struct ptlrpc_request_set *rqset)
454 {
455 return osc_setattr_async_base(exp, oinfo, oti,
456 oinfo->oi_cb_up, oinfo, rqset);
457 }
458
459 int osc_real_create(struct obd_export *exp, struct obdo *oa,
460 struct lov_stripe_md **ea, struct obd_trans_info *oti)
461 {
462 struct ptlrpc_request *req;
463 struct ost_body *body;
464 struct lov_stripe_md *lsm;
465 int rc;
466
467 LASSERT(oa);
468 LASSERT(ea);
469
470 lsm = *ea;
471 if (!lsm) {
472 rc = obd_alloc_memmd(exp, &lsm);
473 if (rc < 0)
474 return rc;
475 }
476
477 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
478 if (req == NULL) {
479 rc = -ENOMEM;
480 goto out;
481 }
482
483 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
484 if (rc) {
485 ptlrpc_request_free(req);
486 goto out;
487 }
488
489 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
490 LASSERT(body);
491
492 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
493
494 ptlrpc_request_set_replen(req);
495
496 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
497 oa->o_flags == OBD_FL_DELORPHAN) {
498 DEBUG_REQ(D_HA, req,
499 "delorphan from OST integration");
500 /* Don't resend the delorphan req */
501 req->rq_no_resend = req->rq_no_delay = 1;
502 }
503
504 rc = ptlrpc_queue_wait(req);
505 if (rc)
506 goto out_req;
507
508 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509 if (body == NULL) {
510 rc = -EPROTO;
511 goto out_req;
512 }
513
514 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
515 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
516
517 oa->o_blksize = cli_brw_size(exp->exp_obd);
518 oa->o_valid |= OBD_MD_FLBLKSZ;
519
520 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
521 * have valid lsm_oinfo data structs, so don't go touching that.
522 * This needs to be fixed in a big way.
523 */
524 lsm->lsm_oi = oa->o_oi;
525 *ea = lsm;
526
527 if (oti != NULL) {
528 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
529
530 if (oa->o_valid & OBD_MD_FLCOOKIE) {
531 if (!oti->oti_logcookies)
532 oti_alloc_cookies(oti, 1);
533 *oti->oti_logcookies = oa->o_lcookie;
534 }
535 }
536
537 CDEBUG(D_HA, "transno: %lld\n",
538 lustre_msg_get_transno(req->rq_repmsg));
539 out_req:
540 ptlrpc_req_finished(req);
541 out:
542 if (rc && !*ea)
543 obd_free_memmd(exp, &lsm);
544 return rc;
545 }
546
547 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
548 obd_enqueue_update_f upcall, void *cookie,
549 struct ptlrpc_request_set *rqset)
550 {
551 struct ptlrpc_request *req;
552 struct osc_setattr_args *sa;
553 struct ost_body *body;
554 int rc;
555
556 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
557 if (req == NULL)
558 return -ENOMEM;
559
560 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
561 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
562 if (rc) {
563 ptlrpc_request_free(req);
564 return rc;
565 }
566 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
567 ptlrpc_at_set_req_timeout(req);
568
569 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
570 LASSERT(body);
571 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
572 oinfo->oi_oa);
573 osc_pack_capa(req, body, oinfo->oi_capa);
574
575 ptlrpc_request_set_replen(req);
576
577 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
578 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
579 sa = ptlrpc_req_async_args(req);
580 sa->sa_oa = oinfo->oi_oa;
581 sa->sa_upcall = upcall;
582 sa->sa_cookie = cookie;
583 if (rqset == PTLRPCD_SET)
584 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
585 else
586 ptlrpc_set_add_req(rqset, req);
587
588 return 0;
589 }
590
591 static int osc_sync_interpret(const struct lu_env *env,
592 struct ptlrpc_request *req,
593 void *arg, int rc)
594 {
595 struct osc_fsync_args *fa = arg;
596 struct ost_body *body;
597
598 if (rc)
599 goto out;
600
601 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602 if (body == NULL) {
603 CERROR ("can't unpack ost_body\n");
604 rc = -EPROTO;
605 goto out;
606 }
607
608 *fa->fa_oi->oi_oa = body->oa;
609 out:
610 rc = fa->fa_upcall(fa->fa_cookie, rc);
611 return rc;
612 }
613
614 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
615 obd_enqueue_update_f upcall, void *cookie,
616 struct ptlrpc_request_set *rqset)
617 {
618 struct ptlrpc_request *req;
619 struct ost_body *body;
620 struct osc_fsync_args *fa;
621 int rc;
622
623 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
624 if (req == NULL)
625 return -ENOMEM;
626
627 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
628 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
629 if (rc) {
630 ptlrpc_request_free(req);
631 return rc;
632 }
633
634 /* overload the size and blocks fields in the oa with start/end */
635 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
636 LASSERT(body);
637 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
638 oinfo->oi_oa);
639 osc_pack_capa(req, body, oinfo->oi_capa);
640
641 ptlrpc_request_set_replen(req);
642 req->rq_interpret_reply = osc_sync_interpret;
643
644 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
645 fa = ptlrpc_req_async_args(req);
646 fa->fa_oi = oinfo;
647 fa->fa_upcall = upcall;
648 fa->fa_cookie = cookie;
649
650 if (rqset == PTLRPCD_SET)
651 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
652 else
653 ptlrpc_set_add_req(rqset, req);
654
655 return 0;
656 }
657
658 /* Find and cancel locally locks matched by @mode in the resource found by
659 * @objid. Found locks are added into @cancel list. Returns the amount of
660 * locks added to @cancels list. */
661 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
662 struct list_head *cancels,
663 ldlm_mode_t mode, __u64 lock_flags)
664 {
665 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
666 struct ldlm_res_id res_id;
667 struct ldlm_resource *res;
668 int count;
669
670 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671 * export) but disabled through procfs (flag in NS).
672 *
673 * This distinguishes from a case when ELC is not supported originally,
674 * when we still want to cancel locks in advance and just cancel them
675 * locally, without sending any RPC. */
676 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
677 return 0;
678
679 ostid_build_res_name(&oa->o_oi, &res_id);
680 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681 if (res == NULL)
682 return 0;
683
684 LDLM_RESOURCE_ADDREF(res);
685 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686 lock_flags, 0, NULL);
687 LDLM_RESOURCE_DELREF(res);
688 ldlm_resource_putref(res);
689 return count;
690 }
691
692 static int osc_destroy_interpret(const struct lu_env *env,
693 struct ptlrpc_request *req, void *data,
694 int rc)
695 {
696 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698 atomic_dec(&cli->cl_destroy_in_flight);
699 wake_up(&cli->cl_destroy_waitq);
700 return 0;
701 }
702
703 static int osc_can_send_destroy(struct client_obd *cli)
704 {
705 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
706 cli->cl_max_rpcs_in_flight) {
707 /* The destroy request can be sent */
708 return 1;
709 }
710 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
711 cli->cl_max_rpcs_in_flight) {
712 /*
713 * The counter has been modified between the two atomic
714 * operations.
715 */
716 wake_up(&cli->cl_destroy_waitq);
717 }
718 return 0;
719 }
720
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722 struct obdo *oa, struct lov_stripe_md **ea,
723 struct obd_trans_info *oti)
724 {
725 int rc = 0;
726
727 LASSERT(oa);
728 LASSERT(ea);
729 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
730
731 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
732 oa->o_flags == OBD_FL_RECREATE_OBJS) {
733 return osc_real_create(exp, oa, ea, oti);
734 }
735
736 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
737 return osc_real_create(exp, oa, ea, oti);
738
739 /* we should not get here anymore */
740 LBUG();
741
742 return rc;
743 }
744
745 /* Destroy requests can be async always on the client, and we don't even really
746 * care about the return code since the client cannot do anything at all about
747 * a destroy failure.
748 * When the MDS is unlinking a filename, it saves the file objects into a
749 * recovery llog, and these object records are cancelled when the OST reports
750 * they were destroyed and sync'd to disk (i.e. transaction committed).
751 * If the client dies, or the OST is down when the object should be destroyed,
752 * the records are not cancelled, and when the OST reconnects to the MDS next,
753 * it will retrieve the llog unlink logs and then sends the log cancellation
754 * cookies to the MDS after committing destroy transactions. */
755 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
756 struct obdo *oa, struct lov_stripe_md *ea,
757 struct obd_trans_info *oti, struct obd_export *md_export,
758 void *capa)
759 {
760 struct client_obd *cli = &exp->exp_obd->u.cli;
761 struct ptlrpc_request *req;
762 struct ost_body *body;
763 LIST_HEAD(cancels);
764 int rc, count;
765
766 if (!oa) {
767 CDEBUG(D_INFO, "oa NULL\n");
768 return -EINVAL;
769 }
770
771 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
772 LDLM_FL_DISCARD_DATA);
773
774 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
775 if (req == NULL) {
776 ldlm_lock_list_put(&cancels, l_bl_ast, count);
777 return -ENOMEM;
778 }
779
780 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
781 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
782 0, &cancels, count);
783 if (rc) {
784 ptlrpc_request_free(req);
785 return rc;
786 }
787
788 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
789 ptlrpc_at_set_req_timeout(req);
790
791 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
792 oa->o_lcookie = *oti->oti_logcookies;
793 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
794 LASSERT(body);
795 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
796
797 osc_pack_capa(req, body, (struct obd_capa *)capa);
798 ptlrpc_request_set_replen(req);
799
800 /* If osc_destroy is for destroying the unlink orphan,
801 * sent from MDT to OST, which should not be blocked here,
802 * because the process might be triggered by ptlrpcd, and
803 * it is not good to block ptlrpcd thread (b=16006)*/
804 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
805 req->rq_interpret_reply = osc_destroy_interpret;
806 if (!osc_can_send_destroy(cli)) {
807 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
808 NULL);
809
810 /*
811 * Wait until the number of on-going destroy RPCs drops
812 * under max_rpc_in_flight
813 */
814 l_wait_event_exclusive(cli->cl_destroy_waitq,
815 osc_can_send_destroy(cli), &lwi);
816 }
817 }
818
819 /* Do not wait for response */
820 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
821 return 0;
822 }
823
824 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
825 long writing_bytes)
826 {
827 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
828
829 LASSERT(!(oa->o_valid & bits));
830
831 oa->o_valid |= bits;
832 client_obd_list_lock(&cli->cl_loi_list_lock);
833 oa->o_dirty = cli->cl_dirty;
834 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
835 cli->cl_dirty_max)) {
836 CERROR("dirty %lu - %lu > dirty_max %lu\n",
837 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
838 oa->o_undirty = 0;
839 } else if (unlikely(atomic_read(&obd_dirty_pages) -
840 atomic_read(&obd_dirty_transit_pages) >
841 (long)(obd_max_dirty_pages + 1))) {
842 /* The atomic_read() allowing the atomic_inc() are
843 * not covered by a lock thus they may safely race and trip
844 * this CERROR() unless we add in a small fudge factor (+1). */
845 CERROR("dirty %d - %d > system dirty_max %d\n",
846 atomic_read(&obd_dirty_pages),
847 atomic_read(&obd_dirty_transit_pages),
848 obd_max_dirty_pages);
849 oa->o_undirty = 0;
850 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
851 CERROR("dirty %lu - dirty_max %lu too big???\n",
852 cli->cl_dirty, cli->cl_dirty_max);
853 oa->o_undirty = 0;
854 } else {
855 long max_in_flight = (cli->cl_max_pages_per_rpc <<
856 PAGE_CACHE_SHIFT)*
857 (cli->cl_max_rpcs_in_flight + 1);
858 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
859 }
860 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
861 oa->o_dropped = cli->cl_lost_grant;
862 cli->cl_lost_grant = 0;
863 client_obd_list_unlock(&cli->cl_loi_list_lock);
864 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
865 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
866
867 }
868
869 void osc_update_next_shrink(struct client_obd *cli)
870 {
871 cli->cl_next_shrink_grant =
872 cfs_time_shift(cli->cl_grant_shrink_interval);
873 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
874 cli->cl_next_shrink_grant);
875 }
876
877 static void __osc_update_grant(struct client_obd *cli, u64 grant)
878 {
879 client_obd_list_lock(&cli->cl_loi_list_lock);
880 cli->cl_avail_grant += grant;
881 client_obd_list_unlock(&cli->cl_loi_list_lock);
882 }
883
884 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
885 {
886 if (body->oa.o_valid & OBD_MD_FLGRANT) {
887 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
888 __osc_update_grant(cli, body->oa.o_grant);
889 }
890 }
891
892 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
893 u32 keylen, void *key, u32 vallen,
894 void *val, struct ptlrpc_request_set *set);
895
896 static int osc_shrink_grant_interpret(const struct lu_env *env,
897 struct ptlrpc_request *req,
898 void *aa, int rc)
899 {
900 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
901 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
902 struct ost_body *body;
903
904 if (rc != 0) {
905 __osc_update_grant(cli, oa->o_grant);
906 goto out;
907 }
908
909 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
910 LASSERT(body);
911 osc_update_grant(cli, body);
912 out:
913 OBDO_FREE(oa);
914 return rc;
915 }
916
917 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
918 {
919 client_obd_list_lock(&cli->cl_loi_list_lock);
920 oa->o_grant = cli->cl_avail_grant / 4;
921 cli->cl_avail_grant -= oa->o_grant;
922 client_obd_list_unlock(&cli->cl_loi_list_lock);
923 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
924 oa->o_valid |= OBD_MD_FLFLAGS;
925 oa->o_flags = 0;
926 }
927 oa->o_flags |= OBD_FL_SHRINK_GRANT;
928 osc_update_next_shrink(cli);
929 }
930
931 /* Shrink the current grant, either from some large amount to enough for a
932 * full set of in-flight RPCs, or if we have already shrunk to that limit
933 * then to enough for a single RPC. This avoids keeping more grant than
934 * needed, and avoids shrinking the grant piecemeal. */
935 static int osc_shrink_grant(struct client_obd *cli)
936 {
937 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
938 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
939
940 client_obd_list_lock(&cli->cl_loi_list_lock);
941 if (cli->cl_avail_grant <= target_bytes)
942 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
943 client_obd_list_unlock(&cli->cl_loi_list_lock);
944
945 return osc_shrink_grant_to_target(cli, target_bytes);
946 }
947
948 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
949 {
950 int rc = 0;
951 struct ost_body *body;
952
953 client_obd_list_lock(&cli->cl_loi_list_lock);
954 /* Don't shrink if we are already above or below the desired limit
955 * We don't want to shrink below a single RPC, as that will negatively
956 * impact block allocation and long-term performance. */
957 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
959
960 if (target_bytes >= cli->cl_avail_grant) {
961 client_obd_list_unlock(&cli->cl_loi_list_lock);
962 return 0;
963 }
964 client_obd_list_unlock(&cli->cl_loi_list_lock);
965
966 body = kzalloc(sizeof(*body), GFP_NOFS);
967 if (!body)
968 return -ENOMEM;
969
970 osc_announce_cached(cli, &body->oa, 0);
971
972 client_obd_list_lock(&cli->cl_loi_list_lock);
973 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974 cli->cl_avail_grant = target_bytes;
975 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977 body->oa.o_valid |= OBD_MD_FLFLAGS;
978 body->oa.o_flags = 0;
979 }
980 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981 osc_update_next_shrink(cli);
982
983 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985 sizeof(*body), body, NULL);
986 if (rc != 0)
987 __osc_update_grant(cli, body->oa.o_grant);
988 kfree(body);
989 return rc;
990 }
991
992 static int osc_should_shrink_grant(struct client_obd *client)
993 {
994 unsigned long time = cfs_time_current();
995 unsigned long next_shrink = client->cl_next_shrink_grant;
996
997 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998 OBD_CONNECT_GRANT_SHRINK) == 0)
999 return 0;
1000
1001 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002 /* Get the current RPC size directly, instead of going via:
1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004 * Keep comment here so that it can be found by searching. */
1005 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1006
1007 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008 client->cl_avail_grant > brw_size)
1009 return 1;
1010
1011 osc_update_next_shrink(client);
1012 }
1013 return 0;
1014 }
1015
1016 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1017 {
1018 struct client_obd *client;
1019
1020 list_for_each_entry(client, &item->ti_obd_list,
1021 cl_grant_shrink_list) {
1022 if (osc_should_shrink_grant(client))
1023 osc_shrink_grant(client);
1024 }
1025 return 0;
1026 }
1027
1028 static int osc_add_shrink_grant(struct client_obd *client)
1029 {
1030 int rc;
1031
1032 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1033 TIMEOUT_GRANT,
1034 osc_grant_shrink_grant_cb, NULL,
1035 &client->cl_grant_shrink_list);
1036 if (rc) {
1037 CERROR("add grant client %s error %d\n",
1038 client->cl_import->imp_obd->obd_name, rc);
1039 return rc;
1040 }
1041 CDEBUG(D_CACHE, "add grant client %s \n",
1042 client->cl_import->imp_obd->obd_name);
1043 osc_update_next_shrink(client);
1044 return 0;
1045 }
1046
1047 static int osc_del_shrink_grant(struct client_obd *client)
1048 {
1049 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1050 TIMEOUT_GRANT);
1051 }
1052
1053 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1054 {
1055 /*
1056 * ocd_grant is the total grant amount we're expect to hold: if we've
1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1059 *
1060 * race is tolerable here: if we're evicted, but imp_state already
1061 * left EVICTED state, then cl_dirty must be 0 already.
1062 */
1063 client_obd_list_lock(&cli->cl_loi_list_lock);
1064 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065 cli->cl_avail_grant = ocd->ocd_grant;
1066 else
1067 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1068
1069 if (cli->cl_avail_grant < 0) {
1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072 ocd->ocd_grant, cli->cl_dirty);
1073 /* workaround for servers which do not have the patch from
1074 * LU-2679 */
1075 cli->cl_avail_grant = ocd->ocd_grant;
1076 }
1077
1078 /* determine the appropriate chunk size used by osc_extent. */
1079 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080 client_obd_list_unlock(&cli->cl_loi_list_lock);
1081
1082 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1083 cli->cl_import->imp_obd->obd_name,
1084 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1085
1086 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087 list_empty(&cli->cl_grant_shrink_list))
1088 osc_add_shrink_grant(cli);
1089 }
1090
1091 /* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095 static void handle_short_read(int nob_read, u32 page_count,
1096 struct brw_page **pga)
1097 {
1098 char *ptr;
1099 int i = 0;
1100
1101 /* skip bytes read OK */
1102 while (nob_read > 0) {
1103 LASSERT (page_count > 0);
1104
1105 if (pga[i]->count > nob_read) {
1106 /* EOF inside this page */
1107 ptr = kmap(pga[i]->pg) +
1108 (pga[i]->off & ~CFS_PAGE_MASK);
1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110 kunmap(pga[i]->pg);
1111 page_count--;
1112 i++;
1113 break;
1114 }
1115
1116 nob_read -= pga[i]->count;
1117 page_count--;
1118 i++;
1119 }
1120
1121 /* zero remaining pages */
1122 while (page_count-- > 0) {
1123 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124 memset(ptr, 0, pga[i]->count);
1125 kunmap(pga[i]->pg);
1126 i++;
1127 }
1128 }
1129
1130 static int check_write_rcs(struct ptlrpc_request *req,
1131 int requested_nob, int niocount,
1132 u32 page_count, struct brw_page **pga)
1133 {
1134 int i;
1135 __u32 *remote_rcs;
1136
1137 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138 sizeof(*remote_rcs) *
1139 niocount);
1140 if (remote_rcs == NULL) {
1141 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142 return -EPROTO;
1143 }
1144
1145 /* return error if any niobuf was in error */
1146 for (i = 0; i < niocount; i++) {
1147 if ((int)remote_rcs[i] < 0)
1148 return remote_rcs[i];
1149
1150 if (remote_rcs[i] != 0) {
1151 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152 i, remote_rcs[i], req);
1153 return -EPROTO;
1154 }
1155 }
1156
1157 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159 req->rq_bulk->bd_nob_transferred, requested_nob);
1160 return -EPROTO;
1161 }
1162
1163 return 0;
1164 }
1165
1166 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167 {
1168 if (p1->flag != p2->flag) {
1169 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1170 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1171
1172 /* warn if we try to combine flags that we don't know to be
1173 * safe to combine */
1174 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1176 p1->flag, p2->flag);
1177 }
1178 return 0;
1179 }
1180
1181 return (p1->off + p1->count == p2->off);
1182 }
1183
1184 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1185 struct brw_page **pga, int opc,
1186 cksum_type_t cksum_type)
1187 {
1188 __u32 cksum;
1189 int i = 0;
1190 struct cfs_crypto_hash_desc *hdesc;
1191 unsigned int bufsize;
1192 int err;
1193 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1194
1195 LASSERT(pg_count > 0);
1196
1197 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1198 if (IS_ERR(hdesc)) {
1199 CERROR("Unable to initialize checksum hash %s\n",
1200 cfs_crypto_hash_name(cfs_alg));
1201 return PTR_ERR(hdesc);
1202 }
1203
1204 while (nob > 0 && pg_count > 0) {
1205 int count = pga[i]->count > nob ? nob : pga[i]->count;
1206
1207 /* corrupt the data before we compute the checksum, to
1208 * simulate an OST->client data error */
1209 if (i == 0 && opc == OST_READ &&
1210 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1211 unsigned char *ptr = kmap(pga[i]->pg);
1212 int off = pga[i]->off & ~CFS_PAGE_MASK;
1213 memcpy(ptr + off, "bad1", min(4, nob));
1214 kunmap(pga[i]->pg);
1215 }
1216 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1217 pga[i]->off & ~CFS_PAGE_MASK,
1218 count);
1219 CDEBUG(D_PAGE,
1220 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1221 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1222 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1223 page_private(pga[i]->pg),
1224 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1225
1226 nob -= pga[i]->count;
1227 pg_count--;
1228 i++;
1229 }
1230
1231 bufsize = 4;
1232 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1233
1234 if (err)
1235 cfs_crypto_hash_final(hdesc, NULL, NULL);
1236
1237 /* For sending we only compute the wrong checksum instead
1238 * of corrupting the data so it is still correct on a redo */
1239 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1240 cksum++;
1241
1242 return cksum;
1243 }
1244
1245 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1246 struct obdo *oa,
1247 struct lov_stripe_md *lsm, u32 page_count,
1248 struct brw_page **pga,
1249 struct ptlrpc_request **reqp,
1250 struct obd_capa *ocapa, int reserve,
1251 int resend)
1252 {
1253 struct ptlrpc_request *req;
1254 struct ptlrpc_bulk_desc *desc;
1255 struct ost_body *body;
1256 struct obd_ioobj *ioobj;
1257 struct niobuf_remote *niobuf;
1258 int niocount, i, requested_nob, opc, rc;
1259 struct osc_brw_async_args *aa;
1260 struct req_capsule *pill;
1261 struct brw_page *pg_prev;
1262
1263 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1264 return -ENOMEM; /* Recoverable */
1265 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1266 return -EINVAL; /* Fatal */
1267
1268 if ((cmd & OBD_BRW_WRITE) != 0) {
1269 opc = OST_WRITE;
1270 req = ptlrpc_request_alloc_pool(cli->cl_import,
1271 cli->cl_import->imp_rq_pool,
1272 &RQF_OST_BRW_WRITE);
1273 } else {
1274 opc = OST_READ;
1275 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1276 }
1277 if (req == NULL)
1278 return -ENOMEM;
1279
1280 for (niocount = i = 1; i < page_count; i++) {
1281 if (!can_merge_pages(pga[i - 1], pga[i]))
1282 niocount++;
1283 }
1284
1285 pill = &req->rq_pill;
1286 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1287 sizeof(*ioobj));
1288 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289 niocount * sizeof(*niobuf));
1290 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1291
1292 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1293 if (rc) {
1294 ptlrpc_request_free(req);
1295 return rc;
1296 }
1297 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298 ptlrpc_at_set_req_timeout(req);
1299 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1300 * retry logic */
1301 req->rq_no_retry_einprogress = 1;
1302
1303 desc = ptlrpc_prep_bulk_imp(req, page_count,
1304 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306 OST_BULK_PORTAL);
1307
1308 if (desc == NULL) {
1309 rc = -ENOMEM;
1310 goto out;
1311 }
1312 /* NB request now owns desc and will free it when it gets freed */
1313
1314 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1315 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1316 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1317 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1318
1319 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1320
1321 obdo_to_ioobj(oa, ioobj);
1322 ioobj->ioo_bufcnt = niocount;
1323 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1324 * that might be send for this request. The actual number is decided
1325 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1326 * "max - 1" for old client compatibility sending "0", and also so the
1327 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1328 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1329 osc_pack_capa(req, body, ocapa);
1330 LASSERT(page_count > 0);
1331 pg_prev = pga[0];
1332 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1333 struct brw_page *pg = pga[i];
1334 int poff = pg->off & ~CFS_PAGE_MASK;
1335
1336 LASSERT(pg->count > 0);
1337 /* make sure there is no gap in the middle of page array */
1338 LASSERTF(page_count == 1 ||
1339 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1340 ergo(i > 0 && i < page_count - 1,
1341 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1342 ergo(i == page_count - 1, poff == 0)),
1343 "i: %d/%d pg: %p off: %llu, count: %u\n",
1344 i, page_count, pg, pg->off, pg->count);
1345 LASSERTF(i == 0 || pg->off > pg_prev->off,
1346 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1347 i, page_count,
1348 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349 pg_prev->pg, page_private(pg_prev->pg),
1350 pg_prev->pg->index, pg_prev->off);
1351 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1352 (pg->flag & OBD_BRW_SRVLOCK));
1353
1354 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1355 requested_nob += pg->count;
1356
1357 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1358 niobuf--;
1359 niobuf->len += pg->count;
1360 } else {
1361 niobuf->offset = pg->off;
1362 niobuf->len = pg->count;
1363 niobuf->flags = pg->flag;
1364 }
1365 pg_prev = pg;
1366 }
1367
1368 LASSERTF((void *)(niobuf - niocount) ==
1369 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1370 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1371 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1372
1373 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1374 if (resend) {
1375 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1376 body->oa.o_valid |= OBD_MD_FLFLAGS;
1377 body->oa.o_flags = 0;
1378 }
1379 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1380 }
1381
1382 if (osc_should_shrink_grant(cli))
1383 osc_shrink_grant_local(cli, &body->oa);
1384
1385 /* size[REQ_REC_OFF] still sizeof (*body) */
1386 if (opc == OST_WRITE) {
1387 if (cli->cl_checksum &&
1388 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389 /* store cl_cksum_type in a local variable since
1390 * it can be changed via lprocfs */
1391 cksum_type_t cksum_type = cli->cl_cksum_type;
1392
1393 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1394 oa->o_flags &= OBD_FL_LOCAL_MASK;
1395 body->oa.o_flags = 0;
1396 }
1397 body->oa.o_flags |= cksum_type_pack(cksum_type);
1398 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1400 page_count, pga,
1401 OST_WRITE,
1402 cksum_type);
1403 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1404 body->oa.o_cksum);
1405 /* save this in 'oa', too, for later checking */
1406 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407 oa->o_flags |= cksum_type_pack(cksum_type);
1408 } else {
1409 /* clear out the checksum flag, in case this is a
1410 * resend but cl_checksum is no longer set. b=11238 */
1411 oa->o_valid &= ~OBD_MD_FLCKSUM;
1412 }
1413 oa->o_cksum = body->oa.o_cksum;
1414 /* 1 RC per niobuf */
1415 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1416 sizeof(__u32) * niocount);
1417 } else {
1418 if (cli->cl_checksum &&
1419 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1420 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1421 body->oa.o_flags = 0;
1422 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1423 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1424 }
1425 }
1426 ptlrpc_request_set_replen(req);
1427
1428 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1429 aa = ptlrpc_req_async_args(req);
1430 aa->aa_oa = oa;
1431 aa->aa_requested_nob = requested_nob;
1432 aa->aa_nio_count = niocount;
1433 aa->aa_page_count = page_count;
1434 aa->aa_resends = 0;
1435 aa->aa_ppga = pga;
1436 aa->aa_cli = cli;
1437 INIT_LIST_HEAD(&aa->aa_oaps);
1438 if (ocapa && reserve)
1439 aa->aa_ocapa = capa_get(ocapa);
1440
1441 *reqp = req;
1442 return 0;
1443
1444 out:
1445 ptlrpc_req_finished(req);
1446 return rc;
1447 }
1448
1449 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1450 __u32 client_cksum, __u32 server_cksum, int nob,
1451 u32 page_count, struct brw_page **pga,
1452 cksum_type_t client_cksum_type)
1453 {
1454 __u32 new_cksum;
1455 char *msg;
1456 cksum_type_t cksum_type;
1457
1458 if (server_cksum == client_cksum) {
1459 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1460 return 0;
1461 }
1462
1463 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1464 oa->o_flags : 0);
1465 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1466 cksum_type);
1467
1468 if (cksum_type != client_cksum_type)
1469 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1470 ;
1471 else if (new_cksum == server_cksum)
1472 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1473 ;
1474 else if (new_cksum == client_cksum)
1475 msg = "changed in transit before arrival at OST";
1476 else
1477 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1478 ;
1479
1480 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1481 " object "DOSTID" extent [%llu-%llu]\n",
1482 msg, libcfs_nid2str(peer->nid),
1483 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1484 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1485 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1486 POSTID(&oa->o_oi), pga[0]->off,
1487 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1488 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1489 client_cksum, client_cksum_type,
1490 server_cksum, cksum_type, new_cksum);
1491 return 1;
1492 }
1493
1494 /* Note rc enters this function as number of bytes transferred */
1495 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1496 {
1497 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1498 const lnet_process_id_t *peer =
1499 &req->rq_import->imp_connection->c_peer;
1500 struct client_obd *cli = aa->aa_cli;
1501 struct ost_body *body;
1502 __u32 client_cksum = 0;
1503
1504 if (rc < 0 && rc != -EDQUOT) {
1505 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1506 return rc;
1507 }
1508
1509 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1510 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1511 if (body == NULL) {
1512 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1513 return -EPROTO;
1514 }
1515
1516 /* set/clear over quota flag for a uid/gid */
1517 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1518 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1519 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1520
1521 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1522 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1523 body->oa.o_flags);
1524 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1525 }
1526
1527 osc_update_grant(cli, body);
1528
1529 if (rc < 0)
1530 return rc;
1531
1532 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1533 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1534
1535 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1536 if (rc > 0) {
1537 CERROR("Unexpected +ve rc %d\n", rc);
1538 return -EPROTO;
1539 }
1540 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1541
1542 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1543 return -EAGAIN;
1544
1545 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1546 check_write_checksum(&body->oa, peer, client_cksum,
1547 body->oa.o_cksum, aa->aa_requested_nob,
1548 aa->aa_page_count, aa->aa_ppga,
1549 cksum_type_unpack(aa->aa_oa->o_flags)))
1550 return -EAGAIN;
1551
1552 rc = check_write_rcs(req, aa->aa_requested_nob,
1553 aa->aa_nio_count,
1554 aa->aa_page_count, aa->aa_ppga);
1555 goto out;
1556 }
1557
1558 /* The rest of this function executes only for OST_READs */
1559
1560 /* if unwrap_bulk failed, return -EAGAIN to retry */
1561 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1562 if (rc < 0) {
1563 rc = -EAGAIN;
1564 goto out;
1565 }
1566
1567 if (rc > aa->aa_requested_nob) {
1568 CERROR("Unexpected rc %d (%d requested)\n", rc,
1569 aa->aa_requested_nob);
1570 return -EPROTO;
1571 }
1572
1573 if (rc != req->rq_bulk->bd_nob_transferred) {
1574 CERROR ("Unexpected rc %d (%d transferred)\n",
1575 rc, req->rq_bulk->bd_nob_transferred);
1576 return -EPROTO;
1577 }
1578
1579 if (rc < aa->aa_requested_nob)
1580 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1581
1582 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1583 static int cksum_counter;
1584 __u32 server_cksum = body->oa.o_cksum;
1585 char *via;
1586 char *router;
1587 cksum_type_t cksum_type;
1588
1589 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1590 body->oa.o_flags : 0);
1591 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1592 aa->aa_ppga, OST_READ,
1593 cksum_type);
1594
1595 if (peer->nid == req->rq_bulk->bd_sender) {
1596 via = router = "";
1597 } else {
1598 via = " via ";
1599 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1600 }
1601
1602 if (server_cksum != client_cksum) {
1603 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1604 req->rq_import->imp_obd->obd_name,
1605 libcfs_nid2str(peer->nid),
1606 via, router,
1607 body->oa.o_valid & OBD_MD_FLFID ?
1608 body->oa.o_parent_seq : (__u64)0,
1609 body->oa.o_valid & OBD_MD_FLFID ?
1610 body->oa.o_parent_oid : 0,
1611 body->oa.o_valid & OBD_MD_FLFID ?
1612 body->oa.o_parent_ver : 0,
1613 POSTID(&body->oa.o_oi),
1614 aa->aa_ppga[0]->off,
1615 aa->aa_ppga[aa->aa_page_count-1]->off +
1616 aa->aa_ppga[aa->aa_page_count-1]->count -
1617 1);
1618 CERROR("client %x, server %x, cksum_type %x\n",
1619 client_cksum, server_cksum, cksum_type);
1620 cksum_counter = 0;
1621 aa->aa_oa->o_cksum = client_cksum;
1622 rc = -EAGAIN;
1623 } else {
1624 cksum_counter++;
1625 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1626 rc = 0;
1627 }
1628 } else if (unlikely(client_cksum)) {
1629 static int cksum_missed;
1630
1631 cksum_missed++;
1632 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1633 CERROR("Checksum %u requested from %s but not sent\n",
1634 cksum_missed, libcfs_nid2str(peer->nid));
1635 } else {
1636 rc = 0;
1637 }
1638 out:
1639 if (rc >= 0)
1640 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1641 aa->aa_oa, &body->oa);
1642
1643 return rc;
1644 }
1645
1646 static int osc_brw_redo_request(struct ptlrpc_request *request,
1647 struct osc_brw_async_args *aa, int rc)
1648 {
1649 struct ptlrpc_request *new_req;
1650 struct osc_brw_async_args *new_aa;
1651 struct osc_async_page *oap;
1652
1653 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1654 "redo for recoverable error %d", rc);
1655
1656 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1657 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1658 aa->aa_cli, aa->aa_oa,
1659 NULL /* lsm unused by osc currently */,
1660 aa->aa_page_count, aa->aa_ppga,
1661 &new_req, aa->aa_ocapa, 0, 1);
1662 if (rc)
1663 return rc;
1664
1665 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1666 if (oap->oap_request != NULL) {
1667 LASSERTF(request == oap->oap_request,
1668 "request %p != oap_request %p\n",
1669 request, oap->oap_request);
1670 if (oap->oap_interrupted) {
1671 ptlrpc_req_finished(new_req);
1672 return -EINTR;
1673 }
1674 }
1675 }
1676 /* New request takes over pga and oaps from old request.
1677 * Note that copying a list_head doesn't work, need to move it... */
1678 aa->aa_resends++;
1679 new_req->rq_interpret_reply = request->rq_interpret_reply;
1680 new_req->rq_async_args = request->rq_async_args;
1681 /* cap resend delay to the current request timeout, this is similar to
1682 * what ptlrpc does (see after_reply()) */
1683 if (aa->aa_resends > new_req->rq_timeout)
1684 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1685 else
1686 new_req->rq_sent = get_seconds() + aa->aa_resends;
1687 new_req->rq_generation_set = 1;
1688 new_req->rq_import_generation = request->rq_import_generation;
1689
1690 new_aa = ptlrpc_req_async_args(new_req);
1691
1692 INIT_LIST_HEAD(&new_aa->aa_oaps);
1693 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1694 INIT_LIST_HEAD(&new_aa->aa_exts);
1695 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1696 new_aa->aa_resends = aa->aa_resends;
1697
1698 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1699 if (oap->oap_request) {
1700 ptlrpc_req_finished(oap->oap_request);
1701 oap->oap_request = ptlrpc_request_addref(new_req);
1702 }
1703 }
1704
1705 new_aa->aa_ocapa = aa->aa_ocapa;
1706 aa->aa_ocapa = NULL;
1707
1708 /* XXX: This code will run into problem if we're going to support
1709 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1710 * and wait for all of them to be finished. We should inherit request
1711 * set from old request. */
1712 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1713
1714 DEBUG_REQ(D_INFO, new_req, "new request");
1715 return 0;
1716 }
1717
1718 /*
1719 * ugh, we want disk allocation on the target to happen in offset order. we'll
1720 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1721 * fine for our small page arrays and doesn't require allocation. its an
1722 * insertion sort that swaps elements that are strides apart, shrinking the
1723 * stride down until its '1' and the array is sorted.
1724 */
1725 static void sort_brw_pages(struct brw_page **array, int num)
1726 {
1727 int stride, i, j;
1728 struct brw_page *tmp;
1729
1730 if (num == 1)
1731 return;
1732 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1733 ;
1734
1735 do {
1736 stride /= 3;
1737 for (i = stride ; i < num ; i++) {
1738 tmp = array[i];
1739 j = i;
1740 while (j >= stride && array[j - stride]->off > tmp->off) {
1741 array[j] = array[j - stride];
1742 j -= stride;
1743 }
1744 array[j] = tmp;
1745 }
1746 } while (stride > 1);
1747 }
1748
1749 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1750 {
1751 LASSERT(ppga != NULL);
1752 kfree(ppga);
1753 }
1754
1755 static int brw_interpret(const struct lu_env *env,
1756 struct ptlrpc_request *req, void *data, int rc)
1757 {
1758 struct osc_brw_async_args *aa = data;
1759 struct osc_extent *ext;
1760 struct osc_extent *tmp;
1761 struct cl_object *obj = NULL;
1762 struct client_obd *cli = aa->aa_cli;
1763
1764 rc = osc_brw_fini_request(req, rc);
1765 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1766 /* When server return -EINPROGRESS, client should always retry
1767 * regardless of the number of times the bulk was resent already. */
1768 if (osc_recoverable_error(rc)) {
1769 if (req->rq_import_generation !=
1770 req->rq_import->imp_generation) {
1771 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1772 req->rq_import->imp_obd->obd_name,
1773 POSTID(&aa->aa_oa->o_oi), rc);
1774 } else if (rc == -EINPROGRESS ||
1775 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1776 rc = osc_brw_redo_request(req, aa, rc);
1777 } else {
1778 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1779 req->rq_import->imp_obd->obd_name,
1780 POSTID(&aa->aa_oa->o_oi), rc);
1781 }
1782
1783 if (rc == 0)
1784 return 0;
1785 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1786 rc = -EIO;
1787 }
1788
1789 if (aa->aa_ocapa) {
1790 capa_put(aa->aa_ocapa);
1791 aa->aa_ocapa = NULL;
1792 }
1793
1794 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1795 if (obj == NULL && rc == 0) {
1796 obj = osc2cl(ext->oe_obj);
1797 cl_object_get(obj);
1798 }
1799
1800 list_del_init(&ext->oe_link);
1801 osc_extent_finish(env, ext, 1, rc);
1802 }
1803 LASSERT(list_empty(&aa->aa_exts));
1804 LASSERT(list_empty(&aa->aa_oaps));
1805
1806 if (obj != NULL) {
1807 struct obdo *oa = aa->aa_oa;
1808 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1809 unsigned long valid = 0;
1810
1811 LASSERT(rc == 0);
1812 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1813 attr->cat_blocks = oa->o_blocks;
1814 valid |= CAT_BLOCKS;
1815 }
1816 if (oa->o_valid & OBD_MD_FLMTIME) {
1817 attr->cat_mtime = oa->o_mtime;
1818 valid |= CAT_MTIME;
1819 }
1820 if (oa->o_valid & OBD_MD_FLATIME) {
1821 attr->cat_atime = oa->o_atime;
1822 valid |= CAT_ATIME;
1823 }
1824 if (oa->o_valid & OBD_MD_FLCTIME) {
1825 attr->cat_ctime = oa->o_ctime;
1826 valid |= CAT_CTIME;
1827 }
1828 if (valid != 0) {
1829 cl_object_attr_lock(obj);
1830 cl_object_attr_set(env, obj, attr, valid);
1831 cl_object_attr_unlock(obj);
1832 }
1833 cl_object_put(env, obj);
1834 }
1835 OBDO_FREE(aa->aa_oa);
1836
1837 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1838 req->rq_bulk->bd_nob_transferred);
1839 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1840 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1841
1842 client_obd_list_lock(&cli->cl_loi_list_lock);
1843 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1844 * is called so we know whether to go to sync BRWs or wait for more
1845 * RPCs to complete */
1846 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1847 cli->cl_w_in_flight--;
1848 else
1849 cli->cl_r_in_flight--;
1850 osc_wake_cache_waiters(cli);
1851 client_obd_list_unlock(&cli->cl_loi_list_lock);
1852
1853 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1854 return rc;
1855 }
1856
1857 /**
1858 * Build an RPC by the list of extent @ext_list. The caller must ensure
1859 * that the total pages in this list are NOT over max pages per RPC.
1860 * Extents in the list must be in OES_RPC state.
1861 */
1862 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1863 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1864 {
1865 struct ptlrpc_request *req = NULL;
1866 struct osc_extent *ext;
1867 struct brw_page **pga = NULL;
1868 struct osc_brw_async_args *aa = NULL;
1869 struct obdo *oa = NULL;
1870 struct osc_async_page *oap;
1871 struct osc_async_page *tmp;
1872 struct cl_req *clerq = NULL;
1873 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1874 struct ldlm_lock *lock = NULL;
1875 struct cl_req_attr *crattr = NULL;
1876 u64 starting_offset = OBD_OBJECT_EOF;
1877 u64 ending_offset = 0;
1878 int mpflag = 0;
1879 int mem_tight = 0;
1880 int page_count = 0;
1881 int i;
1882 int rc;
1883 struct ost_body *body;
1884 LIST_HEAD(rpc_list);
1885
1886 LASSERT(!list_empty(ext_list));
1887
1888 /* add pages into rpc_list to build BRW rpc */
1889 list_for_each_entry(ext, ext_list, oe_link) {
1890 LASSERT(ext->oe_state == OES_RPC);
1891 mem_tight |= ext->oe_memalloc;
1892 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1893 ++page_count;
1894 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1895 if (starting_offset > oap->oap_obj_off)
1896 starting_offset = oap->oap_obj_off;
1897 else
1898 LASSERT(oap->oap_page_off == 0);
1899 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1900 ending_offset = oap->oap_obj_off +
1901 oap->oap_count;
1902 else
1903 LASSERT(oap->oap_page_off + oap->oap_count ==
1904 PAGE_CACHE_SIZE);
1905 }
1906 }
1907
1908 if (mem_tight)
1909 mpflag = cfs_memory_pressure_get_and_set();
1910
1911 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1912 if (!crattr) {
1913 rc = -ENOMEM;
1914 goto out;
1915 }
1916
1917 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1918 if (pga == NULL) {
1919 rc = -ENOMEM;
1920 goto out;
1921 }
1922
1923 OBDO_ALLOC(oa);
1924 if (oa == NULL) {
1925 rc = -ENOMEM;
1926 goto out;
1927 }
1928
1929 i = 0;
1930 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1931 struct cl_page *page = oap2cl_page(oap);
1932 if (clerq == NULL) {
1933 clerq = cl_req_alloc(env, page, crt,
1934 1 /* only 1-object rpcs for now */);
1935 if (IS_ERR(clerq)) {
1936 rc = PTR_ERR(clerq);
1937 goto out;
1938 }
1939 lock = oap->oap_ldlm_lock;
1940 }
1941 if (mem_tight)
1942 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1943 pga[i] = &oap->oap_brw_page;
1944 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1945 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1946 pga[i]->pg, page_index(oap->oap_page), oap,
1947 pga[i]->flag);
1948 i++;
1949 cl_req_page_add(env, clerq, page);
1950 }
1951
1952 /* always get the data for the obdo for the rpc */
1953 LASSERT(clerq != NULL);
1954 crattr->cra_oa = oa;
1955 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1956 if (lock) {
1957 oa->o_handle = lock->l_remote_handle;
1958 oa->o_valid |= OBD_MD_FLHANDLE;
1959 }
1960
1961 rc = cl_req_prep(env, clerq);
1962 if (rc != 0) {
1963 CERROR("cl_req_prep failed: %d\n", rc);
1964 goto out;
1965 }
1966
1967 sort_brw_pages(pga, page_count);
1968 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1969 pga, &req, crattr->cra_capa, 1, 0);
1970 if (rc != 0) {
1971 CERROR("prep_req failed: %d\n", rc);
1972 goto out;
1973 }
1974
1975 req->rq_interpret_reply = brw_interpret;
1976
1977 if (mem_tight != 0)
1978 req->rq_memalloc = 1;
1979
1980 /* Need to update the timestamps after the request is built in case
1981 * we race with setattr (locally or in queue at OST). If OST gets
1982 * later setattr before earlier BRW (as determined by the request xid),
1983 * the OST will not use BRW timestamps. Sadly, there is no obvious
1984 * way to do this in a single call. bug 10150 */
1985 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1986 crattr->cra_oa = &body->oa;
1987 cl_req_attr_set(env, clerq, crattr,
1988 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1989
1990 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1991
1992 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1993 aa = ptlrpc_req_async_args(req);
1994 INIT_LIST_HEAD(&aa->aa_oaps);
1995 list_splice_init(&rpc_list, &aa->aa_oaps);
1996 INIT_LIST_HEAD(&aa->aa_exts);
1997 list_splice_init(ext_list, &aa->aa_exts);
1998 aa->aa_clerq = clerq;
1999
2000 /* queued sync pages can be torn down while the pages
2001 * were between the pending list and the rpc */
2002 tmp = NULL;
2003 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2004 /* only one oap gets a request reference */
2005 if (tmp == NULL)
2006 tmp = oap;
2007 if (oap->oap_interrupted && !req->rq_intr) {
2008 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2009 oap, req);
2010 ptlrpc_mark_interrupted(req);
2011 }
2012 }
2013 if (tmp != NULL)
2014 tmp->oap_request = ptlrpc_request_addref(req);
2015
2016 client_obd_list_lock(&cli->cl_loi_list_lock);
2017 starting_offset >>= PAGE_CACHE_SHIFT;
2018 if (cmd == OBD_BRW_READ) {
2019 cli->cl_r_in_flight++;
2020 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2021 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2022 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2023 starting_offset + 1);
2024 } else {
2025 cli->cl_w_in_flight++;
2026 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2027 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2028 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2029 starting_offset + 1);
2030 }
2031 client_obd_list_unlock(&cli->cl_loi_list_lock);
2032
2033 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2034 page_count, aa, cli->cl_r_in_flight,
2035 cli->cl_w_in_flight);
2036
2037 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2038 * see which CPU/NUMA node the majority of pages were allocated
2039 * on, and try to assign the async RPC to the CPU core
2040 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2041 *
2042 * But on the other hand, we expect that multiple ptlrpcd
2043 * threads and the initial write sponsor can run in parallel,
2044 * especially when data checksum is enabled, which is CPU-bound
2045 * operation and single ptlrpcd thread cannot process in time.
2046 * So more ptlrpcd threads sharing BRW load
2047 * (with PDL_POLICY_ROUND) seems better.
2048 */
2049 ptlrpcd_add_req(req, pol, -1);
2050 rc = 0;
2051
2052 out:
2053 if (mem_tight != 0)
2054 cfs_memory_pressure_restore(mpflag);
2055
2056 if (crattr != NULL) {
2057 capa_put(crattr->cra_capa);
2058 kfree(crattr);
2059 }
2060
2061 if (rc != 0) {
2062 LASSERT(req == NULL);
2063
2064 if (oa)
2065 OBDO_FREE(oa);
2066 kfree(pga);
2067 /* this should happen rarely and is pretty bad, it makes the
2068 * pending list not follow the dirty order */
2069 while (!list_empty(ext_list)) {
2070 ext = list_entry(ext_list->next, struct osc_extent,
2071 oe_link);
2072 list_del_init(&ext->oe_link);
2073 osc_extent_finish(env, ext, 0, rc);
2074 }
2075 if (clerq && !IS_ERR(clerq))
2076 cl_req_completion(env, clerq, rc);
2077 }
2078 return rc;
2079 }
2080
2081 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2082 struct ldlm_enqueue_info *einfo)
2083 {
2084 void *data = einfo->ei_cbdata;
2085 int set = 0;
2086
2087 LASSERT(lock != NULL);
2088 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2089 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2090 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2091 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2092
2093 lock_res_and_lock(lock);
2094 spin_lock(&osc_ast_guard);
2095
2096 if (lock->l_ast_data == NULL)
2097 lock->l_ast_data = data;
2098 if (lock->l_ast_data == data)
2099 set = 1;
2100
2101 spin_unlock(&osc_ast_guard);
2102 unlock_res_and_lock(lock);
2103
2104 return set;
2105 }
2106
2107 static int osc_set_data_with_check(struct lustre_handle *lockh,
2108 struct ldlm_enqueue_info *einfo)
2109 {
2110 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2111 int set = 0;
2112
2113 if (lock != NULL) {
2114 set = osc_set_lock_data_with_check(lock, einfo);
2115 LDLM_LOCK_PUT(lock);
2116 } else
2117 CERROR("lockh %p, data %p - client evicted?\n",
2118 lockh, einfo->ei_cbdata);
2119 return set;
2120 }
2121
2122 /* find any ldlm lock of the inode in osc
2123 * return 0 not find
2124 * 1 find one
2125 * < 0 error */
2126 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2127 ldlm_iterator_t replace, void *data)
2128 {
2129 struct ldlm_res_id res_id;
2130 struct obd_device *obd = class_exp2obd(exp);
2131 int rc = 0;
2132
2133 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2134 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2135 if (rc == LDLM_ITER_STOP)
2136 return 1;
2137 if (rc == LDLM_ITER_CONTINUE)
2138 return 0;
2139 return rc;
2140 }
2141
2142 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2143 obd_enqueue_update_f upcall, void *cookie,
2144 __u64 *flags, int agl, int rc)
2145 {
2146 int intent = *flags & LDLM_FL_HAS_INTENT;
2147
2148 if (intent) {
2149 /* The request was created before ldlm_cli_enqueue call. */
2150 if (rc == ELDLM_LOCK_ABORTED) {
2151 struct ldlm_reply *rep;
2152 rep = req_capsule_server_get(&req->rq_pill,
2153 &RMF_DLM_REP);
2154
2155 LASSERT(rep != NULL);
2156 rep->lock_policy_res1 =
2157 ptlrpc_status_ntoh(rep->lock_policy_res1);
2158 if (rep->lock_policy_res1)
2159 rc = rep->lock_policy_res1;
2160 }
2161 }
2162
2163 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2164 (rc == 0)) {
2165 *flags |= LDLM_FL_LVB_READY;
2166 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2167 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2168 }
2169
2170 /* Call the update callback. */
2171 rc = (*upcall)(cookie, rc);
2172 return rc;
2173 }
2174
2175 static int osc_enqueue_interpret(const struct lu_env *env,
2176 struct ptlrpc_request *req,
2177 struct osc_enqueue_args *aa, int rc)
2178 {
2179 struct ldlm_lock *lock;
2180 struct lustre_handle handle;
2181 __u32 mode;
2182 struct ost_lvb *lvb;
2183 __u32 lvb_len;
2184 __u64 *flags = aa->oa_flags;
2185
2186 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2187 * might be freed anytime after lock upcall has been called. */
2188 lustre_handle_copy(&handle, aa->oa_lockh);
2189 mode = aa->oa_ei->ei_mode;
2190
2191 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2192 * be valid. */
2193 lock = ldlm_handle2lock(&handle);
2194
2195 /* Take an additional reference so that a blocking AST that
2196 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2197 * to arrive after an upcall has been executed by
2198 * osc_enqueue_fini(). */
2199 ldlm_lock_addref(&handle, mode);
2200
2201 /* Let CP AST to grant the lock first. */
2202 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2203
2204 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2205 lvb = NULL;
2206 lvb_len = 0;
2207 } else {
2208 lvb = aa->oa_lvb;
2209 lvb_len = sizeof(*aa->oa_lvb);
2210 }
2211
2212 /* Complete obtaining the lock procedure. */
2213 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2214 mode, flags, lvb, lvb_len, &handle, rc);
2215 /* Complete osc stuff. */
2216 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2217 flags, aa->oa_agl, rc);
2218
2219 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2220
2221 /* Release the lock for async request. */
2222 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2223 /*
2224 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2225 * not already released by
2226 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2227 */
2228 ldlm_lock_decref(&handle, mode);
2229
2230 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2231 aa->oa_lockh, req, aa);
2232 ldlm_lock_decref(&handle, mode);
2233 LDLM_LOCK_PUT(lock);
2234 return rc;
2235 }
2236
2237 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2238
2239 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2240 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2241 * other synchronous requests, however keeping some locks and trying to obtain
2242 * others may take a considerable amount of time in a case of ost failure; and
2243 * when other sync requests do not get released lock from a client, the client
2244 * is excluded from the cluster -- such scenarious make the life difficult, so
2245 * release locks just after they are obtained. */
2246 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2247 __u64 *flags, ldlm_policy_data_t *policy,
2248 struct ost_lvb *lvb, int kms_valid,
2249 obd_enqueue_update_f upcall, void *cookie,
2250 struct ldlm_enqueue_info *einfo,
2251 struct lustre_handle *lockh,
2252 struct ptlrpc_request_set *rqset, int async, int agl)
2253 {
2254 struct obd_device *obd = exp->exp_obd;
2255 struct ptlrpc_request *req = NULL;
2256 int intent = *flags & LDLM_FL_HAS_INTENT;
2257 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2258 ldlm_mode_t mode;
2259 int rc;
2260
2261 /* Filesystem lock extents are extended to page boundaries so that
2262 * dealing with the page cache is a little smoother. */
2263 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2264 policy->l_extent.end |= ~CFS_PAGE_MASK;
2265
2266 /*
2267 * kms is not valid when either object is completely fresh (so that no
2268 * locks are cached), or object was evicted. In the latter case cached
2269 * lock cannot be used, because it would prime inode state with
2270 * potentially stale LVB.
2271 */
2272 if (!kms_valid)
2273 goto no_match;
2274
2275 /* Next, search for already existing extent locks that will cover us */
2276 /* If we're trying to read, we also search for an existing PW lock. The
2277 * VFS and page cache already protect us locally, so lots of readers/
2278 * writers can share a single PW lock.
2279 *
2280 * There are problems with conversion deadlocks, so instead of
2281 * converting a read lock to a write lock, we'll just enqueue a new
2282 * one.
2283 *
2284 * At some point we should cancel the read lock instead of making them
2285 * send us a blocking callback, but there are problems with canceling
2286 * locks out from other users right now, too. */
2287 mode = einfo->ei_mode;
2288 if (einfo->ei_mode == LCK_PR)
2289 mode |= LCK_PW;
2290 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2291 einfo->ei_type, policy, mode, lockh, 0);
2292 if (mode) {
2293 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2294
2295 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2296 /* For AGL, if enqueue RPC is sent but the lock is not
2297 * granted, then skip to process this strpe.
2298 * Return -ECANCELED to tell the caller. */
2299 ldlm_lock_decref(lockh, mode);
2300 LDLM_LOCK_PUT(matched);
2301 return -ECANCELED;
2302 }
2303
2304 if (osc_set_lock_data_with_check(matched, einfo)) {
2305 *flags |= LDLM_FL_LVB_READY;
2306 /* addref the lock only if not async requests and PW
2307 * lock is matched whereas we asked for PR. */
2308 if (!rqset && einfo->ei_mode != mode)
2309 ldlm_lock_addref(lockh, LCK_PR);
2310 if (intent) {
2311 /* I would like to be able to ASSERT here that
2312 * rss <= kms, but I can't, for reasons which
2313 * are explained in lov_enqueue() */
2314 }
2315
2316 /* We already have a lock, and it's referenced.
2317 *
2318 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2319 * AGL upcall may change it to CLS_HELD directly. */
2320 (*upcall)(cookie, ELDLM_OK);
2321
2322 if (einfo->ei_mode != mode)
2323 ldlm_lock_decref(lockh, LCK_PW);
2324 else if (rqset)
2325 /* For async requests, decref the lock. */
2326 ldlm_lock_decref(lockh, einfo->ei_mode);
2327 LDLM_LOCK_PUT(matched);
2328 return ELDLM_OK;
2329 }
2330
2331 ldlm_lock_decref(lockh, mode);
2332 LDLM_LOCK_PUT(matched);
2333 }
2334
2335 no_match:
2336 if (intent) {
2337 LIST_HEAD(cancels);
2338 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2339 &RQF_LDLM_ENQUEUE_LVB);
2340 if (req == NULL)
2341 return -ENOMEM;
2342
2343 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2344 if (rc) {
2345 ptlrpc_request_free(req);
2346 return rc;
2347 }
2348
2349 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2350 sizeof(*lvb));
2351 ptlrpc_request_set_replen(req);
2352 }
2353
2354 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2355 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2356
2357 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2358 sizeof(*lvb), LVB_T_OST, lockh, async);
2359 if (rqset) {
2360 if (!rc) {
2361 struct osc_enqueue_args *aa;
2362 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2363 aa = ptlrpc_req_async_args(req);
2364 aa->oa_ei = einfo;
2365 aa->oa_exp = exp;
2366 aa->oa_flags = flags;
2367 aa->oa_upcall = upcall;
2368 aa->oa_cookie = cookie;
2369 aa->oa_lvb = lvb;
2370 aa->oa_lockh = lockh;
2371 aa->oa_agl = !!agl;
2372
2373 req->rq_interpret_reply =
2374 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2375 if (rqset == PTLRPCD_SET)
2376 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2377 else
2378 ptlrpc_set_add_req(rqset, req);
2379 } else if (intent) {
2380 ptlrpc_req_finished(req);
2381 }
2382 return rc;
2383 }
2384
2385 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2386 if (intent)
2387 ptlrpc_req_finished(req);
2388
2389 return rc;
2390 }
2391
2392 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2393 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2394 __u64 *flags, void *data, struct lustre_handle *lockh,
2395 int unref)
2396 {
2397 struct obd_device *obd = exp->exp_obd;
2398 __u64 lflags = *flags;
2399 ldlm_mode_t rc;
2400
2401 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2402 return -EIO;
2403
2404 /* Filesystem lock extents are extended to page boundaries so that
2405 * dealing with the page cache is a little smoother */
2406 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2407 policy->l_extent.end |= ~CFS_PAGE_MASK;
2408
2409 /* Next, search for already existing extent locks that will cover us */
2410 /* If we're trying to read, we also search for an existing PW lock. The
2411 * VFS and page cache already protect us locally, so lots of readers/
2412 * writers can share a single PW lock. */
2413 rc = mode;
2414 if (mode == LCK_PR)
2415 rc |= LCK_PW;
2416 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2417 res_id, type, policy, rc, lockh, unref);
2418 if (rc) {
2419 if (data != NULL) {
2420 if (!osc_set_data_with_check(lockh, data)) {
2421 if (!(lflags & LDLM_FL_TEST_LOCK))
2422 ldlm_lock_decref(lockh, rc);
2423 return 0;
2424 }
2425 }
2426 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2427 ldlm_lock_addref(lockh, LCK_PR);
2428 ldlm_lock_decref(lockh, LCK_PW);
2429 }
2430 return rc;
2431 }
2432 return rc;
2433 }
2434
2435 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2436 {
2437 if (unlikely(mode == LCK_GROUP))
2438 ldlm_lock_decref_and_cancel(lockh, mode);
2439 else
2440 ldlm_lock_decref(lockh, mode);
2441
2442 return 0;
2443 }
2444
2445 static int osc_statfs_interpret(const struct lu_env *env,
2446 struct ptlrpc_request *req,
2447 struct osc_async_args *aa, int rc)
2448 {
2449 struct obd_statfs *msfs;
2450
2451 if (rc == -EBADR)
2452 /* The request has in fact never been sent
2453 * due to issues at a higher level (LOV).
2454 * Exit immediately since the caller is
2455 * aware of the problem and takes care
2456 * of the clean up */
2457 return rc;
2458
2459 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2460 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2461 rc = 0;
2462 goto out;
2463 }
2464
2465 if (rc != 0)
2466 goto out;
2467
2468 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2469 if (msfs == NULL) {
2470 rc = -EPROTO;
2471 goto out;
2472 }
2473
2474 *aa->aa_oi->oi_osfs = *msfs;
2475 out:
2476 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2477 return rc;
2478 }
2479
2480 static int osc_statfs_async(struct obd_export *exp,
2481 struct obd_info *oinfo, __u64 max_age,
2482 struct ptlrpc_request_set *rqset)
2483 {
2484 struct obd_device *obd = class_exp2obd(exp);
2485 struct ptlrpc_request *req;
2486 struct osc_async_args *aa;
2487 int rc;
2488
2489 /* We could possibly pass max_age in the request (as an absolute
2490 * timestamp or a "seconds.usec ago") so the target can avoid doing
2491 * extra calls into the filesystem if that isn't necessary (e.g.
2492 * during mount that would help a bit). Having relative timestamps
2493 * is not so great if request processing is slow, while absolute
2494 * timestamps are not ideal because they need time synchronization. */
2495 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2496 if (req == NULL)
2497 return -ENOMEM;
2498
2499 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2500 if (rc) {
2501 ptlrpc_request_free(req);
2502 return rc;
2503 }
2504 ptlrpc_request_set_replen(req);
2505 req->rq_request_portal = OST_CREATE_PORTAL;
2506 ptlrpc_at_set_req_timeout(req);
2507
2508 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2509 /* procfs requests not want stat in wait for avoid deadlock */
2510 req->rq_no_resend = 1;
2511 req->rq_no_delay = 1;
2512 }
2513
2514 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2515 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2516 aa = ptlrpc_req_async_args(req);
2517 aa->aa_oi = oinfo;
2518
2519 ptlrpc_set_add_req(rqset, req);
2520 return 0;
2521 }
2522
2523 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2524 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2525 {
2526 struct obd_device *obd = class_exp2obd(exp);
2527 struct obd_statfs *msfs;
2528 struct ptlrpc_request *req;
2529 struct obd_import *imp = NULL;
2530 int rc;
2531
2532 /*Since the request might also come from lprocfs, so we need
2533 *sync this with client_disconnect_export Bug15684*/
2534 down_read(&obd->u.cli.cl_sem);
2535 if (obd->u.cli.cl_import)
2536 imp = class_import_get(obd->u.cli.cl_import);
2537 up_read(&obd->u.cli.cl_sem);
2538 if (!imp)
2539 return -ENODEV;
2540
2541 /* We could possibly pass max_age in the request (as an absolute
2542 * timestamp or a "seconds.usec ago") so the target can avoid doing
2543 * extra calls into the filesystem if that isn't necessary (e.g.
2544 * during mount that would help a bit). Having relative timestamps
2545 * is not so great if request processing is slow, while absolute
2546 * timestamps are not ideal because they need time synchronization. */
2547 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2548
2549 class_import_put(imp);
2550
2551 if (req == NULL)
2552 return -ENOMEM;
2553
2554 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2555 if (rc) {
2556 ptlrpc_request_free(req);
2557 return rc;
2558 }
2559 ptlrpc_request_set_replen(req);
2560 req->rq_request_portal = OST_CREATE_PORTAL;
2561 ptlrpc_at_set_req_timeout(req);
2562
2563 if (flags & OBD_STATFS_NODELAY) {
2564 /* procfs requests not want stat in wait for avoid deadlock */
2565 req->rq_no_resend = 1;
2566 req->rq_no_delay = 1;
2567 }
2568
2569 rc = ptlrpc_queue_wait(req);
2570 if (rc)
2571 goto out;
2572
2573 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2574 if (msfs == NULL) {
2575 rc = -EPROTO;
2576 goto out;
2577 }
2578
2579 *osfs = *msfs;
2580
2581 out:
2582 ptlrpc_req_finished(req);
2583 return rc;
2584 }
2585
2586 /* Retrieve object striping information.
2587 *
2588 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2589 * the maximum number of OST indices which will fit in the user buffer.
2590 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2591 */
2592 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2593 {
2594 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2595 struct lov_user_md_v3 lum, *lumk;
2596 struct lov_user_ost_data_v1 *lmm_objects;
2597 int rc = 0, lum_size;
2598
2599 if (!lsm)
2600 return -ENODATA;
2601
2602 /* we only need the header part from user space to get lmm_magic and
2603 * lmm_stripe_count, (the header part is common to v1 and v3) */
2604 lum_size = sizeof(struct lov_user_md_v1);
2605 if (copy_from_user(&lum, lump, lum_size))
2606 return -EFAULT;
2607
2608 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2609 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2610 return -EINVAL;
2611
2612 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2613 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2614 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2615 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2616
2617 /* we can use lov_mds_md_size() to compute lum_size
2618 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2619 if (lum.lmm_stripe_count > 0) {
2620 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2621 lumk = kzalloc(lum_size, GFP_NOFS);
2622 if (!lumk)
2623 return -ENOMEM;
2624
2625 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2626 lmm_objects =
2627 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2628 else
2629 lmm_objects = &(lumk->lmm_objects[0]);
2630 lmm_objects->l_ost_oi = lsm->lsm_oi;
2631 } else {
2632 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2633 lumk = &lum;
2634 }
2635
2636 lumk->lmm_oi = lsm->lsm_oi;
2637 lumk->lmm_stripe_count = 1;
2638
2639 if (copy_to_user(lump, lumk, lum_size))
2640 rc = -EFAULT;
2641
2642 if (lumk != &lum)
2643 kfree(lumk);
2644
2645 return rc;
2646 }
2647
2648
2649 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2650 void *karg, void *uarg)
2651 {
2652 struct obd_device *obd = exp->exp_obd;
2653 struct obd_ioctl_data *data = karg;
2654 int err = 0;
2655
2656 if (!try_module_get(THIS_MODULE)) {
2657 CERROR("Can't get module. Is it alive?");
2658 return -EINVAL;
2659 }
2660 switch (cmd) {
2661 case OBD_IOC_LOV_GET_CONFIG: {
2662 char *buf;
2663 struct lov_desc *desc;
2664 struct obd_uuid uuid;
2665
2666 buf = NULL;
2667 len = 0;
2668 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2669 err = -EINVAL;
2670 goto out;
2671 }
2672
2673 data = (struct obd_ioctl_data *)buf;
2674
2675 if (sizeof(*desc) > data->ioc_inllen1) {
2676 obd_ioctl_freedata(buf, len);
2677 err = -EINVAL;
2678 goto out;
2679 }
2680
2681 if (data->ioc_inllen2 < sizeof(uuid)) {
2682 obd_ioctl_freedata(buf, len);
2683 err = -EINVAL;
2684 goto out;
2685 }
2686
2687 desc = (struct lov_desc *)data->ioc_inlbuf1;
2688 desc->ld_tgt_count = 1;
2689 desc->ld_active_tgt_count = 1;
2690 desc->ld_default_stripe_count = 1;
2691 desc->ld_default_stripe_size = 0;
2692 desc->ld_default_stripe_offset = 0;
2693 desc->ld_pattern = 0;
2694 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2695
2696 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2697
2698 err = copy_to_user(uarg, buf, len);
2699 if (err)
2700 err = -EFAULT;
2701 obd_ioctl_freedata(buf, len);
2702 goto out;
2703 }
2704 case LL_IOC_LOV_SETSTRIPE:
2705 err = obd_alloc_memmd(exp, karg);
2706 if (err > 0)
2707 err = 0;
2708 goto out;
2709 case LL_IOC_LOV_GETSTRIPE:
2710 err = osc_getstripe(karg, uarg);
2711 goto out;
2712 case OBD_IOC_CLIENT_RECOVER:
2713 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2714 data->ioc_inlbuf1, 0);
2715 if (err > 0)
2716 err = 0;
2717 goto out;
2718 case IOC_OSC_SET_ACTIVE:
2719 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2720 data->ioc_offset);
2721 goto out;
2722 case OBD_IOC_POLL_QUOTACHECK:
2723 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2724 goto out;
2725 case OBD_IOC_PING_TARGET:
2726 err = ptlrpc_obd_ping(obd);
2727 goto out;
2728 default:
2729 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2730 cmd, current_comm());
2731 err = -ENOTTY;
2732 goto out;
2733 }
2734 out:
2735 module_put(THIS_MODULE);
2736 return err;
2737 }
2738
2739 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2740 u32 keylen, void *key, __u32 *vallen, void *val,
2741 struct lov_stripe_md *lsm)
2742 {
2743 if (!vallen || !val)
2744 return -EFAULT;
2745
2746 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2747 __u32 *stripe = val;
2748 *vallen = sizeof(*stripe);
2749 *stripe = 0;
2750 return 0;
2751 } else if (KEY_IS(KEY_LAST_ID)) {
2752 struct ptlrpc_request *req;
2753 u64 *reply;
2754 char *tmp;
2755 int rc;
2756
2757 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2758 &RQF_OST_GET_INFO_LAST_ID);
2759 if (req == NULL)
2760 return -ENOMEM;
2761
2762 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2763 RCL_CLIENT, keylen);
2764 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2765 if (rc) {
2766 ptlrpc_request_free(req);
2767 return rc;
2768 }
2769
2770 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2771 memcpy(tmp, key, keylen);
2772
2773 req->rq_no_delay = req->rq_no_resend = 1;
2774 ptlrpc_request_set_replen(req);
2775 rc = ptlrpc_queue_wait(req);
2776 if (rc)
2777 goto out;
2778
2779 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2780 if (reply == NULL) {
2781 rc = -EPROTO;
2782 goto out;
2783 }
2784
2785 *((u64 *)val) = *reply;
2786 out:
2787 ptlrpc_req_finished(req);
2788 return rc;
2789 } else if (KEY_IS(KEY_FIEMAP)) {
2790 struct ll_fiemap_info_key *fm_key =
2791 (struct ll_fiemap_info_key *)key;
2792 struct ldlm_res_id res_id;
2793 ldlm_policy_data_t policy;
2794 struct lustre_handle lockh;
2795 ldlm_mode_t mode = 0;
2796 struct ptlrpc_request *req;
2797 struct ll_user_fiemap *reply;
2798 char *tmp;
2799 int rc;
2800
2801 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2802 goto skip_locking;
2803
2804 policy.l_extent.start = fm_key->fiemap.fm_start &
2805 CFS_PAGE_MASK;
2806
2807 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2808 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2809 policy.l_extent.end = OBD_OBJECT_EOF;
2810 else
2811 policy.l_extent.end = (fm_key->fiemap.fm_start +
2812 fm_key->fiemap.fm_length +
2813 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2814
2815 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2816 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2817 LDLM_FL_BLOCK_GRANTED |
2818 LDLM_FL_LVB_READY,
2819 &res_id, LDLM_EXTENT, &policy,
2820 LCK_PR | LCK_PW, &lockh, 0);
2821 if (mode) { /* lock is cached on client */
2822 if (mode != LCK_PR) {
2823 ldlm_lock_addref(&lockh, LCK_PR);
2824 ldlm_lock_decref(&lockh, LCK_PW);
2825 }
2826 } else { /* no cached lock, needs acquire lock on server side */
2827 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2828 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2829 }
2830
2831 skip_locking:
2832 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2833 &RQF_OST_GET_INFO_FIEMAP);
2834 if (req == NULL) {
2835 rc = -ENOMEM;
2836 goto drop_lock;
2837 }
2838
2839 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2840 RCL_CLIENT, keylen);
2841 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2842 RCL_CLIENT, *vallen);
2843 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2844 RCL_SERVER, *vallen);
2845
2846 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2847 if (rc) {
2848 ptlrpc_request_free(req);
2849 goto drop_lock;
2850 }
2851
2852 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2853 memcpy(tmp, key, keylen);
2854 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2855 memcpy(tmp, val, *vallen);
2856
2857 ptlrpc_request_set_replen(req);
2858 rc = ptlrpc_queue_wait(req);
2859 if (rc)
2860 goto fini_req;
2861
2862 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2863 if (reply == NULL) {
2864 rc = -EPROTO;
2865 goto fini_req;
2866 }
2867
2868 memcpy(val, reply, *vallen);
2869 fini_req:
2870 ptlrpc_req_finished(req);
2871 drop_lock:
2872 if (mode)
2873 ldlm_lock_decref(&lockh, LCK_PR);
2874 return rc;
2875 }
2876
2877 return -EINVAL;
2878 }
2879
2880 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2881 u32 keylen, void *key, u32 vallen,
2882 void *val, struct ptlrpc_request_set *set)
2883 {
2884 struct ptlrpc_request *req;
2885 struct obd_device *obd = exp->exp_obd;
2886 struct obd_import *imp = class_exp2cliimp(exp);
2887 char *tmp;
2888 int rc;
2889
2890 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2891
2892 if (KEY_IS(KEY_CHECKSUM)) {
2893 if (vallen != sizeof(int))
2894 return -EINVAL;
2895 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2896 return 0;
2897 }
2898
2899 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2900 sptlrpc_conf_client_adapt(obd);
2901 return 0;
2902 }
2903
2904 if (KEY_IS(KEY_FLUSH_CTX)) {
2905 sptlrpc_import_flush_my_ctx(imp);
2906 return 0;
2907 }
2908
2909 if (KEY_IS(KEY_CACHE_SET)) {
2910 struct client_obd *cli = &obd->u.cli;
2911
2912 LASSERT(cli->cl_cache == NULL); /* only once */
2913 cli->cl_cache = (struct cl_client_cache *)val;
2914 atomic_inc(&cli->cl_cache->ccc_users);
2915 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2916
2917 /* add this osc into entity list */
2918 LASSERT(list_empty(&cli->cl_lru_osc));
2919 spin_lock(&cli->cl_cache->ccc_lru_lock);
2920 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2921 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2922
2923 return 0;
2924 }
2925
2926 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2927 struct client_obd *cli = &obd->u.cli;
2928 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2929 int target = *(int *)val;
2930
2931 nr = osc_lru_shrink(cli, min(nr, target));
2932 *(int *)val -= nr;
2933 return 0;
2934 }
2935
2936 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2937 return -EINVAL;
2938
2939 /* We pass all other commands directly to OST. Since nobody calls osc
2940 methods directly and everybody is supposed to go through LOV, we
2941 assume lov checked invalid values for us.
2942 The only recognised values so far are evict_by_nid and mds_conn.
2943 Even if something bad goes through, we'd get a -EINVAL from OST
2944 anyway. */
2945
2946 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2947 &RQF_OST_SET_GRANT_INFO :
2948 &RQF_OBD_SET_INFO);
2949 if (req == NULL)
2950 return -ENOMEM;
2951
2952 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2953 RCL_CLIENT, keylen);
2954 if (!KEY_IS(KEY_GRANT_SHRINK))
2955 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2956 RCL_CLIENT, vallen);
2957 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2958 if (rc) {
2959 ptlrpc_request_free(req);
2960 return rc;
2961 }
2962
2963 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2964 memcpy(tmp, key, keylen);
2965 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2966 &RMF_OST_BODY :
2967 &RMF_SETINFO_VAL);
2968 memcpy(tmp, val, vallen);
2969
2970 if (KEY_IS(KEY_GRANT_SHRINK)) {
2971 struct osc_brw_async_args *aa;
2972 struct obdo *oa;
2973
2974 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2975 aa = ptlrpc_req_async_args(req);
2976 OBDO_ALLOC(oa);
2977 if (!oa) {
2978 ptlrpc_req_finished(req);
2979 return -ENOMEM;
2980 }
2981 *oa = ((struct ost_body *)val)->oa;
2982 aa->aa_oa = oa;
2983 req->rq_interpret_reply = osc_shrink_grant_interpret;
2984 }
2985
2986 ptlrpc_request_set_replen(req);
2987 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2988 LASSERT(set != NULL);
2989 ptlrpc_set_add_req(set, req);
2990 ptlrpc_check_set(NULL, set);
2991 } else
2992 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2993
2994 return 0;
2995 }
2996
2997 static int osc_reconnect(const struct lu_env *env,
2998 struct obd_export *exp, struct obd_device *obd,
2999 struct obd_uuid *cluuid,
3000 struct obd_connect_data *data,
3001 void *localdata)
3002 {
3003 struct client_obd *cli = &obd->u.cli;
3004
3005 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3006 long lost_grant;
3007
3008 client_obd_list_lock(&cli->cl_loi_list_lock);
3009 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3010 2 * cli_brw_size(obd);
3011 lost_grant = cli->cl_lost_grant;
3012 cli->cl_lost_grant = 0;
3013 client_obd_list_unlock(&cli->cl_loi_list_lock);
3014
3015 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3016 data->ocd_connect_flags,
3017 data->ocd_version, data->ocd_grant, lost_grant);
3018 }
3019
3020 return 0;
3021 }
3022
3023 static int osc_disconnect(struct obd_export *exp)
3024 {
3025 struct obd_device *obd = class_exp2obd(exp);
3026 int rc;
3027
3028 rc = client_disconnect_export(exp);
3029 /**
3030 * Initially we put del_shrink_grant before disconnect_export, but it
3031 * causes the following problem if setup (connect) and cleanup
3032 * (disconnect) are tangled together.
3033 * connect p1 disconnect p2
3034 * ptlrpc_connect_import
3035 * ............... class_manual_cleanup
3036 * osc_disconnect
3037 * del_shrink_grant
3038 * ptlrpc_connect_interrupt
3039 * init_grant_shrink
3040 * add this client to shrink list
3041 * cleanup_osc
3042 * Bang! pinger trigger the shrink.
3043 * So the osc should be disconnected from the shrink list, after we
3044 * are sure the import has been destroyed. BUG18662
3045 */
3046 if (obd->u.cli.cl_import == NULL)
3047 osc_del_shrink_grant(&obd->u.cli);
3048 return rc;
3049 }
3050
3051 static int osc_import_event(struct obd_device *obd,
3052 struct obd_import *imp,
3053 enum obd_import_event event)
3054 {
3055 struct client_obd *cli;
3056 int rc = 0;
3057
3058 LASSERT(imp->imp_obd == obd);
3059
3060 switch (event) {
3061 case IMP_EVENT_DISCON: {
3062 cli = &obd->u.cli;
3063 client_obd_list_lock(&cli->cl_loi_list_lock);
3064 cli->cl_avail_grant = 0;
3065 cli->cl_lost_grant = 0;
3066 client_obd_list_unlock(&cli->cl_loi_list_lock);
3067 break;
3068 }
3069 case IMP_EVENT_INACTIVE: {
3070 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3071 break;
3072 }
3073 case IMP_EVENT_INVALIDATE: {
3074 struct ldlm_namespace *ns = obd->obd_namespace;
3075 struct lu_env *env;
3076 int refcheck;
3077
3078 env = cl_env_get(&refcheck);
3079 if (!IS_ERR(env)) {
3080 /* Reset grants */
3081 cli = &obd->u.cli;
3082 /* all pages go to failing rpcs due to the invalid
3083 * import */
3084 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3085
3086 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3087 cl_env_put(env, &refcheck);
3088 } else
3089 rc = PTR_ERR(env);
3090 break;
3091 }
3092 case IMP_EVENT_ACTIVE: {
3093 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3094 break;
3095 }
3096 case IMP_EVENT_OCD: {
3097 struct obd_connect_data *ocd = &imp->imp_connect_data;
3098
3099 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3100 osc_init_grant(&obd->u.cli, ocd);
3101
3102 /* See bug 7198 */
3103 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3104 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3105
3106 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3107 break;
3108 }
3109 case IMP_EVENT_DEACTIVATE: {
3110 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3111 break;
3112 }
3113 case IMP_EVENT_ACTIVATE: {
3114 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3115 break;
3116 }
3117 default:
3118 CERROR("Unknown import event %d\n", event);
3119 LBUG();
3120 }
3121 return rc;
3122 }
3123
3124 /**
3125 * Determine whether the lock can be canceled before replaying the lock
3126 * during recovery, see bug16774 for detailed information.
3127 *
3128 * \retval zero the lock can't be canceled
3129 * \retval other ok to cancel
3130 */
3131 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3132 {
3133 check_res_locked(lock->l_resource);
3134
3135 /*
3136 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3137 *
3138 * XXX as a future improvement, we can also cancel unused write lock
3139 * if it doesn't have dirty data and active mmaps.
3140 */
3141 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3142 (lock->l_granted_mode == LCK_PR ||
3143 lock->l_granted_mode == LCK_CR) &&
3144 (osc_dlm_lock_pageref(lock) == 0))
3145 return 1;
3146
3147 return 0;
3148 }
3149
3150 static int brw_queue_work(const struct lu_env *env, void *data)
3151 {
3152 struct client_obd *cli = data;
3153
3154 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3155
3156 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3157 return 0;
3158 }
3159
3160 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3161 {
3162 struct lprocfs_static_vars lvars = { NULL };
3163 struct client_obd *cli = &obd->u.cli;
3164 void *handler;
3165 int rc;
3166
3167 rc = ptlrpcd_addref();
3168 if (rc)
3169 return rc;
3170
3171 rc = client_obd_setup(obd, lcfg);
3172 if (rc)
3173 goto out_ptlrpcd;
3174
3175 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3176 if (IS_ERR(handler)) {
3177 rc = PTR_ERR(handler);
3178 goto out_client_setup;
3179 }
3180 cli->cl_writeback_work = handler;
3181
3182 rc = osc_quota_setup(obd);
3183 if (rc)
3184 goto out_ptlrpcd_work;
3185
3186 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3187 lprocfs_osc_init_vars(&lvars);
3188 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3189 lproc_osc_attach_seqstat(obd);
3190 sptlrpc_lprocfs_cliobd_attach(obd);
3191 ptlrpc_lprocfs_register_obd(obd);
3192 }
3193
3194 /* We need to allocate a few requests more, because
3195 * brw_interpret tries to create new requests before freeing
3196 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3197 * reserved, but I'm afraid that might be too much wasted RAM
3198 * in fact, so 2 is just my guess and still should work. */
3199 cli->cl_import->imp_rq_pool =
3200 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3201 OST_MAXREQSIZE,
3202 ptlrpc_add_rqs_to_pool);
3203
3204 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3205 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3206 return rc;
3207
3208 out_ptlrpcd_work:
3209 ptlrpcd_destroy_work(handler);
3210 out_client_setup:
3211 client_obd_cleanup(obd);
3212 out_ptlrpcd:
3213 ptlrpcd_decref();
3214 return rc;
3215 }
3216
3217 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3218 {
3219 switch (stage) {
3220 case OBD_CLEANUP_EARLY: {
3221 struct obd_import *imp;
3222 imp = obd->u.cli.cl_import;
3223 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3224 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3225 ptlrpc_deactivate_import(imp);
3226 spin_lock(&imp->imp_lock);
3227 imp->imp_pingable = 0;
3228 spin_unlock(&imp->imp_lock);
3229 break;
3230 }
3231 case OBD_CLEANUP_EXPORTS: {
3232 struct client_obd *cli = &obd->u.cli;
3233 /* LU-464
3234 * for echo client, export may be on zombie list, wait for
3235 * zombie thread to cull it, because cli.cl_import will be
3236 * cleared in client_disconnect_export():
3237 * class_export_destroy() -> obd_cleanup() ->
3238 * echo_device_free() -> echo_client_cleanup() ->
3239 * obd_disconnect() -> osc_disconnect() ->
3240 * client_disconnect_export()
3241 */
3242 obd_zombie_barrier();
3243 if (cli->cl_writeback_work) {
3244 ptlrpcd_destroy_work(cli->cl_writeback_work);
3245 cli->cl_writeback_work = NULL;
3246 }
3247 obd_cleanup_client_import(obd);
3248 ptlrpc_lprocfs_unregister_obd(obd);
3249 lprocfs_obd_cleanup(obd);
3250 break;
3251 }
3252 }
3253 return 0;
3254 }
3255
3256 int osc_cleanup(struct obd_device *obd)
3257 {
3258 struct client_obd *cli = &obd->u.cli;
3259 int rc;
3260
3261 /* lru cleanup */
3262 if (cli->cl_cache != NULL) {
3263 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3264 spin_lock(&cli->cl_cache->ccc_lru_lock);
3265 list_del_init(&cli->cl_lru_osc);
3266 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3267 cli->cl_lru_left = NULL;
3268 atomic_dec(&cli->cl_cache->ccc_users);
3269 cli->cl_cache = NULL;
3270 }
3271
3272 /* free memory of osc quota cache */
3273 osc_quota_cleanup(obd);
3274
3275 rc = client_obd_cleanup(obd);
3276
3277 ptlrpcd_decref();
3278 return rc;
3279 }
3280
3281 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3282 {
3283 struct lprocfs_static_vars lvars = { NULL };
3284 int rc = 0;
3285
3286 lprocfs_osc_init_vars(&lvars);
3287
3288 switch (lcfg->lcfg_command) {
3289 default:
3290 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3291 lcfg, obd);
3292 if (rc > 0)
3293 rc = 0;
3294 break;
3295 }
3296
3297 return rc;
3298 }
3299
3300 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3301 {
3302 return osc_process_config_base(obd, buf);
3303 }
3304
3305 struct obd_ops osc_obd_ops = {
3306 .o_owner = THIS_MODULE,
3307 .o_setup = osc_setup,
3308 .o_precleanup = osc_precleanup,
3309 .o_cleanup = osc_cleanup,
3310 .o_add_conn = client_import_add_conn,
3311 .o_del_conn = client_import_del_conn,
3312 .o_connect = client_connect_import,
3313 .o_reconnect = osc_reconnect,
3314 .o_disconnect = osc_disconnect,
3315 .o_statfs = osc_statfs,
3316 .o_statfs_async = osc_statfs_async,
3317 .o_packmd = osc_packmd,
3318 .o_unpackmd = osc_unpackmd,
3319 .o_create = osc_create,
3320 .o_destroy = osc_destroy,
3321 .o_getattr = osc_getattr,
3322 .o_getattr_async = osc_getattr_async,
3323 .o_setattr = osc_setattr,
3324 .o_setattr_async = osc_setattr_async,
3325 .o_find_cbdata = osc_find_cbdata,
3326 .o_iocontrol = osc_iocontrol,
3327 .o_get_info = osc_get_info,
3328 .o_set_info_async = osc_set_info_async,
3329 .o_import_event = osc_import_event,
3330 .o_process_config = osc_process_config,
3331 .o_quotactl = osc_quotactl,
3332 .o_quotacheck = osc_quotacheck,
3333 };
3334
3335 extern struct lu_kmem_descr osc_caches[];
3336 extern spinlock_t osc_ast_guard;
3337 extern struct lock_class_key osc_ast_guard_class;
3338
3339 static int __init osc_init(void)
3340 {
3341 struct lprocfs_static_vars lvars = { NULL };
3342 int rc;
3343
3344 /* print an address of _any_ initialized kernel symbol from this
3345 * module, to allow debugging with gdb that doesn't support data
3346 * symbols from modules.*/
3347 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3348
3349 rc = lu_kmem_init(osc_caches);
3350 if (rc)
3351 return rc;
3352
3353 lprocfs_osc_init_vars(&lvars);
3354
3355 rc = class_register_type(&osc_obd_ops, NULL,
3356 LUSTRE_OSC_NAME, &osc_device_type);
3357 if (rc) {
3358 lu_kmem_fini(osc_caches);
3359 return rc;
3360 }
3361
3362 spin_lock_init(&osc_ast_guard);
3363 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3364
3365 return rc;
3366 }
3367
3368 static void /*__exit*/ osc_exit(void)
3369 {
3370 class_unregister_type(LUSTRE_OSC_NAME);
3371 lu_kmem_fini(osc_caches);
3372 }
3373
3374 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3375 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3376 MODULE_LICENSE("GPL");
3377 MODULE_VERSION(LUSTRE_VERSION_STRING);
3378
3379 module_init(osc_init);
3380 module_exit(osc_exit);