1 // SPDX-License-Identifier: GPL-2.0
5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2 only,
9 * as published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License version 2 for more details (a copy is included
15 * in the LICENSE file that accompanied this code).
17 * You should have received a copy of the GNU General Public License
18 * version 2 along with this program; If not, see
19 * http://www.gnu.org/licenses/gpl-2.0.html
24 * Copyright (c) 2014, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Seagate, Inc.
30 * lnet/lnet/net_fault.c
32 * Lustre network fault simulation
34 * Author: liang.zhen@intel.com
37 #define DEBUG_SUBSYSTEM S_LNET
39 #include <linux/lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule
{
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link
;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr
;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at
;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 unsigned long dr_drop_time
;
61 /** baseline to caculate dr_drop_time */
62 unsigned long dr_time_base
;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat
;
68 lnet_fault_nid_match(lnet_nid_t nid
, lnet_nid_t msg_nid
)
70 if (nid
== msg_nid
|| nid
== LNET_NID_ANY
)
73 if (LNET_NIDNET(nid
) != LNET_NIDNET(msg_nid
))
76 /* 255.255.255.255@net is wildcard for all addresses in a network */
77 return LNET_NIDADDR(nid
) == LNET_NIDADDR(LNET_NID_ANY
);
81 lnet_fault_attr_match(struct lnet_fault_attr
*attr
, lnet_nid_t src
,
82 lnet_nid_t dst
, unsigned int type
, unsigned int portal
)
84 if (!lnet_fault_nid_match(attr
->fa_src
, src
) ||
85 !lnet_fault_nid_match(attr
->fa_dst
, dst
))
88 if (!(attr
->fa_msg_mask
& (1 << type
)))
92 * NB: ACK and REPLY have no portal, but they should have been
93 * rejected by message mask
95 if (attr
->fa_ptl_mask
&& /* has portal filter */
96 !(attr
->fa_ptl_mask
& (1ULL << portal
)))
103 lnet_fault_attr_validate(struct lnet_fault_attr
*attr
)
105 if (!attr
->fa_msg_mask
)
106 attr
->fa_msg_mask
= LNET_MSG_MASK
; /* all message types */
108 if (!attr
->fa_ptl_mask
) /* no portal filter */
111 /* NB: only PUT and GET can be filtered if portal filter has been set */
112 attr
->fa_msg_mask
&= LNET_GET_BIT
| LNET_PUT_BIT
;
113 if (!attr
->fa_msg_mask
) {
114 CDEBUG(D_NET
, "can't find valid message type bits %x\n",
122 lnet_fault_stat_inc(struct lnet_fault_stat
*stat
, unsigned int type
)
124 /* NB: fs_counter is NOT updated by this function */
142 * LNet message drop simulation
146 * Add a new drop rule to LNet
147 * There is no check for duplicated drop rule, all rules will be checked for
151 lnet_drop_rule_add(struct lnet_fault_attr
*attr
)
153 struct lnet_drop_rule
*rule
;
155 if (attr
->u
.drop
.da_rate
& attr
->u
.drop
.da_interval
) {
156 CDEBUG(D_NET
, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
157 attr
->u
.drop
.da_rate
, attr
->u
.drop
.da_interval
);
161 if (lnet_fault_attr_validate(attr
))
168 spin_lock_init(&rule
->dr_lock
);
170 rule
->dr_attr
= *attr
;
171 if (attr
->u
.drop
.da_interval
) {
172 rule
->dr_time_base
= cfs_time_shift(attr
->u
.drop
.da_interval
);
173 rule
->dr_drop_time
= cfs_time_shift(cfs_rand() %
174 attr
->u
.drop
.da_interval
);
176 rule
->dr_drop_at
= cfs_rand() % attr
->u
.drop
.da_rate
;
179 lnet_net_lock(LNET_LOCK_EX
);
180 list_add(&rule
->dr_link
, &the_lnet
.ln_drop_rules
);
181 lnet_net_unlock(LNET_LOCK_EX
);
183 CDEBUG(D_NET
, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
184 libcfs_nid2str(attr
->fa_src
), libcfs_nid2str(attr
->fa_src
),
185 attr
->u
.drop
.da_rate
, attr
->u
.drop
.da_interval
);
190 * Remove matched drop rules from lnet, all rules that can match \a src and
191 * \a dst will be removed.
192 * If \a src is zero, then all rules have \a dst as destination will be remove
193 * If \a dst is zero, then all rules have \a src as source will be removed
194 * If both of them are zero, all rules will be removed
197 lnet_drop_rule_del(lnet_nid_t src
, lnet_nid_t dst
)
199 struct lnet_drop_rule
*rule
;
200 struct lnet_drop_rule
*tmp
;
201 struct list_head zombies
;
204 INIT_LIST_HEAD(&zombies
);
206 lnet_net_lock(LNET_LOCK_EX
);
207 list_for_each_entry_safe(rule
, tmp
, &the_lnet
.ln_drop_rules
, dr_link
) {
208 if (rule
->dr_attr
.fa_src
!= src
&& src
)
211 if (rule
->dr_attr
.fa_dst
!= dst
&& dst
)
214 list_move(&rule
->dr_link
, &zombies
);
216 lnet_net_unlock(LNET_LOCK_EX
);
218 list_for_each_entry_safe(rule
, tmp
, &zombies
, dr_link
) {
219 CDEBUG(D_NET
, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
220 libcfs_nid2str(rule
->dr_attr
.fa_src
),
221 libcfs_nid2str(rule
->dr_attr
.fa_dst
),
222 rule
->dr_attr
.u
.drop
.da_rate
,
223 rule
->dr_attr
.u
.drop
.da_interval
);
225 list_del(&rule
->dr_link
);
234 * List drop rule at position of \a pos
237 lnet_drop_rule_list(int pos
, struct lnet_fault_attr
*attr
,
238 struct lnet_fault_stat
*stat
)
240 struct lnet_drop_rule
*rule
;
245 cpt
= lnet_net_lock_current();
246 list_for_each_entry(rule
, &the_lnet
.ln_drop_rules
, dr_link
) {
250 spin_lock(&rule
->dr_lock
);
251 *attr
= rule
->dr_attr
;
252 *stat
= rule
->dr_stat
;
253 spin_unlock(&rule
->dr_lock
);
258 lnet_net_unlock(cpt
);
263 * reset counters for all drop rules
266 lnet_drop_rule_reset(void)
268 struct lnet_drop_rule
*rule
;
271 cpt
= lnet_net_lock_current();
273 list_for_each_entry(rule
, &the_lnet
.ln_drop_rules
, dr_link
) {
274 struct lnet_fault_attr
*attr
= &rule
->dr_attr
;
276 spin_lock(&rule
->dr_lock
);
278 memset(&rule
->dr_stat
, 0, sizeof(rule
->dr_stat
));
279 if (attr
->u
.drop
.da_rate
) {
280 rule
->dr_drop_at
= cfs_rand() % attr
->u
.drop
.da_rate
;
282 rule
->dr_drop_time
= cfs_time_shift(cfs_rand() %
283 attr
->u
.drop
.da_interval
);
284 rule
->dr_time_base
= cfs_time_shift(attr
->u
.drop
.da_interval
);
286 spin_unlock(&rule
->dr_lock
);
289 lnet_net_unlock(cpt
);
293 * check source/destination NID, portal, message type and drop rate,
294 * decide whether should drop this message or not
297 drop_rule_match(struct lnet_drop_rule
*rule
, lnet_nid_t src
,
298 lnet_nid_t dst
, unsigned int type
, unsigned int portal
)
300 struct lnet_fault_attr
*attr
= &rule
->dr_attr
;
303 if (!lnet_fault_attr_match(attr
, src
, dst
, type
, portal
))
306 /* match this rule, check drop rate now */
307 spin_lock(&rule
->dr_lock
);
308 if (rule
->dr_drop_time
) { /* time based drop */
309 unsigned long now
= cfs_time_current();
311 rule
->dr_stat
.fs_count
++;
312 drop
= cfs_time_aftereq(now
, rule
->dr_drop_time
);
314 if (cfs_time_after(now
, rule
->dr_time_base
))
315 rule
->dr_time_base
= now
;
317 rule
->dr_drop_time
= rule
->dr_time_base
+
318 cfs_time_seconds(cfs_rand() %
319 attr
->u
.drop
.da_interval
);
320 rule
->dr_time_base
+= cfs_time_seconds(attr
->u
.drop
.da_interval
);
322 CDEBUG(D_NET
, "Drop Rule %s->%s: next drop : %lu\n",
323 libcfs_nid2str(attr
->fa_src
),
324 libcfs_nid2str(attr
->fa_dst
),
328 } else { /* rate based drop */
329 drop
= rule
->dr_stat
.fs_count
++ == rule
->dr_drop_at
;
331 if (!do_div(rule
->dr_stat
.fs_count
, attr
->u
.drop
.da_rate
)) {
332 rule
->dr_drop_at
= rule
->dr_stat
.fs_count
+
333 cfs_rand() % attr
->u
.drop
.da_rate
;
334 CDEBUG(D_NET
, "Drop Rule %s->%s: next drop: %lu\n",
335 libcfs_nid2str(attr
->fa_src
),
336 libcfs_nid2str(attr
->fa_dst
), rule
->dr_drop_at
);
340 if (drop
) { /* drop this message, update counters */
341 lnet_fault_stat_inc(&rule
->dr_stat
, type
);
342 rule
->dr_stat
.u
.drop
.ds_dropped
++;
345 spin_unlock(&rule
->dr_lock
);
350 * Check if message from \a src to \a dst can match any existed drop rule
353 lnet_drop_rule_match(struct lnet_hdr
*hdr
)
355 struct lnet_drop_rule
*rule
;
356 lnet_nid_t src
= le64_to_cpu(hdr
->src_nid
);
357 lnet_nid_t dst
= le64_to_cpu(hdr
->dest_nid
);
358 unsigned int typ
= le32_to_cpu(hdr
->type
);
359 unsigned int ptl
= -1;
364 * NB: if Portal is specified, then only PUT and GET will be
365 * filtered by drop rule
367 if (typ
== LNET_MSG_PUT
)
368 ptl
= le32_to_cpu(hdr
->msg
.put
.ptl_index
);
369 else if (typ
== LNET_MSG_GET
)
370 ptl
= le32_to_cpu(hdr
->msg
.get
.ptl_index
);
372 cpt
= lnet_net_lock_current();
373 list_for_each_entry(rule
, &the_lnet
.ln_drop_rules
, dr_link
) {
374 drop
= drop_rule_match(rule
, src
, dst
, typ
, ptl
);
379 lnet_net_unlock(cpt
);
384 * LNet Delay Simulation
386 /** timestamp (second) to send delayed message */
387 #define msg_delay_send msg_ev.hdr_data
389 struct lnet_delay_rule
{
390 /** link chain on the_lnet.ln_delay_rules */
391 struct list_head dl_link
;
392 /** link chain on delay_dd.dd_sched_rules */
393 struct list_head dl_sched_link
;
394 /** attributes of this rule */
395 struct lnet_fault_attr dl_attr
;
396 /** lock to protect \a below members */
398 /** refcount of delay rule */
399 atomic_t dl_refcount
;
401 * the message sequence to delay, which means message is delayed when
402 * dl_stat.fs_count == dl_delay_at
404 unsigned long dl_delay_at
;
406 * seconds to delay the next message, it's exclusive with dl_delay_at
408 unsigned long dl_delay_time
;
409 /** baseline to caculate dl_delay_time */
410 unsigned long dl_time_base
;
411 /** jiffies to send the next delayed message */
412 unsigned long dl_msg_send
;
413 /** delayed message list */
414 struct list_head dl_msg_list
;
415 /** statistic of delayed messages */
416 struct lnet_fault_stat dl_stat
;
417 /** timer to wakeup delay_daemon */
418 struct timer_list dl_timer
;
421 struct delay_daemon_data
{
422 /** serialise rule add/remove */
423 struct mutex dd_mutex
;
424 /** protect rules on \a dd_sched_rules */
426 /** scheduled delay rules (by timer) */
427 struct list_head dd_sched_rules
;
428 /** daemon thread sleeps at here */
429 wait_queue_head_t dd_waitq
;
430 /** controller (lctl command) wait at here */
431 wait_queue_head_t dd_ctl_waitq
;
432 /** daemon is running */
433 unsigned int dd_running
;
434 /** daemon stopped */
435 unsigned int dd_stopped
;
438 static struct delay_daemon_data delay_dd
;
441 round_timeout(unsigned long timeout
)
443 return cfs_time_seconds((unsigned int)
444 cfs_duration_sec(cfs_time_sub(timeout
, 0)) + 1);
448 delay_rule_decref(struct lnet_delay_rule
*rule
)
450 if (atomic_dec_and_test(&rule
->dl_refcount
)) {
451 LASSERT(list_empty(&rule
->dl_sched_link
));
452 LASSERT(list_empty(&rule
->dl_msg_list
));
453 LASSERT(list_empty(&rule
->dl_link
));
460 * check source/destination NID, portal, message type and delay rate,
461 * decide whether should delay this message or not
464 delay_rule_match(struct lnet_delay_rule
*rule
, lnet_nid_t src
,
465 lnet_nid_t dst
, unsigned int type
, unsigned int portal
,
466 struct lnet_msg
*msg
)
468 struct lnet_fault_attr
*attr
= &rule
->dl_attr
;
471 if (!lnet_fault_attr_match(attr
, src
, dst
, type
, portal
))
474 /* match this rule, check delay rate now */
475 spin_lock(&rule
->dl_lock
);
476 if (rule
->dl_delay_time
) { /* time based delay */
477 unsigned long now
= cfs_time_current();
479 rule
->dl_stat
.fs_count
++;
480 delay
= cfs_time_aftereq(now
, rule
->dl_delay_time
);
482 if (cfs_time_after(now
, rule
->dl_time_base
))
483 rule
->dl_time_base
= now
;
485 rule
->dl_delay_time
= rule
->dl_time_base
+
486 cfs_time_seconds(cfs_rand() %
487 attr
->u
.delay
.la_interval
);
488 rule
->dl_time_base
+= cfs_time_seconds(attr
->u
.delay
.la_interval
);
490 CDEBUG(D_NET
, "Delay Rule %s->%s: next delay : %lu\n",
491 libcfs_nid2str(attr
->fa_src
),
492 libcfs_nid2str(attr
->fa_dst
),
493 rule
->dl_delay_time
);
496 } else { /* rate based delay */
497 delay
= rule
->dl_stat
.fs_count
++ == rule
->dl_delay_at
;
498 /* generate the next random rate sequence */
499 if (!do_div(rule
->dl_stat
.fs_count
, attr
->u
.delay
.la_rate
)) {
500 rule
->dl_delay_at
= rule
->dl_stat
.fs_count
+
501 cfs_rand() % attr
->u
.delay
.la_rate
;
502 CDEBUG(D_NET
, "Delay Rule %s->%s: next delay: %lu\n",
503 libcfs_nid2str(attr
->fa_src
),
504 libcfs_nid2str(attr
->fa_dst
), rule
->dl_delay_at
);
509 spin_unlock(&rule
->dl_lock
);
513 /* delay this message, update counters */
514 lnet_fault_stat_inc(&rule
->dl_stat
, type
);
515 rule
->dl_stat
.u
.delay
.ls_delayed
++;
517 list_add_tail(&msg
->msg_list
, &rule
->dl_msg_list
);
518 msg
->msg_delay_send
= round_timeout(
519 cfs_time_shift(attr
->u
.delay
.la_latency
));
520 if (rule
->dl_msg_send
== -1) {
521 rule
->dl_msg_send
= msg
->msg_delay_send
;
522 mod_timer(&rule
->dl_timer
, rule
->dl_msg_send
);
525 spin_unlock(&rule
->dl_lock
);
530 * check if \a msg can match any Delay Rule, receiving of this message
531 * will be delayed if there is a match.
534 lnet_delay_rule_match_locked(struct lnet_hdr
*hdr
, struct lnet_msg
*msg
)
536 struct lnet_delay_rule
*rule
;
537 lnet_nid_t src
= le64_to_cpu(hdr
->src_nid
);
538 lnet_nid_t dst
= le64_to_cpu(hdr
->dest_nid
);
539 unsigned int typ
= le32_to_cpu(hdr
->type
);
540 unsigned int ptl
= -1;
542 /* NB: called with hold of lnet_net_lock */
545 * NB: if Portal is specified, then only PUT and GET will be
546 * filtered by delay rule
548 if (typ
== LNET_MSG_PUT
)
549 ptl
= le32_to_cpu(hdr
->msg
.put
.ptl_index
);
550 else if (typ
== LNET_MSG_GET
)
551 ptl
= le32_to_cpu(hdr
->msg
.get
.ptl_index
);
553 list_for_each_entry(rule
, &the_lnet
.ln_delay_rules
, dl_link
) {
554 if (delay_rule_match(rule
, src
, dst
, typ
, ptl
, msg
))
561 /** check out delayed messages for send */
563 delayed_msg_check(struct lnet_delay_rule
*rule
, bool all
,
564 struct list_head
*msg_list
)
566 struct lnet_msg
*msg
;
567 struct lnet_msg
*tmp
;
568 unsigned long now
= cfs_time_current();
570 if (!all
&& rule
->dl_msg_send
> now
)
573 spin_lock(&rule
->dl_lock
);
574 list_for_each_entry_safe(msg
, tmp
, &rule
->dl_msg_list
, msg_list
) {
575 if (!all
&& msg
->msg_delay_send
> now
)
578 msg
->msg_delay_send
= 0;
579 list_move_tail(&msg
->msg_list
, msg_list
);
582 if (list_empty(&rule
->dl_msg_list
)) {
583 del_timer(&rule
->dl_timer
);
584 rule
->dl_msg_send
= -1;
586 } else if (!list_empty(msg_list
)) {
588 * dequeued some timedout messages, update timer for the
589 * next delayed message on rule
591 msg
= list_entry(rule
->dl_msg_list
.next
,
592 struct lnet_msg
, msg_list
);
593 rule
->dl_msg_send
= msg
->msg_delay_send
;
594 mod_timer(&rule
->dl_timer
, rule
->dl_msg_send
);
596 spin_unlock(&rule
->dl_lock
);
600 delayed_msg_process(struct list_head
*msg_list
, bool drop
)
602 struct lnet_msg
*msg
;
604 while (!list_empty(msg_list
)) {
609 msg
= list_entry(msg_list
->next
, struct lnet_msg
, msg_list
);
610 LASSERT(msg
->msg_rxpeer
);
612 ni
= msg
->msg_rxpeer
->lp_ni
;
613 cpt
= msg
->msg_rx_cpt
;
615 list_del_init(&msg
->msg_list
);
619 } else if (!msg
->msg_routing
) {
620 rc
= lnet_parse_local(ni
, msg
);
626 rc
= lnet_parse_forward_locked(ni
, msg
);
627 lnet_net_unlock(cpt
);
631 lnet_ni_recv(ni
, msg
->msg_private
, msg
, 0,
632 0, msg
->msg_len
, msg
->msg_len
);
634 case LNET_CREDIT_WAIT
:
636 default: /* failures */
641 lnet_drop_message(ni
, cpt
, msg
->msg_private
, msg
->msg_len
);
642 lnet_finalize(ni
, msg
, rc
);
647 * Process delayed messages for scheduled rules
648 * This function can either be called by delay_rule_daemon, or by lnet_finalise
651 lnet_delay_rule_check(void)
653 struct lnet_delay_rule
*rule
;
654 struct list_head msgs
;
656 INIT_LIST_HEAD(&msgs
);
658 if (list_empty(&delay_dd
.dd_sched_rules
))
661 spin_lock_bh(&delay_dd
.dd_lock
);
662 if (list_empty(&delay_dd
.dd_sched_rules
)) {
663 spin_unlock_bh(&delay_dd
.dd_lock
);
667 rule
= list_entry(delay_dd
.dd_sched_rules
.next
,
668 struct lnet_delay_rule
, dl_sched_link
);
669 list_del_init(&rule
->dl_sched_link
);
670 spin_unlock_bh(&delay_dd
.dd_lock
);
672 delayed_msg_check(rule
, false, &msgs
);
673 delay_rule_decref(rule
); /* -1 for delay_dd.dd_sched_rules */
676 if (!list_empty(&msgs
))
677 delayed_msg_process(&msgs
, false);
680 /** daemon thread to handle delayed messages */
682 lnet_delay_rule_daemon(void *arg
)
684 delay_dd
.dd_running
= 1;
685 wake_up(&delay_dd
.dd_ctl_waitq
);
687 while (delay_dd
.dd_running
) {
688 wait_event_interruptible(delay_dd
.dd_waitq
,
689 !delay_dd
.dd_running
||
690 !list_empty(&delay_dd
.dd_sched_rules
));
691 lnet_delay_rule_check();
694 /* in case more rules have been enqueued after my last check */
695 lnet_delay_rule_check();
696 delay_dd
.dd_stopped
= 1;
697 wake_up(&delay_dd
.dd_ctl_waitq
);
703 delay_timer_cb(struct timer_list
*t
)
705 struct lnet_delay_rule
*rule
= from_timer(rule
, t
, dl_timer
);
707 spin_lock_bh(&delay_dd
.dd_lock
);
708 if (list_empty(&rule
->dl_sched_link
) && delay_dd
.dd_running
) {
709 atomic_inc(&rule
->dl_refcount
);
710 list_add_tail(&rule
->dl_sched_link
, &delay_dd
.dd_sched_rules
);
711 wake_up(&delay_dd
.dd_waitq
);
713 spin_unlock_bh(&delay_dd
.dd_lock
);
717 * Add a new delay rule to LNet
718 * There is no check for duplicated delay rule, all rules will be checked for
722 lnet_delay_rule_add(struct lnet_fault_attr
*attr
)
724 struct lnet_delay_rule
*rule
;
727 if (attr
->u
.delay
.la_rate
& attr
->u
.delay
.la_interval
) {
728 CDEBUG(D_NET
, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
729 attr
->u
.delay
.la_rate
, attr
->u
.delay
.la_interval
);
733 if (!attr
->u
.delay
.la_latency
) {
734 CDEBUG(D_NET
, "delay latency cannot be zero\n");
738 if (lnet_fault_attr_validate(attr
))
745 mutex_lock(&delay_dd
.dd_mutex
);
746 if (!delay_dd
.dd_running
) {
747 struct task_struct
*task
;
750 * NB: although LND threads will process delayed message
751 * in lnet_finalize, but there is no guarantee that LND
752 * threads will be waken up if no other message needs to
754 * Only one daemon thread, performance is not the concern
755 * of this simualation module.
757 task
= kthread_run(lnet_delay_rule_daemon
, NULL
, "lnet_dd");
762 wait_event(delay_dd
.dd_ctl_waitq
, delay_dd
.dd_running
);
765 timer_setup(&rule
->dl_timer
, delay_timer_cb
, 0);
767 spin_lock_init(&rule
->dl_lock
);
768 INIT_LIST_HEAD(&rule
->dl_msg_list
);
769 INIT_LIST_HEAD(&rule
->dl_sched_link
);
771 rule
->dl_attr
= *attr
;
772 if (attr
->u
.delay
.la_interval
) {
773 rule
->dl_time_base
= cfs_time_shift(attr
->u
.delay
.la_interval
);
774 rule
->dl_delay_time
= cfs_time_shift(cfs_rand() %
775 attr
->u
.delay
.la_interval
);
777 rule
->dl_delay_at
= cfs_rand() % attr
->u
.delay
.la_rate
;
780 rule
->dl_msg_send
= -1;
782 lnet_net_lock(LNET_LOCK_EX
);
783 atomic_set(&rule
->dl_refcount
, 1);
784 list_add(&rule
->dl_link
, &the_lnet
.ln_delay_rules
);
785 lnet_net_unlock(LNET_LOCK_EX
);
787 CDEBUG(D_NET
, "Added delay rule: src %s, dst %s, rate %d\n",
788 libcfs_nid2str(attr
->fa_src
), libcfs_nid2str(attr
->fa_src
),
789 attr
->u
.delay
.la_rate
);
791 mutex_unlock(&delay_dd
.dd_mutex
);
794 mutex_unlock(&delay_dd
.dd_mutex
);
800 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
801 * and \a dst are zero, all rules will be removed, otherwise only matched rules
803 * If \a src is zero, then all rules have \a dst as destination will be remove
804 * If \a dst is zero, then all rules have \a src as source will be removed
806 * When a delay rule is removed, all delayed messages of this rule will be
807 * processed immediately.
810 lnet_delay_rule_del(lnet_nid_t src
, lnet_nid_t dst
, bool shutdown
)
812 struct lnet_delay_rule
*rule
;
813 struct lnet_delay_rule
*tmp
;
814 struct list_head rule_list
;
815 struct list_head msg_list
;
819 INIT_LIST_HEAD(&rule_list
);
820 INIT_LIST_HEAD(&msg_list
);
827 mutex_lock(&delay_dd
.dd_mutex
);
828 lnet_net_lock(LNET_LOCK_EX
);
830 list_for_each_entry_safe(rule
, tmp
, &the_lnet
.ln_delay_rules
, dl_link
) {
831 if (rule
->dl_attr
.fa_src
!= src
&& src
)
834 if (rule
->dl_attr
.fa_dst
!= dst
&& dst
)
837 CDEBUG(D_NET
, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
838 libcfs_nid2str(rule
->dl_attr
.fa_src
),
839 libcfs_nid2str(rule
->dl_attr
.fa_dst
),
840 rule
->dl_attr
.u
.delay
.la_rate
,
841 rule
->dl_attr
.u
.delay
.la_interval
);
842 /* refcount is taken over by rule_list */
843 list_move(&rule
->dl_link
, &rule_list
);
846 /* check if we need to shutdown delay_daemon */
847 cleanup
= list_empty(&the_lnet
.ln_delay_rules
) &&
848 !list_empty(&rule_list
);
849 lnet_net_unlock(LNET_LOCK_EX
);
851 list_for_each_entry_safe(rule
, tmp
, &rule_list
, dl_link
) {
852 list_del_init(&rule
->dl_link
);
854 del_timer_sync(&rule
->dl_timer
);
855 delayed_msg_check(rule
, true, &msg_list
);
856 delay_rule_decref(rule
); /* -1 for the_lnet.ln_delay_rules */
860 if (cleanup
) { /* no more delay rule, shutdown delay_daemon */
861 LASSERT(delay_dd
.dd_running
);
862 delay_dd
.dd_running
= 0;
863 wake_up(&delay_dd
.dd_waitq
);
865 while (!delay_dd
.dd_stopped
)
866 wait_event(delay_dd
.dd_ctl_waitq
, delay_dd
.dd_stopped
);
868 mutex_unlock(&delay_dd
.dd_mutex
);
870 if (!list_empty(&msg_list
))
871 delayed_msg_process(&msg_list
, shutdown
);
877 * List Delay Rule at position of \a pos
880 lnet_delay_rule_list(int pos
, struct lnet_fault_attr
*attr
,
881 struct lnet_fault_stat
*stat
)
883 struct lnet_delay_rule
*rule
;
888 cpt
= lnet_net_lock_current();
889 list_for_each_entry(rule
, &the_lnet
.ln_delay_rules
, dl_link
) {
893 spin_lock(&rule
->dl_lock
);
894 *attr
= rule
->dl_attr
;
895 *stat
= rule
->dl_stat
;
896 spin_unlock(&rule
->dl_lock
);
901 lnet_net_unlock(cpt
);
906 * reset counters for all Delay Rules
909 lnet_delay_rule_reset(void)
911 struct lnet_delay_rule
*rule
;
914 cpt
= lnet_net_lock_current();
916 list_for_each_entry(rule
, &the_lnet
.ln_delay_rules
, dl_link
) {
917 struct lnet_fault_attr
*attr
= &rule
->dl_attr
;
919 spin_lock(&rule
->dl_lock
);
921 memset(&rule
->dl_stat
, 0, sizeof(rule
->dl_stat
));
922 if (attr
->u
.delay
.la_rate
) {
923 rule
->dl_delay_at
= cfs_rand() % attr
->u
.delay
.la_rate
;
925 rule
->dl_delay_time
= cfs_time_shift(cfs_rand() %
926 attr
->u
.delay
.la_interval
);
927 rule
->dl_time_base
= cfs_time_shift(attr
->u
.delay
.la_interval
);
929 spin_unlock(&rule
->dl_lock
);
932 lnet_net_unlock(cpt
);
936 lnet_fault_ctl(int opc
, struct libcfs_ioctl_data
*data
)
938 struct lnet_fault_attr
*attr
;
939 struct lnet_fault_stat
*stat
;
941 attr
= (struct lnet_fault_attr
*)data
->ioc_inlbuf1
;
947 case LNET_CTL_DROP_ADD
:
951 return lnet_drop_rule_add(attr
);
953 case LNET_CTL_DROP_DEL
:
957 data
->ioc_count
= lnet_drop_rule_del(attr
->fa_src
,
961 case LNET_CTL_DROP_RESET
:
962 lnet_drop_rule_reset();
965 case LNET_CTL_DROP_LIST
:
966 stat
= (struct lnet_fault_stat
*)data
->ioc_inlbuf2
;
970 return lnet_drop_rule_list(data
->ioc_count
, attr
, stat
);
972 case LNET_CTL_DELAY_ADD
:
976 return lnet_delay_rule_add(attr
);
978 case LNET_CTL_DELAY_DEL
:
982 data
->ioc_count
= lnet_delay_rule_del(attr
->fa_src
,
983 attr
->fa_dst
, false);
986 case LNET_CTL_DELAY_RESET
:
987 lnet_delay_rule_reset();
990 case LNET_CTL_DELAY_LIST
:
991 stat
= (struct lnet_fault_stat
*)data
->ioc_inlbuf2
;
995 return lnet_delay_rule_list(data
->ioc_count
, attr
, stat
);
1000 lnet_fault_init(void)
1002 BUILD_BUG_ON(LNET_PUT_BIT
!= 1 << LNET_MSG_PUT
);
1003 BUILD_BUG_ON(LNET_ACK_BIT
!= 1 << LNET_MSG_ACK
);
1004 BUILD_BUG_ON(LNET_GET_BIT
!= 1 << LNET_MSG_GET
);
1005 BUILD_BUG_ON(LNET_REPLY_BIT
!= 1 << LNET_MSG_REPLY
);
1007 mutex_init(&delay_dd
.dd_mutex
);
1008 spin_lock_init(&delay_dd
.dd_lock
);
1009 init_waitqueue_head(&delay_dd
.dd_waitq
);
1010 init_waitqueue_head(&delay_dd
.dd_ctl_waitq
);
1011 INIT_LIST_HEAD(&delay_dd
.dd_sched_rules
);
1017 lnet_fault_fini(void)
1019 lnet_drop_rule_del(0, 0);
1020 lnet_delay_rule_del(0, 0, true);
1022 LASSERT(list_empty(&the_lnet
.ln_drop_rules
));
1023 LASSERT(list_empty(&the_lnet
.ln_delay_rules
));
1024 LASSERT(list_empty(&delay_dd
.dd_sched_rules
));