4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Author: Isaac Huang <isaac@clusterfs.com>
40 * 2012-05-13: Liang Zhen <liang@whamcloud.com>
41 * - percpt data for service to improve smp performance
45 #define DEBUG_SUBSYSTEM S_LNET
57 static struct smoketest_rpc
{
58 spinlock_t rpc_glock
; /* global lock */
59 srpc_service_t
*rpc_services
[SRPC_SERVICE_MAX_ID
+ 1];
60 lnet_handle_eq_t rpc_lnet_eq
; /* _the_ LNet event queue */
61 srpc_state_t rpc_state
;
62 srpc_counters_t rpc_counters
;
63 __u64 rpc_matchbits
; /* matchbits counter */
67 srpc_serv_portal(int svc_id
)
69 return svc_id
< SRPC_FRAMEWORK_SERVICE_MAX_ID
?
70 SRPC_FRAMEWORK_REQUEST_PORTAL
: SRPC_REQUEST_PORTAL
;
74 int srpc_handle_rpc(swi_workitem_t
*wi
);
76 void srpc_get_counters(srpc_counters_t
*cnt
)
78 spin_lock(&srpc_data
.rpc_glock
);
79 *cnt
= srpc_data
.rpc_counters
;
80 spin_unlock(&srpc_data
.rpc_glock
);
83 void srpc_set_counters(const srpc_counters_t
*cnt
)
85 spin_lock(&srpc_data
.rpc_glock
);
86 srpc_data
.rpc_counters
= *cnt
;
87 spin_unlock(&srpc_data
.rpc_glock
);
91 srpc_add_bulk_page(srpc_bulk_t
*bk
, struct page
*pg
, int i
, int nob
)
93 nob
= min_t(int, nob
, PAGE_SIZE
);
96 LASSERT(i
>= 0 && i
< bk
->bk_niov
);
98 bk
->bk_iovs
[i
].kiov_offset
= 0;
99 bk
->bk_iovs
[i
].kiov_page
= pg
;
100 bk
->bk_iovs
[i
].kiov_len
= nob
;
105 srpc_free_bulk(srpc_bulk_t
*bk
)
112 for (i
= 0; i
< bk
->bk_niov
; i
++) {
113 pg
= bk
->bk_iovs
[i
].kiov_page
;
120 LIBCFS_FREE(bk
, offsetof(srpc_bulk_t
, bk_iovs
[bk
->bk_niov
]));
124 srpc_alloc_bulk(int cpt
, unsigned bulk_npg
, unsigned bulk_len
, int sink
)
129 LASSERT(bulk_npg
> 0 && bulk_npg
<= LNET_MAX_IOV
);
131 LIBCFS_CPT_ALLOC(bk
, lnet_cpt_table(), cpt
,
132 offsetof(srpc_bulk_t
, bk_iovs
[bulk_npg
]));
134 CERROR("Can't allocate descriptor for %d pages\n", bulk_npg
);
138 memset(bk
, 0, offsetof(srpc_bulk_t
, bk_iovs
[bulk_npg
]));
140 bk
->bk_len
= bulk_len
;
141 bk
->bk_niov
= bulk_npg
;
143 for (i
= 0; i
< bulk_npg
; i
++) {
147 pg
= alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt
),
150 CERROR("Can't allocate page %d of %d\n", i
, bulk_npg
);
155 nob
= srpc_add_bulk_page(bk
, pg
, i
, bulk_len
);
167 spin_lock(&srpc_data
.rpc_glock
);
168 id
= srpc_data
.rpc_matchbits
++;
169 spin_unlock(&srpc_data
.rpc_glock
);
174 srpc_init_server_rpc(struct srpc_server_rpc
*rpc
,
175 struct srpc_service_cd
*scd
,
176 struct srpc_buffer
*buffer
)
178 memset(rpc
, 0, sizeof(*rpc
));
179 swi_init_workitem(&rpc
->srpc_wi
, rpc
, srpc_handle_rpc
,
180 srpc_serv_is_framework(scd
->scd_svc
) ?
181 lst_sched_serial
: lst_sched_test
[scd
->scd_cpt
]);
183 rpc
->srpc_ev
.ev_fired
= 1; /* no event expected now */
186 rpc
->srpc_reqstbuf
= buffer
;
187 rpc
->srpc_peer
= buffer
->buf_peer
;
188 rpc
->srpc_self
= buffer
->buf_self
;
189 LNetInvalidateHandle(&rpc
->srpc_replymdh
);
193 srpc_service_fini(struct srpc_service
*svc
)
195 struct srpc_service_cd
*scd
;
196 struct srpc_server_rpc
*rpc
;
197 struct srpc_buffer
*buf
;
201 if (!svc
->sv_cpt_data
)
204 cfs_percpt_for_each(scd
, i
, svc
->sv_cpt_data
) {
206 if (!list_empty(&scd
->scd_buf_posted
))
207 q
= &scd
->scd_buf_posted
;
208 else if (!list_empty(&scd
->scd_buf_blocked
))
209 q
= &scd
->scd_buf_blocked
;
213 while (!list_empty(q
)) {
214 buf
= list_entry(q
->next
, struct srpc_buffer
,
216 list_del(&buf
->buf_list
);
217 LIBCFS_FREE(buf
, sizeof(*buf
));
221 LASSERT(list_empty(&scd
->scd_rpc_active
));
223 while (!list_empty(&scd
->scd_rpc_free
)) {
224 rpc
= list_entry(scd
->scd_rpc_free
.next
,
225 struct srpc_server_rpc
,
227 list_del(&rpc
->srpc_list
);
228 LIBCFS_FREE(rpc
, sizeof(*rpc
));
232 cfs_percpt_free(svc
->sv_cpt_data
);
233 svc
->sv_cpt_data
= NULL
;
237 srpc_service_nrpcs(struct srpc_service
*svc
)
239 int nrpcs
= svc
->sv_wi_total
/ svc
->sv_ncpts
;
241 return srpc_serv_is_framework(svc
) ?
242 max(nrpcs
, SFW_FRWK_WI_MIN
) : max(nrpcs
, SFW_TEST_WI_MIN
);
245 int srpc_add_buffer(struct swi_workitem
*wi
);
248 srpc_service_init(struct srpc_service
*svc
)
250 struct srpc_service_cd
*scd
;
251 struct srpc_server_rpc
*rpc
;
256 svc
->sv_shuttingdown
= 0;
258 svc
->sv_cpt_data
= cfs_percpt_alloc(lnet_cpt_table(),
259 sizeof(struct srpc_service_cd
));
260 if (!svc
->sv_cpt_data
)
263 svc
->sv_ncpts
= srpc_serv_is_framework(svc
) ?
264 1 : cfs_cpt_number(lnet_cpt_table());
265 nrpcs
= srpc_service_nrpcs(svc
);
267 cfs_percpt_for_each(scd
, i
, svc
->sv_cpt_data
) {
270 spin_lock_init(&scd
->scd_lock
);
271 INIT_LIST_HEAD(&scd
->scd_rpc_free
);
272 INIT_LIST_HEAD(&scd
->scd_rpc_active
);
273 INIT_LIST_HEAD(&scd
->scd_buf_posted
);
274 INIT_LIST_HEAD(&scd
->scd_buf_blocked
);
276 scd
->scd_ev
.ev_data
= scd
;
277 scd
->scd_ev
.ev_type
= SRPC_REQUEST_RCVD
;
280 * NB: don't use lst_sched_serial for adding buffer,
281 * see details in srpc_service_add_buffers()
283 swi_init_workitem(&scd
->scd_buf_wi
, scd
,
284 srpc_add_buffer
, lst_sched_test
[i
]);
286 if (i
&& srpc_serv_is_framework(svc
)) {
288 * NB: framework service only needs srpc_service_cd for
289 * one partition, but we allocate for all to make
290 * it easier to implement, it will waste a little
291 * memory but nobody should care about this
296 for (j
= 0; j
< nrpcs
; j
++) {
297 LIBCFS_CPT_ALLOC(rpc
, lnet_cpt_table(),
300 srpc_service_fini(svc
);
303 list_add(&rpc
->srpc_list
, &scd
->scd_rpc_free
);
311 srpc_add_service(struct srpc_service
*sv
)
315 LASSERT(0 <= id
&& id
<= SRPC_SERVICE_MAX_ID
);
317 if (srpc_service_init(sv
))
320 spin_lock(&srpc_data
.rpc_glock
);
322 LASSERT(srpc_data
.rpc_state
== SRPC_STATE_RUNNING
);
324 if (srpc_data
.rpc_services
[id
]) {
325 spin_unlock(&srpc_data
.rpc_glock
);
329 srpc_data
.rpc_services
[id
] = sv
;
330 spin_unlock(&srpc_data
.rpc_glock
);
332 CDEBUG(D_NET
, "Adding service: id %d, name %s\n", id
, sv
->sv_name
);
336 srpc_service_fini(sv
);
341 srpc_remove_service(srpc_service_t
*sv
)
345 spin_lock(&srpc_data
.rpc_glock
);
347 if (srpc_data
.rpc_services
[id
] != sv
) {
348 spin_unlock(&srpc_data
.rpc_glock
);
352 srpc_data
.rpc_services
[id
] = NULL
;
353 spin_unlock(&srpc_data
.rpc_glock
);
358 srpc_post_passive_rdma(int portal
, int local
, __u64 matchbits
, void *buf
,
359 int len
, int options
, lnet_process_id_t peer
,
360 lnet_handle_md_t
*mdh
, srpc_event_t
*ev
)
364 lnet_handle_me_t meh
;
366 rc
= LNetMEAttach(portal
, peer
, matchbits
, 0, LNET_UNLINK
,
367 local
? LNET_INS_LOCAL
: LNET_INS_AFTER
, &meh
);
369 CERROR("LNetMEAttach failed: %d\n", rc
);
370 LASSERT(rc
== -ENOMEM
);
378 md
.options
= options
;
379 md
.eq_handle
= srpc_data
.rpc_lnet_eq
;
381 rc
= LNetMDAttach(meh
, md
, LNET_UNLINK
, mdh
);
383 CERROR("LNetMDAttach failed: %d\n", rc
);
384 LASSERT(rc
== -ENOMEM
);
386 rc
= LNetMEUnlink(meh
);
391 CDEBUG(D_NET
, "Posted passive RDMA: peer %s, portal %d, matchbits %#llx\n",
392 libcfs_id2str(peer
), portal
, matchbits
);
397 srpc_post_active_rdma(int portal
, __u64 matchbits
, void *buf
, int len
,
398 int options
, lnet_process_id_t peer
, lnet_nid_t self
,
399 lnet_handle_md_t
*mdh
, srpc_event_t
*ev
)
407 md
.eq_handle
= srpc_data
.rpc_lnet_eq
;
408 md
.threshold
= options
& LNET_MD_OP_GET
? 2 : 1;
409 md
.options
= options
& ~(LNET_MD_OP_PUT
| LNET_MD_OP_GET
);
411 rc
= LNetMDBind(md
, LNET_UNLINK
, mdh
);
413 CERROR("LNetMDBind failed: %d\n", rc
);
414 LASSERT(rc
== -ENOMEM
);
419 * this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
420 * they're only meaningful for MDs attached to an ME (i.e. passive
423 if (options
& LNET_MD_OP_PUT
) {
424 rc
= LNetPut(self
, *mdh
, LNET_NOACK_REQ
, peer
,
425 portal
, matchbits
, 0, 0);
427 LASSERT(options
& LNET_MD_OP_GET
);
429 rc
= LNetGet(self
, *mdh
, peer
, portal
, matchbits
, 0);
433 CERROR("LNet%s(%s, %d, %lld) failed: %d\n",
434 options
& LNET_MD_OP_PUT
? "Put" : "Get",
435 libcfs_id2str(peer
), portal
, matchbits
, rc
);
438 * The forthcoming unlink event will complete this operation
439 * with failure, so fall through and return success here.
441 rc
= LNetMDUnlink(*mdh
);
444 CDEBUG(D_NET
, "Posted active RDMA: peer %s, portal %u, matchbits %#llx\n",
445 libcfs_id2str(peer
), portal
, matchbits
);
451 srpc_post_passive_rqtbuf(int service
, int local
, void *buf
, int len
,
452 lnet_handle_md_t
*mdh
, srpc_event_t
*ev
)
454 lnet_process_id_t any
= { 0 };
456 any
.nid
= LNET_NID_ANY
;
457 any
.pid
= LNET_PID_ANY
;
459 return srpc_post_passive_rdma(srpc_serv_portal(service
),
460 local
, service
, buf
, len
,
461 LNET_MD_OP_PUT
, any
, mdh
, ev
);
465 srpc_service_post_buffer(struct srpc_service_cd
*scd
, struct srpc_buffer
*buf
)
466 __must_hold(&scd
->scd_lock
)
468 struct srpc_service
*sv
= scd
->scd_svc
;
469 struct srpc_msg
*msg
= &buf
->buf_msg
;
472 LNetInvalidateHandle(&buf
->buf_mdh
);
473 list_add(&buf
->buf_list
, &scd
->scd_buf_posted
);
474 scd
->scd_buf_nposted
++;
475 spin_unlock(&scd
->scd_lock
);
477 rc
= srpc_post_passive_rqtbuf(sv
->sv_id
,
478 !srpc_serv_is_framework(sv
),
479 msg
, sizeof(*msg
), &buf
->buf_mdh
,
483 * At this point, a RPC (new or delayed) may have arrived in
484 * msg and its event handler has been called. So we must add
485 * buf to scd_buf_posted _before_ dropping scd_lock
487 spin_lock(&scd
->scd_lock
);
490 if (!sv
->sv_shuttingdown
)
493 spin_unlock(&scd
->scd_lock
);
495 * srpc_shutdown_service might have tried to unlink me
496 * when my buf_mdh was still invalid
498 LNetMDUnlink(buf
->buf_mdh
);
499 spin_lock(&scd
->scd_lock
);
503 scd
->scd_buf_nposted
--;
504 if (sv
->sv_shuttingdown
)
505 return rc
; /* don't allow to change scd_buf_posted */
507 list_del(&buf
->buf_list
);
508 spin_unlock(&scd
->scd_lock
);
510 LIBCFS_FREE(buf
, sizeof(*buf
));
512 spin_lock(&scd
->scd_lock
);
517 srpc_add_buffer(struct swi_workitem
*wi
)
519 struct srpc_service_cd
*scd
= wi
->swi_workitem
.wi_data
;
520 struct srpc_buffer
*buf
;
524 * it's called by workitem scheduler threads, these threads
525 * should have been set CPT affinity, so buffers will be posted
526 * on CPT local list of Portal
528 spin_lock(&scd
->scd_lock
);
530 while (scd
->scd_buf_adjust
> 0 &&
531 !scd
->scd_svc
->sv_shuttingdown
) {
532 scd
->scd_buf_adjust
--; /* consume it */
533 scd
->scd_buf_posting
++;
535 spin_unlock(&scd
->scd_lock
);
537 LIBCFS_ALLOC(buf
, sizeof(*buf
));
539 CERROR("Failed to add new buf to service: %s\n",
540 scd
->scd_svc
->sv_name
);
541 spin_lock(&scd
->scd_lock
);
546 spin_lock(&scd
->scd_lock
);
547 if (scd
->scd_svc
->sv_shuttingdown
) {
548 spin_unlock(&scd
->scd_lock
);
549 LIBCFS_FREE(buf
, sizeof(*buf
));
551 spin_lock(&scd
->scd_lock
);
556 rc
= srpc_service_post_buffer(scd
, buf
);
558 break; /* buf has been freed inside */
560 LASSERT(scd
->scd_buf_posting
> 0);
561 scd
->scd_buf_posting
--;
562 scd
->scd_buf_total
++;
563 scd
->scd_buf_low
= max(2, scd
->scd_buf_total
/ 4);
567 scd
->scd_buf_err_stamp
= ktime_get_real_seconds();
568 scd
->scd_buf_err
= rc
;
570 LASSERT(scd
->scd_buf_posting
> 0);
571 scd
->scd_buf_posting
--;
574 spin_unlock(&scd
->scd_lock
);
579 srpc_service_add_buffers(struct srpc_service
*sv
, int nbuffer
)
581 struct srpc_service_cd
*scd
;
585 LASSERTF(nbuffer
> 0, "nbuffer must be positive: %d\n", nbuffer
);
587 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
) {
588 spin_lock(&scd
->scd_lock
);
590 scd
->scd_buf_err
= 0;
591 scd
->scd_buf_err_stamp
= 0;
592 scd
->scd_buf_posting
= 0;
593 scd
->scd_buf_adjust
= nbuffer
;
594 /* start to post buffers */
595 swi_schedule_workitem(&scd
->scd_buf_wi
);
596 spin_unlock(&scd
->scd_lock
);
598 /* framework service only post buffer for one partition */
599 if (srpc_serv_is_framework(sv
))
603 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
) {
604 spin_lock(&scd
->scd_lock
);
606 * NB: srpc_service_add_buffers() can be called inside
607 * thread context of lst_sched_serial, and we don't normally
608 * allow to sleep inside thread context of WI scheduler
609 * because it will block current scheduler thread from doing
610 * anything else, even worse, it could deadlock if it's
611 * waiting on result from another WI of the same scheduler.
612 * However, it's safe at here because scd_buf_wi is scheduled
613 * by thread in a different WI scheduler (lst_sched_test),
614 * so we don't have any risk of deadlock, though this could
615 * block all WIs pending on lst_sched_serial for a moment
616 * which is not good but not fatal.
618 lst_wait_until(scd
->scd_buf_err
||
619 (!scd
->scd_buf_adjust
&&
620 !scd
->scd_buf_posting
),
621 scd
->scd_lock
, "waiting for adding buffer\n");
623 if (scd
->scd_buf_err
&& !rc
)
624 rc
= scd
->scd_buf_err
;
626 spin_unlock(&scd
->scd_lock
);
633 srpc_service_remove_buffers(struct srpc_service
*sv
, int nbuffer
)
635 struct srpc_service_cd
*scd
;
639 LASSERT(!sv
->sv_shuttingdown
);
641 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
) {
642 spin_lock(&scd
->scd_lock
);
644 num
= scd
->scd_buf_total
+ scd
->scd_buf_posting
;
645 scd
->scd_buf_adjust
-= min(nbuffer
, num
);
647 spin_unlock(&scd
->scd_lock
);
651 /* returns 1 if sv has finished, otherwise 0 */
653 srpc_finish_service(struct srpc_service
*sv
)
655 struct srpc_service_cd
*scd
;
656 struct srpc_server_rpc
*rpc
;
659 LASSERT(sv
->sv_shuttingdown
); /* srpc_shutdown_service called */
661 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
) {
662 spin_lock(&scd
->scd_lock
);
663 if (!swi_deschedule_workitem(&scd
->scd_buf_wi
)) {
664 spin_unlock(&scd
->scd_lock
);
668 if (scd
->scd_buf_nposted
> 0) {
669 CDEBUG(D_NET
, "waiting for %d posted buffers to unlink\n",
670 scd
->scd_buf_nposted
);
671 spin_unlock(&scd
->scd_lock
);
675 if (list_empty(&scd
->scd_rpc_active
)) {
676 spin_unlock(&scd
->scd_lock
);
680 rpc
= list_entry(scd
->scd_rpc_active
.next
,
681 struct srpc_server_rpc
, srpc_list
);
682 CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
683 rpc
, sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
),
684 swi_state2str(rpc
->srpc_wi
.swi_state
),
685 rpc
->srpc_wi
.swi_workitem
.wi_scheduled
,
686 rpc
->srpc_wi
.swi_workitem
.wi_running
,
687 rpc
->srpc_ev
.ev_fired
, rpc
->srpc_ev
.ev_type
,
688 rpc
->srpc_ev
.ev_status
, rpc
->srpc_ev
.ev_lnet
);
689 spin_unlock(&scd
->scd_lock
);
693 /* no lock needed from now on */
694 srpc_service_fini(sv
);
698 /* called with sv->sv_lock held */
700 srpc_service_recycle_buffer(struct srpc_service_cd
*scd
, srpc_buffer_t
*buf
)
701 __must_hold(&scd
->scd_lock
)
703 if (!scd
->scd_svc
->sv_shuttingdown
&& scd
->scd_buf_adjust
>= 0) {
704 if (srpc_service_post_buffer(scd
, buf
)) {
705 CWARN("Failed to post %s buffer\n",
706 scd
->scd_svc
->sv_name
);
711 /* service is shutting down, or we want to recycle some buffers */
712 scd
->scd_buf_total
--;
714 if (scd
->scd_buf_adjust
< 0) {
715 scd
->scd_buf_adjust
++;
716 if (scd
->scd_buf_adjust
< 0 &&
717 !scd
->scd_buf_total
&& !scd
->scd_buf_posting
) {
719 "Try to recycle %d buffers but nothing left\n",
720 scd
->scd_buf_adjust
);
721 scd
->scd_buf_adjust
= 0;
725 spin_unlock(&scd
->scd_lock
);
726 LIBCFS_FREE(buf
, sizeof(*buf
));
727 spin_lock(&scd
->scd_lock
);
731 srpc_abort_service(struct srpc_service
*sv
)
733 struct srpc_service_cd
*scd
;
734 struct srpc_server_rpc
*rpc
;
737 CDEBUG(D_NET
, "Aborting service: id %d, name %s\n",
738 sv
->sv_id
, sv
->sv_name
);
740 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
) {
741 spin_lock(&scd
->scd_lock
);
744 * schedule in-flight RPCs to notice the abort, NB:
745 * racing with incoming RPCs; complete fix should make test
746 * RPCs carry session ID in its headers
748 list_for_each_entry(rpc
, &scd
->scd_rpc_active
, srpc_list
) {
749 rpc
->srpc_aborted
= 1;
750 swi_schedule_workitem(&rpc
->srpc_wi
);
753 spin_unlock(&scd
->scd_lock
);
758 srpc_shutdown_service(srpc_service_t
*sv
)
760 struct srpc_service_cd
*scd
;
761 struct srpc_server_rpc
*rpc
;
765 CDEBUG(D_NET
, "Shutting down service: id %d, name %s\n",
766 sv
->sv_id
, sv
->sv_name
);
768 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
)
769 spin_lock(&scd
->scd_lock
);
771 sv
->sv_shuttingdown
= 1; /* i.e. no new active RPC */
773 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
)
774 spin_unlock(&scd
->scd_lock
);
776 cfs_percpt_for_each(scd
, i
, sv
->sv_cpt_data
) {
777 spin_lock(&scd
->scd_lock
);
779 /* schedule in-flight RPCs to notice the shutdown */
780 list_for_each_entry(rpc
, &scd
->scd_rpc_active
, srpc_list
)
781 swi_schedule_workitem(&rpc
->srpc_wi
);
783 spin_unlock(&scd
->scd_lock
);
786 * OK to traverse scd_buf_posted without lock, since no one
787 * touches scd_buf_posted now
789 list_for_each_entry(buf
, &scd
->scd_buf_posted
, buf_list
)
790 LNetMDUnlink(buf
->buf_mdh
);
795 srpc_send_request(srpc_client_rpc_t
*rpc
)
797 srpc_event_t
*ev
= &rpc
->crpc_reqstev
;
802 ev
->ev_type
= SRPC_REQUEST_SENT
;
804 rc
= srpc_post_active_rdma(srpc_serv_portal(rpc
->crpc_service
),
805 rpc
->crpc_service
, &rpc
->crpc_reqstmsg
,
806 sizeof(srpc_msg_t
), LNET_MD_OP_PUT
,
807 rpc
->crpc_dest
, LNET_NID_ANY
,
808 &rpc
->crpc_reqstmdh
, ev
);
810 LASSERT(rc
== -ENOMEM
);
811 ev
->ev_fired
= 1; /* no more event expected */
817 srpc_prepare_reply(srpc_client_rpc_t
*rpc
)
819 srpc_event_t
*ev
= &rpc
->crpc_replyev
;
820 __u64
*id
= &rpc
->crpc_reqstmsg
.msg_body
.reqst
.rpyid
;
825 ev
->ev_type
= SRPC_REPLY_RCVD
;
827 *id
= srpc_next_id();
829 rc
= srpc_post_passive_rdma(SRPC_RDMA_PORTAL
, 0, *id
,
830 &rpc
->crpc_replymsg
, sizeof(srpc_msg_t
),
831 LNET_MD_OP_PUT
, rpc
->crpc_dest
,
832 &rpc
->crpc_replymdh
, ev
);
834 LASSERT(rc
== -ENOMEM
);
835 ev
->ev_fired
= 1; /* no more event expected */
841 srpc_prepare_bulk(srpc_client_rpc_t
*rpc
)
843 srpc_bulk_t
*bk
= &rpc
->crpc_bulk
;
844 srpc_event_t
*ev
= &rpc
->crpc_bulkev
;
845 __u64
*id
= &rpc
->crpc_reqstmsg
.msg_body
.reqst
.bulkid
;
849 LASSERT(bk
->bk_niov
<= LNET_MAX_IOV
);
852 return 0; /* nothing to do */
854 opt
= bk
->bk_sink
? LNET_MD_OP_PUT
: LNET_MD_OP_GET
;
859 ev
->ev_type
= SRPC_BULK_REQ_RCVD
;
861 *id
= srpc_next_id();
863 rc
= srpc_post_passive_rdma(SRPC_RDMA_PORTAL
, 0, *id
,
864 &bk
->bk_iovs
[0], bk
->bk_niov
, opt
,
865 rpc
->crpc_dest
, &bk
->bk_mdh
, ev
);
867 LASSERT(rc
== -ENOMEM
);
868 ev
->ev_fired
= 1; /* no more event expected */
874 srpc_do_bulk(struct srpc_server_rpc
*rpc
)
876 srpc_event_t
*ev
= &rpc
->srpc_ev
;
877 srpc_bulk_t
*bk
= rpc
->srpc_bulk
;
878 __u64 id
= rpc
->srpc_reqstbuf
->buf_msg
.msg_body
.reqst
.bulkid
;
884 opt
= bk
->bk_sink
? LNET_MD_OP_GET
: LNET_MD_OP_PUT
;
889 ev
->ev_type
= bk
->bk_sink
? SRPC_BULK_GET_RPLD
: SRPC_BULK_PUT_SENT
;
891 rc
= srpc_post_active_rdma(SRPC_RDMA_PORTAL
, id
,
892 &bk
->bk_iovs
[0], bk
->bk_niov
, opt
,
893 rpc
->srpc_peer
, rpc
->srpc_self
,
896 ev
->ev_fired
= 1; /* no more event expected */
900 /* only called from srpc_handle_rpc */
902 srpc_server_rpc_done(struct srpc_server_rpc
*rpc
, int status
)
904 struct srpc_service_cd
*scd
= rpc
->srpc_scd
;
905 struct srpc_service
*sv
= scd
->scd_svc
;
906 srpc_buffer_t
*buffer
;
908 LASSERT(status
|| rpc
->srpc_wi
.swi_state
== SWI_STATE_DONE
);
910 rpc
->srpc_status
= status
;
912 CDEBUG_LIMIT(!status
? D_NET
: D_NETERROR
,
913 "Server RPC %p done: service %s, peer %s, status %s:%d\n",
914 rpc
, sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
),
915 swi_state2str(rpc
->srpc_wi
.swi_state
), status
);
918 spin_lock(&srpc_data
.rpc_glock
);
919 srpc_data
.rpc_counters
.rpcs_dropped
++;
920 spin_unlock(&srpc_data
.rpc_glock
);
924 (*rpc
->srpc_done
) (rpc
);
925 LASSERT(!rpc
->srpc_bulk
);
927 spin_lock(&scd
->scd_lock
);
929 if (rpc
->srpc_reqstbuf
) {
931 * NB might drop sv_lock in srpc_service_recycle_buffer, but
932 * sv won't go away for scd_rpc_active must not be empty
934 srpc_service_recycle_buffer(scd
, rpc
->srpc_reqstbuf
);
935 rpc
->srpc_reqstbuf
= NULL
;
938 list_del(&rpc
->srpc_list
); /* from scd->scd_rpc_active */
941 * No one can schedule me now since:
942 * - I'm not on scd_rpc_active.
943 * - all LNet events have been fired.
944 * Cancel pending schedules and prevent future schedule attempts:
946 LASSERT(rpc
->srpc_ev
.ev_fired
);
947 swi_exit_workitem(&rpc
->srpc_wi
);
949 if (!sv
->sv_shuttingdown
&& !list_empty(&scd
->scd_buf_blocked
)) {
950 buffer
= list_entry(scd
->scd_buf_blocked
.next
,
951 srpc_buffer_t
, buf_list
);
952 list_del(&buffer
->buf_list
);
954 srpc_init_server_rpc(rpc
, scd
, buffer
);
955 list_add_tail(&rpc
->srpc_list
, &scd
->scd_rpc_active
);
956 swi_schedule_workitem(&rpc
->srpc_wi
);
958 list_add(&rpc
->srpc_list
, &scd
->scd_rpc_free
);
961 spin_unlock(&scd
->scd_lock
);
964 /* handles an incoming RPC */
966 srpc_handle_rpc(swi_workitem_t
*wi
)
968 struct srpc_server_rpc
*rpc
= wi
->swi_workitem
.wi_data
;
969 struct srpc_service_cd
*scd
= rpc
->srpc_scd
;
970 struct srpc_service
*sv
= scd
->scd_svc
;
971 srpc_event_t
*ev
= &rpc
->srpc_ev
;
974 LASSERT(wi
== &rpc
->srpc_wi
);
976 spin_lock(&scd
->scd_lock
);
978 if (sv
->sv_shuttingdown
|| rpc
->srpc_aborted
) {
979 spin_unlock(&scd
->scd_lock
);
982 LNetMDUnlink(rpc
->srpc_bulk
->bk_mdh
);
983 LNetMDUnlink(rpc
->srpc_replymdh
);
985 if (ev
->ev_fired
) { /* no more event, OK to finish */
986 srpc_server_rpc_done(rpc
, -ESHUTDOWN
);
992 spin_unlock(&scd
->scd_lock
);
994 switch (wi
->swi_state
) {
997 case SWI_STATE_NEWBORN
: {
999 srpc_generic_reply_t
*reply
;
1001 msg
= &rpc
->srpc_reqstbuf
->buf_msg
;
1002 reply
= &rpc
->srpc_replymsg
.msg_body
.reply
;
1004 if (!msg
->msg_magic
) {
1005 /* moaned already in srpc_lnet_ev_handler */
1006 srpc_server_rpc_done(rpc
, EBADMSG
);
1010 srpc_unpack_msg_hdr(msg
);
1011 if (msg
->msg_version
!= SRPC_MSG_VERSION
) {
1012 CWARN("Version mismatch: %u, %u expected, from %s\n",
1013 msg
->msg_version
, SRPC_MSG_VERSION
,
1014 libcfs_id2str(rpc
->srpc_peer
));
1015 reply
->status
= EPROTO
;
1016 /* drop through and send reply */
1019 rc
= (*sv
->sv_handler
)(rpc
);
1020 LASSERT(!reply
->status
|| !rpc
->srpc_bulk
);
1022 srpc_server_rpc_done(rpc
, rc
);
1027 wi
->swi_state
= SWI_STATE_BULK_STARTED
;
1029 if (rpc
->srpc_bulk
) {
1030 rc
= srpc_do_bulk(rpc
);
1032 return 0; /* wait for bulk */
1034 LASSERT(ev
->ev_fired
);
1038 case SWI_STATE_BULK_STARTED
:
1039 LASSERT(!rpc
->srpc_bulk
|| ev
->ev_fired
);
1041 if (rpc
->srpc_bulk
) {
1044 if (sv
->sv_bulk_ready
)
1045 rc
= (*sv
->sv_bulk_ready
) (rpc
, rc
);
1048 srpc_server_rpc_done(rpc
, rc
);
1053 wi
->swi_state
= SWI_STATE_REPLY_SUBMITTED
;
1054 rc
= srpc_send_reply(rpc
);
1056 return 0; /* wait for reply */
1057 srpc_server_rpc_done(rpc
, rc
);
1060 case SWI_STATE_REPLY_SUBMITTED
:
1061 if (!ev
->ev_fired
) {
1062 CERROR("RPC %p: bulk %p, service %d\n",
1063 rpc
, rpc
->srpc_bulk
, sv
->sv_id
);
1064 CERROR("Event: status %d, type %d, lnet %d\n",
1065 ev
->ev_status
, ev
->ev_type
, ev
->ev_lnet
);
1066 LASSERT(ev
->ev_fired
);
1069 wi
->swi_state
= SWI_STATE_DONE
;
1070 srpc_server_rpc_done(rpc
, ev
->ev_status
);
1078 srpc_client_rpc_expired(void *data
)
1080 srpc_client_rpc_t
*rpc
= data
;
1082 CWARN("Client RPC expired: service %d, peer %s, timeout %d.\n",
1083 rpc
->crpc_service
, libcfs_id2str(rpc
->crpc_dest
),
1086 spin_lock(&rpc
->crpc_lock
);
1088 rpc
->crpc_timeout
= 0;
1089 srpc_abort_rpc(rpc
, -ETIMEDOUT
);
1091 spin_unlock(&rpc
->crpc_lock
);
1093 spin_lock(&srpc_data
.rpc_glock
);
1094 srpc_data
.rpc_counters
.rpcs_expired
++;
1095 spin_unlock(&srpc_data
.rpc_glock
);
1099 srpc_add_client_rpc_timer(srpc_client_rpc_t
*rpc
)
1101 struct stt_timer
*timer
= &rpc
->crpc_timer
;
1103 if (!rpc
->crpc_timeout
)
1106 INIT_LIST_HEAD(&timer
->stt_list
);
1107 timer
->stt_data
= rpc
;
1108 timer
->stt_func
= srpc_client_rpc_expired
;
1109 timer
->stt_expires
= ktime_get_real_seconds() + rpc
->crpc_timeout
;
1110 stt_add_timer(timer
);
1114 * Called with rpc->crpc_lock held.
1116 * Upon exit the RPC expiry timer is not queued and the handler is not
1117 * running on any CPU.
1120 srpc_del_client_rpc_timer(srpc_client_rpc_t
*rpc
)
1122 /* timer not planted or already exploded */
1123 if (!rpc
->crpc_timeout
)
1126 /* timer successfully defused */
1127 if (stt_del_timer(&rpc
->crpc_timer
))
1130 /* timer detonated, wait for it to explode */
1131 while (rpc
->crpc_timeout
) {
1132 spin_unlock(&rpc
->crpc_lock
);
1136 spin_lock(&rpc
->crpc_lock
);
1141 srpc_client_rpc_done(srpc_client_rpc_t
*rpc
, int status
)
1143 swi_workitem_t
*wi
= &rpc
->crpc_wi
;
1145 LASSERT(status
|| wi
->swi_state
== SWI_STATE_DONE
);
1147 spin_lock(&rpc
->crpc_lock
);
1149 rpc
->crpc_closed
= 1;
1150 if (!rpc
->crpc_status
)
1151 rpc
->crpc_status
= status
;
1153 srpc_del_client_rpc_timer(rpc
);
1155 CDEBUG_LIMIT(!status
? D_NET
: D_NETERROR
,
1156 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1157 rpc
->crpc_service
, libcfs_id2str(rpc
->crpc_dest
),
1158 swi_state2str(wi
->swi_state
), rpc
->crpc_aborted
, status
);
1161 * No one can schedule me now since:
1162 * - RPC timer has been defused.
1163 * - all LNet events have been fired.
1164 * - crpc_closed has been set, preventing srpc_abort_rpc from
1166 * Cancel pending schedules and prevent future schedule attempts:
1168 LASSERT(!srpc_event_pending(rpc
));
1169 swi_exit_workitem(wi
);
1171 spin_unlock(&rpc
->crpc_lock
);
1173 (*rpc
->crpc_done
)(rpc
);
1176 /* sends an outgoing RPC */
1178 srpc_send_rpc(swi_workitem_t
*wi
)
1181 srpc_client_rpc_t
*rpc
;
1187 rpc
= wi
->swi_workitem
.wi_data
;
1190 LASSERT(wi
== &rpc
->crpc_wi
);
1192 reply
= &rpc
->crpc_replymsg
;
1193 do_bulk
= rpc
->crpc_bulk
.bk_niov
> 0;
1195 spin_lock(&rpc
->crpc_lock
);
1197 if (rpc
->crpc_aborted
) {
1198 spin_unlock(&rpc
->crpc_lock
);
1202 spin_unlock(&rpc
->crpc_lock
);
1204 switch (wi
->swi_state
) {
1207 case SWI_STATE_NEWBORN
:
1208 LASSERT(!srpc_event_pending(rpc
));
1210 rc
= srpc_prepare_reply(rpc
);
1212 srpc_client_rpc_done(rpc
, rc
);
1216 rc
= srpc_prepare_bulk(rpc
);
1220 wi
->swi_state
= SWI_STATE_REQUEST_SUBMITTED
;
1221 rc
= srpc_send_request(rpc
);
1224 case SWI_STATE_REQUEST_SUBMITTED
:
1226 * CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1227 * order; however, they're processed in a strict order:
1228 * rqt, rpy, and bulk.
1230 if (!rpc
->crpc_reqstev
.ev_fired
)
1233 rc
= rpc
->crpc_reqstev
.ev_status
;
1237 wi
->swi_state
= SWI_STATE_REQUEST_SENT
;
1238 /* perhaps more events, fall thru */
1239 case SWI_STATE_REQUEST_SENT
: {
1240 srpc_msg_type_t type
= srpc_service2reply(rpc
->crpc_service
);
1242 if (!rpc
->crpc_replyev
.ev_fired
)
1245 rc
= rpc
->crpc_replyev
.ev_status
;
1249 srpc_unpack_msg_hdr(reply
);
1250 if (reply
->msg_type
!= type
||
1251 (reply
->msg_magic
!= SRPC_MSG_MAGIC
&&
1252 reply
->msg_magic
!= __swab32(SRPC_MSG_MAGIC
))) {
1253 CWARN("Bad message from %s: type %u (%d expected), magic %u (%d expected).\n",
1254 libcfs_id2str(rpc
->crpc_dest
),
1255 reply
->msg_type
, type
,
1256 reply
->msg_magic
, SRPC_MSG_MAGIC
);
1261 if (do_bulk
&& reply
->msg_body
.reply
.status
) {
1262 CWARN("Remote error %d at %s, unlink bulk buffer in case peer didn't initiate bulk transfer\n",
1263 reply
->msg_body
.reply
.status
,
1264 libcfs_id2str(rpc
->crpc_dest
));
1265 LNetMDUnlink(rpc
->crpc_bulk
.bk_mdh
);
1268 wi
->swi_state
= SWI_STATE_REPLY_RECEIVED
;
1270 case SWI_STATE_REPLY_RECEIVED
:
1271 if (do_bulk
&& !rpc
->crpc_bulkev
.ev_fired
)
1274 rc
= do_bulk
? rpc
->crpc_bulkev
.ev_status
: 0;
1277 * Bulk buffer was unlinked due to remote error. Clear error
1278 * since reply buffer still contains valid data.
1279 * NB rpc->crpc_done shouldn't look into bulk data in case of
1282 if (do_bulk
&& rpc
->crpc_bulkev
.ev_lnet
== LNET_EVENT_UNLINK
&&
1283 !rpc
->crpc_status
&& reply
->msg_body
.reply
.status
)
1286 wi
->swi_state
= SWI_STATE_DONE
;
1287 srpc_client_rpc_done(rpc
, rc
);
1292 spin_lock(&rpc
->crpc_lock
);
1293 srpc_abort_rpc(rpc
, rc
);
1294 spin_unlock(&rpc
->crpc_lock
);
1298 if (rpc
->crpc_aborted
) {
1299 LNetMDUnlink(rpc
->crpc_reqstmdh
);
1300 LNetMDUnlink(rpc
->crpc_replymdh
);
1301 LNetMDUnlink(rpc
->crpc_bulk
.bk_mdh
);
1303 if (!srpc_event_pending(rpc
)) {
1304 srpc_client_rpc_done(rpc
, -EINTR
);
1312 srpc_create_client_rpc(lnet_process_id_t peer
, int service
,
1313 int nbulkiov
, int bulklen
,
1314 void (*rpc_done
)(srpc_client_rpc_t
*),
1315 void (*rpc_fini
)(srpc_client_rpc_t
*), void *priv
)
1317 srpc_client_rpc_t
*rpc
;
1319 LIBCFS_ALLOC(rpc
, offsetof(srpc_client_rpc_t
,
1320 crpc_bulk
.bk_iovs
[nbulkiov
]));
1324 srpc_init_client_rpc(rpc
, peer
, service
, nbulkiov
,
1325 bulklen
, rpc_done
, rpc_fini
, priv
);
1329 /* called with rpc->crpc_lock held */
1331 srpc_abort_rpc(srpc_client_rpc_t
*rpc
, int why
)
1335 if (rpc
->crpc_aborted
|| /* already aborted */
1336 rpc
->crpc_closed
) /* callback imminent */
1339 CDEBUG(D_NET
, "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1340 rpc
->crpc_service
, libcfs_id2str(rpc
->crpc_dest
),
1341 swi_state2str(rpc
->crpc_wi
.swi_state
), why
);
1343 rpc
->crpc_aborted
= 1;
1344 rpc
->crpc_status
= why
;
1345 swi_schedule_workitem(&rpc
->crpc_wi
);
1348 /* called with rpc->crpc_lock held */
1350 srpc_post_rpc(srpc_client_rpc_t
*rpc
)
1352 LASSERT(!rpc
->crpc_aborted
);
1353 LASSERT(srpc_data
.rpc_state
== SRPC_STATE_RUNNING
);
1355 CDEBUG(D_NET
, "Posting RPC: peer %s, service %d, timeout %d\n",
1356 libcfs_id2str(rpc
->crpc_dest
), rpc
->crpc_service
,
1359 srpc_add_client_rpc_timer(rpc
);
1360 swi_schedule_workitem(&rpc
->crpc_wi
);
1364 srpc_send_reply(struct srpc_server_rpc
*rpc
)
1366 srpc_event_t
*ev
= &rpc
->srpc_ev
;
1367 struct srpc_msg
*msg
= &rpc
->srpc_replymsg
;
1368 struct srpc_buffer
*buffer
= rpc
->srpc_reqstbuf
;
1369 struct srpc_service_cd
*scd
= rpc
->srpc_scd
;
1370 struct srpc_service
*sv
= scd
->scd_svc
;
1375 rpyid
= buffer
->buf_msg
.msg_body
.reqst
.rpyid
;
1377 spin_lock(&scd
->scd_lock
);
1379 if (!sv
->sv_shuttingdown
&& !srpc_serv_is_framework(sv
)) {
1381 * Repost buffer before replying since test client
1382 * might send me another RPC once it gets the reply
1384 if (srpc_service_post_buffer(scd
, buffer
))
1385 CWARN("Failed to repost %s buffer\n", sv
->sv_name
);
1386 rpc
->srpc_reqstbuf
= NULL
;
1389 spin_unlock(&scd
->scd_lock
);
1393 ev
->ev_type
= SRPC_REPLY_SENT
;
1395 msg
->msg_magic
= SRPC_MSG_MAGIC
;
1396 msg
->msg_version
= SRPC_MSG_VERSION
;
1397 msg
->msg_type
= srpc_service2reply(sv
->sv_id
);
1399 rc
= srpc_post_active_rdma(SRPC_RDMA_PORTAL
, rpyid
, msg
,
1400 sizeof(*msg
), LNET_MD_OP_PUT
,
1401 rpc
->srpc_peer
, rpc
->srpc_self
,
1402 &rpc
->srpc_replymdh
, ev
);
1404 ev
->ev_fired
= 1; /* no more event expected */
1408 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1410 srpc_lnet_ev_handler(lnet_event_t
*ev
)
1412 struct srpc_service_cd
*scd
;
1413 srpc_event_t
*rpcev
= ev
->md
.user_ptr
;
1414 srpc_client_rpc_t
*crpc
;
1415 struct srpc_server_rpc
*srpc
;
1416 srpc_buffer_t
*buffer
;
1419 srpc_msg_type_t type
;
1421 LASSERT(!in_interrupt());
1426 spin_lock(&srpc_data
.rpc_glock
);
1427 if (ev
->status
!= -ECANCELED
) /* cancellation is not error */
1428 srpc_data
.rpc_counters
.errors
++;
1429 errors
= srpc_data
.rpc_counters
.errors
;
1430 spin_unlock(&srpc_data
.rpc_glock
);
1432 CNETERR("LNet event status %d type %d, RPC errors %u\n",
1433 ev
->status
, ev
->type
, errors
);
1436 rpcev
->ev_lnet
= ev
->type
;
1438 switch (rpcev
->ev_type
) {
1440 CERROR("Unknown event: status %d, type %d, lnet %d\n",
1441 rpcev
->ev_status
, rpcev
->ev_type
, rpcev
->ev_lnet
);
1443 case SRPC_REQUEST_SENT
:
1444 if (!ev
->status
&& ev
->type
!= LNET_EVENT_UNLINK
) {
1445 spin_lock(&srpc_data
.rpc_glock
);
1446 srpc_data
.rpc_counters
.rpcs_sent
++;
1447 spin_unlock(&srpc_data
.rpc_glock
);
1449 case SRPC_REPLY_RCVD
:
1450 case SRPC_BULK_REQ_RCVD
:
1451 crpc
= rpcev
->ev_data
;
1453 if (rpcev
!= &crpc
->crpc_reqstev
&&
1454 rpcev
!= &crpc
->crpc_replyev
&&
1455 rpcev
!= &crpc
->crpc_bulkev
) {
1456 CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1457 rpcev
, crpc
, &crpc
->crpc_reqstev
,
1458 &crpc
->crpc_replyev
, &crpc
->crpc_bulkev
);
1459 CERROR("Bad event: status %d, type %d, lnet %d\n",
1460 rpcev
->ev_status
, rpcev
->ev_type
, rpcev
->ev_lnet
);
1464 spin_lock(&crpc
->crpc_lock
);
1466 LASSERT(!rpcev
->ev_fired
);
1467 rpcev
->ev_fired
= 1;
1468 rpcev
->ev_status
= (ev
->type
== LNET_EVENT_UNLINK
) ?
1469 -EINTR
: ev
->status
;
1470 swi_schedule_workitem(&crpc
->crpc_wi
);
1472 spin_unlock(&crpc
->crpc_lock
);
1475 case SRPC_REQUEST_RCVD
:
1476 scd
= rpcev
->ev_data
;
1479 LASSERT(rpcev
== &scd
->scd_ev
);
1481 spin_lock(&scd
->scd_lock
);
1483 LASSERT(ev
->unlinked
);
1484 LASSERT(ev
->type
== LNET_EVENT_PUT
||
1485 ev
->type
== LNET_EVENT_UNLINK
);
1486 LASSERT(ev
->type
!= LNET_EVENT_UNLINK
||
1487 sv
->sv_shuttingdown
);
1489 buffer
= container_of(ev
->md
.start
, srpc_buffer_t
, buf_msg
);
1490 buffer
->buf_peer
= ev
->initiator
;
1491 buffer
->buf_self
= ev
->target
.nid
;
1493 LASSERT(scd
->scd_buf_nposted
> 0);
1494 scd
->scd_buf_nposted
--;
1496 if (sv
->sv_shuttingdown
) {
1498 * Leave buffer on scd->scd_buf_nposted since
1499 * srpc_finish_service needs to traverse it.
1501 spin_unlock(&scd
->scd_lock
);
1505 if (scd
->scd_buf_err_stamp
&&
1506 scd
->scd_buf_err_stamp
< ktime_get_real_seconds()) {
1507 /* re-enable adding buffer */
1508 scd
->scd_buf_err_stamp
= 0;
1509 scd
->scd_buf_err
= 0;
1512 if (!scd
->scd_buf_err
&& /* adding buffer is enabled */
1513 !scd
->scd_buf_adjust
&&
1514 scd
->scd_buf_nposted
< scd
->scd_buf_low
) {
1515 scd
->scd_buf_adjust
= max(scd
->scd_buf_total
/ 2,
1517 swi_schedule_workitem(&scd
->scd_buf_wi
);
1520 list_del(&buffer
->buf_list
); /* from scd->scd_buf_posted */
1521 msg
= &buffer
->buf_msg
;
1522 type
= srpc_service2request(sv
->sv_id
);
1524 if (ev
->status
|| ev
->mlength
!= sizeof(*msg
) ||
1525 (msg
->msg_type
!= type
&&
1526 msg
->msg_type
!= __swab32(type
)) ||
1527 (msg
->msg_magic
!= SRPC_MSG_MAGIC
&&
1528 msg
->msg_magic
!= __swab32(SRPC_MSG_MAGIC
))) {
1529 CERROR("Dropping RPC (%s) from %s: status %d mlength %d type %u magic %u.\n",
1530 sv
->sv_name
, libcfs_id2str(ev
->initiator
),
1531 ev
->status
, ev
->mlength
,
1532 msg
->msg_type
, msg
->msg_magic
);
1535 * NB can't call srpc_service_recycle_buffer here since
1536 * it may call LNetM[DE]Attach. The invalid magic tells
1537 * srpc_handle_rpc to drop this RPC
1542 if (!list_empty(&scd
->scd_rpc_free
)) {
1543 srpc
= list_entry(scd
->scd_rpc_free
.next
,
1544 struct srpc_server_rpc
,
1546 list_del(&srpc
->srpc_list
);
1548 srpc_init_server_rpc(srpc
, scd
, buffer
);
1549 list_add_tail(&srpc
->srpc_list
,
1550 &scd
->scd_rpc_active
);
1551 swi_schedule_workitem(&srpc
->srpc_wi
);
1553 list_add_tail(&buffer
->buf_list
,
1554 &scd
->scd_buf_blocked
);
1557 spin_unlock(&scd
->scd_lock
);
1559 spin_lock(&srpc_data
.rpc_glock
);
1560 srpc_data
.rpc_counters
.rpcs_rcvd
++;
1561 spin_unlock(&srpc_data
.rpc_glock
);
1564 case SRPC_BULK_GET_RPLD
:
1565 LASSERT(ev
->type
== LNET_EVENT_SEND
||
1566 ev
->type
== LNET_EVENT_REPLY
||
1567 ev
->type
== LNET_EVENT_UNLINK
);
1570 break; /* wait for final event */
1572 case SRPC_BULK_PUT_SENT
:
1573 if (!ev
->status
&& ev
->type
!= LNET_EVENT_UNLINK
) {
1574 spin_lock(&srpc_data
.rpc_glock
);
1576 if (rpcev
->ev_type
== SRPC_BULK_GET_RPLD
)
1577 srpc_data
.rpc_counters
.bulk_get
+= ev
->mlength
;
1579 srpc_data
.rpc_counters
.bulk_put
+= ev
->mlength
;
1581 spin_unlock(&srpc_data
.rpc_glock
);
1583 case SRPC_REPLY_SENT
:
1584 srpc
= rpcev
->ev_data
;
1585 scd
= srpc
->srpc_scd
;
1587 LASSERT(rpcev
== &srpc
->srpc_ev
);
1589 spin_lock(&scd
->scd_lock
);
1591 rpcev
->ev_fired
= 1;
1592 rpcev
->ev_status
= (ev
->type
== LNET_EVENT_UNLINK
) ?
1593 -EINTR
: ev
->status
;
1594 swi_schedule_workitem(&srpc
->srpc_wi
);
1596 spin_unlock(&scd
->scd_lock
);
1606 memset(&srpc_data
, 0, sizeof(struct smoketest_rpc
));
1607 spin_lock_init(&srpc_data
.rpc_glock
);
1609 /* 1 second pause to avoid timestamp reuse */
1610 set_current_state(TASK_UNINTERRUPTIBLE
);
1611 schedule_timeout(cfs_time_seconds(1));
1612 srpc_data
.rpc_matchbits
= ((__u64
)ktime_get_real_seconds()) << 48;
1614 srpc_data
.rpc_state
= SRPC_STATE_NONE
;
1616 rc
= LNetNIInit(LNET_PID_LUSTRE
);
1618 CERROR("LNetNIInit() has failed: %d\n", rc
);
1622 srpc_data
.rpc_state
= SRPC_STATE_NI_INIT
;
1624 LNetInvalidateHandle(&srpc_data
.rpc_lnet_eq
);
1625 rc
= LNetEQAlloc(0, srpc_lnet_ev_handler
, &srpc_data
.rpc_lnet_eq
);
1627 CERROR("LNetEQAlloc() has failed: %d\n", rc
);
1631 rc
= LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL
);
1633 rc
= LNetSetLazyPortal(SRPC_REQUEST_PORTAL
);
1636 srpc_data
.rpc_state
= SRPC_STATE_EQ_INIT
;
1644 srpc_data
.rpc_state
= SRPC_STATE_RUNNING
;
1656 state
= srpc_data
.rpc_state
;
1657 srpc_data
.rpc_state
= SRPC_STATE_STOPPING
;
1662 case SRPC_STATE_RUNNING
:
1663 spin_lock(&srpc_data
.rpc_glock
);
1665 for (i
= 0; i
<= SRPC_SERVICE_MAX_ID
; i
++) {
1666 srpc_service_t
*sv
= srpc_data
.rpc_services
[i
];
1668 LASSERTF(!sv
, "service not empty: id %d, name %s\n",
1672 spin_unlock(&srpc_data
.rpc_glock
);
1676 case SRPC_STATE_EQ_INIT
:
1677 rc
= LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL
);
1678 rc
= LNetClearLazyPortal(SRPC_REQUEST_PORTAL
);
1680 rc
= LNetEQFree(srpc_data
.rpc_lnet_eq
);
1681 LASSERT(!rc
); /* the EQ should have no user by now */
1683 case SRPC_STATE_NI_INIT
: