]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/netem.py
bump version to 18.2.4-pve3
[ceph.git] / ceph / qa / tasks / netem.py
1 """
2 Task to run tests with network delay between two remotes using tc and netem.
3 Reference:https://wiki.linuxfoundation.org/networking/netem.
4
5 """
6
7 import logging
8 import contextlib
9 from paramiko import SSHException
10 import socket
11 import time
12 import gevent
13 import argparse
14
15 log = logging.getLogger(__name__)
16
17
18 def set_priority(interface):
19
20 # create a priority queueing discipline
21 return ['sudo', 'tc', 'qdisc', 'add', 'dev', interface, 'root', 'handle', '1:', 'prio']
22
23
24 def show_tc(interface):
25
26 # shows tc device present
27 return ['sudo', 'tc', 'qdisc', 'show', 'dev', interface]
28
29
30 def del_tc(interface):
31
32 return ['sudo', 'tc', 'qdisc', 'del', 'dev', interface, 'root']
33
34
35 def cmd_prefix(interface):
36
37 # prepare command to set delay
38 cmd1 = ['sudo', 'tc', 'qdisc', 'add', 'dev', interface, 'parent',
39 '1:1', 'handle', '2:', 'netem', 'delay']
40
41 # prepare command to change delay
42 cmd2 = ['sudo', 'tc', 'qdisc', 'replace', 'dev', interface, 'root', 'netem', 'delay']
43
44 # prepare command to apply filter to the matched ip/host
45
46 cmd3 = ['sudo', 'tc', 'filter', 'add', 'dev', interface,
47 'parent', '1:0', 'protocol', 'ip', 'pref', '55',
48 'handle', '::55', 'u32', 'match', 'ip', 'dst']
49
50 return cmd1, cmd2, cmd3
51
52
53 def static_delay(remote, host, interface, delay):
54
55 """ Sets a constant delay between two hosts to emulate network delays using tc qdisc and netem"""
56
57 set_delay, change_delay, set_ip = cmd_prefix(interface)
58
59 ip = socket.gethostbyname(host.hostname)
60
61 tc = remote.sh(show_tc(interface))
62 if tc.strip().find('refcnt') == -1:
63 # call set_priority() func to create priority queue
64 # if not already created(indicated by -1)
65 log.info('Create priority queue')
66 remote.run(args=set_priority(interface))
67
68 # set static delay, with +/- 5ms jitter with normal distribution as default
69 log.info('Setting delay to %s' % delay)
70 set_delay.extend(['%s' % delay, '5ms', 'distribution', 'normal'])
71 remote.run(args=set_delay)
72
73 # set delay to a particular remote node via ip
74 log.info('Delay set on %s' % remote)
75 set_ip.extend(['%s' % ip, 'flowid', '2:1'])
76 remote.run(args=set_ip)
77 else:
78 # if the device is already created, only change the delay
79 log.info('Setting delay to %s' % delay)
80 change_delay.extend(['%s' % delay, '5ms', 'distribution', 'normal'])
81 remote.run(args=change_delay)
82
83
84 def variable_delay(remote, host, interface, delay_range=[]):
85
86 """ Vary delay between two values"""
87
88 set_delay, change_delay, set_ip = cmd_prefix(interface)
89
90 ip = socket.gethostbyname(host.hostname)
91
92 # delay1 has to be lower than delay2
93 delay1 = delay_range[0]
94 delay2 = delay_range[1]
95
96 tc = remote.sh(show_tc(interface))
97 if tc.strip().find('refcnt') == -1:
98 # call set_priority() func to create priority queue
99 # if not already created(indicated by -1)
100 remote.run(args=set_priority(interface))
101
102 # set variable delay
103 log.info('Setting varying delay')
104 set_delay.extend(['%s' % delay1, '%s' % delay2])
105 remote.run(args=set_delay)
106
107 # set delay to a particular remote node via ip
108 log.info('Delay set on %s' % remote)
109 set_ip.extend(['%s' % ip, 'flowid', '2:1'])
110 remote.run(args=set_ip)
111 else:
112 # if the device is already created, only change the delay
113 log.info('Setting varying delay')
114 change_delay.extend(['%s' % delay1, '%s' % delay2])
115 remote.run(args=change_delay)
116
117
118 def delete_dev(remote, interface):
119
120 """ Delete the qdisc if present"""
121
122 log.info('Delete tc')
123 tc = remote.sh(show_tc(interface))
124 if tc.strip().find('refcnt') != -1:
125 remote.run(args=del_tc(interface))
126
127
128 class Toggle:
129
130 stop_event = gevent.event.Event()
131
132 def __init__(self, ctx, remote, host, interface, interval):
133 self.ctx = ctx
134 self.remote = remote
135 self.host = host
136 self.interval = interval
137 self.interface = interface
138 self.ip = socket.gethostbyname(self.host.hostname)
139
140 def packet_drop(self):
141
142 """ Drop packets to the remote ip specified"""
143
144 _, _, set_ip = cmd_prefix(self.interface)
145
146 tc = self.remote.sh(show_tc(self.interface))
147 if tc.strip().find('refcnt') == -1:
148 self.remote.run(args=set_priority(self.interface))
149 # packet drop to specific ip
150 log.info('Drop all packets to %s' % self.host)
151 set_ip.extend(['%s' % self.ip, 'action', 'drop'])
152 self.remote.run(args=set_ip)
153
154 def link_toggle(self):
155
156 """
157 For toggling packet drop and recovery in regular interval.
158 If interval is 5s, link is up for 5s and link is down for 5s
159 """
160
161 while not self.stop_event.is_set():
162 self.stop_event.wait(timeout=self.interval)
163 # simulate link down
164 try:
165 self.packet_drop()
166 log.info('link down')
167 except SSHException:
168 log.debug('Failed to run command')
169
170 self.stop_event.wait(timeout=self.interval)
171 # if qdisc exist,delete it.
172 try:
173 delete_dev(self.remote, self.interface)
174 log.info('link up')
175 except SSHException:
176 log.debug('Failed to run command')
177
178 def begin(self, gname):
179 self.thread = gevent.spawn(self.link_toggle)
180 self.ctx.netem.names[gname] = self.thread
181
182 def end(self, gname):
183 self.stop_event.set()
184 log.info('gname is {}'.format(self.ctx.netem.names[gname]))
185 self.ctx.netem.names[gname].get()
186
187 def cleanup(self):
188 """
189 Invoked during unwinding if the test fails or exits before executing task 'link_recover'
190 """
191 log.info('Clean up')
192 self.stop_event.set()
193 self.thread.get()
194
195
196 @contextlib.contextmanager
197 def task(ctx, config):
198
199 """
200 - netem:
201 clients: [c1.rgw.0]
202 iface: eno1
203 dst_client: [c2.rgw.1]
204 delay: 10ms
205
206 - netem:
207 clients: [c1.rgw.0]
208 iface: eno1
209 dst_client: [c2.rgw.1]
210 delay_range: [10ms, 20ms] # (min, max)
211
212 - netem:
213 clients: [rgw.1, mon.0]
214 iface: eno1
215 gname: t1
216 dst_client: [c2.rgw.1]
217 link_toggle_interval: 10 # no unit mentioned. By default takes seconds.
218
219 - netem:
220 clients: [rgw.1, mon.0]
221 iface: eno1
222 link_recover: [t1, t2]
223
224
225 """
226
227 log.info('config %s' % config)
228
229 assert isinstance(config, dict), \
230 "please list clients to run on"
231 if not hasattr(ctx, 'netem'):
232 ctx.netem = argparse.Namespace()
233 ctx.netem.names = {}
234
235 if config.get('dst_client') is not None:
236 dst = config.get('dst_client')
237 (host,) = ctx.cluster.only(dst).remotes.keys()
238
239 for role in config.get('clients', None):
240 (remote,) = ctx.cluster.only(role).remotes.keys()
241 ctx.netem.remote = remote
242 if config.get('delay', False):
243 static_delay(remote, host, config.get('iface'), config.get('delay'))
244 if config.get('delay_range', False):
245 variable_delay(remote, host, config.get('iface'), config.get('delay_range'))
246 if config.get('link_toggle_interval', False):
247 log.info('Toggling link for %s' % config.get('link_toggle_interval'))
248 global toggle
249 toggle = Toggle(ctx, remote, host, config.get('iface'), config.get('link_toggle_interval'))
250 toggle.begin(config.get('gname'))
251 if config.get('link_recover', False):
252 log.info('Recovering link')
253 for gname in config.get('link_recover'):
254 toggle.end(gname)
255 log.info('sleeping')
256 time.sleep(config.get('link_toggle_interval'))
257 delete_dev(ctx.netem.remote, config.get('iface'))
258 del ctx.netem.names[gname]
259
260 try:
261 yield
262 finally:
263 if ctx.netem.names:
264 toggle.cleanup()
265 for role in config.get('clients'):
266 (remote,) = ctx.cluster.only(role).remotes.keys()
267 delete_dev(remote, config.get('iface'))
268