]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blob - drivers/staging/lustre/lnet/lnet/net_fault.c
Merge tag 'mmc-v4.15-2' of git://git.kernel.org/pub/scm/linux/kernel/git/ulfh/mmc
[mirror_ubuntu-bionic-kernel.git] / drivers / staging / lustre / lnet / lnet / net_fault.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * GPL HEADER START
4 *
5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2 only,
9 * as published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License version 2 for more details (a copy is included
15 * in the LICENSE file that accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License
18 * version 2 along with this program; If not, see
19 * http://www.gnu.org/licenses/gpl-2.0.html
20 *
21 * GPL HEADER END
22 */
23 /*
24 * Copyright (c) 2014, Intel Corporation.
25 */
26 /*
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Seagate, Inc.
29 *
30 * lnet/lnet/net_fault.c
31 *
32 * Lustre network fault simulation
33 *
34 * Author: liang.zhen@intel.com
35 */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <linux/lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
41
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
44
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
51 spinlock_t dr_lock;
52 /**
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
55 */
56 unsigned long dr_drop_at;
57 /**
58 * seconds to drop the next message, it's exclusive with dr_drop_at
59 */
60 unsigned long dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 unsigned long dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
65 };
66
67 static bool
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
69 {
70 if (nid == msg_nid || nid == LNET_NID_ANY)
71 return true;
72
73 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
74 return false;
75
76 /* 255.255.255.255@net is wildcard for all addresses in a network */
77 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
78 }
79
80 static bool
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82 lnet_nid_t dst, unsigned int type, unsigned int portal)
83 {
84 if (!lnet_fault_nid_match(attr->fa_src, src) ||
85 !lnet_fault_nid_match(attr->fa_dst, dst))
86 return false;
87
88 if (!(attr->fa_msg_mask & (1 << type)))
89 return false;
90
91 /**
92 * NB: ACK and REPLY have no portal, but they should have been
93 * rejected by message mask
94 */
95 if (attr->fa_ptl_mask && /* has portal filter */
96 !(attr->fa_ptl_mask & (1ULL << portal)))
97 return false;
98
99 return true;
100 }
101
102 static int
103 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
104 {
105 if (!attr->fa_msg_mask)
106 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
107
108 if (!attr->fa_ptl_mask) /* no portal filter */
109 return 0;
110
111 /* NB: only PUT and GET can be filtered if portal filter has been set */
112 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
113 if (!attr->fa_msg_mask) {
114 CDEBUG(D_NET, "can't find valid message type bits %x\n",
115 attr->fa_msg_mask);
116 return -EINVAL;
117 }
118 return 0;
119 }
120
121 static void
122 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
123 {
124 /* NB: fs_counter is NOT updated by this function */
125 switch (type) {
126 case LNET_MSG_PUT:
127 stat->fs_put++;
128 return;
129 case LNET_MSG_ACK:
130 stat->fs_ack++;
131 return;
132 case LNET_MSG_GET:
133 stat->fs_get++;
134 return;
135 case LNET_MSG_REPLY:
136 stat->fs_reply++;
137 return;
138 }
139 }
140
141 /**
142 * LNet message drop simulation
143 */
144
145 /**
146 * Add a new drop rule to LNet
147 * There is no check for duplicated drop rule, all rules will be checked for
148 * incoming message.
149 */
150 static int
151 lnet_drop_rule_add(struct lnet_fault_attr *attr)
152 {
153 struct lnet_drop_rule *rule;
154
155 if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
156 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
157 attr->u.drop.da_rate, attr->u.drop.da_interval);
158 return -EINVAL;
159 }
160
161 if (lnet_fault_attr_validate(attr))
162 return -EINVAL;
163
164 CFS_ALLOC_PTR(rule);
165 if (!rule)
166 return -ENOMEM;
167
168 spin_lock_init(&rule->dr_lock);
169
170 rule->dr_attr = *attr;
171 if (attr->u.drop.da_interval) {
172 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
173 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
174 attr->u.drop.da_interval);
175 } else {
176 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
177 }
178
179 lnet_net_lock(LNET_LOCK_EX);
180 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
181 lnet_net_unlock(LNET_LOCK_EX);
182
183 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
184 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
185 attr->u.drop.da_rate, attr->u.drop.da_interval);
186 return 0;
187 }
188
189 /**
190 * Remove matched drop rules from lnet, all rules that can match \a src and
191 * \a dst will be removed.
192 * If \a src is zero, then all rules have \a dst as destination will be remove
193 * If \a dst is zero, then all rules have \a src as source will be removed
194 * If both of them are zero, all rules will be removed
195 */
196 static int
197 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
198 {
199 struct lnet_drop_rule *rule;
200 struct lnet_drop_rule *tmp;
201 struct list_head zombies;
202 int n = 0;
203
204 INIT_LIST_HEAD(&zombies);
205
206 lnet_net_lock(LNET_LOCK_EX);
207 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
208 if (rule->dr_attr.fa_src != src && src)
209 continue;
210
211 if (rule->dr_attr.fa_dst != dst && dst)
212 continue;
213
214 list_move(&rule->dr_link, &zombies);
215 }
216 lnet_net_unlock(LNET_LOCK_EX);
217
218 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
219 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
220 libcfs_nid2str(rule->dr_attr.fa_src),
221 libcfs_nid2str(rule->dr_attr.fa_dst),
222 rule->dr_attr.u.drop.da_rate,
223 rule->dr_attr.u.drop.da_interval);
224
225 list_del(&rule->dr_link);
226 CFS_FREE_PTR(rule);
227 n++;
228 }
229
230 return n;
231 }
232
233 /**
234 * List drop rule at position of \a pos
235 */
236 static int
237 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
238 struct lnet_fault_stat *stat)
239 {
240 struct lnet_drop_rule *rule;
241 int cpt;
242 int i = 0;
243 int rc = -ENOENT;
244
245 cpt = lnet_net_lock_current();
246 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
247 if (i++ < pos)
248 continue;
249
250 spin_lock(&rule->dr_lock);
251 *attr = rule->dr_attr;
252 *stat = rule->dr_stat;
253 spin_unlock(&rule->dr_lock);
254 rc = 0;
255 break;
256 }
257
258 lnet_net_unlock(cpt);
259 return rc;
260 }
261
262 /**
263 * reset counters for all drop rules
264 */
265 static void
266 lnet_drop_rule_reset(void)
267 {
268 struct lnet_drop_rule *rule;
269 int cpt;
270
271 cpt = lnet_net_lock_current();
272
273 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
274 struct lnet_fault_attr *attr = &rule->dr_attr;
275
276 spin_lock(&rule->dr_lock);
277
278 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
279 if (attr->u.drop.da_rate) {
280 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
281 } else {
282 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
283 attr->u.drop.da_interval);
284 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
285 }
286 spin_unlock(&rule->dr_lock);
287 }
288
289 lnet_net_unlock(cpt);
290 }
291
292 /**
293 * check source/destination NID, portal, message type and drop rate,
294 * decide whether should drop this message or not
295 */
296 static bool
297 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
298 lnet_nid_t dst, unsigned int type, unsigned int portal)
299 {
300 struct lnet_fault_attr *attr = &rule->dr_attr;
301 bool drop;
302
303 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
304 return false;
305
306 /* match this rule, check drop rate now */
307 spin_lock(&rule->dr_lock);
308 if (rule->dr_drop_time) { /* time based drop */
309 unsigned long now = cfs_time_current();
310
311 rule->dr_stat.fs_count++;
312 drop = cfs_time_aftereq(now, rule->dr_drop_time);
313 if (drop) {
314 if (cfs_time_after(now, rule->dr_time_base))
315 rule->dr_time_base = now;
316
317 rule->dr_drop_time = rule->dr_time_base +
318 cfs_time_seconds(cfs_rand() %
319 attr->u.drop.da_interval);
320 rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
321
322 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
323 libcfs_nid2str(attr->fa_src),
324 libcfs_nid2str(attr->fa_dst),
325 rule->dr_drop_time);
326 }
327
328 } else { /* rate based drop */
329 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
330
331 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
332 rule->dr_drop_at = rule->dr_stat.fs_count +
333 cfs_rand() % attr->u.drop.da_rate;
334 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
335 libcfs_nid2str(attr->fa_src),
336 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
337 }
338 }
339
340 if (drop) { /* drop this message, update counters */
341 lnet_fault_stat_inc(&rule->dr_stat, type);
342 rule->dr_stat.u.drop.ds_dropped++;
343 }
344
345 spin_unlock(&rule->dr_lock);
346 return drop;
347 }
348
349 /**
350 * Check if message from \a src to \a dst can match any existed drop rule
351 */
352 bool
353 lnet_drop_rule_match(struct lnet_hdr *hdr)
354 {
355 struct lnet_drop_rule *rule;
356 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
357 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
358 unsigned int typ = le32_to_cpu(hdr->type);
359 unsigned int ptl = -1;
360 bool drop = false;
361 int cpt;
362
363 /**
364 * NB: if Portal is specified, then only PUT and GET will be
365 * filtered by drop rule
366 */
367 if (typ == LNET_MSG_PUT)
368 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
369 else if (typ == LNET_MSG_GET)
370 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
371
372 cpt = lnet_net_lock_current();
373 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
374 drop = drop_rule_match(rule, src, dst, typ, ptl);
375 if (drop)
376 break;
377 }
378
379 lnet_net_unlock(cpt);
380 return drop;
381 }
382
383 /**
384 * LNet Delay Simulation
385 */
386 /** timestamp (second) to send delayed message */
387 #define msg_delay_send msg_ev.hdr_data
388
389 struct lnet_delay_rule {
390 /** link chain on the_lnet.ln_delay_rules */
391 struct list_head dl_link;
392 /** link chain on delay_dd.dd_sched_rules */
393 struct list_head dl_sched_link;
394 /** attributes of this rule */
395 struct lnet_fault_attr dl_attr;
396 /** lock to protect \a below members */
397 spinlock_t dl_lock;
398 /** refcount of delay rule */
399 atomic_t dl_refcount;
400 /**
401 * the message sequence to delay, which means message is delayed when
402 * dl_stat.fs_count == dl_delay_at
403 */
404 unsigned long dl_delay_at;
405 /**
406 * seconds to delay the next message, it's exclusive with dl_delay_at
407 */
408 unsigned long dl_delay_time;
409 /** baseline to caculate dl_delay_time */
410 unsigned long dl_time_base;
411 /** jiffies to send the next delayed message */
412 unsigned long dl_msg_send;
413 /** delayed message list */
414 struct list_head dl_msg_list;
415 /** statistic of delayed messages */
416 struct lnet_fault_stat dl_stat;
417 /** timer to wakeup delay_daemon */
418 struct timer_list dl_timer;
419 };
420
421 struct delay_daemon_data {
422 /** serialise rule add/remove */
423 struct mutex dd_mutex;
424 /** protect rules on \a dd_sched_rules */
425 spinlock_t dd_lock;
426 /** scheduled delay rules (by timer) */
427 struct list_head dd_sched_rules;
428 /** daemon thread sleeps at here */
429 wait_queue_head_t dd_waitq;
430 /** controller (lctl command) wait at here */
431 wait_queue_head_t dd_ctl_waitq;
432 /** daemon is running */
433 unsigned int dd_running;
434 /** daemon stopped */
435 unsigned int dd_stopped;
436 };
437
438 static struct delay_daemon_data delay_dd;
439
440 static unsigned long
441 round_timeout(unsigned long timeout)
442 {
443 return cfs_time_seconds((unsigned int)
444 cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
445 }
446
447 static void
448 delay_rule_decref(struct lnet_delay_rule *rule)
449 {
450 if (atomic_dec_and_test(&rule->dl_refcount)) {
451 LASSERT(list_empty(&rule->dl_sched_link));
452 LASSERT(list_empty(&rule->dl_msg_list));
453 LASSERT(list_empty(&rule->dl_link));
454
455 CFS_FREE_PTR(rule);
456 }
457 }
458
459 /**
460 * check source/destination NID, portal, message type and delay rate,
461 * decide whether should delay this message or not
462 */
463 static bool
464 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
465 lnet_nid_t dst, unsigned int type, unsigned int portal,
466 struct lnet_msg *msg)
467 {
468 struct lnet_fault_attr *attr = &rule->dl_attr;
469 bool delay;
470
471 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
472 return false;
473
474 /* match this rule, check delay rate now */
475 spin_lock(&rule->dl_lock);
476 if (rule->dl_delay_time) { /* time based delay */
477 unsigned long now = cfs_time_current();
478
479 rule->dl_stat.fs_count++;
480 delay = cfs_time_aftereq(now, rule->dl_delay_time);
481 if (delay) {
482 if (cfs_time_after(now, rule->dl_time_base))
483 rule->dl_time_base = now;
484
485 rule->dl_delay_time = rule->dl_time_base +
486 cfs_time_seconds(cfs_rand() %
487 attr->u.delay.la_interval);
488 rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
489
490 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
491 libcfs_nid2str(attr->fa_src),
492 libcfs_nid2str(attr->fa_dst),
493 rule->dl_delay_time);
494 }
495
496 } else { /* rate based delay */
497 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
498 /* generate the next random rate sequence */
499 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
500 rule->dl_delay_at = rule->dl_stat.fs_count +
501 cfs_rand() % attr->u.delay.la_rate;
502 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
503 libcfs_nid2str(attr->fa_src),
504 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
505 }
506 }
507
508 if (!delay) {
509 spin_unlock(&rule->dl_lock);
510 return false;
511 }
512
513 /* delay this message, update counters */
514 lnet_fault_stat_inc(&rule->dl_stat, type);
515 rule->dl_stat.u.delay.ls_delayed++;
516
517 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
518 msg->msg_delay_send = round_timeout(
519 cfs_time_shift(attr->u.delay.la_latency));
520 if (rule->dl_msg_send == -1) {
521 rule->dl_msg_send = msg->msg_delay_send;
522 mod_timer(&rule->dl_timer, rule->dl_msg_send);
523 }
524
525 spin_unlock(&rule->dl_lock);
526 return true;
527 }
528
529 /**
530 * check if \a msg can match any Delay Rule, receiving of this message
531 * will be delayed if there is a match.
532 */
533 bool
534 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
535 {
536 struct lnet_delay_rule *rule;
537 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
538 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
539 unsigned int typ = le32_to_cpu(hdr->type);
540 unsigned int ptl = -1;
541
542 /* NB: called with hold of lnet_net_lock */
543
544 /**
545 * NB: if Portal is specified, then only PUT and GET will be
546 * filtered by delay rule
547 */
548 if (typ == LNET_MSG_PUT)
549 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
550 else if (typ == LNET_MSG_GET)
551 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
552
553 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
554 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
555 return true;
556 }
557
558 return false;
559 }
560
561 /** check out delayed messages for send */
562 static void
563 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
564 struct list_head *msg_list)
565 {
566 struct lnet_msg *msg;
567 struct lnet_msg *tmp;
568 unsigned long now = cfs_time_current();
569
570 if (!all && rule->dl_msg_send > now)
571 return;
572
573 spin_lock(&rule->dl_lock);
574 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
575 if (!all && msg->msg_delay_send > now)
576 break;
577
578 msg->msg_delay_send = 0;
579 list_move_tail(&msg->msg_list, msg_list);
580 }
581
582 if (list_empty(&rule->dl_msg_list)) {
583 del_timer(&rule->dl_timer);
584 rule->dl_msg_send = -1;
585
586 } else if (!list_empty(msg_list)) {
587 /*
588 * dequeued some timedout messages, update timer for the
589 * next delayed message on rule
590 */
591 msg = list_entry(rule->dl_msg_list.next,
592 struct lnet_msg, msg_list);
593 rule->dl_msg_send = msg->msg_delay_send;
594 mod_timer(&rule->dl_timer, rule->dl_msg_send);
595 }
596 spin_unlock(&rule->dl_lock);
597 }
598
599 static void
600 delayed_msg_process(struct list_head *msg_list, bool drop)
601 {
602 struct lnet_msg *msg;
603
604 while (!list_empty(msg_list)) {
605 struct lnet_ni *ni;
606 int cpt;
607 int rc;
608
609 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
610 LASSERT(msg->msg_rxpeer);
611
612 ni = msg->msg_rxpeer->lp_ni;
613 cpt = msg->msg_rx_cpt;
614
615 list_del_init(&msg->msg_list);
616 if (drop) {
617 rc = -ECANCELED;
618
619 } else if (!msg->msg_routing) {
620 rc = lnet_parse_local(ni, msg);
621 if (!rc)
622 continue;
623
624 } else {
625 lnet_net_lock(cpt);
626 rc = lnet_parse_forward_locked(ni, msg);
627 lnet_net_unlock(cpt);
628
629 switch (rc) {
630 case LNET_CREDIT_OK:
631 lnet_ni_recv(ni, msg->msg_private, msg, 0,
632 0, msg->msg_len, msg->msg_len);
633 /* fall through */
634 case LNET_CREDIT_WAIT:
635 continue;
636 default: /* failures */
637 break;
638 }
639 }
640
641 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
642 lnet_finalize(ni, msg, rc);
643 }
644 }
645
646 /**
647 * Process delayed messages for scheduled rules
648 * This function can either be called by delay_rule_daemon, or by lnet_finalise
649 */
650 void
651 lnet_delay_rule_check(void)
652 {
653 struct lnet_delay_rule *rule;
654 struct list_head msgs;
655
656 INIT_LIST_HEAD(&msgs);
657 while (1) {
658 if (list_empty(&delay_dd.dd_sched_rules))
659 break;
660
661 spin_lock_bh(&delay_dd.dd_lock);
662 if (list_empty(&delay_dd.dd_sched_rules)) {
663 spin_unlock_bh(&delay_dd.dd_lock);
664 break;
665 }
666
667 rule = list_entry(delay_dd.dd_sched_rules.next,
668 struct lnet_delay_rule, dl_sched_link);
669 list_del_init(&rule->dl_sched_link);
670 spin_unlock_bh(&delay_dd.dd_lock);
671
672 delayed_msg_check(rule, false, &msgs);
673 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
674 }
675
676 if (!list_empty(&msgs))
677 delayed_msg_process(&msgs, false);
678 }
679
680 /** daemon thread to handle delayed messages */
681 static int
682 lnet_delay_rule_daemon(void *arg)
683 {
684 delay_dd.dd_running = 1;
685 wake_up(&delay_dd.dd_ctl_waitq);
686
687 while (delay_dd.dd_running) {
688 wait_event_interruptible(delay_dd.dd_waitq,
689 !delay_dd.dd_running ||
690 !list_empty(&delay_dd.dd_sched_rules));
691 lnet_delay_rule_check();
692 }
693
694 /* in case more rules have been enqueued after my last check */
695 lnet_delay_rule_check();
696 delay_dd.dd_stopped = 1;
697 wake_up(&delay_dd.dd_ctl_waitq);
698
699 return 0;
700 }
701
702 static void
703 delay_timer_cb(struct timer_list *t)
704 {
705 struct lnet_delay_rule *rule = from_timer(rule, t, dl_timer);
706
707 spin_lock_bh(&delay_dd.dd_lock);
708 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
709 atomic_inc(&rule->dl_refcount);
710 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
711 wake_up(&delay_dd.dd_waitq);
712 }
713 spin_unlock_bh(&delay_dd.dd_lock);
714 }
715
716 /**
717 * Add a new delay rule to LNet
718 * There is no check for duplicated delay rule, all rules will be checked for
719 * incoming message.
720 */
721 int
722 lnet_delay_rule_add(struct lnet_fault_attr *attr)
723 {
724 struct lnet_delay_rule *rule;
725 int rc = 0;
726
727 if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
728 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
729 attr->u.delay.la_rate, attr->u.delay.la_interval);
730 return -EINVAL;
731 }
732
733 if (!attr->u.delay.la_latency) {
734 CDEBUG(D_NET, "delay latency cannot be zero\n");
735 return -EINVAL;
736 }
737
738 if (lnet_fault_attr_validate(attr))
739 return -EINVAL;
740
741 CFS_ALLOC_PTR(rule);
742 if (!rule)
743 return -ENOMEM;
744
745 mutex_lock(&delay_dd.dd_mutex);
746 if (!delay_dd.dd_running) {
747 struct task_struct *task;
748
749 /**
750 * NB: although LND threads will process delayed message
751 * in lnet_finalize, but there is no guarantee that LND
752 * threads will be waken up if no other message needs to
753 * be handled.
754 * Only one daemon thread, performance is not the concern
755 * of this simualation module.
756 */
757 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
758 if (IS_ERR(task)) {
759 rc = PTR_ERR(task);
760 goto failed;
761 }
762 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
763 }
764
765 timer_setup(&rule->dl_timer, delay_timer_cb, 0);
766
767 spin_lock_init(&rule->dl_lock);
768 INIT_LIST_HEAD(&rule->dl_msg_list);
769 INIT_LIST_HEAD(&rule->dl_sched_link);
770
771 rule->dl_attr = *attr;
772 if (attr->u.delay.la_interval) {
773 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
774 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
775 attr->u.delay.la_interval);
776 } else {
777 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
778 }
779
780 rule->dl_msg_send = -1;
781
782 lnet_net_lock(LNET_LOCK_EX);
783 atomic_set(&rule->dl_refcount, 1);
784 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
785 lnet_net_unlock(LNET_LOCK_EX);
786
787 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
788 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
789 attr->u.delay.la_rate);
790
791 mutex_unlock(&delay_dd.dd_mutex);
792 return 0;
793 failed:
794 mutex_unlock(&delay_dd.dd_mutex);
795 CFS_FREE_PTR(rule);
796 return rc;
797 }
798
799 /**
800 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
801 * and \a dst are zero, all rules will be removed, otherwise only matched rules
802 * will be removed.
803 * If \a src is zero, then all rules have \a dst as destination will be remove
804 * If \a dst is zero, then all rules have \a src as source will be removed
805 *
806 * When a delay rule is removed, all delayed messages of this rule will be
807 * processed immediately.
808 */
809 int
810 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
811 {
812 struct lnet_delay_rule *rule;
813 struct lnet_delay_rule *tmp;
814 struct list_head rule_list;
815 struct list_head msg_list;
816 int n = 0;
817 bool cleanup;
818
819 INIT_LIST_HEAD(&rule_list);
820 INIT_LIST_HEAD(&msg_list);
821
822 if (shutdown) {
823 src = 0;
824 dst = 0;
825 }
826
827 mutex_lock(&delay_dd.dd_mutex);
828 lnet_net_lock(LNET_LOCK_EX);
829
830 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
831 if (rule->dl_attr.fa_src != src && src)
832 continue;
833
834 if (rule->dl_attr.fa_dst != dst && dst)
835 continue;
836
837 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
838 libcfs_nid2str(rule->dl_attr.fa_src),
839 libcfs_nid2str(rule->dl_attr.fa_dst),
840 rule->dl_attr.u.delay.la_rate,
841 rule->dl_attr.u.delay.la_interval);
842 /* refcount is taken over by rule_list */
843 list_move(&rule->dl_link, &rule_list);
844 }
845
846 /* check if we need to shutdown delay_daemon */
847 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
848 !list_empty(&rule_list);
849 lnet_net_unlock(LNET_LOCK_EX);
850
851 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
852 list_del_init(&rule->dl_link);
853
854 del_timer_sync(&rule->dl_timer);
855 delayed_msg_check(rule, true, &msg_list);
856 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
857 n++;
858 }
859
860 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
861 LASSERT(delay_dd.dd_running);
862 delay_dd.dd_running = 0;
863 wake_up(&delay_dd.dd_waitq);
864
865 while (!delay_dd.dd_stopped)
866 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
867 }
868 mutex_unlock(&delay_dd.dd_mutex);
869
870 if (!list_empty(&msg_list))
871 delayed_msg_process(&msg_list, shutdown);
872
873 return n;
874 }
875
876 /**
877 * List Delay Rule at position of \a pos
878 */
879 int
880 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
881 struct lnet_fault_stat *stat)
882 {
883 struct lnet_delay_rule *rule;
884 int cpt;
885 int i = 0;
886 int rc = -ENOENT;
887
888 cpt = lnet_net_lock_current();
889 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
890 if (i++ < pos)
891 continue;
892
893 spin_lock(&rule->dl_lock);
894 *attr = rule->dl_attr;
895 *stat = rule->dl_stat;
896 spin_unlock(&rule->dl_lock);
897 rc = 0;
898 break;
899 }
900
901 lnet_net_unlock(cpt);
902 return rc;
903 }
904
905 /**
906 * reset counters for all Delay Rules
907 */
908 void
909 lnet_delay_rule_reset(void)
910 {
911 struct lnet_delay_rule *rule;
912 int cpt;
913
914 cpt = lnet_net_lock_current();
915
916 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
917 struct lnet_fault_attr *attr = &rule->dl_attr;
918
919 spin_lock(&rule->dl_lock);
920
921 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
922 if (attr->u.delay.la_rate) {
923 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
924 } else {
925 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
926 attr->u.delay.la_interval);
927 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
928 }
929 spin_unlock(&rule->dl_lock);
930 }
931
932 lnet_net_unlock(cpt);
933 }
934
935 int
936 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
937 {
938 struct lnet_fault_attr *attr;
939 struct lnet_fault_stat *stat;
940
941 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
942
943 switch (opc) {
944 default:
945 return -EINVAL;
946
947 case LNET_CTL_DROP_ADD:
948 if (!attr)
949 return -EINVAL;
950
951 return lnet_drop_rule_add(attr);
952
953 case LNET_CTL_DROP_DEL:
954 if (!attr)
955 return -EINVAL;
956
957 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
958 attr->fa_dst);
959 return 0;
960
961 case LNET_CTL_DROP_RESET:
962 lnet_drop_rule_reset();
963 return 0;
964
965 case LNET_CTL_DROP_LIST:
966 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
967 if (!attr || !stat)
968 return -EINVAL;
969
970 return lnet_drop_rule_list(data->ioc_count, attr, stat);
971
972 case LNET_CTL_DELAY_ADD:
973 if (!attr)
974 return -EINVAL;
975
976 return lnet_delay_rule_add(attr);
977
978 case LNET_CTL_DELAY_DEL:
979 if (!attr)
980 return -EINVAL;
981
982 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
983 attr->fa_dst, false);
984 return 0;
985
986 case LNET_CTL_DELAY_RESET:
987 lnet_delay_rule_reset();
988 return 0;
989
990 case LNET_CTL_DELAY_LIST:
991 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
992 if (!attr || !stat)
993 return -EINVAL;
994
995 return lnet_delay_rule_list(data->ioc_count, attr, stat);
996 }
997 }
998
999 int
1000 lnet_fault_init(void)
1001 {
1002 BUILD_BUG_ON(LNET_PUT_BIT != 1 << LNET_MSG_PUT);
1003 BUILD_BUG_ON(LNET_ACK_BIT != 1 << LNET_MSG_ACK);
1004 BUILD_BUG_ON(LNET_GET_BIT != 1 << LNET_MSG_GET);
1005 BUILD_BUG_ON(LNET_REPLY_BIT != 1 << LNET_MSG_REPLY);
1006
1007 mutex_init(&delay_dd.dd_mutex);
1008 spin_lock_init(&delay_dd.dd_lock);
1009 init_waitqueue_head(&delay_dd.dd_waitq);
1010 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1011 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1012
1013 return 0;
1014 }
1015
1016 void
1017 lnet_fault_fini(void)
1018 {
1019 lnet_drop_rule_del(0, 0);
1020 lnet_delay_rule_del(0, 0, true);
1021
1022 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1023 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1024 LASSERT(list_empty(&delay_dd.dd_sched_rules));
1025 }