]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/staging/lustre/lustre/mdc/mdc_locks.c
staging: lustre: lmv: separate master object with master stripe
[mirror_ubuntu-bionic-kernel.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
6a5b99a4 18 * http://www.gnu.org/licenses/gpl-2.0.html
d7e09d03 19 *
d7e09d03
PT
20 * GPL HEADER END
21 */
22/*
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
25 *
1dc563a6 26 * Copyright (c) 2011, 2015, Intel Corporation.
d7e09d03
PT
27 */
28/*
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
31 */
32
33#define DEBUG_SUBSYSTEM S_MDC
34
35# include <linux/module.h>
d7e09d03 36
00d65ec8 37#include "../include/lustre_intent.h"
05932307
GKH
38#include "../include/obd.h"
39#include "../include/obd_class.h"
40#include "../include/lustre_dlm.h"
41#include "../include/lustre_fid.h" /* fid_res_name_eq() */
42#include "../include/lustre_mdc.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre_req_layout.h"
d7e09d03
PT
45#include "mdc_internal.h"
46
47struct mdc_getattr_args {
48 struct obd_export *ga_exp;
49 struct md_enqueue_info *ga_minfo;
50 struct ldlm_enqueue_info *ga_einfo;
51};
52
d7e09d03
PT
53int it_open_error(int phase, struct lookup_intent *it)
54{
d3a8a4e2
JX
55 if (it_disposition(it, DISP_OPEN_LEASE)) {
56 if (phase >= DISP_OPEN_LEASE)
e476f2e5 57 return it->it_status;
d3a8a4e2
JX
58 else
59 return 0;
60 }
d7e09d03
PT
61 if (it_disposition(it, DISP_OPEN_OPEN)) {
62 if (phase >= DISP_OPEN_OPEN)
e476f2e5 63 return it->it_status;
d7e09d03
PT
64 else
65 return 0;
66 }
67
68 if (it_disposition(it, DISP_OPEN_CREATE)) {
69 if (phase >= DISP_OPEN_CREATE)
e476f2e5 70 return it->it_status;
d7e09d03
PT
71 else
72 return 0;
73 }
74
75 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
76 if (phase >= DISP_LOOKUP_EXECD)
e476f2e5 77 return it->it_status;
d7e09d03
PT
78 else
79 return 0;
80 }
81
82 if (it_disposition(it, DISP_IT_EXECD)) {
83 if (phase >= DISP_IT_EXECD)
e476f2e5 84 return it->it_status;
d7e09d03
PT
85 else
86 return 0;
87 }
e476f2e5
JH
88 CERROR("it disp: %X, status: %d\n", it->it_disposition,
89 it->it_status);
d7e09d03
PT
90 LBUG();
91 return 0;
92}
93EXPORT_SYMBOL(it_open_error);
94
95/* this must be called on a lockh that is known to have a referenced lock */
96int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
97 __u64 *bits)
98{
99 struct ldlm_lock *lock;
100 struct inode *new_inode = data;
d7e09d03 101
88005c5f 102 if (bits)
d7e09d03
PT
103 *bits = 0;
104
105 if (!*lockh)
0a3bdb00 106 return 0;
d7e09d03
PT
107
108 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
109
34e3ff96 110 LASSERT(lock);
d7e09d03
PT
111 lock_res_and_lock(lock);
112 if (lock->l_resource->lr_lvb_inode &&
113 lock->l_resource->lr_lvb_inode != data) {
114 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
7436d070 115
d7e09d03 116 LASSERTF(old_inode->i_state & I_FREEING,
ee990b33
SM
117 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
118 old_inode, old_inode->i_ino, old_inode->i_generation,
119 old_inode->i_state, new_inode, new_inode->i_ino,
120 new_inode->i_generation);
d7e09d03
PT
121 }
122 lock->l_resource->lr_lvb_inode = new_inode;
123 if (bits)
124 *bits = lock->l_policy_data.l_inodebits.bits;
125
126 unlock_res_and_lock(lock);
127 LDLM_LOCK_PUT(lock);
128
0a3bdb00 129 return 0;
d7e09d03
PT
130}
131
52ee0d20
OD
132enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
133 const struct lu_fid *fid, enum ldlm_type type,
134 ldlm_policy_data_t *policy, enum ldlm_mode mode,
135 struct lustre_handle *lockh)
d7e09d03
PT
136{
137 struct ldlm_res_id res_id;
52ee0d20 138 enum ldlm_mode rc;
d7e09d03
PT
139
140 fid_build_reg_res_name(fid, &res_id);
6caea2f9
AL
141 /* LU-4405: Clear bits not supported by server */
142 policy->l_inodebits.bits &= exp_connect_ibits(exp);
d7e09d03
PT
143 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
144 &res_id, type, policy, mode, lockh, 0);
0a3bdb00 145 return rc;
d7e09d03
PT
146}
147
148int mdc_cancel_unused(struct obd_export *exp,
149 const struct lu_fid *fid,
150 ldlm_policy_data_t *policy,
52ee0d20 151 enum ldlm_mode mode,
f833ee42 152 enum ldlm_cancel_flags flags,
d7e09d03
PT
153 void *opaque)
154{
155 struct ldlm_res_id res_id;
156 struct obd_device *obd = class_exp2obd(exp);
157 int rc;
158
d7e09d03
PT
159 fid_build_reg_res_name(fid, &res_id);
160 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
161 policy, mode, flags, opaque);
0a3bdb00 162 return rc;
d7e09d03
PT
163}
164
165int mdc_null_inode(struct obd_export *exp,
166 const struct lu_fid *fid)
167{
168 struct ldlm_res_id res_id;
169 struct ldlm_resource *res;
170 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
d7e09d03 171
34e3ff96 172 LASSERTF(ns, "no namespace passed\n");
d7e09d03
PT
173
174 fid_build_reg_res_name(fid, &res_id);
175
176 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
34e3ff96 177 if (!res)
0a3bdb00 178 return 0;
d7e09d03
PT
179
180 lock_res(res);
181 res->lr_lvb_inode = NULL;
182 unlock_res(res);
183
184 ldlm_resource_putref(res);
0a3bdb00 185 return 0;
d7e09d03
PT
186}
187
188/* find any ldlm lock of the inode in mdc
189 * return 0 not find
190 * 1 find one
1df232ee
OD
191 * < 0 error
192 */
d7e09d03
PT
193int mdc_find_cbdata(struct obd_export *exp,
194 const struct lu_fid *fid,
195 ldlm_iterator_t it, void *data)
196{
197 struct ldlm_res_id res_id;
198 int rc = 0;
d7e09d03 199
c35e01ff 200 fid_build_reg_res_name((struct lu_fid *)fid, &res_id);
d7e09d03
PT
201 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
202 it, data);
203 if (rc == LDLM_ITER_STOP)
0a3bdb00 204 return 1;
d7e09d03 205 else if (rc == LDLM_ITER_CONTINUE)
0a3bdb00
GKH
206 return 0;
207 return rc;
d7e09d03
PT
208}
209
210static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
211{
212 /* Don't hold error requests for replay. */
213 if (req->rq_replay) {
214 spin_lock(&req->rq_lock);
215 req->rq_replay = 0;
216 spin_unlock(&req->rq_lock);
217 }
218 if (rc && req->rq_transno != 0) {
219 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
220 LBUG();
221 }
222}
223
224/* Save a large LOV EA into the request buffer so that it is available
225 * for replay. We don't do this in the initial request because the
226 * original request doesn't need this buffer (at most it sends just the
227 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
228 * buffer and may also be difficult to allocate and save a very large
229 * request buffer for each open. (bug 5707)
230 *
231 * OOM here may cause recovery failure if lmm is needed (only for the
232 * original open if the MDS crashed just when this client also OOM'd)
233 * but this is incredibly unlikely, and questionable whether the client
1df232ee
OD
234 * could do MDS recovery under OOM anyways...
235 */
d7e09d03
PT
236static void mdc_realloc_openmsg(struct ptlrpc_request *req,
237 struct mdt_body *body)
238{
239 int rc;
240
241 /* FIXME: remove this explicit offset. */
242 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
243 body->eadatasize);
244 if (rc) {
245 CERROR("Can't enlarge segment %d size to %d\n",
246 DLM_INTENT_REC_OFF + 4, body->eadatasize);
247 body->valid &= ~OBD_MD_FLEASIZE;
248 body->eadatasize = 0;
249 }
250}
251
252static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
253 struct lookup_intent *it,
254 struct md_op_data *op_data,
255 void *lmm, int lmmsize,
256 void *cb_data)
257{
258 struct ptlrpc_request *req;
259 struct obd_device *obddev = class_exp2obd(exp);
260 struct ldlm_intent *lit;
261 LIST_HEAD(cancels);
262 int count = 0;
263 int mode;
264 int rc;
d7e09d03
PT
265
266 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
267
268 /* XXX: openlock is not cancelled for cross-refs. */
269 /* If inode is known, cancel conflicting OPEN locks. */
270 if (fid_is_sane(&op_data->op_fid2)) {
d3a8a4e2
JX
271 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
272 if (it->it_flags & FMODE_WRITE)
273 mode = LCK_EX;
274 else
275 mode = LCK_PR;
276 } else {
277 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
278 mode = LCK_CW;
962dbfd6 279 else if (it->it_flags & __FMODE_EXEC)
d3a8a4e2 280 mode = LCK_PR;
d3a8a4e2
JX
281 else
282 mode = LCK_CR;
283 }
d7e09d03
PT
284 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 &cancels, mode,
286 MDS_INODELOCK_OPEN);
287 }
288
289 /* If CREATE, cancel parent's UPDATE lock. */
290 if (it->it_op & IT_CREAT)
291 mode = LCK_EX;
292 else
293 mode = LCK_CR;
294 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
295 &cancels, mode,
296 MDS_INODELOCK_UPDATE);
297
298 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
299 &RQF_LDLM_INTENT_OPEN);
34e3ff96 300 if (!req) {
d7e09d03 301 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 302 return ERR_PTR(-ENOMEM);
d7e09d03
PT
303 }
304
d7e09d03
PT
305 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
306 op_data->op_namelen + 1);
307 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
308 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
309
310 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
add882a8 311 if (rc < 0) {
d7e09d03 312 ptlrpc_request_free(req);
add882a8 313 return ERR_PTR(rc);
d7e09d03
PT
314 }
315
316 spin_lock(&req->rq_lock);
317 req->rq_replay = req->rq_import->imp_replayable;
318 spin_unlock(&req->rq_lock);
319
320 /* pack the intent */
321 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
322 lit->opc = (__u64)it->it_op;
323
324 /* pack the intended request */
325 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
326 lmmsize);
327
2de35386 328 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
329 obddev->u.cli.cl_max_mds_easize);
330
d7e09d03
PT
331 ptlrpc_request_set_replen(req);
332 return req;
333}
334
7fc1f831
AP
335static struct ptlrpc_request *
336mdc_intent_getxattr_pack(struct obd_export *exp,
337 struct lookup_intent *it,
338 struct md_op_data *op_data)
339{
340 struct ptlrpc_request *req;
341 struct ldlm_intent *lit;
342 int rc, count = 0, maxdata;
343 LIST_HEAD(cancels);
344
7fc1f831 345 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
22e0bc6a 346 &RQF_LDLM_INTENT_GETXATTR);
34e3ff96 347 if (!req)
7fc1f831
AP
348 return ERR_PTR(-ENOMEM);
349
7fc1f831
AP
350 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
351 if (rc) {
352 ptlrpc_request_free(req);
353 return ERR_PTR(rc);
354 }
355
356 /* pack the intent */
357 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
358 lit->opc = IT_GETXATTR;
359
360 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
361
362 /* pack the intended request */
ef2e0f55
OD
363 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
364 0);
7fc1f831 365
22e0bc6a 366 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
7fc1f831 367
22e0bc6a 368 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
7fc1f831
AP
369
370 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
22e0bc6a 371 RCL_SERVER, maxdata);
7fc1f831
AP
372
373 ptlrpc_request_set_replen(req);
374
375 return req;
376}
377
d7e09d03
PT
378static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
379 struct lookup_intent *it,
380 struct md_op_data *op_data)
381{
382 struct ptlrpc_request *req;
383 struct obd_device *obddev = class_exp2obd(exp);
384 struct ldlm_intent *lit;
385 int rc;
d7e09d03
PT
386
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_UNLINK);
34e3ff96 389 if (!req)
0a3bdb00 390 return ERR_PTR(-ENOMEM);
d7e09d03 391
d7e09d03
PT
392 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
393 op_data->op_namelen + 1);
394
395 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
396 if (rc) {
397 ptlrpc_request_free(req);
0a3bdb00 398 return ERR_PTR(rc);
d7e09d03
PT
399 }
400
401 /* pack the intent */
402 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403 lit->opc = (__u64)it->it_op;
404
405 /* pack the intended request */
406 mdc_unlink_pack(req, op_data);
407
408 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
44779340 409 obddev->u.cli.cl_default_mds_easize);
d7e09d03 410 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
44779340 411 obddev->u.cli.cl_default_mds_cookiesize);
d7e09d03 412 ptlrpc_request_set_replen(req);
0a3bdb00 413 return req;
d7e09d03
PT
414}
415
416static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
22e0bc6a
OD
417 struct lookup_intent *it,
418 struct md_op_data *op_data)
d7e09d03
PT
419{
420 struct ptlrpc_request *req;
421 struct obd_device *obddev = class_exp2obd(exp);
21aef7d9 422 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
d7e09d03 423 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
341f1f0a 424 OBD_MD_MEA | OBD_MD_FLACL;
d7e09d03
PT
425 struct ldlm_intent *lit;
426 int rc;
2c580836 427 int easize;
d7e09d03
PT
428
429 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
430 &RQF_LDLM_INTENT_GETATTR);
34e3ff96 431 if (!req)
0a3bdb00 432 return ERR_PTR(-ENOMEM);
d7e09d03 433
d7e09d03
PT
434 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
435 op_data->op_namelen + 1);
436
437 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
438 if (rc) {
439 ptlrpc_request_free(req);
0a3bdb00 440 return ERR_PTR(rc);
d7e09d03
PT
441 }
442
443 /* pack the intent */
444 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
445 lit->opc = (__u64)it->it_op;
446
2c580836 447 if (obddev->u.cli.cl_default_mds_easize > 0)
448 easize = obddev->u.cli.cl_default_mds_easize;
449 else
450 easize = obddev->u.cli.cl_max_mds_easize;
451
d7e09d03 452 /* pack the intended request */
2c580836 453 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
d7e09d03 454
2c580836 455 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
d7e09d03 456 ptlrpc_request_set_replen(req);
0a3bdb00 457 return req;
d7e09d03
PT
458}
459
460static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
461 struct lookup_intent *it,
462 struct md_op_data *unused)
463{
464 struct obd_device *obd = class_exp2obd(exp);
465 struct ptlrpc_request *req;
466 struct ldlm_intent *lit;
467 struct layout_intent *layout;
468 int rc;
d7e09d03
PT
469
470 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
22e0bc6a 471 &RQF_LDLM_INTENT_LAYOUT);
34e3ff96 472 if (!req)
0a3bdb00 473 return ERR_PTR(-ENOMEM);
d7e09d03
PT
474
475 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
476 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477 if (rc) {
478 ptlrpc_request_free(req);
0a3bdb00 479 return ERR_PTR(rc);
d7e09d03
PT
480 }
481
482 /* pack the intent */
483 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
484 lit->opc = (__u64)it->it_op;
485
486 /* pack the layout intent request */
487 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
488 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
1df232ee
OD
489 * set for replication
490 */
d7e09d03
PT
491 layout->li_opc = LAYOUT_INTENT_ACCESS;
492
493 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
44779340 494 obd->u.cli.cl_default_mds_easize);
d7e09d03 495 ptlrpc_request_set_replen(req);
0a3bdb00 496 return req;
d7e09d03
PT
497}
498
499static struct ptlrpc_request *
500mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
501{
502 struct ptlrpc_request *req;
503 int rc;
d7e09d03
PT
504
505 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
34e3ff96 506 if (!req)
0a3bdb00 507 return ERR_PTR(-ENOMEM);
d7e09d03
PT
508
509 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
510 if (rc) {
511 ptlrpc_request_free(req);
0a3bdb00 512 return ERR_PTR(rc);
d7e09d03
PT
513 }
514
515 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
516 ptlrpc_request_set_replen(req);
0a3bdb00 517 return req;
d7e09d03
PT
518}
519
520static int mdc_finish_enqueue(struct obd_export *exp,
521 struct ptlrpc_request *req,
522 struct ldlm_enqueue_info *einfo,
523 struct lookup_intent *it,
524 struct lustre_handle *lockh,
525 int rc)
526{
527 struct req_capsule *pill = &req->rq_pill;
528 struct ldlm_request *lockreq;
529 struct ldlm_reply *lockrep;
d7e09d03
PT
530 struct ldlm_lock *lock;
531 void *lvb_data = NULL;
532 int lvb_len = 0;
d7e09d03
PT
533
534 LASSERT(rc >= 0);
535 /* Similarly, if we're going to replay this request, we don't want to
1df232ee
OD
536 * actually get a lock, just perform the intent.
537 */
d7e09d03
PT
538 if (req->rq_transno || req->rq_replay) {
539 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
540 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
541 }
542
543 if (rc == ELDLM_LOCK_ABORTED) {
544 einfo->ei_mode = 0;
545 memset(lockh, 0, sizeof(*lockh));
546 rc = 0;
547 } else { /* rc = 0 */
548 lock = ldlm_handle2lock(lockh);
d7e09d03
PT
549
550 /* If the server gave us back a different lock mode, we should
1df232ee
OD
551 * fix up our variables.
552 */
d7e09d03
PT
553 if (lock->l_req_mode != einfo->ei_mode) {
554 ldlm_lock_addref(lockh, lock->l_req_mode);
555 ldlm_lock_decref(lockh, einfo->ei_mode);
556 einfo->ei_mode = lock->l_req_mode;
557 }
558 LDLM_LOCK_PUT(lock);
559 }
560
561 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
d7e09d03 562
e476f2e5
JH
563 it->it_disposition = (int)lockrep->lock_policy_res1;
564 it->it_status = (int)lockrep->lock_policy_res2;
565 it->it_lock_mode = einfo->ei_mode;
566 it->it_lock_handle = lockh->cookie;
8bf86fd9 567 it->it_request = req;
d7e09d03
PT
568
569 /* Technically speaking rq_transno must already be zero if
1df232ee
OD
570 * it_status is in error, so the check is a bit redundant
571 */
e476f2e5
JH
572 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
573 mdc_clear_replay_flag(req, it->it_status);
d7e09d03
PT
574
575 /* If we're doing an IT_OPEN which did not result in an actual
576 * successful open, then we need to remove the bit which saves
577 * this request for unconditional replay.
578 *
579 * It's important that we do this first! Otherwise we might exit the
580 * function without doing so, and try to replay a failed create
1df232ee
OD
581 * (bug 3440)
582 */
d7e09d03 583 if (it->it_op & IT_OPEN && req->rq_replay &&
e476f2e5
JH
584 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
585 mdc_clear_replay_flag(req, it->it_status);
d7e09d03
PT
586
587 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
e476f2e5 588 it->it_op, it->it_disposition, it->it_status);
d7e09d03
PT
589
590 /* We know what to expect, so we do any byte flipping required here */
591 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
592 struct mdt_body *body;
593
594 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
34e3ff96 595 if (!body) {
ffdac6ce 596 CERROR("Can't swab mdt_body\n");
0a3bdb00 597 return -EPROTO;
d7e09d03
PT
598 }
599
600 if (it_disposition(it, DISP_OPEN_OPEN) &&
601 !it_open_error(DISP_OPEN_OPEN, it)) {
602 /*
603 * If this is a successful OPEN request, we need to set
604 * replay handler and data early, so that if replay
605 * happens immediately after swabbing below, new reply
606 * is swabbed by that handler correctly.
607 */
63d42578 608 mdc_set_open_replay_data(NULL, NULL, it);
d7e09d03
PT
609 }
610
611 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
612 void *eadata;
613
614 mdc_update_max_ea_from_body(exp, body);
615
616 /*
617 * The eadata is opaque; just check that it is there.
618 * Eventually, obd_unpackmd() will check the contents.
619 */
620 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
621 body->eadatasize);
34e3ff96 622 if (!eadata)
0a3bdb00 623 return -EPROTO;
d7e09d03
PT
624
625 /* save lvb data and length in case this is for layout
1df232ee
OD
626 * lock
627 */
d7e09d03
PT
628 lvb_data = eadata;
629 lvb_len = body->eadatasize;
630
631 /*
632 * We save the reply LOV EA in case we have to replay a
633 * create for recovery. If we didn't allocate a large
634 * enough request buffer above we need to reallocate it
635 * here to hold the actual LOV EA.
636 *
637 * To not save LOV EA if request is not going to replay
638 * (for example error one).
639 */
640 if ((it->it_op & IT_OPEN) && req->rq_replay) {
641 void *lmm;
7436d070 642
d7e09d03
PT
643 if (req_capsule_get_size(pill, &RMF_EADATA,
644 RCL_CLIENT) <
645 body->eadatasize)
646 mdc_realloc_openmsg(req, body);
647 else
648 req_capsule_shrink(pill, &RMF_EADATA,
649 body->eadatasize,
650 RCL_CLIENT);
651
652 req_capsule_set_size(pill, &RMF_EADATA,
653 RCL_CLIENT,
654 body->eadatasize);
655
656 lmm = req_capsule_client_get(pill, &RMF_EADATA);
657 if (lmm)
658 memcpy(lmm, eadata, body->eadatasize);
659 }
660 }
d7e09d03
PT
661 } else if (it->it_op & IT_LAYOUT) {
662 /* maybe the lock was granted right away and layout
1df232ee
OD
663 * is packed into RMF_DLM_LVB of req
664 */
d7e09d03
PT
665 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
666 if (lvb_len > 0) {
667 lvb_data = req_capsule_server_sized_get(pill,
668 &RMF_DLM_LVB, lvb_len);
34e3ff96 669 if (!lvb_data)
0a3bdb00 670 return -EPROTO;
d7e09d03
PT
671 }
672 }
673
674 /* fill in stripe data for layout lock */
675 lock = ldlm_handle2lock(lockh);
34e3ff96 676 if (lock && ldlm_has_layout(lock) && lvb_data) {
d7e09d03
PT
677 void *lmm;
678
e93876dd 679 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
22e0bc6a 680 ldlm_it2str(it->it_op), lvb_len);
d7e09d03 681
33784467 682 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
34e3ff96 683 if (!lmm) {
d7e09d03 684 LDLM_LOCK_PUT(lock);
0a3bdb00 685 return -ENOMEM;
d7e09d03
PT
686 }
687 memcpy(lmm, lvb_data, lvb_len);
688
689 /* install lvb_data */
690 lock_res_and_lock(lock);
34e3ff96 691 if (!lock->l_lvb_data) {
04aa5d15 692 lock->l_lvb_type = LVB_T_LAYOUT;
d7e09d03
PT
693 lock->l_lvb_data = lmm;
694 lock->l_lvb_len = lvb_len;
695 lmm = NULL;
696 }
697 unlock_res_and_lock(lock);
34e3ff96 698 if (lmm)
33784467 699 kvfree(lmm);
d7e09d03 700 }
34e3ff96 701 if (lock)
d7e09d03
PT
702 LDLM_LOCK_PUT(lock);
703
0a3bdb00 704 return rc;
d7e09d03
PT
705}
706
707/* We always reserve enough space in the reply packet for a stripe MD, because
1df232ee
OD
708 * we don't know in advance the file type.
709 */
d7e09d03
PT
710int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
711 struct lookup_intent *it, struct md_op_data *op_data,
712 struct lustre_handle *lockh, void *lmm, int lmmsize,
ab909585 713 struct ptlrpc_request **reqp, u64 extra_lock_flags)
d7e09d03 714{
1a4cd3e9
SM
715 static const ldlm_policy_data_t lookup_policy = {
716 .l_inodebits = { MDS_INODELOCK_LOOKUP }
717 };
718 static const ldlm_policy_data_t update_policy = {
719 .l_inodebits = { MDS_INODELOCK_UPDATE }
720 };
721 static const ldlm_policy_data_t layout_policy = {
722 .l_inodebits = { MDS_INODELOCK_LAYOUT }
723 };
7fc1f831 724 static const ldlm_policy_data_t getxattr_policy = {
1a4cd3e9
SM
725 .l_inodebits = { MDS_INODELOCK_XATTR }
726 };
d7e09d03 727 ldlm_policy_data_t const *policy = &lookup_policy;
ab909585
SM
728 struct obd_device *obddev = class_exp2obd(exp);
729 struct ptlrpc_request *req;
730 u64 flags, saved_flags = extra_lock_flags;
731 struct ldlm_res_id res_id;
732 int generation, resends = 0;
733 struct ldlm_reply *lockrep;
734 enum lvb_type lvb_type = LVB_T_NONE;
735 int rc;
d7e09d03
PT
736
737 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
738 einfo->ei_type);
739
740 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
741
742 if (it) {
743 saved_flags |= LDLM_FL_HAS_INTENT;
744 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
745 policy = &update_policy;
746 else if (it->it_op & IT_LAYOUT)
747 policy = &layout_policy;
7fc1f831
AP
748 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
749 policy = &getxattr_policy;
d7e09d03
PT
750 }
751
34e3ff96 752 LASSERT(!reqp);
d7e09d03
PT
753
754 generation = obddev->u.cli.cl_import->imp_generation;
755resend:
756 flags = saved_flags;
757 if (!it) {
758 /* The only way right now is FLOCK, in this case we hide flock
1df232ee
OD
759 * policy as lmm, but lmmsize is 0
760 */
d7e09d03
PT
761 LASSERT(lmm && lmmsize == 0);
762 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
763 einfo->ei_type);
3cf8e32d 764 policy = lmm;
d7e09d03 765 res_id.name[3] = LDLM_FLOCK;
3a09f36e 766 req = NULL;
d7e09d03
PT
767 } else if (it->it_op & IT_OPEN) {
768 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
769 einfo->ei_cbdata);
770 policy = &update_policy;
771 einfo->ei_cbdata = NULL;
772 lmm = NULL;
773 } else if (it->it_op & IT_UNLINK) {
774 req = mdc_intent_unlink_pack(exp, it, op_data);
775 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
776 req = mdc_intent_getattr_pack(exp, it, op_data);
777 } else if (it->it_op & IT_READDIR) {
778 req = mdc_enqueue_pack(exp, 0);
779 } else if (it->it_op & IT_LAYOUT) {
780 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
0a3bdb00 781 return -EOPNOTSUPP;
d7e09d03
PT
782 req = mdc_intent_layout_pack(exp, it, op_data);
783 lvb_type = LVB_T_LAYOUT;
e93a3082 784 } else if (it->it_op & IT_GETXATTR) {
7fc1f831 785 req = mdc_intent_getxattr_pack(exp, it, op_data);
d7e09d03
PT
786 } else {
787 LBUG();
0a3bdb00 788 return -EINVAL;
d7e09d03
PT
789 }
790
791 if (IS_ERR(req))
0a3bdb00 792 return PTR_ERR(req);
d7e09d03 793
34e3ff96 794 if (req && it && it->it_op & IT_CREAT)
d7e09d03 795 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
1df232ee
OD
796 * retry logic
797 */
d7e09d03
PT
798 req->rq_no_retry_einprogress = 1;
799
800 if (resends) {
801 req->rq_generation_set = 1;
802 req->rq_import_generation = generation;
219e6de6 803 req->rq_sent = ktime_get_real_seconds() + resends;
d7e09d03
PT
804 }
805
806 /* It is important to obtain rpc_lock first (if applicable), so that
807 * threads that are serialised with rpc_lock are not polluting our
1df232ee
OD
808 * rpcs in flight counter. We do not do flock request limiting, though
809 */
d7e09d03
PT
810 if (it) {
811 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
1d5d5ec1 812 rc = obd_get_request_slot(&obddev->u.cli);
d7e09d03
PT
813 if (rc != 0) {
814 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
815 mdc_clear_replay_flag(req, 0);
816 ptlrpc_req_finished(req);
0a3bdb00 817 return rc;
d7e09d03
PT
818 }
819 }
820
821 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
822 0, lvb_type, lockh, 0);
823 if (!it) {
34ca8748 824 /* For flock requests we immediately return without further
1df232ee
OD
825 * delay and let caller deal with the rest, since rest of
826 * this function metadata processing makes no sense for flock
827 * requests anyway. But in case of problem during comms with
828 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
829 * can not rely on caller and this mainly for F_UNLCKs
830 * (explicits or automatically generated by Kernel to clean
831 * current FLocks upon exit) that can't be trashed
832 */
e9ada6fa
BF
833 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
834 (einfo->ei_type == LDLM_FLOCK) &&
835 (einfo->ei_mode == LCK_NL))
cd6b328c 836 goto resend;
0a3bdb00 837 return rc;
d7e09d03
PT
838 }
839
1d5d5ec1 840 obd_put_request_slot(&obddev->u.cli);
d7e09d03
PT
841 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
842
843 if (rc < 0) {
e49634bb
AD
844 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
845 "%s: ldlm_cli_enqueue failed: rc = %d\n",
846 obddev->obd_name, rc);
847
d7e09d03
PT
848 mdc_clear_replay_flag(req, rc);
849 ptlrpc_req_finished(req);
0a3bdb00 850 return rc;
d7e09d03
PT
851 }
852
853 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
d7e09d03 854
2d58de78
LW
855 lockrep->lock_policy_res2 =
856 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
857
d7e09d03 858 /* Retry the create infinitely when we get -EINPROGRESS from
1df232ee
OD
859 * server. This is required by the new quota design.
860 */
a3aa95f8 861 if (it->it_op & IT_CREAT &&
d7e09d03
PT
862 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
863 mdc_clear_replay_flag(req, rc);
864 ptlrpc_req_finished(req);
865 resends++;
866
867 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
868 obddev->obd_name, resends, it->it_op,
869 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
870
871 if (generation == obddev->u.cli.cl_import->imp_generation) {
872 goto resend;
873 } else {
874 CDEBUG(D_HA, "resend cross eviction\n");
0a3bdb00 875 return -EIO;
d7e09d03
PT
876 }
877 }
878
879 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
880 if (rc < 0) {
881 if (lustre_handle_is_used(lockh)) {
882 ldlm_lock_decref(lockh, einfo->ei_mode);
883 memset(lockh, 0, sizeof(*lockh));
884 }
885 ptlrpc_req_finished(req);
7591805a 886
e476f2e5
JH
887 it->it_lock_handle = 0;
888 it->it_lock_mode = 0;
8bf86fd9 889 it->it_request = NULL;
d7e09d03 890 }
7591805a 891
0a3bdb00 892 return rc;
d7e09d03
PT
893}
894
895static int mdc_finish_intent_lock(struct obd_export *exp,
896 struct ptlrpc_request *request,
897 struct md_op_data *op_data,
898 struct lookup_intent *it,
899 struct lustre_handle *lockh)
900{
901 struct lustre_handle old_lock;
902 struct mdt_body *mdt_body;
903 struct ldlm_lock *lock;
904 int rc;
d7e09d03 905
d7e09d03
PT
906 LASSERT(request != LP_POISON);
907 LASSERT(request->rq_repmsg != LP_POISON);
908
34a60457 909 if (it->it_op & IT_READDIR)
910 return 0;
911
d7e09d03
PT
912 if (!it_disposition(it, DISP_IT_EXECD)) {
913 /* The server failed before it even started executing the
1df232ee
OD
914 * intent, i.e. because it couldn't unpack the request.
915 */
e476f2e5
JH
916 LASSERT(it->it_status != 0);
917 return it->it_status;
d7e09d03
PT
918 }
919 rc = it_open_error(DISP_IT_EXECD, it);
920 if (rc)
0a3bdb00 921 return rc;
d7e09d03
PT
922
923 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
34e3ff96 924 LASSERT(mdt_body); /* mdc_enqueue checked */
d7e09d03
PT
925
926 /* If we were revalidating a fid/name pair, mark the intent in
1df232ee
OD
927 * case we fail and get called again from lookup
928 */
d7e09d03
PT
929 if (fid_is_sane(&op_data->op_fid2) &&
930 it->it_create_mode & M_CHECK_STALE &&
931 it->it_op != IT_GETATTR) {
d7e09d03
PT
932 /* Also: did we find the same inode? */
933 /* sever can return one of two fids:
934 * op_fid2 - new allocated fid - if file is created.
935 * op_fid3 - existent fid - if file only open.
1df232ee
OD
936 * op_fid3 is saved in lmv_intent_open
937 */
d7e09d03
PT
938 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
939 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
940 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
941 "\n", PFID(&op_data->op_fid2),
942 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
0a3bdb00 943 return -ESTALE;
d7e09d03
PT
944 }
945 }
946
947 rc = it_open_error(DISP_LOOKUP_EXECD, it);
948 if (rc)
0a3bdb00 949 return rc;
d7e09d03
PT
950
951 /* keep requests around for the multiple phases of the call
952 * this shows the DISP_XX must guarantee we make it into the call
953 */
954 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
955 it_disposition(it, DISP_OPEN_CREATE) &&
956 !it_open_error(DISP_OPEN_CREATE, it)) {
957 it_set_disposition(it, DISP_ENQ_CREATE_REF);
958 ptlrpc_request_addref(request); /* balanced in ll_create_node */
959 }
960 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
961 it_disposition(it, DISP_OPEN_OPEN) &&
962 !it_open_error(DISP_OPEN_OPEN, it)) {
963 it_set_disposition(it, DISP_ENQ_OPEN_REF);
964 ptlrpc_request_addref(request); /* balanced in ll_file_open */
965 /* BUG 11546 - eviction in the middle of open rpc processing */
966 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
967 }
968
969 if (it->it_op & IT_CREAT) {
970 /* XXX this belongs in ll_create_it */
971 } else if (it->it_op == IT_OPEN) {
972 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
973 } else {
974 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
975 }
976
977 /* If we already have a matching lock, then cancel the new
978 * one. We have to set the data here instead of in
979 * mdc_enqueue, because we need to use the child's inode as
980 * the l_ast_data to match, and that's not available until
1df232ee
OD
981 * intent_finish has performed the iget().)
982 */
d7e09d03
PT
983 lock = ldlm_handle2lock(lockh);
984 if (lock) {
985 ldlm_policy_data_t policy = lock->l_policy_data;
7436d070 986
d7e09d03
PT
987 LDLM_DEBUG(lock, "matching against this");
988
989 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
990 &lock->l_resource->lr_name),
6d95e048
AD
991 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
992 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
d7e09d03
PT
993 LDLM_LOCK_PUT(lock);
994
995 memcpy(&old_lock, lockh, sizeof(*lockh));
996 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
e5e663ae
SM
997 LDLM_IBITS, &policy, LCK_NL,
998 &old_lock, 0)) {
d7e09d03 999 ldlm_lock_decref_and_cancel(lockh,
e476f2e5 1000 it->it_lock_mode);
d7e09d03 1001 memcpy(lockh, &old_lock, sizeof(old_lock));
e476f2e5 1002 it->it_lock_handle = lockh->cookie;
d7e09d03
PT
1003 }
1004 }
301af906
SM
1005 CDEBUG(D_DENTRY,
1006 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
d7e09d03 1007 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
e476f2e5 1008 it->it_status, it->it_disposition, rc);
0a3bdb00 1009 return rc;
d7e09d03
PT
1010}
1011
1012int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1013 struct lu_fid *fid, __u64 *bits)
1014{
1015 /* We could just return 1 immediately, but since we should only
1016 * be called in revalidate_it if we already have a lock, let's
1df232ee
OD
1017 * verify that.
1018 */
d7e09d03
PT
1019 struct ldlm_res_id res_id;
1020 struct lustre_handle lockh;
1021 ldlm_policy_data_t policy;
52ee0d20 1022 enum ldlm_mode mode;
d7e09d03 1023
e476f2e5
JH
1024 if (it->it_lock_handle) {
1025 lockh.cookie = it->it_lock_handle;
d7e09d03
PT
1026 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1027 } else {
1028 fid_build_reg_res_name(fid, &res_id);
1029 switch (it->it_op) {
1030 case IT_GETATTR:
bf08ee0d
OD
1031 /* File attributes are held under multiple bits:
1032 * nlink is under lookup lock, size and times are
1033 * under UPDATE lock and recently we've also got
1034 * a separate permissions lock for owner/group/acl that
1035 * were protected by lookup lock before.
1036 * Getattr must provide all of that information,
1037 * so we need to ensure we have all of those locks.
1038 * Unfortunately, if the bits are split across multiple
1039 * locks, there's no easy way to match all of them here,
1040 * so an extra RPC would be performed to fetch all
1df232ee
OD
1041 * of those bits at once for now.
1042 */
fe4c58af 1043 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1044 * but for old MDTs (< 2.4), permission is covered
1df232ee
OD
1045 * by LOOKUP lock, so it needs to match all bits here.
1046 */
bf08ee0d
OD
1047 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1048 MDS_INODELOCK_LOOKUP |
1049 MDS_INODELOCK_PERM;
d7e09d03 1050 break;
34a60457 1051 case IT_READDIR:
1052 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1053 break;
d7e09d03
PT
1054 case IT_LAYOUT:
1055 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1056 break;
1057 default:
1058 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1059 break;
1060 }
bf08ee0d 1061
6caea2f9 1062 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
22e0bc6a 1063 LDLM_IBITS, &policy,
6caea2f9
AL
1064 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1065 &lockh);
d7e09d03
PT
1066 }
1067
1068 if (mode) {
e476f2e5
JH
1069 it->it_lock_handle = lockh.cookie;
1070 it->it_lock_mode = mode;
d7e09d03 1071 } else {
e476f2e5
JH
1072 it->it_lock_handle = 0;
1073 it->it_lock_mode = 0;
d7e09d03
PT
1074 }
1075
0a3bdb00 1076 return !!mode;
d7e09d03
PT
1077}
1078
1079/*
1080 * This long block is all about fixing up the lock and request state
1081 * so that it is correct as of the moment _before_ the operation was
1082 * applied; that way, the VFS will think that everything is normal and
1083 * call Lustre's regular VFS methods.
1084 *
1085 * If we're performing a creation, that means that unless the creation
1086 * failed with EEXIST, we should fake up a negative dentry.
1087 *
1088 * For everything else, we want to lookup to succeed.
1089 *
1090 * One additional note: if CREATE or OPEN succeeded, we add an extra
1091 * reference to the request because we need to keep it around until
1092 * ll_create/ll_open gets called.
1093 *
1094 * The server will return to us, in it_disposition, an indication of
e476f2e5 1095 * exactly what it_status refers to.
d7e09d03 1096 *
e476f2e5 1097 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
d7e09d03
PT
1098 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1099 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1100 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1101 * was successful.
1102 *
e476f2e5 1103 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
d7e09d03
PT
1104 * child lookup.
1105 */
1106int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1107 void *lmm, int lmmsize, struct lookup_intent *it,
1108 int lookup_flags, struct ptlrpc_request **reqp,
1109 ldlm_blocking_callback cb_blocking,
1110 __u64 extra_lock_flags)
1111{
f236f69b
LS
1112 struct ldlm_enqueue_info einfo = {
1113 .ei_type = LDLM_IBITS,
1114 .ei_mode = it_to_lock_mode(it),
1115 .ei_cb_bl = cb_blocking,
1116 .ei_cb_cp = ldlm_completion_ast,
1117 };
d7e09d03
PT
1118 struct lustre_handle lockh;
1119 int rc = 0;
29aaf496 1120
d7e09d03
PT
1121 LASSERT(it);
1122
1123 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
d3a8a4e2
JX
1124 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1125 op_data->op_name, PFID(&op_data->op_fid2),
1126 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1127 it->it_flags);
d7e09d03
PT
1128
1129 lockh.cookie = 0;
1130 if (fid_is_sane(&op_data->op_fid2) &&
34a60457 1131 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
d7e09d03
PT
1132 /* We could just return 1 immediately, but since we should only
1133 * be called in revalidate_it if we already have a lock, let's
1df232ee
OD
1134 * verify that.
1135 */
e476f2e5 1136 it->it_lock_handle = 0;
d7e09d03
PT
1137 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1138 /* Only return failure if it was not GETATTR by cfid
1df232ee
OD
1139 * (from inode_revalidate)
1140 */
d7e09d03 1141 if (rc || op_data->op_namelen != 0)
0a3bdb00 1142 return rc;
d7e09d03
PT
1143 }
1144
f236f69b
LS
1145 /* For case if upper layer did not alloc fid, do it now. */
1146 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
8f18c8a4 1147 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
f236f69b
LS
1148 if (rc < 0) {
1149 CERROR("Can't alloc new fid, rc %d\n", rc);
0a3bdb00 1150 return rc;
f236f69b 1151 }
d7e09d03 1152 }
f236f69b
LS
1153 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1154 extra_lock_flags);
1155 if (rc < 0)
1156 return rc;
1157
8bf86fd9 1158 *reqp = it->it_request;
d7e09d03 1159 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
0a3bdb00 1160 return rc;
d7e09d03
PT
1161}
1162
1163static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1164 struct ptlrpc_request *req,
1165 void *args, int rc)
1166{
1167 struct mdc_getattr_args *ga = args;
1168 struct obd_export *exp = ga->ga_exp;
1169 struct md_enqueue_info *minfo = ga->ga_minfo;
1170 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1171 struct lookup_intent *it;
1172 struct lustre_handle *lockh;
1173 struct obd_device *obddev;
2d58de78 1174 struct ldlm_reply *lockrep;
d7e09d03 1175 __u64 flags = LDLM_FL_HAS_INTENT;
d7e09d03
PT
1176
1177 it = &minfo->mi_it;
1178 lockh = &minfo->mi_lockh;
1179
1180 obddev = class_exp2obd(exp);
1181
1d5d5ec1 1182 obd_put_request_slot(&obddev->u.cli);
d7e09d03
PT
1183 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1184 rc = -ETIMEDOUT;
1185
1186 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1187 &flags, NULL, 0, lockh, rc);
1188 if (rc < 0) {
1189 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1190 mdc_clear_replay_flag(req, rc);
d5fdc207 1191 goto out;
d7e09d03
PT
1192 }
1193
2d58de78 1194 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2d58de78
LW
1195
1196 lockrep->lock_policy_res2 =
1197 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1198
d7e09d03
PT
1199 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1200 if (rc)
d5fdc207 1201 goto out;
d7e09d03
PT
1202
1203 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
d7e09d03
PT
1204
1205out:
7b81779d 1206 kfree(einfo);
d7e09d03
PT
1207 minfo->mi_cb(req, minfo, rc);
1208 return 0;
1209}
1210
1211int mdc_intent_getattr_async(struct obd_export *exp,
1212 struct md_enqueue_info *minfo,
1213 struct ldlm_enqueue_info *einfo)
1214{
1215 struct md_op_data *op_data = &minfo->mi_data;
1216 struct lookup_intent *it = &minfo->mi_it;
1217 struct ptlrpc_request *req;
1218 struct mdc_getattr_args *ga;
1219 struct obd_device *obddev = class_exp2obd(exp);
1220 struct ldlm_res_id res_id;
1221 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1222 * for statahead currently. Consider CMD in future, such two bits
1df232ee
OD
1223 * maybe managed by different MDS, should be adjusted then.
1224 */
d7e09d03
PT
1225 ldlm_policy_data_t policy = {
1226 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1227 MDS_INODELOCK_UPDATE }
1228 };
1229 int rc = 0;
1230 __u64 flags = LDLM_FL_HAS_INTENT;
d7e09d03 1231
d3a8a4e2 1232 CDEBUG(D_DLMTRACE,
22e0bc6a
OD
1233 "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1234 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1235 ldlm_it2str(it->it_op), it->it_flags);
d7e09d03
PT
1236
1237 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1238 req = mdc_intent_getattr_pack(exp, it, op_data);
add882a8
JH
1239 if (IS_ERR(req))
1240 return PTR_ERR(req);
d7e09d03 1241
1d5d5ec1 1242 rc = obd_get_request_slot(&obddev->u.cli);
d7e09d03
PT
1243 if (rc != 0) {
1244 ptlrpc_req_finished(req);
0a3bdb00 1245 return rc;
d7e09d03
PT
1246 }
1247
1248 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1249 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1250 if (rc < 0) {
1d5d5ec1 1251 obd_put_request_slot(&obddev->u.cli);
d7e09d03 1252 ptlrpc_req_finished(req);
0a3bdb00 1253 return rc;
d7e09d03
PT
1254 }
1255
1256 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1257 ga = ptlrpc_req_async_args(req);
1258 ga->ga_exp = exp;
1259 ga->ga_minfo = minfo;
1260 ga->ga_einfo = einfo;
1261
1262 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
c5c4c6fa 1263 ptlrpcd_add_req(req);
d7e09d03 1264
0a3bdb00 1265 return 0;
d7e09d03 1266}