4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 # include <linux/module.h>
37 #include "../include/lustre_intent.h"
38 #include "../include/obd.h"
39 #include "../include/obd_class.h"
40 #include "../include/lustre_dlm.h"
41 #include "../include/lustre_fid.h"
42 #include "../include/lustre_mdc.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre_req_layout.h"
45 #include "../include/lustre_swab.h"
47 #include "mdc_internal.h"
49 struct mdc_getattr_args
{
50 struct obd_export
*ga_exp
;
51 struct md_enqueue_info
*ga_minfo
;
52 struct ldlm_enqueue_info
*ga_einfo
;
55 int it_open_error(int phase
, struct lookup_intent
*it
)
57 if (it_disposition(it
, DISP_OPEN_LEASE
)) {
58 if (phase
>= DISP_OPEN_LEASE
)
63 if (it_disposition(it
, DISP_OPEN_OPEN
)) {
64 if (phase
>= DISP_OPEN_OPEN
)
70 if (it_disposition(it
, DISP_OPEN_CREATE
)) {
71 if (phase
>= DISP_OPEN_CREATE
)
77 if (it_disposition(it
, DISP_LOOKUP_EXECD
)) {
78 if (phase
>= DISP_LOOKUP_EXECD
)
84 if (it_disposition(it
, DISP_IT_EXECD
)) {
85 if (phase
>= DISP_IT_EXECD
)
90 CERROR("it disp: %X, status: %d\n", it
->it_disposition
,
95 EXPORT_SYMBOL(it_open_error
);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export
*exp
, const struct lustre_handle
*lockh
,
99 void *data
, __u64
*bits
)
101 struct ldlm_lock
*lock
;
102 struct inode
*new_inode
= data
;
107 if (!lustre_handle_is_used(lockh
))
110 lock
= ldlm_handle2lock(lockh
);
113 lock_res_and_lock(lock
);
114 if (lock
->l_resource
->lr_lvb_inode
&&
115 lock
->l_resource
->lr_lvb_inode
!= data
) {
116 struct inode
*old_inode
= lock
->l_resource
->lr_lvb_inode
;
118 LASSERTF(old_inode
->i_state
& I_FREEING
,
119 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
120 old_inode
, old_inode
->i_ino
, old_inode
->i_generation
,
121 old_inode
->i_state
, new_inode
, new_inode
->i_ino
,
122 new_inode
->i_generation
);
124 lock
->l_resource
->lr_lvb_inode
= new_inode
;
126 *bits
= lock
->l_policy_data
.l_inodebits
.bits
;
128 unlock_res_and_lock(lock
);
134 enum ldlm_mode
mdc_lock_match(struct obd_export
*exp
, __u64 flags
,
135 const struct lu_fid
*fid
, enum ldlm_type type
,
136 union ldlm_policy_data
*policy
,
138 struct lustre_handle
*lockh
)
140 struct ldlm_res_id res_id
;
143 fid_build_reg_res_name(fid
, &res_id
);
144 /* LU-4405: Clear bits not supported by server */
145 policy
->l_inodebits
.bits
&= exp_connect_ibits(exp
);
146 rc
= ldlm_lock_match(class_exp2obd(exp
)->obd_namespace
, flags
,
147 &res_id
, type
, policy
, mode
, lockh
, 0);
151 int mdc_cancel_unused(struct obd_export
*exp
,
152 const struct lu_fid
*fid
,
153 union ldlm_policy_data
*policy
,
155 enum ldlm_cancel_flags flags
,
158 struct ldlm_res_id res_id
;
159 struct obd_device
*obd
= class_exp2obd(exp
);
162 fid_build_reg_res_name(fid
, &res_id
);
163 rc
= ldlm_cli_cancel_unused_resource(obd
->obd_namespace
, &res_id
,
164 policy
, mode
, flags
, opaque
);
168 int mdc_null_inode(struct obd_export
*exp
,
169 const struct lu_fid
*fid
)
171 struct ldlm_res_id res_id
;
172 struct ldlm_resource
*res
;
173 struct ldlm_namespace
*ns
= class_exp2obd(exp
)->obd_namespace
;
175 LASSERTF(ns
, "no namespace passed\n");
177 fid_build_reg_res_name(fid
, &res_id
);
179 res
= ldlm_resource_get(ns
, NULL
, &res_id
, 0, 0);
184 res
->lr_lvb_inode
= NULL
;
187 ldlm_resource_putref(res
);
191 static inline void mdc_clear_replay_flag(struct ptlrpc_request
*req
, int rc
)
193 /* Don't hold error requests for replay. */
194 if (req
->rq_replay
) {
195 spin_lock(&req
->rq_lock
);
197 spin_unlock(&req
->rq_lock
);
199 if (rc
&& req
->rq_transno
!= 0) {
200 DEBUG_REQ(D_ERROR
, req
, "transno returned on error rc %d", rc
);
205 /* Save a large LOV EA into the request buffer so that it is available
206 * for replay. We don't do this in the initial request because the
207 * original request doesn't need this buffer (at most it sends just the
208 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
209 * buffer and may also be difficult to allocate and save a very large
210 * request buffer for each open. (bug 5707)
212 * OOM here may cause recovery failure if lmm is needed (only for the
213 * original open if the MDS crashed just when this client also OOM'd)
214 * but this is incredibly unlikely, and questionable whether the client
215 * could do MDS recovery under OOM anyways...
217 static void mdc_realloc_openmsg(struct ptlrpc_request
*req
,
218 struct mdt_body
*body
)
222 /* FIXME: remove this explicit offset. */
223 rc
= sptlrpc_cli_enlarge_reqbuf(req
, DLM_INTENT_REC_OFF
+ 4,
224 body
->mbo_eadatasize
);
226 CERROR("Can't enlarge segment %d size to %d\n",
227 DLM_INTENT_REC_OFF
+ 4, body
->mbo_eadatasize
);
228 body
->mbo_valid
&= ~OBD_MD_FLEASIZE
;
229 body
->mbo_eadatasize
= 0;
233 static struct ptlrpc_request
*
234 mdc_intent_open_pack(struct obd_export
*exp
, struct lookup_intent
*it
,
235 struct md_op_data
*op_data
)
237 struct ptlrpc_request
*req
;
238 struct obd_device
*obddev
= class_exp2obd(exp
);
239 struct ldlm_intent
*lit
;
240 const void *lmm
= op_data
->op_data
;
241 u32 lmmsize
= op_data
->op_data_size
;
247 it
->it_create_mode
= (it
->it_create_mode
& ~S_IFMT
) | S_IFREG
;
249 /* XXX: openlock is not cancelled for cross-refs. */
250 /* If inode is known, cancel conflicting OPEN locks. */
251 if (fid_is_sane(&op_data
->op_fid2
)) {
252 if (it
->it_flags
& MDS_OPEN_LEASE
) { /* try to get lease */
253 if (it
->it_flags
& FMODE_WRITE
)
258 if (it
->it_flags
& (FMODE_WRITE
| MDS_OPEN_TRUNC
))
260 else if (it
->it_flags
& __FMODE_EXEC
)
265 count
= mdc_resource_get_unused(exp
, &op_data
->op_fid2
,
270 /* If CREATE, cancel parent's UPDATE lock. */
271 if (it
->it_op
& IT_CREAT
)
275 count
+= mdc_resource_get_unused(exp
, &op_data
->op_fid1
,
277 MDS_INODELOCK_UPDATE
);
279 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
280 &RQF_LDLM_INTENT_OPEN
);
282 ldlm_lock_list_put(&cancels
, l_bl_ast
, count
);
283 return ERR_PTR(-ENOMEM
);
286 req_capsule_set_size(&req
->rq_pill
, &RMF_NAME
, RCL_CLIENT
,
287 op_data
->op_namelen
+ 1);
288 req_capsule_set_size(&req
->rq_pill
, &RMF_EADATA
, RCL_CLIENT
,
289 max(lmmsize
, obddev
->u
.cli
.cl_default_mds_easize
));
291 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, count
);
293 ptlrpc_request_free(req
);
297 spin_lock(&req
->rq_lock
);
298 req
->rq_replay
= req
->rq_import
->imp_replayable
;
299 spin_unlock(&req
->rq_lock
);
301 /* pack the intent */
302 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
303 lit
->opc
= (__u64
)it
->it_op
;
305 /* pack the intended request */
306 mdc_open_pack(req
, op_data
, it
->it_create_mode
, 0, it
->it_flags
, lmm
,
309 req_capsule_set_size(&req
->rq_pill
, &RMF_MDT_MD
, RCL_SERVER
,
310 obddev
->u
.cli
.cl_max_mds_easize
);
312 ptlrpc_request_set_replen(req
);
316 static struct ptlrpc_request
*
317 mdc_intent_getxattr_pack(struct obd_export
*exp
,
318 struct lookup_intent
*it
,
319 struct md_op_data
*op_data
)
321 struct ptlrpc_request
*req
;
322 struct ldlm_intent
*lit
;
327 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
328 &RQF_LDLM_INTENT_GETXATTR
);
330 return ERR_PTR(-ENOMEM
);
332 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, count
);
334 ptlrpc_request_free(req
);
338 /* pack the intent */
339 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
340 lit
->opc
= IT_GETXATTR
;
342 maxdata
= class_exp2cliimp(exp
)->imp_connect_data
.ocd_max_easize
;
344 /* pack the intended request */
345 mdc_pack_body(req
, &op_data
->op_fid1
, op_data
->op_valid
, maxdata
, -1,
348 req_capsule_set_size(&req
->rq_pill
, &RMF_EADATA
, RCL_SERVER
, maxdata
);
350 req_capsule_set_size(&req
->rq_pill
, &RMF_EAVALS
, RCL_SERVER
, maxdata
);
352 req_capsule_set_size(&req
->rq_pill
, &RMF_EAVALS_LENS
,
353 RCL_SERVER
, maxdata
);
355 ptlrpc_request_set_replen(req
);
360 static struct ptlrpc_request
*mdc_intent_unlink_pack(struct obd_export
*exp
,
361 struct lookup_intent
*it
,
362 struct md_op_data
*op_data
)
364 struct ptlrpc_request
*req
;
365 struct obd_device
*obddev
= class_exp2obd(exp
);
366 struct ldlm_intent
*lit
;
369 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
370 &RQF_LDLM_INTENT_UNLINK
);
372 return ERR_PTR(-ENOMEM
);
374 req_capsule_set_size(&req
->rq_pill
, &RMF_NAME
, RCL_CLIENT
,
375 op_data
->op_namelen
+ 1);
377 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
379 ptlrpc_request_free(req
);
383 /* pack the intent */
384 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
385 lit
->opc
= (__u64
)it
->it_op
;
387 /* pack the intended request */
388 mdc_unlink_pack(req
, op_data
);
390 req_capsule_set_size(&req
->rq_pill
, &RMF_MDT_MD
, RCL_SERVER
,
391 obddev
->u
.cli
.cl_default_mds_easize
);
392 ptlrpc_request_set_replen(req
);
396 static struct ptlrpc_request
*mdc_intent_getattr_pack(struct obd_export
*exp
,
397 struct lookup_intent
*it
,
398 struct md_op_data
*op_data
)
400 struct ptlrpc_request
*req
;
401 struct obd_device
*obddev
= class_exp2obd(exp
);
402 u64 valid
= OBD_MD_FLGETATTR
| OBD_MD_FLEASIZE
|
403 OBD_MD_FLMODEASIZE
| OBD_MD_FLDIREA
|
404 OBD_MD_MEA
| OBD_MD_FLACL
;
405 struct ldlm_intent
*lit
;
409 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
410 &RQF_LDLM_INTENT_GETATTR
);
412 return ERR_PTR(-ENOMEM
);
414 req_capsule_set_size(&req
->rq_pill
, &RMF_NAME
, RCL_CLIENT
,
415 op_data
->op_namelen
+ 1);
417 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
419 ptlrpc_request_free(req
);
423 /* pack the intent */
424 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
425 lit
->opc
= (__u64
)it
->it_op
;
427 if (obddev
->u
.cli
.cl_default_mds_easize
> 0)
428 easize
= obddev
->u
.cli
.cl_default_mds_easize
;
430 easize
= obddev
->u
.cli
.cl_max_mds_easize
;
432 /* pack the intended request */
433 mdc_getattr_pack(req
, valid
, it
->it_flags
, op_data
, easize
);
435 req_capsule_set_size(&req
->rq_pill
, &RMF_MDT_MD
, RCL_SERVER
, easize
);
436 ptlrpc_request_set_replen(req
);
440 static struct ptlrpc_request
*mdc_intent_layout_pack(struct obd_export
*exp
,
441 struct lookup_intent
*it
,
442 struct md_op_data
*unused
)
444 struct obd_device
*obd
= class_exp2obd(exp
);
445 struct ptlrpc_request
*req
;
446 struct ldlm_intent
*lit
;
447 struct layout_intent
*layout
;
450 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
451 &RQF_LDLM_INTENT_LAYOUT
);
453 return ERR_PTR(-ENOMEM
);
455 req_capsule_set_size(&req
->rq_pill
, &RMF_EADATA
, RCL_CLIENT
, 0);
456 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
458 ptlrpc_request_free(req
);
462 /* pack the intent */
463 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
464 lit
->opc
= (__u64
)it
->it_op
;
466 /* pack the layout intent request */
467 layout
= req_capsule_client_get(&req
->rq_pill
, &RMF_LAYOUT_INTENT
);
468 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
469 * set for replication
471 layout
->li_opc
= LAYOUT_INTENT_ACCESS
;
473 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
,
474 obd
->u
.cli
.cl_default_mds_easize
);
475 ptlrpc_request_set_replen(req
);
479 static struct ptlrpc_request
*
480 mdc_enqueue_pack(struct obd_export
*exp
, int lvb_len
)
482 struct ptlrpc_request
*req
;
485 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_LDLM_ENQUEUE
);
487 return ERR_PTR(-ENOMEM
);
489 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
491 ptlrpc_request_free(req
);
495 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
, lvb_len
);
496 ptlrpc_request_set_replen(req
);
500 static int mdc_finish_enqueue(struct obd_export
*exp
,
501 struct ptlrpc_request
*req
,
502 struct ldlm_enqueue_info
*einfo
,
503 struct lookup_intent
*it
,
504 struct lustre_handle
*lockh
,
507 struct req_capsule
*pill
= &req
->rq_pill
;
508 struct ldlm_request
*lockreq
;
509 struct ldlm_reply
*lockrep
;
510 struct ldlm_lock
*lock
;
511 void *lvb_data
= NULL
;
515 /* Similarly, if we're going to replay this request, we don't want to
516 * actually get a lock, just perform the intent.
518 if (req
->rq_transno
|| req
->rq_replay
) {
519 lockreq
= req_capsule_client_get(pill
, &RMF_DLM_REQ
);
520 lockreq
->lock_flags
|= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY
);
523 if (rc
== ELDLM_LOCK_ABORTED
) {
525 memset(lockh
, 0, sizeof(*lockh
));
527 } else { /* rc = 0 */
528 lock
= ldlm_handle2lock(lockh
);
530 /* If the server gave us back a different lock mode, we should
531 * fix up our variables.
533 if (lock
->l_req_mode
!= einfo
->ei_mode
) {
534 ldlm_lock_addref(lockh
, lock
->l_req_mode
);
535 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
536 einfo
->ei_mode
= lock
->l_req_mode
;
541 lockrep
= req_capsule_server_get(pill
, &RMF_DLM_REP
);
543 it
->it_disposition
= (int)lockrep
->lock_policy_res1
;
544 it
->it_status
= (int)lockrep
->lock_policy_res2
;
545 it
->it_lock_mode
= einfo
->ei_mode
;
546 it
->it_lock_handle
= lockh
->cookie
;
547 it
->it_request
= req
;
549 /* Technically speaking rq_transno must already be zero if
550 * it_status is in error, so the check is a bit redundant
552 if ((!req
->rq_transno
|| it
->it_status
< 0) && req
->rq_replay
)
553 mdc_clear_replay_flag(req
, it
->it_status
);
555 /* If we're doing an IT_OPEN which did not result in an actual
556 * successful open, then we need to remove the bit which saves
557 * this request for unconditional replay.
559 * It's important that we do this first! Otherwise we might exit the
560 * function without doing so, and try to replay a failed create
563 if (it
->it_op
& IT_OPEN
&& req
->rq_replay
&&
564 (!it_disposition(it
, DISP_OPEN_OPEN
) || it
->it_status
!= 0))
565 mdc_clear_replay_flag(req
, it
->it_status
);
567 DEBUG_REQ(D_RPCTRACE
, req
, "op: %d disposition: %x, status: %d",
568 it
->it_op
, it
->it_disposition
, it
->it_status
);
570 /* We know what to expect, so we do any byte flipping required here */
571 if (it
->it_op
& (IT_OPEN
| IT_UNLINK
| IT_LOOKUP
| IT_GETATTR
)) {
572 struct mdt_body
*body
;
574 body
= req_capsule_server_get(pill
, &RMF_MDT_BODY
);
576 CERROR("Can't swab mdt_body\n");
580 if (it_disposition(it
, DISP_OPEN_OPEN
) &&
581 !it_open_error(DISP_OPEN_OPEN
, it
)) {
583 * If this is a successful OPEN request, we need to set
584 * replay handler and data early, so that if replay
585 * happens immediately after swabbing below, new reply
586 * is swabbed by that handler correctly.
588 mdc_set_open_replay_data(NULL
, NULL
, it
);
591 if ((body
->mbo_valid
& (OBD_MD_FLDIREA
| OBD_MD_FLEASIZE
)) != 0) {
594 mdc_update_max_ea_from_body(exp
, body
);
597 * The eadata is opaque; just check that it is there.
598 * Eventually, obd_unpackmd() will check the contents.
600 eadata
= req_capsule_server_sized_get(pill
, &RMF_MDT_MD
,
601 body
->mbo_eadatasize
);
605 /* save lvb data and length in case this is for layout
609 lvb_len
= body
->mbo_eadatasize
;
612 * We save the reply LOV EA in case we have to replay a
613 * create for recovery. If we didn't allocate a large
614 * enough request buffer above we need to reallocate it
615 * here to hold the actual LOV EA.
617 * To not save LOV EA if request is not going to replay
618 * (for example error one).
620 if ((it
->it_op
& IT_OPEN
) && req
->rq_replay
) {
623 if (req_capsule_get_size(pill
, &RMF_EADATA
,
625 body
->mbo_eadatasize
)
626 mdc_realloc_openmsg(req
, body
);
628 req_capsule_shrink(pill
, &RMF_EADATA
,
629 body
->mbo_eadatasize
,
632 req_capsule_set_size(pill
, &RMF_EADATA
,
634 body
->mbo_eadatasize
);
636 lmm
= req_capsule_client_get(pill
, &RMF_EADATA
);
638 memcpy(lmm
, eadata
, body
->mbo_eadatasize
);
641 } else if (it
->it_op
& IT_LAYOUT
) {
642 /* maybe the lock was granted right away and layout
643 * is packed into RMF_DLM_LVB of req
645 lvb_len
= req_capsule_get_size(pill
, &RMF_DLM_LVB
, RCL_SERVER
);
647 lvb_data
= req_capsule_server_sized_get(pill
,
655 /* fill in stripe data for layout lock */
656 lock
= ldlm_handle2lock(lockh
);
657 if (lock
&& ldlm_has_layout(lock
) && lvb_data
) {
660 LDLM_DEBUG(lock
, "layout lock returned by: %s, lvb_len: %d",
661 ldlm_it2str(it
->it_op
), lvb_len
);
663 lmm
= libcfs_kvzalloc(lvb_len
, GFP_NOFS
);
668 memcpy(lmm
, lvb_data
, lvb_len
);
670 /* install lvb_data */
671 lock_res_and_lock(lock
);
672 if (!lock
->l_lvb_data
) {
673 lock
->l_lvb_type
= LVB_T_LAYOUT
;
674 lock
->l_lvb_data
= lmm
;
675 lock
->l_lvb_len
= lvb_len
;
678 unlock_res_and_lock(lock
);
688 /* We always reserve enough space in the reply packet for a stripe MD, because
689 * we don't know in advance the file type.
691 int mdc_enqueue(struct obd_export
*exp
, struct ldlm_enqueue_info
*einfo
,
692 const union ldlm_policy_data
*policy
,
693 struct lookup_intent
*it
, struct md_op_data
*op_data
,
694 struct lustre_handle
*lockh
, u64 extra_lock_flags
)
696 static const union ldlm_policy_data lookup_policy
= {
697 .l_inodebits
= { MDS_INODELOCK_LOOKUP
}
699 static const union ldlm_policy_data update_policy
= {
700 .l_inodebits
= { MDS_INODELOCK_UPDATE
}
702 static const union ldlm_policy_data layout_policy
= {
703 .l_inodebits
= { MDS_INODELOCK_LAYOUT
}
705 static const union ldlm_policy_data getxattr_policy
= {
706 .l_inodebits
= { MDS_INODELOCK_XATTR
}
708 struct obd_device
*obddev
= class_exp2obd(exp
);
709 struct ptlrpc_request
*req
= NULL
;
710 u64 flags
, saved_flags
= extra_lock_flags
;
711 struct ldlm_res_id res_id
;
712 int generation
, resends
= 0;
713 struct ldlm_reply
*lockrep
;
714 enum lvb_type lvb_type
= LVB_T_NONE
;
717 LASSERTF(!it
|| einfo
->ei_type
== LDLM_IBITS
, "lock type %d\n",
719 fid_build_reg_res_name(&op_data
->op_fid1
, &res_id
);
724 saved_flags
|= LDLM_FL_HAS_INTENT
;
725 if (it
->it_op
& (IT_OPEN
| IT_UNLINK
| IT_GETATTR
| IT_READDIR
))
726 policy
= &update_policy
;
727 else if (it
->it_op
& IT_LAYOUT
)
728 policy
= &layout_policy
;
729 else if (it
->it_op
& (IT_GETXATTR
| IT_SETXATTR
))
730 policy
= &getxattr_policy
;
732 policy
= &lookup_policy
;
735 generation
= obddev
->u
.cli
.cl_import
->imp_generation
;
739 /* The only way right now is FLOCK. */
740 LASSERTF(einfo
->ei_type
== LDLM_FLOCK
, "lock type %d\n",
742 res_id
.name
[3] = LDLM_FLOCK
;
743 } else if (it
->it_op
& IT_OPEN
) {
744 req
= mdc_intent_open_pack(exp
, it
, op_data
);
745 } else if (it
->it_op
& IT_UNLINK
) {
746 req
= mdc_intent_unlink_pack(exp
, it
, op_data
);
747 } else if (it
->it_op
& (IT_GETATTR
| IT_LOOKUP
)) {
748 req
= mdc_intent_getattr_pack(exp
, it
, op_data
);
749 } else if (it
->it_op
& IT_READDIR
) {
750 req
= mdc_enqueue_pack(exp
, 0);
751 } else if (it
->it_op
& IT_LAYOUT
) {
752 if (!imp_connect_lvb_type(class_exp2cliimp(exp
)))
754 req
= mdc_intent_layout_pack(exp
, it
, op_data
);
755 lvb_type
= LVB_T_LAYOUT
;
756 } else if (it
->it_op
& IT_GETXATTR
) {
757 req
= mdc_intent_getxattr_pack(exp
, it
, op_data
);
767 req
->rq_generation_set
= 1;
768 req
->rq_import_generation
= generation
;
769 req
->rq_sent
= ktime_get_real_seconds() + resends
;
772 /* It is important to obtain modify RPC slot first (if applicable), so
773 * that threads that are waiting for a modify RPC slot are not polluting
774 * our rpcs in flight counter.
775 * We do not do flock request limiting, though
778 mdc_get_mod_rpc_slot(req
, it
);
779 rc
= obd_get_request_slot(&obddev
->u
.cli
);
781 mdc_put_mod_rpc_slot(req
, it
);
782 mdc_clear_replay_flag(req
, 0);
783 ptlrpc_req_finished(req
);
788 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, &res_id
, policy
, &flags
, NULL
,
789 0, lvb_type
, lockh
, 0);
791 /* For flock requests we immediately return without further
792 * delay and let caller deal with the rest, since rest of
793 * this function metadata processing makes no sense for flock
794 * requests anyway. But in case of problem during comms with
795 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
796 * can not rely on caller and this mainly for F_UNLCKs
797 * (explicits or automatically generated by Kernel to clean
798 * current FLocks upon exit) that can't be trashed
800 if (((rc
== -EINTR
) || (rc
== -ETIMEDOUT
)) &&
801 (einfo
->ei_type
== LDLM_FLOCK
) &&
802 (einfo
->ei_mode
== LCK_NL
))
807 obd_put_request_slot(&obddev
->u
.cli
);
808 mdc_put_mod_rpc_slot(req
, it
);
811 CDEBUG(D_INFO
, "%s: ldlm_cli_enqueue failed: rc = %d\n",
812 obddev
->obd_name
, rc
);
814 mdc_clear_replay_flag(req
, rc
);
815 ptlrpc_req_finished(req
);
819 lockrep
= req_capsule_server_get(&req
->rq_pill
, &RMF_DLM_REP
);
821 lockrep
->lock_policy_res2
=
822 ptlrpc_status_ntoh(lockrep
->lock_policy_res2
);
825 * Retry infinitely when the server returns -EINPROGRESS for the
826 * intent operation, when server returns -EINPROGRESS for acquiring
827 * intent lock, we'll retry in after_reply().
829 if (it
->it_op
&& (int)lockrep
->lock_policy_res2
== -EINPROGRESS
) {
830 mdc_clear_replay_flag(req
, rc
);
831 ptlrpc_req_finished(req
);
834 CDEBUG(D_HA
, "%s: resend:%d op:%d "DFID
"/"DFID
"\n",
835 obddev
->obd_name
, resends
, it
->it_op
,
836 PFID(&op_data
->op_fid1
), PFID(&op_data
->op_fid2
));
838 if (generation
== obddev
->u
.cli
.cl_import
->imp_generation
) {
841 CDEBUG(D_HA
, "resend cross eviction\n");
846 rc
= mdc_finish_enqueue(exp
, req
, einfo
, it
, lockh
, rc
);
848 if (lustre_handle_is_used(lockh
)) {
849 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
850 memset(lockh
, 0, sizeof(*lockh
));
852 ptlrpc_req_finished(req
);
854 it
->it_lock_handle
= 0;
855 it
->it_lock_mode
= 0;
856 it
->it_request
= NULL
;
862 static int mdc_finish_intent_lock(struct obd_export
*exp
,
863 struct ptlrpc_request
*request
,
864 struct md_op_data
*op_data
,
865 struct lookup_intent
*it
,
866 struct lustre_handle
*lockh
)
868 struct lustre_handle old_lock
;
869 struct mdt_body
*mdt_body
;
870 struct ldlm_lock
*lock
;
873 LASSERT(request
!= LP_POISON
);
874 LASSERT(request
->rq_repmsg
!= LP_POISON
);
876 if (it
->it_op
& IT_READDIR
)
879 if (!it_disposition(it
, DISP_IT_EXECD
)) {
880 /* The server failed before it even started executing the
881 * intent, i.e. because it couldn't unpack the request.
883 LASSERT(it
->it_status
!= 0);
884 return it
->it_status
;
886 rc
= it_open_error(DISP_IT_EXECD
, it
);
890 mdt_body
= req_capsule_server_get(&request
->rq_pill
, &RMF_MDT_BODY
);
891 LASSERT(mdt_body
); /* mdc_enqueue checked */
893 rc
= it_open_error(DISP_LOOKUP_EXECD
, it
);
897 /* keep requests around for the multiple phases of the call
898 * this shows the DISP_XX must guarantee we make it into the call
900 if (!it_disposition(it
, DISP_ENQ_CREATE_REF
) &&
901 it_disposition(it
, DISP_OPEN_CREATE
) &&
902 !it_open_error(DISP_OPEN_CREATE
, it
)) {
903 it_set_disposition(it
, DISP_ENQ_CREATE_REF
);
904 ptlrpc_request_addref(request
); /* balanced in ll_create_node */
906 if (!it_disposition(it
, DISP_ENQ_OPEN_REF
) &&
907 it_disposition(it
, DISP_OPEN_OPEN
) &&
908 !it_open_error(DISP_OPEN_OPEN
, it
)) {
909 it_set_disposition(it
, DISP_ENQ_OPEN_REF
);
910 ptlrpc_request_addref(request
); /* balanced in ll_file_open */
911 /* BUG 11546 - eviction in the middle of open rpc processing */
912 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE
, obd_timeout
);
915 if (it
->it_op
& IT_CREAT
) {
916 /* XXX this belongs in ll_create_it */
917 } else if (it
->it_op
== IT_OPEN
) {
918 LASSERT(!it_disposition(it
, DISP_OPEN_CREATE
));
920 LASSERT(it
->it_op
& (IT_GETATTR
| IT_LOOKUP
| IT_LAYOUT
));
923 /* If we already have a matching lock, then cancel the new
924 * one. We have to set the data here instead of in
925 * mdc_enqueue, because we need to use the child's inode as
926 * the l_ast_data to match, and that's not available until
927 * intent_finish has performed the iget().)
929 lock
= ldlm_handle2lock(lockh
);
931 union ldlm_policy_data policy
= lock
->l_policy_data
;
933 LDLM_DEBUG(lock
, "matching against this");
935 LASSERTF(fid_res_name_eq(&mdt_body
->mbo_fid1
,
936 &lock
->l_resource
->lr_name
),
937 "Lock res_id: "DLDLMRES
", fid: "DFID
"\n",
938 PLDLMRES(lock
->l_resource
), PFID(&mdt_body
->mbo_fid1
));
941 memcpy(&old_lock
, lockh
, sizeof(*lockh
));
942 if (ldlm_lock_match(NULL
, LDLM_FL_BLOCK_GRANTED
, NULL
,
943 LDLM_IBITS
, &policy
, LCK_NL
,
945 ldlm_lock_decref_and_cancel(lockh
,
947 memcpy(lockh
, &old_lock
, sizeof(old_lock
));
948 it
->it_lock_handle
= lockh
->cookie
;
952 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
953 (int)op_data
->op_namelen
, op_data
->op_name
,
954 ldlm_it2str(it
->it_op
), it
->it_status
, it
->it_disposition
, rc
);
958 int mdc_revalidate_lock(struct obd_export
*exp
, struct lookup_intent
*it
,
959 struct lu_fid
*fid
, __u64
*bits
)
961 /* We could just return 1 immediately, but since we should only
962 * be called in revalidate_it if we already have a lock, let's
965 struct ldlm_res_id res_id
;
966 struct lustre_handle lockh
;
967 union ldlm_policy_data policy
;
970 if (it
->it_lock_handle
) {
971 lockh
.cookie
= it
->it_lock_handle
;
972 mode
= ldlm_revalidate_lock_handle(&lockh
, bits
);
974 fid_build_reg_res_name(fid
, &res_id
);
977 /* File attributes are held under multiple bits:
978 * nlink is under lookup lock, size and times are
979 * under UPDATE lock and recently we've also got
980 * a separate permissions lock for owner/group/acl that
981 * were protected by lookup lock before.
982 * Getattr must provide all of that information,
983 * so we need to ensure we have all of those locks.
984 * Unfortunately, if the bits are split across multiple
985 * locks, there's no easy way to match all of them here,
986 * so an extra RPC would be performed to fetch all
987 * of those bits at once for now.
989 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
990 * but for old MDTs (< 2.4), permission is covered
991 * by LOOKUP lock, so it needs to match all bits here.
993 policy
.l_inodebits
.bits
= MDS_INODELOCK_UPDATE
|
994 MDS_INODELOCK_LOOKUP
|
998 policy
.l_inodebits
.bits
= MDS_INODELOCK_UPDATE
;
1001 policy
.l_inodebits
.bits
= MDS_INODELOCK_LAYOUT
;
1004 policy
.l_inodebits
.bits
= MDS_INODELOCK_LOOKUP
;
1008 mode
= mdc_lock_match(exp
, LDLM_FL_BLOCK_GRANTED
, fid
,
1009 LDLM_IBITS
, &policy
,
1010 LCK_CR
| LCK_CW
| LCK_PR
| LCK_PW
,
1015 it
->it_lock_handle
= lockh
.cookie
;
1016 it
->it_lock_mode
= mode
;
1018 it
->it_lock_handle
= 0;
1019 it
->it_lock_mode
= 0;
1026 * This long block is all about fixing up the lock and request state
1027 * so that it is correct as of the moment _before_ the operation was
1028 * applied; that way, the VFS will think that everything is normal and
1029 * call Lustre's regular VFS methods.
1031 * If we're performing a creation, that means that unless the creation
1032 * failed with EEXIST, we should fake up a negative dentry.
1034 * For everything else, we want to lookup to succeed.
1036 * One additional note: if CREATE or OPEN succeeded, we add an extra
1037 * reference to the request because we need to keep it around until
1038 * ll_create/ll_open gets called.
1040 * The server will return to us, in it_disposition, an indication of
1041 * exactly what it_status refers to.
1043 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1044 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1045 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1046 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1049 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1052 int mdc_intent_lock(struct obd_export
*exp
, struct md_op_data
*op_data
,
1053 struct lookup_intent
*it
, struct ptlrpc_request
**reqp
,
1054 ldlm_blocking_callback cb_blocking
, __u64 extra_lock_flags
)
1056 struct ldlm_enqueue_info einfo
= {
1057 .ei_type
= LDLM_IBITS
,
1058 .ei_mode
= it_to_lock_mode(it
),
1059 .ei_cb_bl
= cb_blocking
,
1060 .ei_cb_cp
= ldlm_completion_ast
,
1062 struct lustre_handle lockh
;
1067 CDEBUG(D_DLMTRACE
, "(name: %.*s,"DFID
") in obj "DFID
1068 ", intent: %s flags %#Lo\n", (int)op_data
->op_namelen
,
1069 op_data
->op_name
, PFID(&op_data
->op_fid2
),
1070 PFID(&op_data
->op_fid1
), ldlm_it2str(it
->it_op
),
1074 if (fid_is_sane(&op_data
->op_fid2
) &&
1075 (it
->it_op
& (IT_LOOKUP
| IT_GETATTR
| IT_READDIR
))) {
1076 /* We could just return 1 immediately, but since we should only
1077 * be called in revalidate_it if we already have a lock, let's
1080 it
->it_lock_handle
= 0;
1081 rc
= mdc_revalidate_lock(exp
, it
, &op_data
->op_fid2
, NULL
);
1082 /* Only return failure if it was not GETATTR by cfid
1083 * (from inode_revalidate)
1085 if (rc
|| op_data
->op_namelen
!= 0)
1089 /* For case if upper layer did not alloc fid, do it now. */
1090 if (!fid_is_sane(&op_data
->op_fid2
) && it
->it_op
& IT_CREAT
) {
1091 rc
= mdc_fid_alloc(NULL
, exp
, &op_data
->op_fid2
, op_data
);
1093 CERROR("Can't alloc new fid, rc %d\n", rc
);
1097 rc
= mdc_enqueue(exp
, &einfo
, NULL
, it
, op_data
, &lockh
,
1102 *reqp
= it
->it_request
;
1103 rc
= mdc_finish_intent_lock(exp
, *reqp
, op_data
, it
, &lockh
);
1107 static int mdc_intent_getattr_async_interpret(const struct lu_env
*env
,
1108 struct ptlrpc_request
*req
,
1111 struct mdc_getattr_args
*ga
= args
;
1112 struct obd_export
*exp
= ga
->ga_exp
;
1113 struct md_enqueue_info
*minfo
= ga
->ga_minfo
;
1114 struct ldlm_enqueue_info
*einfo
= ga
->ga_einfo
;
1115 struct lookup_intent
*it
;
1116 struct lustre_handle
*lockh
;
1117 struct obd_device
*obddev
;
1118 struct ldlm_reply
*lockrep
;
1119 __u64 flags
= LDLM_FL_HAS_INTENT
;
1122 lockh
= &minfo
->mi_lockh
;
1124 obddev
= class_exp2obd(exp
);
1126 obd_put_request_slot(&obddev
->u
.cli
);
1127 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE
))
1130 rc
= ldlm_cli_enqueue_fini(exp
, req
, einfo
->ei_type
, 1, einfo
->ei_mode
,
1131 &flags
, NULL
, 0, lockh
, rc
);
1133 CERROR("ldlm_cli_enqueue_fini: %d\n", rc
);
1134 mdc_clear_replay_flag(req
, rc
);
1138 lockrep
= req_capsule_server_get(&req
->rq_pill
, &RMF_DLM_REP
);
1140 lockrep
->lock_policy_res2
=
1141 ptlrpc_status_ntoh(lockrep
->lock_policy_res2
);
1143 rc
= mdc_finish_enqueue(exp
, req
, einfo
, it
, lockh
, rc
);
1147 rc
= mdc_finish_intent_lock(exp
, req
, &minfo
->mi_data
, it
, lockh
);
1151 minfo
->mi_cb(req
, minfo
, rc
);
1155 int mdc_intent_getattr_async(struct obd_export
*exp
,
1156 struct md_enqueue_info
*minfo
,
1157 struct ldlm_enqueue_info
*einfo
)
1159 struct md_op_data
*op_data
= &minfo
->mi_data
;
1160 struct lookup_intent
*it
= &minfo
->mi_it
;
1161 struct ptlrpc_request
*req
;
1162 struct mdc_getattr_args
*ga
;
1163 struct obd_device
*obddev
= class_exp2obd(exp
);
1164 struct ldlm_res_id res_id
;
1165 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1166 * for statahead currently. Consider CMD in future, such two bits
1167 * maybe managed by different MDS, should be adjusted then.
1169 union ldlm_policy_data policy
= {
1170 .l_inodebits
= { MDS_INODELOCK_LOOKUP
| MDS_INODELOCK_UPDATE
}
1173 __u64 flags
= LDLM_FL_HAS_INTENT
;
1176 "name: %.*s in inode " DFID
", intent: %s flags %#Lo\n",
1177 (int)op_data
->op_namelen
, op_data
->op_name
,
1178 PFID(&op_data
->op_fid1
), ldlm_it2str(it
->it_op
), it
->it_flags
);
1180 fid_build_reg_res_name(&op_data
->op_fid1
, &res_id
);
1181 req
= mdc_intent_getattr_pack(exp
, it
, op_data
);
1183 return PTR_ERR(req
);
1185 rc
= obd_get_request_slot(&obddev
->u
.cli
);
1187 ptlrpc_req_finished(req
);
1191 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, &res_id
, &policy
, &flags
, NULL
,
1192 0, LVB_T_NONE
, &minfo
->mi_lockh
, 1);
1194 obd_put_request_slot(&obddev
->u
.cli
);
1195 ptlrpc_req_finished(req
);
1199 CLASSERT(sizeof(*ga
) <= sizeof(req
->rq_async_args
));
1200 ga
= ptlrpc_req_async_args(req
);
1202 ga
->ga_minfo
= minfo
;
1203 ga
->ga_einfo
= einfo
;
1205 req
->rq_interpret_reply
= mdc_intent_getattr_async_interpret
;
1206 ptlrpcd_add_req(req
);