]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/netem.py
2 Task to run tests with network delay between two remotes using tc and netem.
3 Reference:https://wiki.linuxfoundation.org/networking/netem.
9 from paramiko
import SSHException
15 log
= logging
.getLogger(__name__
)
18 def set_priority(interface
):
20 # create a priority queueing discipline
21 return ['sudo', 'tc', 'qdisc', 'add', 'dev', interface
, 'root', 'handle', '1:', 'prio']
24 def show_tc(interface
):
26 # shows tc device present
27 return ['sudo', 'tc', 'qdisc', 'show', 'dev', interface
]
30 def del_tc(interface
):
32 return ['sudo', 'tc', 'qdisc', 'del', 'dev', interface
, 'root']
35 def cmd_prefix(interface
):
37 # prepare command to set delay
38 cmd1
= ['sudo', 'tc', 'qdisc', 'add', 'dev', interface
, 'parent',
39 '1:1', 'handle', '2:', 'netem', 'delay']
41 # prepare command to change delay
42 cmd2
= ['sudo', 'tc', 'qdisc', 'replace', 'dev', interface
, 'root', 'netem', 'delay']
44 # prepare command to apply filter to the matched ip/host
46 cmd3
= ['sudo', 'tc', 'filter', 'add', 'dev', interface
,
47 'parent', '1:0', 'protocol', 'ip', 'pref', '55',
48 'handle', '::55', 'u32', 'match', 'ip', 'dst']
50 return cmd1
, cmd2
, cmd3
53 def static_delay(remote
, host
, interface
, delay
):
55 """ Sets a constant delay between two hosts to emulate network delays using tc qdisc and netem"""
57 set_delay
, change_delay
, set_ip
= cmd_prefix(interface
)
59 ip
= socket
.gethostbyname(host
.hostname
)
61 tc
= remote
.sh(show_tc(interface
))
62 if tc
.strip().find('refcnt') == -1:
63 # call set_priority() func to create priority queue
64 # if not already created(indicated by -1)
65 log
.info('Create priority queue')
66 remote
.run(args
=set_priority(interface
))
68 # set static delay, with +/- 5ms jitter with normal distribution as default
69 log
.info('Setting delay to %s' % delay
)
70 set_delay
.extend(['%s' % delay
, '5ms', 'distribution', 'normal'])
71 remote
.run(args
=set_delay
)
73 # set delay to a particular remote node via ip
74 log
.info('Delay set on %s' % remote
)
75 set_ip
.extend(['%s' % ip
, 'flowid', '2:1'])
76 remote
.run(args
=set_ip
)
78 # if the device is already created, only change the delay
79 log
.info('Setting delay to %s' % delay
)
80 change_delay
.extend(['%s' % delay
, '5ms', 'distribution', 'normal'])
81 remote
.run(args
=change_delay
)
84 def variable_delay(remote
, host
, interface
, delay_range
=[]):
86 """ Vary delay between two values"""
88 set_delay
, change_delay
, set_ip
= cmd_prefix(interface
)
90 ip
= socket
.gethostbyname(host
.hostname
)
92 # delay1 has to be lower than delay2
93 delay1
= delay_range
[0]
94 delay2
= delay_range
[1]
96 tc
= remote
.sh(show_tc(interface
))
97 if tc
.strip().find('refcnt') == -1:
98 # call set_priority() func to create priority queue
99 # if not already created(indicated by -1)
100 remote
.run(args
=set_priority(interface
))
103 log
.info('Setting varying delay')
104 set_delay
.extend(['%s' % delay1
, '%s' % delay2
])
105 remote
.run(args
=set_delay
)
107 # set delay to a particular remote node via ip
108 log
.info('Delay set on %s' % remote
)
109 set_ip
.extend(['%s' % ip
, 'flowid', '2:1'])
110 remote
.run(args
=set_ip
)
112 # if the device is already created, only change the delay
113 log
.info('Setting varying delay')
114 change_delay
.extend(['%s' % delay1
, '%s' % delay2
])
115 remote
.run(args
=change_delay
)
118 def delete_dev(remote
, interface
):
120 """ Delete the qdisc if present"""
122 log
.info('Delete tc')
123 tc
= remote
.sh(show_tc(interface
))
124 if tc
.strip().find('refcnt') != -1:
125 remote
.run(args
=del_tc(interface
))
130 stop_event
= gevent
.event
.Event()
132 def __init__(self
, ctx
, remote
, host
, interface
, interval
):
136 self
.interval
= interval
137 self
.interface
= interface
138 self
.ip
= socket
.gethostbyname(self
.host
.hostname
)
140 def packet_drop(self
):
142 """ Drop packets to the remote ip specified"""
144 _
, _
, set_ip
= cmd_prefix(self
.interface
)
146 tc
= self
.remote
.sh(show_tc(self
.interface
))
147 if tc
.strip().find('refcnt') == -1:
148 self
.remote
.run(args
=set_priority(self
.interface
))
149 # packet drop to specific ip
150 log
.info('Drop all packets to %s' % self
.host
)
151 set_ip
.extend(['%s' % self
.ip
, 'action', 'drop'])
152 self
.remote
.run(args
=set_ip
)
154 def link_toggle(self
):
157 For toggling packet drop and recovery in regular interval.
158 If interval is 5s, link is up for 5s and link is down for 5s
161 while not self
.stop_event
.is_set():
162 self
.stop_event
.wait(timeout
=self
.interval
)
166 log
.info('link down')
168 log
.debug('Failed to run command')
170 self
.stop_event
.wait(timeout
=self
.interval
)
171 # if qdisc exist,delete it.
173 delete_dev(self
.remote
, self
.interface
)
176 log
.debug('Failed to run command')
178 def begin(self
, gname
):
179 self
.thread
= gevent
.spawn(self
.link_toggle
)
180 self
.ctx
.netem
.names
[gname
] = self
.thread
182 def end(self
, gname
):
183 self
.stop_event
.set()
184 log
.info('gname is {}'.format(self
.ctx
.netem
.names
[gname
]))
185 self
.ctx
.netem
.names
[gname
].get()
189 Invoked during unwinding if the test fails or exits before executing task 'link_recover'
192 self
.stop_event
.set()
196 @contextlib.contextmanager
197 def task(ctx
, config
):
203 dst_client: [c2.rgw.1]
209 dst_client: [c2.rgw.1]
210 delay_range: [10ms, 20ms] # (min, max)
213 clients: [rgw.1, mon.0]
216 dst_client: [c2.rgw.1]
217 link_toggle_interval: 10 # no unit mentioned. By default takes seconds.
220 clients: [rgw.1, mon.0]
222 link_recover: [t1, t2]
227 log
.info('config %s' % config
)
229 assert isinstance(config
, dict), \
230 "please list clients to run on"
231 if not hasattr(ctx
, 'netem'):
232 ctx
.netem
= argparse
.Namespace()
235 if config
.get('dst_client') is not None:
236 dst
= config
.get('dst_client')
237 (host
,) = ctx
.cluster
.only(dst
).remotes
.keys()
239 for role
in config
.get('clients', None):
240 (remote
,) = ctx
.cluster
.only(role
).remotes
.keys()
241 ctx
.netem
.remote
= remote
242 if config
.get('delay', False):
243 static_delay(remote
, host
, config
.get('iface'), config
.get('delay'))
244 if config
.get('delay_range', False):
245 variable_delay(remote
, host
, config
.get('iface'), config
.get('delay_range'))
246 if config
.get('link_toggle_interval', False):
247 log
.info('Toggling link for %s' % config
.get('link_toggle_interval'))
249 toggle
= Toggle(ctx
, remote
, host
, config
.get('iface'), config
.get('link_toggle_interval'))
250 toggle
.begin(config
.get('gname'))
251 if config
.get('link_recover', False):
252 log
.info('Recovering link')
253 for gname
in config
.get('link_recover'):
256 time
.sleep(config
.get('link_toggle_interval'))
257 delete_dev(ctx
.netem
.remote
, config
.get('iface'))
258 del ctx
.netem
.names
[gname
]
265 for role
in config
.get('clients'):
266 (remote
,) = ctx
.cluster
.only(role
).remotes
.keys()
267 delete_dev(remote
, config
.get('iface'))