]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
Merge pull request #9368 from donaldsharp/ospf_ensure_lsa_length
[mirror_frr.git] / tests / topotests / lib / lutil.py
1 #!/usr/bin/env python
2
3 # Copyright 2017, LabN Consulting, L.L.C.
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 import os
20 import re
21 import sys
22 import time
23 import json
24 import math
25 import time
26 from lib.topolog import logger
27 from lib.topotest import json_cmp
28
29
30 # L utility functions
31 #
32 # These functions are inteneted to provide support for CI testing within MiniNet
33 # environments.
34
35
36 class lUtil:
37 # to be made configurable in the future
38 base_script_dir = "."
39 base_log_dir = "."
40 fout_name = "output.log"
41 fsum_name = "summary.txt"
42 l_level = 6
43 CallOnFail = False
44
45 l_total = 0
46 l_pass = 0
47 l_fail = 0
48 l_filename = ""
49 l_last = None
50 l_line = 0
51 l_dotall_experiment = False
52 l_last_nl = None
53
54 fout = ""
55 fsum = ""
56 net = ""
57
58 def log(self, str, level=6):
59 if self.l_level > 0:
60 if self.fout == "":
61 self.fout = open(self.fout_name, "w")
62 self.fout.write(str + "\n")
63 if level <= self.l_level:
64 print(str)
65
66 def summary(self, str):
67 if self.fsum == "":
68 self.fsum = open(self.fsum_name, "w")
69 self.fsum.write(
70 "\
71 ******************************************************************************\n"
72 )
73 self.fsum.write(
74 "\
75 Test Target Summary Pass Fail\n"
76 )
77 self.fsum.write(
78 "\
79 ******************************************************************************\n"
80 )
81 self.fsum.write(str + "\n")
82
83 def result(self, target, success, str, logstr=None):
84 if success:
85 p = 1
86 f = 0
87 self.l_pass += 1
88 sstr = "PASS"
89 else:
90 f = 1
91 p = 0
92 self.l_fail += 1
93 sstr = "FAIL"
94 self.l_total += 1
95 if logstr != None:
96 self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
97 res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
98 self.log("R:" + res)
99 self.summary(res)
100 if f == 1 and self.CallOnFail != False:
101 self.CallOnFail()
102
103 def closeFiles(self):
104 ret = (
105 "\
106 ******************************************************************************\n\
107 Total %-4d %-4d %d\n\
108 ******************************************************************************"
109 % (self.l_total, self.l_pass, self.l_fail)
110 )
111 if self.fsum != "":
112 self.fsum.write(ret + "\n")
113 self.fsum.close()
114 self.fsum = ""
115 if self.fout != "":
116 if os.path.isfile(self.fsum_name):
117 r = open(self.fsum_name, "r")
118 self.fout.write(r.read())
119 r.close()
120 self.fout.close()
121 self.fout = ""
122 return ret
123
124 def setFilename(self, name):
125 str = "FILE: " + name
126 self.log(str)
127 self.summary(str)
128 self.l_filename = name
129 self.line = 0
130
131 def getCallOnFail(self):
132 return self.CallOnFail
133
134 def setCallOnFail(self, CallOnFail):
135 self.CallOnFail = CallOnFail
136
137 def strToArray(self, string):
138 a = []
139 c = 0
140 end = ""
141 words = string.split()
142 if len(words) < 1 or words[0].startswith("#"):
143 return a
144 words = string.split()
145 for word in words:
146 if len(end) == 0:
147 a.append(word)
148 else:
149 a[c] += str(" " + word)
150 if end == "\\":
151 end = ""
152 if not word.endswith("\\"):
153 if end != '"':
154 if word.startswith('"'):
155 end = '"'
156 else:
157 c += 1
158 else:
159 if word.endswith('"'):
160 end = ""
161 c += 1
162 else:
163 c += 1
164 else:
165 end = "\\"
166 # if len(end) == 0:
167 # print('%d:%s:' % (c, a[c-1]))
168
169 return a
170
171 def execTestFile(self, tstFile):
172 if os.path.isfile(tstFile):
173 f = open(tstFile)
174 for line in f:
175 if len(line) > 1:
176 a = self.strToArray(line)
177 if len(a) >= 6:
178 luCommand(a[1], a[2], a[3], a[4], a[5])
179 else:
180 self.l_line += 1
181 self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
182 if len(a) >= 2:
183 if a[0] == "sleep":
184 time.sleep(int(a[1]))
185 elif a[0] == "include":
186 self.execTestFile(a[1])
187 f.close()
188 else:
189 self.log("unable to read: " + tstFile)
190 sys.exit(1)
191
192 def command(self, target, command, regexp, op, result, returnJson, startt=None):
193 global net
194 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
195 returnJson = True
196
197 self.log(
198 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
199 % (
200 time.asctime(),
201 self.l_total + 1,
202 self.l_filename,
203 self.l_line,
204 target,
205 command,
206 regexp,
207 op,
208 result,
209 )
210 )
211 if self.net == "":
212 return False
213 # self.log("Running %s %s" % (target, command))
214 js = None
215 out = self.net[target].cmd(command).rstrip()
216 if len(out) == 0:
217 report = "<no output>"
218 else:
219 report = out
220 if returnJson == True:
221 try:
222 js = json.loads(out)
223 except:
224 js = None
225 self.log(
226 "WARNING: JSON load failed -- confirm command output is in JSON format."
227 )
228 self.log("COMMAND OUTPUT:%s:" % report)
229
230 # JSON comparison
231 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
232 try:
233 expect = json.loads(regexp)
234 except:
235 expect = None
236 self.log(
237 "WARNING: JSON load failed -- confirm regex input is in JSON format."
238 )
239 json_diff = json_cmp(js, expect)
240 if json_diff != None:
241 if op == "jsoncmp_fail":
242 success = True
243 else:
244 success = False
245 self.log("JSON DIFF:%s:" % json_diff)
246 ret = success
247 else:
248 if op == "jsoncmp_fail":
249 success = False
250 else:
251 success = True
252 self.result(target, success, result)
253 if js != None:
254 return js
255 return ret
256
257 # Experiment: can we achieve the same match behavior via DOTALL
258 # without converting newlines to spaces?
259 out_nl = out
260 search_nl = re.search(regexp, out_nl, re.DOTALL)
261 self.l_last_nl = search_nl
262 # Set up for comparison
263 if search_nl != None:
264 group_nl = search_nl.group()
265 group_nl_converted = " ".join(group_nl.splitlines())
266 else:
267 group_nl_converted = None
268
269 out = " ".join(out.splitlines())
270 search = re.search(regexp, out)
271 self.l_last = search
272 if search == None:
273 if op == "fail":
274 success = True
275 else:
276 success = False
277 ret = success
278 else:
279 ret = search.group()
280 if op != "fail":
281 success = True
282 level = 7
283 else:
284 success = False
285 level = 5
286 self.log("found:%s:" % ret, level)
287 # Experiment: compare matched strings obtained each way
288 if self.l_dotall_experiment and (group_nl_converted != ret):
289 self.log(
290 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
291 % (group_nl_converted, ret),
292 9,
293 )
294 if startt != None:
295 if js != None or ret is not False:
296 delta = time.time() - startt
297 self.result(target, success, "%s +%4.2f secs" % (result, delta))
298 elif op == "pass" or op == "fail":
299 self.result(target, success, result)
300 if js != None:
301 return js
302 return ret
303
304 def wait(
305 self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
306 ):
307 self.log(
308 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
309 % (
310 self.l_filename,
311 self.l_line,
312 target,
313 command,
314 regexp,
315 op,
316 result,
317 wait,
318 wait_time,
319 )
320 )
321 found = False
322 n = 0
323 startt = time.time()
324
325 # Calculate the amount of `sleep`s we are going to peform.
326 wait_count = int(math.ceil(wait / wait_time)) + 1
327
328 while wait_count > 0:
329 n += 1
330 found = self.command(target, command, regexp, op, result, returnJson, startt)
331 if found is not False:
332 break
333
334 wait_count -= 1
335 if wait_count > 0:
336 time.sleep(wait_time)
337
338 delta = time.time() - startt
339 self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
340 return found
341
342
343 # initialized by luStart
344 LUtil = None
345
346 # entry calls
347 def luStart(
348 baseScriptDir=".",
349 baseLogDir=".",
350 net="",
351 fout="output.log",
352 fsum="summary.txt",
353 level=None,
354 ):
355 global LUtil
356 # init class
357 LUtil = lUtil()
358 LUtil.base_script_dir = baseScriptDir
359 LUtil.base_log_dir = baseLogDir
360 LUtil.net = net
361 if fout != "":
362 LUtil.fout_name = baseLogDir + "/" + fout
363 if fsum != None:
364 LUtil.fsum_name = baseLogDir + "/" + fsum
365 if level != None:
366 LUtil.l_level = level
367 LUtil.l_dotall_experiment = False
368 LUtil.l_dotall_experiment = True
369
370
371 def luCommand(
372 target,
373 command,
374 regexp=".",
375 op="none",
376 result="",
377 time=10,
378 returnJson=False,
379 wait_time=0.5,
380 ):
381 if op != "wait":
382 return LUtil.command(target, command, regexp, op, result, returnJson)
383 else:
384 return LUtil.wait(
385 target, command, regexp, op, result, time, returnJson, wait_time
386 )
387
388
389 def luLast(usenl=False):
390 if usenl:
391 if LUtil.l_last_nl != None:
392 LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
393 return LUtil.l_last_nl
394 else:
395 if LUtil.l_last != None:
396 LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
397 return LUtil.l_last
398
399
400 def luInclude(filename, CallOnFail=None):
401 tstFile = LUtil.base_script_dir + "/" + filename
402 LUtil.setFilename(filename)
403 if CallOnFail != None:
404 oldCallOnFail = LUtil.getCallOnFail()
405 LUtil.setCallOnFail(CallOnFail)
406 if filename.endswith(".py"):
407 LUtil.log("luInclude: execfile " + tstFile)
408 with open(tstFile) as infile:
409 exec(infile.read())
410 else:
411 LUtil.log("luInclude: execTestFile " + tstFile)
412 LUtil.execTestFile(tstFile)
413 if CallOnFail != None:
414 LUtil.setCallOnFail(oldCallOnFail)
415
416
417 def luFinish():
418 global LUtil
419 ret = LUtil.closeFiles()
420 # done
421 LUtil = None
422 return ret
423
424
425 def luNumFail():
426 return LUtil.l_fail
427
428
429 def luNumPass():
430 return LUtil.l_pass
431
432
433 def luResult(target, success, str, logstr=None):
434 return LUtil.result(target, success, str, logstr)
435
436
437 def luShowResults(prFunction):
438 printed = 0
439 sf = open(LUtil.fsum_name, "r")
440 for line in sf:
441 printed += 1
442 prFunction(line.rstrip())
443 sf.close()
444
445
446 def luShowFail():
447 printed = 0
448 sf = open(LUtil.fsum_name, "r")
449 for line in sf:
450 if line[-2] != "0":
451 printed += 1
452 logger.error(line.rstrip())
453 sf.close()
454 if printed > 0:
455 logger.error("See %s for details of errors" % LUtil.fout_name)
456
457
458 # for testing
459 if __name__ == "__main__":
460 print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
461 luStart()
462 for arg in sys.argv[1:]:
463 luInclude(arg)
464 luFinish()
465 sys.exit(0)