]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/staging/lustre/lustre/mdc/mdc_locks.c
Merge remote-tracking branches 'asoc/topic/rt5514', 'asoc/topic/rt5640', 'asoc/topic...
[mirror_ubuntu-artful-kernel.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
19 *
20 * GPL HEADER END
21 */
22 /*
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
25 *
26 * Copyright (c) 2011, 2015, Intel Corporation.
27 */
28 /*
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
31 */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 # include <linux/module.h>
36
37 #include "../include/lustre_intent.h"
38 #include "../include/obd.h"
39 #include "../include/obd_class.h"
40 #include "../include/lustre_dlm.h"
41 #include "../include/lustre_fid.h"
42 #include "../include/lustre_mdc.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre_req_layout.h"
45 #include "../include/lustre_swab.h"
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
52 struct ldlm_enqueue_info *ga_einfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57 if (it_disposition(it, DISP_OPEN_LEASE)) {
58 if (phase >= DISP_OPEN_LEASE)
59 return it->it_status;
60 else
61 return 0;
62 }
63 if (it_disposition(it, DISP_OPEN_OPEN)) {
64 if (phase >= DISP_OPEN_OPEN)
65 return it->it_status;
66 else
67 return 0;
68 }
69
70 if (it_disposition(it, DISP_OPEN_CREATE)) {
71 if (phase >= DISP_OPEN_CREATE)
72 return it->it_status;
73 else
74 return 0;
75 }
76
77 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78 if (phase >= DISP_LOOKUP_EXECD)
79 return it->it_status;
80 else
81 return 0;
82 }
83
84 if (it_disposition(it, DISP_IT_EXECD)) {
85 if (phase >= DISP_IT_EXECD)
86 return it->it_status;
87 else
88 return 0;
89 }
90 CERROR("it disp: %X, status: %d\n", it->it_disposition,
91 it->it_status);
92 LBUG();
93 return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
100 {
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
103
104 if (bits)
105 *bits = 0;
106
107 if (!lustre_handle_is_used(lockh))
108 return 0;
109
110 lock = ldlm_handle2lock(lockh);
111
112 LASSERT(lock);
113 lock_res_and_lock(lock);
114 if (lock->l_resource->lr_lvb_inode &&
115 lock->l_resource->lr_lvb_inode != data) {
116 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
117
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
120 old_inode, old_inode->i_ino, old_inode->i_generation,
121 old_inode->i_state, new_inode, new_inode->i_ino,
122 new_inode->i_generation);
123 }
124 lock->l_resource->lr_lvb_inode = new_inode;
125 if (bits)
126 *bits = lock->l_policy_data.l_inodebits.bits;
127
128 unlock_res_and_lock(lock);
129 LDLM_LOCK_PUT(lock);
130
131 return 0;
132 }
133
134 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
135 const struct lu_fid *fid, enum ldlm_type type,
136 union ldlm_policy_data *policy,
137 enum ldlm_mode mode,
138 struct lustre_handle *lockh)
139 {
140 struct ldlm_res_id res_id;
141 enum ldlm_mode rc;
142
143 fid_build_reg_res_name(fid, &res_id);
144 /* LU-4405: Clear bits not supported by server */
145 policy->l_inodebits.bits &= exp_connect_ibits(exp);
146 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
147 &res_id, type, policy, mode, lockh, 0);
148 return rc;
149 }
150
151 int mdc_cancel_unused(struct obd_export *exp,
152 const struct lu_fid *fid,
153 union ldlm_policy_data *policy,
154 enum ldlm_mode mode,
155 enum ldlm_cancel_flags flags,
156 void *opaque)
157 {
158 struct ldlm_res_id res_id;
159 struct obd_device *obd = class_exp2obd(exp);
160 int rc;
161
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
165 return rc;
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
170 {
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174
175 LASSERTF(ns, "no namespace passed\n");
176
177 fid_build_reg_res_name(fid, &res_id);
178
179 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
180 if (IS_ERR(res))
181 return 0;
182
183 lock_res(res);
184 res->lr_lvb_inode = NULL;
185 unlock_res(res);
186
187 ldlm_resource_putref(res);
188 return 0;
189 }
190
191 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
192 {
193 /* Don't hold error requests for replay. */
194 if (req->rq_replay) {
195 spin_lock(&req->rq_lock);
196 req->rq_replay = 0;
197 spin_unlock(&req->rq_lock);
198 }
199 if (rc && req->rq_transno != 0) {
200 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
201 LBUG();
202 }
203 }
204
205 /* Save a large LOV EA into the request buffer so that it is available
206 * for replay. We don't do this in the initial request because the
207 * original request doesn't need this buffer (at most it sends just the
208 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
209 * buffer and may also be difficult to allocate and save a very large
210 * request buffer for each open. (bug 5707)
211 *
212 * OOM here may cause recovery failure if lmm is needed (only for the
213 * original open if the MDS crashed just when this client also OOM'd)
214 * but this is incredibly unlikely, and questionable whether the client
215 * could do MDS recovery under OOM anyways...
216 */
217 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
218 struct mdt_body *body)
219 {
220 int rc;
221
222 /* FIXME: remove this explicit offset. */
223 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
224 body->mbo_eadatasize);
225 if (rc) {
226 CERROR("Can't enlarge segment %d size to %d\n",
227 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
228 body->mbo_valid &= ~OBD_MD_FLEASIZE;
229 body->mbo_eadatasize = 0;
230 }
231 }
232
233 static struct ptlrpc_request *
234 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
235 struct md_op_data *op_data)
236 {
237 struct ptlrpc_request *req;
238 struct obd_device *obddev = class_exp2obd(exp);
239 struct ldlm_intent *lit;
240 const void *lmm = op_data->op_data;
241 u32 lmmsize = op_data->op_data_size;
242 LIST_HEAD(cancels);
243 int count = 0;
244 int mode;
245 int rc;
246
247 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
248
249 /* XXX: openlock is not cancelled for cross-refs. */
250 /* If inode is known, cancel conflicting OPEN locks. */
251 if (fid_is_sane(&op_data->op_fid2)) {
252 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
253 if (it->it_flags & FMODE_WRITE)
254 mode = LCK_EX;
255 else
256 mode = LCK_PR;
257 } else {
258 if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
259 mode = LCK_CW;
260 else if (it->it_flags & __FMODE_EXEC)
261 mode = LCK_PR;
262 else
263 mode = LCK_CR;
264 }
265 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
266 &cancels, mode,
267 MDS_INODELOCK_OPEN);
268 }
269
270 /* If CREATE, cancel parent's UPDATE lock. */
271 if (it->it_op & IT_CREAT)
272 mode = LCK_EX;
273 else
274 mode = LCK_CR;
275 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
276 &cancels, mode,
277 MDS_INODELOCK_UPDATE);
278
279 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
280 &RQF_LDLM_INTENT_OPEN);
281 if (!req) {
282 ldlm_lock_list_put(&cancels, l_bl_ast, count);
283 return ERR_PTR(-ENOMEM);
284 }
285
286 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
287 op_data->op_namelen + 1);
288 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
289 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
290
291 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
292 if (rc < 0) {
293 ptlrpc_request_free(req);
294 return ERR_PTR(rc);
295 }
296
297 spin_lock(&req->rq_lock);
298 req->rq_replay = req->rq_import->imp_replayable;
299 spin_unlock(&req->rq_lock);
300
301 /* pack the intent */
302 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
303 lit->opc = (__u64)it->it_op;
304
305 /* pack the intended request */
306 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
307 lmmsize);
308
309 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
310 obddev->u.cli.cl_max_mds_easize);
311
312 ptlrpc_request_set_replen(req);
313 return req;
314 }
315
316 static struct ptlrpc_request *
317 mdc_intent_getxattr_pack(struct obd_export *exp,
318 struct lookup_intent *it,
319 struct md_op_data *op_data)
320 {
321 struct ptlrpc_request *req;
322 struct ldlm_intent *lit;
323 int rc, count = 0;
324 u32 maxdata;
325 LIST_HEAD(cancels);
326
327 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
328 &RQF_LDLM_INTENT_GETXATTR);
329 if (!req)
330 return ERR_PTR(-ENOMEM);
331
332 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
333 if (rc) {
334 ptlrpc_request_free(req);
335 return ERR_PTR(rc);
336 }
337
338 /* pack the intent */
339 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
340 lit->opc = IT_GETXATTR;
341
342 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
343
344 /* pack the intended request */
345 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
346 0);
347
348 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
349
350 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
351
352 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
353 RCL_SERVER, maxdata);
354
355 ptlrpc_request_set_replen(req);
356
357 return req;
358 }
359
360 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
361 struct lookup_intent *it,
362 struct md_op_data *op_data)
363 {
364 struct ptlrpc_request *req;
365 struct obd_device *obddev = class_exp2obd(exp);
366 struct ldlm_intent *lit;
367 int rc;
368
369 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
370 &RQF_LDLM_INTENT_UNLINK);
371 if (!req)
372 return ERR_PTR(-ENOMEM);
373
374 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
375 op_data->op_namelen + 1);
376
377 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
378 if (rc) {
379 ptlrpc_request_free(req);
380 return ERR_PTR(rc);
381 }
382
383 /* pack the intent */
384 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
385 lit->opc = (__u64)it->it_op;
386
387 /* pack the intended request */
388 mdc_unlink_pack(req, op_data);
389
390 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
391 obddev->u.cli.cl_default_mds_easize);
392 ptlrpc_request_set_replen(req);
393 return req;
394 }
395
396 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
397 struct lookup_intent *it,
398 struct md_op_data *op_data)
399 {
400 struct ptlrpc_request *req;
401 struct obd_device *obddev = class_exp2obd(exp);
402 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
403 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
404 OBD_MD_MEA | OBD_MD_FLACL;
405 struct ldlm_intent *lit;
406 int rc;
407 u32 easize;
408
409 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
410 &RQF_LDLM_INTENT_GETATTR);
411 if (!req)
412 return ERR_PTR(-ENOMEM);
413
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
416
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418 if (rc) {
419 ptlrpc_request_free(req);
420 return ERR_PTR(rc);
421 }
422
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
426
427 if (obddev->u.cli.cl_default_mds_easize > 0)
428 easize = obddev->u.cli.cl_default_mds_easize;
429 else
430 easize = obddev->u.cli.cl_max_mds_easize;
431
432 /* pack the intended request */
433 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
434
435 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
436 ptlrpc_request_set_replen(req);
437 return req;
438 }
439
440 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
441 struct lookup_intent *it,
442 struct md_op_data *unused)
443 {
444 struct obd_device *obd = class_exp2obd(exp);
445 struct ptlrpc_request *req;
446 struct ldlm_intent *lit;
447 struct layout_intent *layout;
448 int rc;
449
450 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
451 &RQF_LDLM_INTENT_LAYOUT);
452 if (!req)
453 return ERR_PTR(-ENOMEM);
454
455 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
456 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457 if (rc) {
458 ptlrpc_request_free(req);
459 return ERR_PTR(rc);
460 }
461
462 /* pack the intent */
463 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
464 lit->opc = (__u64)it->it_op;
465
466 /* pack the layout intent request */
467 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
468 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
469 * set for replication
470 */
471 layout->li_opc = LAYOUT_INTENT_ACCESS;
472
473 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
474 obd->u.cli.cl_default_mds_easize);
475 ptlrpc_request_set_replen(req);
476 return req;
477 }
478
479 static struct ptlrpc_request *
480 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
481 {
482 struct ptlrpc_request *req;
483 int rc;
484
485 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
486 if (!req)
487 return ERR_PTR(-ENOMEM);
488
489 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
490 if (rc) {
491 ptlrpc_request_free(req);
492 return ERR_PTR(rc);
493 }
494
495 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
496 ptlrpc_request_set_replen(req);
497 return req;
498 }
499
500 static int mdc_finish_enqueue(struct obd_export *exp,
501 struct ptlrpc_request *req,
502 struct ldlm_enqueue_info *einfo,
503 struct lookup_intent *it,
504 struct lustre_handle *lockh,
505 int rc)
506 {
507 struct req_capsule *pill = &req->rq_pill;
508 struct ldlm_request *lockreq;
509 struct ldlm_reply *lockrep;
510 struct ldlm_lock *lock;
511 void *lvb_data = NULL;
512 u32 lvb_len = 0;
513
514 LASSERT(rc >= 0);
515 /* Similarly, if we're going to replay this request, we don't want to
516 * actually get a lock, just perform the intent.
517 */
518 if (req->rq_transno || req->rq_replay) {
519 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
520 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
521 }
522
523 if (rc == ELDLM_LOCK_ABORTED) {
524 einfo->ei_mode = 0;
525 memset(lockh, 0, sizeof(*lockh));
526 rc = 0;
527 } else { /* rc = 0 */
528 lock = ldlm_handle2lock(lockh);
529
530 /* If the server gave us back a different lock mode, we should
531 * fix up our variables.
532 */
533 if (lock->l_req_mode != einfo->ei_mode) {
534 ldlm_lock_addref(lockh, lock->l_req_mode);
535 ldlm_lock_decref(lockh, einfo->ei_mode);
536 einfo->ei_mode = lock->l_req_mode;
537 }
538 LDLM_LOCK_PUT(lock);
539 }
540
541 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
542
543 it->it_disposition = (int)lockrep->lock_policy_res1;
544 it->it_status = (int)lockrep->lock_policy_res2;
545 it->it_lock_mode = einfo->ei_mode;
546 it->it_lock_handle = lockh->cookie;
547 it->it_request = req;
548
549 /* Technically speaking rq_transno must already be zero if
550 * it_status is in error, so the check is a bit redundant
551 */
552 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
553 mdc_clear_replay_flag(req, it->it_status);
554
555 /* If we're doing an IT_OPEN which did not result in an actual
556 * successful open, then we need to remove the bit which saves
557 * this request for unconditional replay.
558 *
559 * It's important that we do this first! Otherwise we might exit the
560 * function without doing so, and try to replay a failed create
561 * (bug 3440)
562 */
563 if (it->it_op & IT_OPEN && req->rq_replay &&
564 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
565 mdc_clear_replay_flag(req, it->it_status);
566
567 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
568 it->it_op, it->it_disposition, it->it_status);
569
570 /* We know what to expect, so we do any byte flipping required here */
571 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
572 struct mdt_body *body;
573
574 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
575 if (!body) {
576 CERROR("Can't swab mdt_body\n");
577 return -EPROTO;
578 }
579
580 if (it_disposition(it, DISP_OPEN_OPEN) &&
581 !it_open_error(DISP_OPEN_OPEN, it)) {
582 /*
583 * If this is a successful OPEN request, we need to set
584 * replay handler and data early, so that if replay
585 * happens immediately after swabbing below, new reply
586 * is swabbed by that handler correctly.
587 */
588 mdc_set_open_replay_data(NULL, NULL, it);
589 }
590
591 if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
592 void *eadata;
593
594 mdc_update_max_ea_from_body(exp, body);
595
596 /*
597 * The eadata is opaque; just check that it is there.
598 * Eventually, obd_unpackmd() will check the contents.
599 */
600 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
601 body->mbo_eadatasize);
602 if (!eadata)
603 return -EPROTO;
604
605 /* save lvb data and length in case this is for layout
606 * lock
607 */
608 lvb_data = eadata;
609 lvb_len = body->mbo_eadatasize;
610
611 /*
612 * We save the reply LOV EA in case we have to replay a
613 * create for recovery. If we didn't allocate a large
614 * enough request buffer above we need to reallocate it
615 * here to hold the actual LOV EA.
616 *
617 * To not save LOV EA if request is not going to replay
618 * (for example error one).
619 */
620 if ((it->it_op & IT_OPEN) && req->rq_replay) {
621 void *lmm;
622
623 if (req_capsule_get_size(pill, &RMF_EADATA,
624 RCL_CLIENT) <
625 body->mbo_eadatasize)
626 mdc_realloc_openmsg(req, body);
627 else
628 req_capsule_shrink(pill, &RMF_EADATA,
629 body->mbo_eadatasize,
630 RCL_CLIENT);
631
632 req_capsule_set_size(pill, &RMF_EADATA,
633 RCL_CLIENT,
634 body->mbo_eadatasize);
635
636 lmm = req_capsule_client_get(pill, &RMF_EADATA);
637 if (lmm)
638 memcpy(lmm, eadata, body->mbo_eadatasize);
639 }
640 }
641 } else if (it->it_op & IT_LAYOUT) {
642 /* maybe the lock was granted right away and layout
643 * is packed into RMF_DLM_LVB of req
644 */
645 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
646 if (lvb_len > 0) {
647 lvb_data = req_capsule_server_sized_get(pill,
648 &RMF_DLM_LVB,
649 lvb_len);
650 if (!lvb_data)
651 return -EPROTO;
652 }
653 }
654
655 /* fill in stripe data for layout lock */
656 lock = ldlm_handle2lock(lockh);
657 if (lock && ldlm_has_layout(lock) && lvb_data) {
658 void *lmm;
659
660 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
661 ldlm_it2str(it->it_op), lvb_len);
662
663 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
664 if (!lmm) {
665 LDLM_LOCK_PUT(lock);
666 return -ENOMEM;
667 }
668 memcpy(lmm, lvb_data, lvb_len);
669
670 /* install lvb_data */
671 lock_res_and_lock(lock);
672 if (!lock->l_lvb_data) {
673 lock->l_lvb_type = LVB_T_LAYOUT;
674 lock->l_lvb_data = lmm;
675 lock->l_lvb_len = lvb_len;
676 lmm = NULL;
677 }
678 unlock_res_and_lock(lock);
679 if (lmm)
680 kvfree(lmm);
681 }
682 if (lock)
683 LDLM_LOCK_PUT(lock);
684
685 return rc;
686 }
687
688 /* We always reserve enough space in the reply packet for a stripe MD, because
689 * we don't know in advance the file type.
690 */
691 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
692 const union ldlm_policy_data *policy,
693 struct lookup_intent *it, struct md_op_data *op_data,
694 struct lustre_handle *lockh, u64 extra_lock_flags)
695 {
696 static const union ldlm_policy_data lookup_policy = {
697 .l_inodebits = { MDS_INODELOCK_LOOKUP }
698 };
699 static const union ldlm_policy_data update_policy = {
700 .l_inodebits = { MDS_INODELOCK_UPDATE }
701 };
702 static const union ldlm_policy_data layout_policy = {
703 .l_inodebits = { MDS_INODELOCK_LAYOUT }
704 };
705 static const union ldlm_policy_data getxattr_policy = {
706 .l_inodebits = { MDS_INODELOCK_XATTR }
707 };
708 struct obd_device *obddev = class_exp2obd(exp);
709 struct ptlrpc_request *req = NULL;
710 u64 flags, saved_flags = extra_lock_flags;
711 struct ldlm_res_id res_id;
712 int generation, resends = 0;
713 struct ldlm_reply *lockrep;
714 enum lvb_type lvb_type = LVB_T_NONE;
715 int rc;
716
717 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
718 einfo->ei_type);
719 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
720
721 if (it) {
722 LASSERT(!policy);
723
724 saved_flags |= LDLM_FL_HAS_INTENT;
725 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
726 policy = &update_policy;
727 else if (it->it_op & IT_LAYOUT)
728 policy = &layout_policy;
729 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
730 policy = &getxattr_policy;
731 else
732 policy = &lookup_policy;
733 }
734
735 generation = obddev->u.cli.cl_import->imp_generation;
736 resend:
737 flags = saved_flags;
738 if (!it) {
739 /* The only way right now is FLOCK. */
740 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
741 einfo->ei_type);
742 res_id.name[3] = LDLM_FLOCK;
743 } else if (it->it_op & IT_OPEN) {
744 req = mdc_intent_open_pack(exp, it, op_data);
745 } else if (it->it_op & IT_UNLINK) {
746 req = mdc_intent_unlink_pack(exp, it, op_data);
747 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
748 req = mdc_intent_getattr_pack(exp, it, op_data);
749 } else if (it->it_op & IT_READDIR) {
750 req = mdc_enqueue_pack(exp, 0);
751 } else if (it->it_op & IT_LAYOUT) {
752 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
753 return -EOPNOTSUPP;
754 req = mdc_intent_layout_pack(exp, it, op_data);
755 lvb_type = LVB_T_LAYOUT;
756 } else if (it->it_op & IT_GETXATTR) {
757 req = mdc_intent_getxattr_pack(exp, it, op_data);
758 } else {
759 LBUG();
760 return -EINVAL;
761 }
762
763 if (IS_ERR(req))
764 return PTR_ERR(req);
765
766 if (resends) {
767 req->rq_generation_set = 1;
768 req->rq_import_generation = generation;
769 req->rq_sent = ktime_get_real_seconds() + resends;
770 }
771
772 /* It is important to obtain modify RPC slot first (if applicable), so
773 * that threads that are waiting for a modify RPC slot are not polluting
774 * our rpcs in flight counter.
775 * We do not do flock request limiting, though
776 */
777 if (it) {
778 mdc_get_mod_rpc_slot(req, it);
779 rc = obd_get_request_slot(&obddev->u.cli);
780 if (rc != 0) {
781 mdc_put_mod_rpc_slot(req, it);
782 mdc_clear_replay_flag(req, 0);
783 ptlrpc_req_finished(req);
784 return rc;
785 }
786 }
787
788 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
789 0, lvb_type, lockh, 0);
790 if (!it) {
791 /* For flock requests we immediately return without further
792 * delay and let caller deal with the rest, since rest of
793 * this function metadata processing makes no sense for flock
794 * requests anyway. But in case of problem during comms with
795 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
796 * can not rely on caller and this mainly for F_UNLCKs
797 * (explicits or automatically generated by Kernel to clean
798 * current FLocks upon exit) that can't be trashed
799 */
800 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
801 (einfo->ei_type == LDLM_FLOCK) &&
802 (einfo->ei_mode == LCK_NL))
803 goto resend;
804 return rc;
805 }
806
807 obd_put_request_slot(&obddev->u.cli);
808 mdc_put_mod_rpc_slot(req, it);
809
810 if (rc < 0) {
811 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
812 obddev->obd_name, rc);
813
814 mdc_clear_replay_flag(req, rc);
815 ptlrpc_req_finished(req);
816 return rc;
817 }
818
819 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
820
821 lockrep->lock_policy_res2 =
822 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
823
824 /*
825 * Retry infinitely when the server returns -EINPROGRESS for the
826 * intent operation, when server returns -EINPROGRESS for acquiring
827 * intent lock, we'll retry in after_reply().
828 */
829 if (it->it_op && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
830 mdc_clear_replay_flag(req, rc);
831 ptlrpc_req_finished(req);
832 resends++;
833
834 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
835 obddev->obd_name, resends, it->it_op,
836 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
837
838 if (generation == obddev->u.cli.cl_import->imp_generation) {
839 goto resend;
840 } else {
841 CDEBUG(D_HA, "resend cross eviction\n");
842 return -EIO;
843 }
844 }
845
846 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
847 if (rc < 0) {
848 if (lustre_handle_is_used(lockh)) {
849 ldlm_lock_decref(lockh, einfo->ei_mode);
850 memset(lockh, 0, sizeof(*lockh));
851 }
852 ptlrpc_req_finished(req);
853
854 it->it_lock_handle = 0;
855 it->it_lock_mode = 0;
856 it->it_request = NULL;
857 }
858
859 return rc;
860 }
861
862 static int mdc_finish_intent_lock(struct obd_export *exp,
863 struct ptlrpc_request *request,
864 struct md_op_data *op_data,
865 struct lookup_intent *it,
866 struct lustre_handle *lockh)
867 {
868 struct lustre_handle old_lock;
869 struct mdt_body *mdt_body;
870 struct ldlm_lock *lock;
871 int rc;
872
873 LASSERT(request != LP_POISON);
874 LASSERT(request->rq_repmsg != LP_POISON);
875
876 if (it->it_op & IT_READDIR)
877 return 0;
878
879 if (!it_disposition(it, DISP_IT_EXECD)) {
880 /* The server failed before it even started executing the
881 * intent, i.e. because it couldn't unpack the request.
882 */
883 LASSERT(it->it_status != 0);
884 return it->it_status;
885 }
886 rc = it_open_error(DISP_IT_EXECD, it);
887 if (rc)
888 return rc;
889
890 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
891 LASSERT(mdt_body); /* mdc_enqueue checked */
892
893 rc = it_open_error(DISP_LOOKUP_EXECD, it);
894 if (rc)
895 return rc;
896
897 /* keep requests around for the multiple phases of the call
898 * this shows the DISP_XX must guarantee we make it into the call
899 */
900 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
901 it_disposition(it, DISP_OPEN_CREATE) &&
902 !it_open_error(DISP_OPEN_CREATE, it)) {
903 it_set_disposition(it, DISP_ENQ_CREATE_REF);
904 ptlrpc_request_addref(request); /* balanced in ll_create_node */
905 }
906 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
907 it_disposition(it, DISP_OPEN_OPEN) &&
908 !it_open_error(DISP_OPEN_OPEN, it)) {
909 it_set_disposition(it, DISP_ENQ_OPEN_REF);
910 ptlrpc_request_addref(request); /* balanced in ll_file_open */
911 /* BUG 11546 - eviction in the middle of open rpc processing */
912 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
913 }
914
915 if (it->it_op & IT_CREAT) {
916 /* XXX this belongs in ll_create_it */
917 } else if (it->it_op == IT_OPEN) {
918 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
919 } else {
920 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
921 }
922
923 /* If we already have a matching lock, then cancel the new
924 * one. We have to set the data here instead of in
925 * mdc_enqueue, because we need to use the child's inode as
926 * the l_ast_data to match, and that's not available until
927 * intent_finish has performed the iget().)
928 */
929 lock = ldlm_handle2lock(lockh);
930 if (lock) {
931 union ldlm_policy_data policy = lock->l_policy_data;
932
933 LDLM_DEBUG(lock, "matching against this");
934
935 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
936 &lock->l_resource->lr_name),
937 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
938 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
939 LDLM_LOCK_PUT(lock);
940
941 memcpy(&old_lock, lockh, sizeof(*lockh));
942 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
943 LDLM_IBITS, &policy, LCK_NL,
944 &old_lock, 0)) {
945 ldlm_lock_decref_and_cancel(lockh,
946 it->it_lock_mode);
947 memcpy(lockh, &old_lock, sizeof(old_lock));
948 it->it_lock_handle = lockh->cookie;
949 }
950 }
951 CDEBUG(D_DENTRY,
952 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
953 (int)op_data->op_namelen, op_data->op_name,
954 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
955 return rc;
956 }
957
958 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
959 struct lu_fid *fid, __u64 *bits)
960 {
961 /* We could just return 1 immediately, but since we should only
962 * be called in revalidate_it if we already have a lock, let's
963 * verify that.
964 */
965 struct ldlm_res_id res_id;
966 struct lustre_handle lockh;
967 union ldlm_policy_data policy;
968 enum ldlm_mode mode;
969
970 if (it->it_lock_handle) {
971 lockh.cookie = it->it_lock_handle;
972 mode = ldlm_revalidate_lock_handle(&lockh, bits);
973 } else {
974 fid_build_reg_res_name(fid, &res_id);
975 switch (it->it_op) {
976 case IT_GETATTR:
977 /* File attributes are held under multiple bits:
978 * nlink is under lookup lock, size and times are
979 * under UPDATE lock and recently we've also got
980 * a separate permissions lock for owner/group/acl that
981 * were protected by lookup lock before.
982 * Getattr must provide all of that information,
983 * so we need to ensure we have all of those locks.
984 * Unfortunately, if the bits are split across multiple
985 * locks, there's no easy way to match all of them here,
986 * so an extra RPC would be performed to fetch all
987 * of those bits at once for now.
988 */
989 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
990 * but for old MDTs (< 2.4), permission is covered
991 * by LOOKUP lock, so it needs to match all bits here.
992 */
993 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
994 MDS_INODELOCK_LOOKUP |
995 MDS_INODELOCK_PERM;
996 break;
997 case IT_READDIR:
998 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
999 break;
1000 case IT_LAYOUT:
1001 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1002 break;
1003 default:
1004 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1005 break;
1006 }
1007
1008 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1009 LDLM_IBITS, &policy,
1010 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1011 &lockh);
1012 }
1013
1014 if (mode) {
1015 it->it_lock_handle = lockh.cookie;
1016 it->it_lock_mode = mode;
1017 } else {
1018 it->it_lock_handle = 0;
1019 it->it_lock_mode = 0;
1020 }
1021
1022 return !!mode;
1023 }
1024
1025 /*
1026 * This long block is all about fixing up the lock and request state
1027 * so that it is correct as of the moment _before_ the operation was
1028 * applied; that way, the VFS will think that everything is normal and
1029 * call Lustre's regular VFS methods.
1030 *
1031 * If we're performing a creation, that means that unless the creation
1032 * failed with EEXIST, we should fake up a negative dentry.
1033 *
1034 * For everything else, we want to lookup to succeed.
1035 *
1036 * One additional note: if CREATE or OPEN succeeded, we add an extra
1037 * reference to the request because we need to keep it around until
1038 * ll_create/ll_open gets called.
1039 *
1040 * The server will return to us, in it_disposition, an indication of
1041 * exactly what it_status refers to.
1042 *
1043 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1044 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1045 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1046 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1047 * was successful.
1048 *
1049 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1050 * child lookup.
1051 */
1052 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1053 struct lookup_intent *it, struct ptlrpc_request **reqp,
1054 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1055 {
1056 struct ldlm_enqueue_info einfo = {
1057 .ei_type = LDLM_IBITS,
1058 .ei_mode = it_to_lock_mode(it),
1059 .ei_cb_bl = cb_blocking,
1060 .ei_cb_cp = ldlm_completion_ast,
1061 };
1062 struct lustre_handle lockh;
1063 int rc = 0;
1064
1065 LASSERT(it);
1066
1067 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1068 ", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
1069 op_data->op_name, PFID(&op_data->op_fid2),
1070 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1071 it->it_flags);
1072
1073 lockh.cookie = 0;
1074 if (fid_is_sane(&op_data->op_fid2) &&
1075 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1076 /* We could just return 1 immediately, but since we should only
1077 * be called in revalidate_it if we already have a lock, let's
1078 * verify that.
1079 */
1080 it->it_lock_handle = 0;
1081 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1082 /* Only return failure if it was not GETATTR by cfid
1083 * (from inode_revalidate)
1084 */
1085 if (rc || op_data->op_namelen != 0)
1086 return rc;
1087 }
1088
1089 /* For case if upper layer did not alloc fid, do it now. */
1090 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1091 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1092 if (rc < 0) {
1093 CERROR("Can't alloc new fid, rc %d\n", rc);
1094 return rc;
1095 }
1096 }
1097 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1098 extra_lock_flags);
1099 if (rc < 0)
1100 return rc;
1101
1102 *reqp = it->it_request;
1103 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1104 return rc;
1105 }
1106
1107 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1108 struct ptlrpc_request *req,
1109 void *args, int rc)
1110 {
1111 struct mdc_getattr_args *ga = args;
1112 struct obd_export *exp = ga->ga_exp;
1113 struct md_enqueue_info *minfo = ga->ga_minfo;
1114 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1115 struct lookup_intent *it;
1116 struct lustre_handle *lockh;
1117 struct obd_device *obddev;
1118 struct ldlm_reply *lockrep;
1119 __u64 flags = LDLM_FL_HAS_INTENT;
1120
1121 it = &minfo->mi_it;
1122 lockh = &minfo->mi_lockh;
1123
1124 obddev = class_exp2obd(exp);
1125
1126 obd_put_request_slot(&obddev->u.cli);
1127 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1128 rc = -ETIMEDOUT;
1129
1130 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1131 &flags, NULL, 0, lockh, rc);
1132 if (rc < 0) {
1133 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1134 mdc_clear_replay_flag(req, rc);
1135 goto out;
1136 }
1137
1138 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1139
1140 lockrep->lock_policy_res2 =
1141 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1142
1143 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1144 if (rc)
1145 goto out;
1146
1147 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1148
1149 out:
1150 kfree(einfo);
1151 minfo->mi_cb(req, minfo, rc);
1152 return 0;
1153 }
1154
1155 int mdc_intent_getattr_async(struct obd_export *exp,
1156 struct md_enqueue_info *minfo,
1157 struct ldlm_enqueue_info *einfo)
1158 {
1159 struct md_op_data *op_data = &minfo->mi_data;
1160 struct lookup_intent *it = &minfo->mi_it;
1161 struct ptlrpc_request *req;
1162 struct mdc_getattr_args *ga;
1163 struct obd_device *obddev = class_exp2obd(exp);
1164 struct ldlm_res_id res_id;
1165 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1166 * for statahead currently. Consider CMD in future, such two bits
1167 * maybe managed by different MDS, should be adjusted then.
1168 */
1169 union ldlm_policy_data policy = {
1170 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1171 };
1172 int rc = 0;
1173 __u64 flags = LDLM_FL_HAS_INTENT;
1174
1175 CDEBUG(D_DLMTRACE,
1176 "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1177 (int)op_data->op_namelen, op_data->op_name,
1178 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1179
1180 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1181 req = mdc_intent_getattr_pack(exp, it, op_data);
1182 if (IS_ERR(req))
1183 return PTR_ERR(req);
1184
1185 rc = obd_get_request_slot(&obddev->u.cli);
1186 if (rc != 0) {
1187 ptlrpc_req_finished(req);
1188 return rc;
1189 }
1190
1191 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1192 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1193 if (rc < 0) {
1194 obd_put_request_slot(&obddev->u.cli);
1195 ptlrpc_req_finished(req);
1196 return rc;
1197 }
1198
1199 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1200 ga = ptlrpc_req_async_args(req);
1201 ga->ga_exp = exp;
1202 ga->ga_minfo = minfo;
1203 ga->ga_einfo = einfo;
1204
1205 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1206 ptlrpcd_add_req(req);
1207
1208 return 0;
1209 }