4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 # include <linux/module.h>
37 #include "../include/lustre_intent.h"
38 #include "../include/obd.h"
39 #include "../include/obd_class.h"
40 #include "../include/lustre_dlm.h"
41 #include "../include/lustre_fid.h"
42 #include "../include/lustre_mdc.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre_req_layout.h"
45 #include "../include/lustre_swab.h"
47 #include "mdc_internal.h"
49 struct mdc_getattr_args
{
50 struct obd_export
*ga_exp
;
51 struct md_enqueue_info
*ga_minfo
;
54 int it_open_error(int phase
, struct lookup_intent
*it
)
56 if (it_disposition(it
, DISP_OPEN_LEASE
)) {
57 if (phase
>= DISP_OPEN_LEASE
)
62 if (it_disposition(it
, DISP_OPEN_OPEN
)) {
63 if (phase
>= DISP_OPEN_OPEN
)
69 if (it_disposition(it
, DISP_OPEN_CREATE
)) {
70 if (phase
>= DISP_OPEN_CREATE
)
76 if (it_disposition(it
, DISP_LOOKUP_EXECD
)) {
77 if (phase
>= DISP_LOOKUP_EXECD
)
83 if (it_disposition(it
, DISP_IT_EXECD
)) {
84 if (phase
>= DISP_IT_EXECD
)
89 CERROR("it disp: %X, status: %d\n", it
->it_disposition
,
94 EXPORT_SYMBOL(it_open_error
);
96 /* this must be called on a lockh that is known to have a referenced lock */
97 int mdc_set_lock_data(struct obd_export
*exp
, const struct lustre_handle
*lockh
,
98 void *data
, __u64
*bits
)
100 struct ldlm_lock
*lock
;
101 struct inode
*new_inode
= data
;
106 if (!lustre_handle_is_used(lockh
))
109 lock
= ldlm_handle2lock(lockh
);
112 lock_res_and_lock(lock
);
113 if (lock
->l_resource
->lr_lvb_inode
&&
114 lock
->l_resource
->lr_lvb_inode
!= data
) {
115 struct inode
*old_inode
= lock
->l_resource
->lr_lvb_inode
;
117 LASSERTF(old_inode
->i_state
& I_FREEING
,
118 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
119 old_inode
, old_inode
->i_ino
, old_inode
->i_generation
,
120 old_inode
->i_state
, new_inode
, new_inode
->i_ino
,
121 new_inode
->i_generation
);
123 lock
->l_resource
->lr_lvb_inode
= new_inode
;
125 *bits
= lock
->l_policy_data
.l_inodebits
.bits
;
127 unlock_res_and_lock(lock
);
133 enum ldlm_mode
mdc_lock_match(struct obd_export
*exp
, __u64 flags
,
134 const struct lu_fid
*fid
, enum ldlm_type type
,
135 union ldlm_policy_data
*policy
,
137 struct lustre_handle
*lockh
)
139 struct ldlm_res_id res_id
;
142 fid_build_reg_res_name(fid
, &res_id
);
143 /* LU-4405: Clear bits not supported by server */
144 policy
->l_inodebits
.bits
&= exp_connect_ibits(exp
);
145 rc
= ldlm_lock_match(class_exp2obd(exp
)->obd_namespace
, flags
,
146 &res_id
, type
, policy
, mode
, lockh
, 0);
150 int mdc_cancel_unused(struct obd_export
*exp
,
151 const struct lu_fid
*fid
,
152 union ldlm_policy_data
*policy
,
154 enum ldlm_cancel_flags flags
,
157 struct ldlm_res_id res_id
;
158 struct obd_device
*obd
= class_exp2obd(exp
);
161 fid_build_reg_res_name(fid
, &res_id
);
162 rc
= ldlm_cli_cancel_unused_resource(obd
->obd_namespace
, &res_id
,
163 policy
, mode
, flags
, opaque
);
167 int mdc_null_inode(struct obd_export
*exp
,
168 const struct lu_fid
*fid
)
170 struct ldlm_res_id res_id
;
171 struct ldlm_resource
*res
;
172 struct ldlm_namespace
*ns
= class_exp2obd(exp
)->obd_namespace
;
174 LASSERTF(ns
, "no namespace passed\n");
176 fid_build_reg_res_name(fid
, &res_id
);
178 res
= ldlm_resource_get(ns
, NULL
, &res_id
, 0, 0);
183 res
->lr_lvb_inode
= NULL
;
186 ldlm_resource_putref(res
);
190 static inline void mdc_clear_replay_flag(struct ptlrpc_request
*req
, int rc
)
192 /* Don't hold error requests for replay. */
193 if (req
->rq_replay
) {
194 spin_lock(&req
->rq_lock
);
196 spin_unlock(&req
->rq_lock
);
198 if (rc
&& req
->rq_transno
!= 0) {
199 DEBUG_REQ(D_ERROR
, req
, "transno returned on error rc %d", rc
);
204 /* Save a large LOV EA into the request buffer so that it is available
205 * for replay. We don't do this in the initial request because the
206 * original request doesn't need this buffer (at most it sends just the
207 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
208 * buffer and may also be difficult to allocate and save a very large
209 * request buffer for each open. (bug 5707)
211 * OOM here may cause recovery failure if lmm is needed (only for the
212 * original open if the MDS crashed just when this client also OOM'd)
213 * but this is incredibly unlikely, and questionable whether the client
214 * could do MDS recovery under OOM anyways...
216 static void mdc_realloc_openmsg(struct ptlrpc_request
*req
,
217 struct mdt_body
*body
)
221 /* FIXME: remove this explicit offset. */
222 rc
= sptlrpc_cli_enlarge_reqbuf(req
, DLM_INTENT_REC_OFF
+ 4,
223 body
->mbo_eadatasize
);
225 CERROR("Can't enlarge segment %d size to %d\n",
226 DLM_INTENT_REC_OFF
+ 4, body
->mbo_eadatasize
);
227 body
->mbo_valid
&= ~OBD_MD_FLEASIZE
;
228 body
->mbo_eadatasize
= 0;
232 static struct ptlrpc_request
*
233 mdc_intent_open_pack(struct obd_export
*exp
, struct lookup_intent
*it
,
234 struct md_op_data
*op_data
)
236 struct ptlrpc_request
*req
;
237 struct obd_device
*obddev
= class_exp2obd(exp
);
238 struct ldlm_intent
*lit
;
239 const void *lmm
= op_data
->op_data
;
240 u32 lmmsize
= op_data
->op_data_size
;
246 it
->it_create_mode
= (it
->it_create_mode
& ~S_IFMT
) | S_IFREG
;
248 /* XXX: openlock is not cancelled for cross-refs. */
249 /* If inode is known, cancel conflicting OPEN locks. */
250 if (fid_is_sane(&op_data
->op_fid2
)) {
251 if (it
->it_flags
& MDS_OPEN_LEASE
) { /* try to get lease */
252 if (it
->it_flags
& FMODE_WRITE
)
257 if (it
->it_flags
& (FMODE_WRITE
| MDS_OPEN_TRUNC
))
259 else if (it
->it_flags
& __FMODE_EXEC
)
264 count
= mdc_resource_get_unused(exp
, &op_data
->op_fid2
,
269 /* If CREATE, cancel parent's UPDATE lock. */
270 if (it
->it_op
& IT_CREAT
)
274 count
+= mdc_resource_get_unused(exp
, &op_data
->op_fid1
,
276 MDS_INODELOCK_UPDATE
);
278 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
279 &RQF_LDLM_INTENT_OPEN
);
281 ldlm_lock_list_put(&cancels
, l_bl_ast
, count
);
282 return ERR_PTR(-ENOMEM
);
285 req_capsule_set_size(&req
->rq_pill
, &RMF_NAME
, RCL_CLIENT
,
286 op_data
->op_namelen
+ 1);
287 req_capsule_set_size(&req
->rq_pill
, &RMF_EADATA
, RCL_CLIENT
,
288 max(lmmsize
, obddev
->u
.cli
.cl_default_mds_easize
));
290 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, count
);
292 ptlrpc_request_free(req
);
296 spin_lock(&req
->rq_lock
);
297 req
->rq_replay
= req
->rq_import
->imp_replayable
;
298 spin_unlock(&req
->rq_lock
);
300 /* pack the intent */
301 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
302 lit
->opc
= (__u64
)it
->it_op
;
304 /* pack the intended request */
305 mdc_open_pack(req
, op_data
, it
->it_create_mode
, 0, it
->it_flags
, lmm
,
308 req_capsule_set_size(&req
->rq_pill
, &RMF_MDT_MD
, RCL_SERVER
,
309 obddev
->u
.cli
.cl_max_mds_easize
);
311 ptlrpc_request_set_replen(req
);
315 static struct ptlrpc_request
*
316 mdc_intent_getxattr_pack(struct obd_export
*exp
,
317 struct lookup_intent
*it
,
318 struct md_op_data
*op_data
)
320 struct ptlrpc_request
*req
;
321 struct ldlm_intent
*lit
;
326 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
327 &RQF_LDLM_INTENT_GETXATTR
);
329 return ERR_PTR(-ENOMEM
);
331 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, count
);
333 ptlrpc_request_free(req
);
337 /* pack the intent */
338 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
339 lit
->opc
= IT_GETXATTR
;
341 maxdata
= class_exp2cliimp(exp
)->imp_connect_data
.ocd_max_easize
;
343 /* pack the intended request */
344 mdc_pack_body(req
, &op_data
->op_fid1
, op_data
->op_valid
, maxdata
, -1,
347 req_capsule_set_size(&req
->rq_pill
, &RMF_EADATA
, RCL_SERVER
, maxdata
);
349 req_capsule_set_size(&req
->rq_pill
, &RMF_EAVALS
, RCL_SERVER
, maxdata
);
351 req_capsule_set_size(&req
->rq_pill
, &RMF_EAVALS_LENS
,
352 RCL_SERVER
, maxdata
);
354 ptlrpc_request_set_replen(req
);
359 static struct ptlrpc_request
*mdc_intent_unlink_pack(struct obd_export
*exp
,
360 struct lookup_intent
*it
,
361 struct md_op_data
*op_data
)
363 struct ptlrpc_request
*req
;
364 struct obd_device
*obddev
= class_exp2obd(exp
);
365 struct ldlm_intent
*lit
;
368 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
369 &RQF_LDLM_INTENT_UNLINK
);
371 return ERR_PTR(-ENOMEM
);
373 req_capsule_set_size(&req
->rq_pill
, &RMF_NAME
, RCL_CLIENT
,
374 op_data
->op_namelen
+ 1);
376 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
378 ptlrpc_request_free(req
);
382 /* pack the intent */
383 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
384 lit
->opc
= (__u64
)it
->it_op
;
386 /* pack the intended request */
387 mdc_unlink_pack(req
, op_data
);
389 req_capsule_set_size(&req
->rq_pill
, &RMF_MDT_MD
, RCL_SERVER
,
390 obddev
->u
.cli
.cl_default_mds_easize
);
391 ptlrpc_request_set_replen(req
);
395 static struct ptlrpc_request
*mdc_intent_getattr_pack(struct obd_export
*exp
,
396 struct lookup_intent
*it
,
397 struct md_op_data
*op_data
)
399 struct ptlrpc_request
*req
;
400 struct obd_device
*obddev
= class_exp2obd(exp
);
401 u64 valid
= OBD_MD_FLGETATTR
| OBD_MD_FLEASIZE
|
402 OBD_MD_FLMODEASIZE
| OBD_MD_FLDIREA
|
403 OBD_MD_MEA
| OBD_MD_FLACL
;
404 struct ldlm_intent
*lit
;
408 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
409 &RQF_LDLM_INTENT_GETATTR
);
411 return ERR_PTR(-ENOMEM
);
413 req_capsule_set_size(&req
->rq_pill
, &RMF_NAME
, RCL_CLIENT
,
414 op_data
->op_namelen
+ 1);
416 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
418 ptlrpc_request_free(req
);
422 /* pack the intent */
423 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
424 lit
->opc
= (__u64
)it
->it_op
;
426 if (obddev
->u
.cli
.cl_default_mds_easize
> 0)
427 easize
= obddev
->u
.cli
.cl_default_mds_easize
;
429 easize
= obddev
->u
.cli
.cl_max_mds_easize
;
431 /* pack the intended request */
432 mdc_getattr_pack(req
, valid
, it
->it_flags
, op_data
, easize
);
434 req_capsule_set_size(&req
->rq_pill
, &RMF_MDT_MD
, RCL_SERVER
, easize
);
435 ptlrpc_request_set_replen(req
);
439 static struct ptlrpc_request
*mdc_intent_layout_pack(struct obd_export
*exp
,
440 struct lookup_intent
*it
,
441 struct md_op_data
*unused
)
443 struct obd_device
*obd
= class_exp2obd(exp
);
444 struct ptlrpc_request
*req
;
445 struct ldlm_intent
*lit
;
446 struct layout_intent
*layout
;
449 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
450 &RQF_LDLM_INTENT_LAYOUT
);
452 return ERR_PTR(-ENOMEM
);
454 req_capsule_set_size(&req
->rq_pill
, &RMF_EADATA
, RCL_CLIENT
, 0);
455 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
457 ptlrpc_request_free(req
);
461 /* pack the intent */
462 lit
= req_capsule_client_get(&req
->rq_pill
, &RMF_LDLM_INTENT
);
463 lit
->opc
= (__u64
)it
->it_op
;
465 /* pack the layout intent request */
466 layout
= req_capsule_client_get(&req
->rq_pill
, &RMF_LAYOUT_INTENT
);
467 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
468 * set for replication
470 layout
->li_opc
= LAYOUT_INTENT_ACCESS
;
472 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
,
473 obd
->u
.cli
.cl_default_mds_easize
);
474 ptlrpc_request_set_replen(req
);
478 static struct ptlrpc_request
*
479 mdc_enqueue_pack(struct obd_export
*exp
, int lvb_len
)
481 struct ptlrpc_request
*req
;
484 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_LDLM_ENQUEUE
);
486 return ERR_PTR(-ENOMEM
);
488 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
490 ptlrpc_request_free(req
);
494 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
, lvb_len
);
495 ptlrpc_request_set_replen(req
);
499 static int mdc_finish_enqueue(struct obd_export
*exp
,
500 struct ptlrpc_request
*req
,
501 struct ldlm_enqueue_info
*einfo
,
502 struct lookup_intent
*it
,
503 struct lustre_handle
*lockh
,
506 struct req_capsule
*pill
= &req
->rq_pill
;
507 struct ldlm_request
*lockreq
;
508 struct ldlm_reply
*lockrep
;
509 struct ldlm_lock
*lock
;
510 void *lvb_data
= NULL
;
514 /* Similarly, if we're going to replay this request, we don't want to
515 * actually get a lock, just perform the intent.
517 if (req
->rq_transno
|| req
->rq_replay
) {
518 lockreq
= req_capsule_client_get(pill
, &RMF_DLM_REQ
);
519 lockreq
->lock_flags
|= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY
);
522 if (rc
== ELDLM_LOCK_ABORTED
) {
524 memset(lockh
, 0, sizeof(*lockh
));
526 } else { /* rc = 0 */
527 lock
= ldlm_handle2lock(lockh
);
529 /* If the server gave us back a different lock mode, we should
530 * fix up our variables.
532 if (lock
->l_req_mode
!= einfo
->ei_mode
) {
533 ldlm_lock_addref(lockh
, lock
->l_req_mode
);
534 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
535 einfo
->ei_mode
= lock
->l_req_mode
;
540 lockrep
= req_capsule_server_get(pill
, &RMF_DLM_REP
);
542 it
->it_disposition
= (int)lockrep
->lock_policy_res1
;
543 it
->it_status
= (int)lockrep
->lock_policy_res2
;
544 it
->it_lock_mode
= einfo
->ei_mode
;
545 it
->it_lock_handle
= lockh
->cookie
;
546 it
->it_request
= req
;
548 /* Technically speaking rq_transno must already be zero if
549 * it_status is in error, so the check is a bit redundant
551 if ((!req
->rq_transno
|| it
->it_status
< 0) && req
->rq_replay
)
552 mdc_clear_replay_flag(req
, it
->it_status
);
554 /* If we're doing an IT_OPEN which did not result in an actual
555 * successful open, then we need to remove the bit which saves
556 * this request for unconditional replay.
558 * It's important that we do this first! Otherwise we might exit the
559 * function without doing so, and try to replay a failed create
562 if (it
->it_op
& IT_OPEN
&& req
->rq_replay
&&
563 (!it_disposition(it
, DISP_OPEN_OPEN
) || it
->it_status
!= 0))
564 mdc_clear_replay_flag(req
, it
->it_status
);
566 DEBUG_REQ(D_RPCTRACE
, req
, "op: %d disposition: %x, status: %d",
567 it
->it_op
, it
->it_disposition
, it
->it_status
);
569 /* We know what to expect, so we do any byte flipping required here */
570 if (it
->it_op
& (IT_OPEN
| IT_UNLINK
| IT_LOOKUP
| IT_GETATTR
)) {
571 struct mdt_body
*body
;
573 body
= req_capsule_server_get(pill
, &RMF_MDT_BODY
);
575 CERROR("Can't swab mdt_body\n");
579 if (it_disposition(it
, DISP_OPEN_OPEN
) &&
580 !it_open_error(DISP_OPEN_OPEN
, it
)) {
582 * If this is a successful OPEN request, we need to set
583 * replay handler and data early, so that if replay
584 * happens immediately after swabbing below, new reply
585 * is swabbed by that handler correctly.
587 mdc_set_open_replay_data(NULL
, NULL
, it
);
590 if ((body
->mbo_valid
& (OBD_MD_FLDIREA
| OBD_MD_FLEASIZE
)) != 0) {
593 mdc_update_max_ea_from_body(exp
, body
);
596 * The eadata is opaque; just check that it is there.
597 * Eventually, obd_unpackmd() will check the contents.
599 eadata
= req_capsule_server_sized_get(pill
, &RMF_MDT_MD
,
600 body
->mbo_eadatasize
);
604 /* save lvb data and length in case this is for layout
608 lvb_len
= body
->mbo_eadatasize
;
611 * We save the reply LOV EA in case we have to replay a
612 * create for recovery. If we didn't allocate a large
613 * enough request buffer above we need to reallocate it
614 * here to hold the actual LOV EA.
616 * To not save LOV EA if request is not going to replay
617 * (for example error one).
619 if ((it
->it_op
& IT_OPEN
) && req
->rq_replay
) {
622 if (req_capsule_get_size(pill
, &RMF_EADATA
,
624 body
->mbo_eadatasize
)
625 mdc_realloc_openmsg(req
, body
);
627 req_capsule_shrink(pill
, &RMF_EADATA
,
628 body
->mbo_eadatasize
,
631 req_capsule_set_size(pill
, &RMF_EADATA
,
633 body
->mbo_eadatasize
);
635 lmm
= req_capsule_client_get(pill
, &RMF_EADATA
);
637 memcpy(lmm
, eadata
, body
->mbo_eadatasize
);
640 } else if (it
->it_op
& IT_LAYOUT
) {
641 /* maybe the lock was granted right away and layout
642 * is packed into RMF_DLM_LVB of req
644 lvb_len
= req_capsule_get_size(pill
, &RMF_DLM_LVB
, RCL_SERVER
);
646 lvb_data
= req_capsule_server_sized_get(pill
,
654 /* fill in stripe data for layout lock */
655 lock
= ldlm_handle2lock(lockh
);
656 if (lock
&& ldlm_has_layout(lock
) && lvb_data
) {
659 LDLM_DEBUG(lock
, "layout lock returned by: %s, lvb_len: %d",
660 ldlm_it2str(it
->it_op
), lvb_len
);
662 lmm
= libcfs_kvzalloc(lvb_len
, GFP_NOFS
);
667 memcpy(lmm
, lvb_data
, lvb_len
);
669 /* install lvb_data */
670 lock_res_and_lock(lock
);
671 if (!lock
->l_lvb_data
) {
672 lock
->l_lvb_type
= LVB_T_LAYOUT
;
673 lock
->l_lvb_data
= lmm
;
674 lock
->l_lvb_len
= lvb_len
;
677 unlock_res_and_lock(lock
);
687 /* We always reserve enough space in the reply packet for a stripe MD, because
688 * we don't know in advance the file type.
690 int mdc_enqueue(struct obd_export
*exp
, struct ldlm_enqueue_info
*einfo
,
691 const union ldlm_policy_data
*policy
,
692 struct lookup_intent
*it
, struct md_op_data
*op_data
,
693 struct lustre_handle
*lockh
, u64 extra_lock_flags
)
695 static const union ldlm_policy_data lookup_policy
= {
696 .l_inodebits
= { MDS_INODELOCK_LOOKUP
}
698 static const union ldlm_policy_data update_policy
= {
699 .l_inodebits
= { MDS_INODELOCK_UPDATE
}
701 static const union ldlm_policy_data layout_policy
= {
702 .l_inodebits
= { MDS_INODELOCK_LAYOUT
}
704 static const union ldlm_policy_data getxattr_policy
= {
705 .l_inodebits
= { MDS_INODELOCK_XATTR
}
707 struct obd_device
*obddev
= class_exp2obd(exp
);
708 struct ptlrpc_request
*req
= NULL
;
709 u64 flags
, saved_flags
= extra_lock_flags
;
710 struct ldlm_res_id res_id
;
711 int generation
, resends
= 0;
712 struct ldlm_reply
*lockrep
;
713 enum lvb_type lvb_type
= LVB_T_NONE
;
716 LASSERTF(!it
|| einfo
->ei_type
== LDLM_IBITS
, "lock type %d\n",
718 fid_build_reg_res_name(&op_data
->op_fid1
, &res_id
);
723 saved_flags
|= LDLM_FL_HAS_INTENT
;
724 if (it
->it_op
& (IT_UNLINK
| IT_GETATTR
| IT_READDIR
))
725 policy
= &update_policy
;
726 else if (it
->it_op
& IT_LAYOUT
)
727 policy
= &layout_policy
;
728 else if (it
->it_op
& (IT_GETXATTR
| IT_SETXATTR
))
729 policy
= &getxattr_policy
;
731 policy
= &lookup_policy
;
734 generation
= obddev
->u
.cli
.cl_import
->imp_generation
;
738 /* The only way right now is FLOCK. */
739 LASSERTF(einfo
->ei_type
== LDLM_FLOCK
, "lock type %d\n",
741 res_id
.name
[3] = LDLM_FLOCK
;
742 } else if (it
->it_op
& IT_OPEN
) {
743 req
= mdc_intent_open_pack(exp
, it
, op_data
);
744 } else if (it
->it_op
& IT_UNLINK
) {
745 req
= mdc_intent_unlink_pack(exp
, it
, op_data
);
746 } else if (it
->it_op
& (IT_GETATTR
| IT_LOOKUP
)) {
747 req
= mdc_intent_getattr_pack(exp
, it
, op_data
);
748 } else if (it
->it_op
& IT_READDIR
) {
749 req
= mdc_enqueue_pack(exp
, 0);
750 } else if (it
->it_op
& IT_LAYOUT
) {
751 if (!imp_connect_lvb_type(class_exp2cliimp(exp
)))
753 req
= mdc_intent_layout_pack(exp
, it
, op_data
);
754 lvb_type
= LVB_T_LAYOUT
;
755 } else if (it
->it_op
& IT_GETXATTR
) {
756 req
= mdc_intent_getxattr_pack(exp
, it
, op_data
);
766 req
->rq_generation_set
= 1;
767 req
->rq_import_generation
= generation
;
768 req
->rq_sent
= ktime_get_real_seconds() + resends
;
771 /* It is important to obtain modify RPC slot first (if applicable), so
772 * that threads that are waiting for a modify RPC slot are not polluting
773 * our rpcs in flight counter.
774 * We do not do flock request limiting, though
777 mdc_get_mod_rpc_slot(req
, it
);
778 rc
= obd_get_request_slot(&obddev
->u
.cli
);
780 mdc_put_mod_rpc_slot(req
, it
);
781 mdc_clear_replay_flag(req
, 0);
782 ptlrpc_req_finished(req
);
787 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, &res_id
, policy
, &flags
, NULL
,
788 0, lvb_type
, lockh
, 0);
790 /* For flock requests we immediately return without further
791 * delay and let caller deal with the rest, since rest of
792 * this function metadata processing makes no sense for flock
793 * requests anyway. But in case of problem during comms with
794 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
795 * can not rely on caller and this mainly for F_UNLCKs
796 * (explicits or automatically generated by Kernel to clean
797 * current FLocks upon exit) that can't be trashed
799 if (((rc
== -EINTR
) || (rc
== -ETIMEDOUT
)) &&
800 (einfo
->ei_type
== LDLM_FLOCK
) &&
801 (einfo
->ei_mode
== LCK_NL
))
806 obd_put_request_slot(&obddev
->u
.cli
);
807 mdc_put_mod_rpc_slot(req
, it
);
810 CDEBUG(D_INFO
, "%s: ldlm_cli_enqueue failed: rc = %d\n",
811 obddev
->obd_name
, rc
);
813 mdc_clear_replay_flag(req
, rc
);
814 ptlrpc_req_finished(req
);
818 lockrep
= req_capsule_server_get(&req
->rq_pill
, &RMF_DLM_REP
);
820 lockrep
->lock_policy_res2
=
821 ptlrpc_status_ntoh(lockrep
->lock_policy_res2
);
824 * Retry infinitely when the server returns -EINPROGRESS for the
825 * intent operation, when server returns -EINPROGRESS for acquiring
826 * intent lock, we'll retry in after_reply().
828 if (it
->it_op
&& (int)lockrep
->lock_policy_res2
== -EINPROGRESS
) {
829 mdc_clear_replay_flag(req
, rc
);
830 ptlrpc_req_finished(req
);
833 CDEBUG(D_HA
, "%s: resend:%d op:%d "DFID
"/"DFID
"\n",
834 obddev
->obd_name
, resends
, it
->it_op
,
835 PFID(&op_data
->op_fid1
), PFID(&op_data
->op_fid2
));
837 if (generation
== obddev
->u
.cli
.cl_import
->imp_generation
) {
840 CDEBUG(D_HA
, "resend cross eviction\n");
845 rc
= mdc_finish_enqueue(exp
, req
, einfo
, it
, lockh
, rc
);
847 if (lustre_handle_is_used(lockh
)) {
848 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
849 memset(lockh
, 0, sizeof(*lockh
));
851 ptlrpc_req_finished(req
);
853 it
->it_lock_handle
= 0;
854 it
->it_lock_mode
= 0;
855 it
->it_request
= NULL
;
861 static int mdc_finish_intent_lock(struct obd_export
*exp
,
862 struct ptlrpc_request
*request
,
863 struct md_op_data
*op_data
,
864 struct lookup_intent
*it
,
865 struct lustre_handle
*lockh
)
867 struct lustre_handle old_lock
;
868 struct mdt_body
*mdt_body
;
869 struct ldlm_lock
*lock
;
872 LASSERT(request
!= LP_POISON
);
873 LASSERT(request
->rq_repmsg
!= LP_POISON
);
875 if (it
->it_op
& IT_READDIR
)
878 if (!it_disposition(it
, DISP_IT_EXECD
)) {
879 /* The server failed before it even started executing the
880 * intent, i.e. because it couldn't unpack the request.
882 LASSERT(it
->it_status
!= 0);
883 return it
->it_status
;
885 rc
= it_open_error(DISP_IT_EXECD
, it
);
889 mdt_body
= req_capsule_server_get(&request
->rq_pill
, &RMF_MDT_BODY
);
890 LASSERT(mdt_body
); /* mdc_enqueue checked */
892 rc
= it_open_error(DISP_LOOKUP_EXECD
, it
);
896 /* keep requests around for the multiple phases of the call
897 * this shows the DISP_XX must guarantee we make it into the call
899 if (!it_disposition(it
, DISP_ENQ_CREATE_REF
) &&
900 it_disposition(it
, DISP_OPEN_CREATE
) &&
901 !it_open_error(DISP_OPEN_CREATE
, it
)) {
902 it_set_disposition(it
, DISP_ENQ_CREATE_REF
);
903 ptlrpc_request_addref(request
); /* balanced in ll_create_node */
905 if (!it_disposition(it
, DISP_ENQ_OPEN_REF
) &&
906 it_disposition(it
, DISP_OPEN_OPEN
) &&
907 !it_open_error(DISP_OPEN_OPEN
, it
)) {
908 it_set_disposition(it
, DISP_ENQ_OPEN_REF
);
909 ptlrpc_request_addref(request
); /* balanced in ll_file_open */
910 /* BUG 11546 - eviction in the middle of open rpc processing */
911 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE
, obd_timeout
);
914 if (it
->it_op
& IT_CREAT
) {
915 /* XXX this belongs in ll_create_it */
916 } else if (it
->it_op
== IT_OPEN
) {
917 LASSERT(!it_disposition(it
, DISP_OPEN_CREATE
));
919 LASSERT(it
->it_op
& (IT_GETATTR
| IT_LOOKUP
| IT_LAYOUT
));
922 /* If we already have a matching lock, then cancel the new
923 * one. We have to set the data here instead of in
924 * mdc_enqueue, because we need to use the child's inode as
925 * the l_ast_data to match, and that's not available until
926 * intent_finish has performed the iget().)
928 lock
= ldlm_handle2lock(lockh
);
930 union ldlm_policy_data policy
= lock
->l_policy_data
;
932 LDLM_DEBUG(lock
, "matching against this");
934 LASSERTF(fid_res_name_eq(&mdt_body
->mbo_fid1
,
935 &lock
->l_resource
->lr_name
),
936 "Lock res_id: "DLDLMRES
", fid: "DFID
"\n",
937 PLDLMRES(lock
->l_resource
), PFID(&mdt_body
->mbo_fid1
));
940 memcpy(&old_lock
, lockh
, sizeof(*lockh
));
941 if (ldlm_lock_match(NULL
, LDLM_FL_BLOCK_GRANTED
, NULL
,
942 LDLM_IBITS
, &policy
, LCK_NL
,
944 ldlm_lock_decref_and_cancel(lockh
,
946 memcpy(lockh
, &old_lock
, sizeof(old_lock
));
947 it
->it_lock_handle
= lockh
->cookie
;
951 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
952 (int)op_data
->op_namelen
, op_data
->op_name
,
953 ldlm_it2str(it
->it_op
), it
->it_status
, it
->it_disposition
, rc
);
957 int mdc_revalidate_lock(struct obd_export
*exp
, struct lookup_intent
*it
,
958 struct lu_fid
*fid
, __u64
*bits
)
960 /* We could just return 1 immediately, but since we should only
961 * be called in revalidate_it if we already have a lock, let's
964 struct ldlm_res_id res_id
;
965 struct lustre_handle lockh
;
966 union ldlm_policy_data policy
;
969 if (it
->it_lock_handle
) {
970 lockh
.cookie
= it
->it_lock_handle
;
971 mode
= ldlm_revalidate_lock_handle(&lockh
, bits
);
973 fid_build_reg_res_name(fid
, &res_id
);
976 /* File attributes are held under multiple bits:
977 * nlink is under lookup lock, size and times are
978 * under UPDATE lock and recently we've also got
979 * a separate permissions lock for owner/group/acl that
980 * were protected by lookup lock before.
981 * Getattr must provide all of that information,
982 * so we need to ensure we have all of those locks.
983 * Unfortunately, if the bits are split across multiple
984 * locks, there's no easy way to match all of them here,
985 * so an extra RPC would be performed to fetch all
986 * of those bits at once for now.
988 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
989 * but for old MDTs (< 2.4), permission is covered
990 * by LOOKUP lock, so it needs to match all bits here.
992 policy
.l_inodebits
.bits
= MDS_INODELOCK_UPDATE
|
993 MDS_INODELOCK_LOOKUP
|
997 policy
.l_inodebits
.bits
= MDS_INODELOCK_UPDATE
;
1000 policy
.l_inodebits
.bits
= MDS_INODELOCK_LAYOUT
;
1003 policy
.l_inodebits
.bits
= MDS_INODELOCK_LOOKUP
;
1007 mode
= mdc_lock_match(exp
, LDLM_FL_BLOCK_GRANTED
, fid
,
1008 LDLM_IBITS
, &policy
,
1009 LCK_CR
| LCK_CW
| LCK_PR
| LCK_PW
,
1014 it
->it_lock_handle
= lockh
.cookie
;
1015 it
->it_lock_mode
= mode
;
1017 it
->it_lock_handle
= 0;
1018 it
->it_lock_mode
= 0;
1025 * This long block is all about fixing up the lock and request state
1026 * so that it is correct as of the moment _before_ the operation was
1027 * applied; that way, the VFS will think that everything is normal and
1028 * call Lustre's regular VFS methods.
1030 * If we're performing a creation, that means that unless the creation
1031 * failed with EEXIST, we should fake up a negative dentry.
1033 * For everything else, we want to lookup to succeed.
1035 * One additional note: if CREATE or OPEN succeeded, we add an extra
1036 * reference to the request because we need to keep it around until
1037 * ll_create/ll_open gets called.
1039 * The server will return to us, in it_disposition, an indication of
1040 * exactly what it_status refers to.
1042 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1043 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1044 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1045 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1048 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1051 int mdc_intent_lock(struct obd_export
*exp
, struct md_op_data
*op_data
,
1052 struct lookup_intent
*it
, struct ptlrpc_request
**reqp
,
1053 ldlm_blocking_callback cb_blocking
, __u64 extra_lock_flags
)
1055 struct ldlm_enqueue_info einfo
= {
1056 .ei_type
= LDLM_IBITS
,
1057 .ei_mode
= it_to_lock_mode(it
),
1058 .ei_cb_bl
= cb_blocking
,
1059 .ei_cb_cp
= ldlm_completion_ast
,
1061 struct lustre_handle lockh
;
1066 CDEBUG(D_DLMTRACE
, "(name: %.*s,"DFID
") in obj "DFID
1067 ", intent: %s flags %#Lo\n", (int)op_data
->op_namelen
,
1068 op_data
->op_name
, PFID(&op_data
->op_fid2
),
1069 PFID(&op_data
->op_fid1
), ldlm_it2str(it
->it_op
),
1073 if (fid_is_sane(&op_data
->op_fid2
) &&
1074 (it
->it_op
& (IT_LOOKUP
| IT_GETATTR
| IT_READDIR
))) {
1075 /* We could just return 1 immediately, but since we should only
1076 * be called in revalidate_it if we already have a lock, let's
1079 it
->it_lock_handle
= 0;
1080 rc
= mdc_revalidate_lock(exp
, it
, &op_data
->op_fid2
, NULL
);
1081 /* Only return failure if it was not GETATTR by cfid
1082 * (from inode_revalidate)
1084 if (rc
|| op_data
->op_namelen
!= 0)
1088 /* For case if upper layer did not alloc fid, do it now. */
1089 if (!fid_is_sane(&op_data
->op_fid2
) && it
->it_op
& IT_CREAT
) {
1090 rc
= mdc_fid_alloc(NULL
, exp
, &op_data
->op_fid2
, op_data
);
1092 CERROR("Can't alloc new fid, rc %d\n", rc
);
1096 rc
= mdc_enqueue(exp
, &einfo
, NULL
, it
, op_data
, &lockh
,
1101 *reqp
= it
->it_request
;
1102 rc
= mdc_finish_intent_lock(exp
, *reqp
, op_data
, it
, &lockh
);
1106 static int mdc_intent_getattr_async_interpret(const struct lu_env
*env
,
1107 struct ptlrpc_request
*req
,
1110 struct mdc_getattr_args
*ga
= args
;
1111 struct obd_export
*exp
= ga
->ga_exp
;
1112 struct md_enqueue_info
*minfo
= ga
->ga_minfo
;
1113 struct ldlm_enqueue_info
*einfo
= &minfo
->mi_einfo
;
1114 struct lookup_intent
*it
;
1115 struct lustre_handle
*lockh
;
1116 struct obd_device
*obddev
;
1117 struct ldlm_reply
*lockrep
;
1118 __u64 flags
= LDLM_FL_HAS_INTENT
;
1121 lockh
= &minfo
->mi_lockh
;
1123 obddev
= class_exp2obd(exp
);
1125 obd_put_request_slot(&obddev
->u
.cli
);
1126 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE
))
1129 rc
= ldlm_cli_enqueue_fini(exp
, req
, einfo
->ei_type
, 1, einfo
->ei_mode
,
1130 &flags
, NULL
, 0, lockh
, rc
);
1132 CERROR("ldlm_cli_enqueue_fini: %d\n", rc
);
1133 mdc_clear_replay_flag(req
, rc
);
1137 lockrep
= req_capsule_server_get(&req
->rq_pill
, &RMF_DLM_REP
);
1139 lockrep
->lock_policy_res2
=
1140 ptlrpc_status_ntoh(lockrep
->lock_policy_res2
);
1142 rc
= mdc_finish_enqueue(exp
, req
, einfo
, it
, lockh
, rc
);
1146 rc
= mdc_finish_intent_lock(exp
, req
, &minfo
->mi_data
, it
, lockh
);
1149 minfo
->mi_cb(req
, minfo
, rc
);
1153 int mdc_intent_getattr_async(struct obd_export
*exp
,
1154 struct md_enqueue_info
*minfo
)
1156 struct md_op_data
*op_data
= &minfo
->mi_data
;
1157 struct lookup_intent
*it
= &minfo
->mi_it
;
1158 struct ptlrpc_request
*req
;
1159 struct mdc_getattr_args
*ga
;
1160 struct obd_device
*obddev
= class_exp2obd(exp
);
1161 struct ldlm_res_id res_id
;
1162 union ldlm_policy_data policy
= {
1163 .l_inodebits
= { MDS_INODELOCK_LOOKUP
| MDS_INODELOCK_UPDATE
}
1166 __u64 flags
= LDLM_FL_HAS_INTENT
;
1169 "name: %.*s in inode " DFID
", intent: %s flags %#Lo\n",
1170 (int)op_data
->op_namelen
, op_data
->op_name
,
1171 PFID(&op_data
->op_fid1
), ldlm_it2str(it
->it_op
), it
->it_flags
);
1173 fid_build_reg_res_name(&op_data
->op_fid1
, &res_id
);
1174 req
= mdc_intent_getattr_pack(exp
, it
, op_data
);
1176 return PTR_ERR(req
);
1178 rc
= obd_get_request_slot(&obddev
->u
.cli
);
1180 ptlrpc_req_finished(req
);
1184 rc
= ldlm_cli_enqueue(exp
, &req
, &minfo
->mi_einfo
, &res_id
, &policy
,
1185 &flags
, NULL
, 0, LVB_T_NONE
, &minfo
->mi_lockh
, 1);
1187 obd_put_request_slot(&obddev
->u
.cli
);
1188 ptlrpc_req_finished(req
);
1192 BUILD_BUG_ON(sizeof(*ga
) > sizeof(req
->rq_async_args
));
1193 ga
= ptlrpc_req_async_args(req
);
1195 ga
->ga_minfo
= minfo
;
1197 req
->rq_interpret_reply
= mdc_intent_getattr_async_interpret
;
1198 ptlrpcd_add_req(req
);