]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
*: auto-convert to SPDX License IDs
[mirror_frr.git] / tests / topotests / lib / lutil.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: GPL-2.0-or-later
3
4 # Copyright 2017, LabN Consulting, L.L.C.
5
6 import os
7 import re
8 import sys
9 import time
10 import json
11 import math
12 import time
13 from lib.topolog import logger
14 from lib.topotest import json_cmp
15
16
17 # L utility functions
18 #
19 # These functions are inteneted to provide support for CI testing within MiniNet
20 # environments.
21
22
23 class lUtil:
24 # to be made configurable in the future
25 base_script_dir = "."
26 base_log_dir = "."
27 fout_name = "output.log"
28 fsum_name = "summary.txt"
29 l_level = 6
30 CallOnFail = False
31
32 l_total = 0
33 l_pass = 0
34 l_fail = 0
35 l_filename = ""
36 l_last = None
37 l_line = 0
38 l_dotall_experiment = False
39 l_last_nl = None
40 l_wait_strict = 1
41
42 fout = ""
43 fsum = ""
44 net = ""
45
46 def log(self, str, level=6):
47 if self.l_level > 0:
48 if self.fout == "":
49 self.fout = open(self.fout_name, "w")
50 self.fout.write(str + "\n")
51 if level <= self.l_level:
52 print(str)
53
54 def summary(self, str):
55 if self.fsum == "":
56 self.fsum = open(self.fsum_name, "w")
57 self.fsum.write(
58 "\
59 ******************************************************************************\n"
60 )
61 self.fsum.write(
62 "\
63 Test Target Summary Pass Fail\n"
64 )
65 self.fsum.write(
66 "\
67 ******************************************************************************\n"
68 )
69 self.fsum.write(str + "\n")
70
71 def result(self, target, success, str, logstr=None):
72 if success:
73 p = 1
74 f = 0
75 self.l_pass += 1
76 sstr = "PASS"
77 else:
78 f = 1
79 p = 0
80 self.l_fail += 1
81 sstr = "FAIL"
82 self.l_total += 1
83 if logstr != None:
84 self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
85 res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
86 self.log("R:" + res)
87 self.summary(res)
88 if f == 1 and self.CallOnFail != False:
89 self.CallOnFail()
90
91 def closeFiles(self):
92 ret = (
93 "\
94 ******************************************************************************\n\
95 Total %-4d %-4d %d\n\
96 ******************************************************************************"
97 % (self.l_total, self.l_pass, self.l_fail)
98 )
99 if self.fsum != "":
100 self.fsum.write(ret + "\n")
101 self.fsum.close()
102 self.fsum = ""
103 if self.fout != "":
104 if os.path.isfile(self.fsum_name):
105 r = open(self.fsum_name, "r")
106 self.fout.write(r.read())
107 r.close()
108 self.fout.close()
109 self.fout = ""
110 return ret
111
112 def setFilename(self, name):
113 str = "FILE: " + name
114 self.log(str)
115 self.summary(str)
116 self.l_filename = name
117 self.line = 0
118
119 def getCallOnFail(self):
120 return self.CallOnFail
121
122 def setCallOnFail(self, CallOnFail):
123 self.CallOnFail = CallOnFail
124
125 def strToArray(self, string):
126 a = []
127 c = 0
128 end = ""
129 words = string.split()
130 if len(words) < 1 or words[0].startswith("#"):
131 return a
132 words = string.split()
133 for word in words:
134 if len(end) == 0:
135 a.append(word)
136 else:
137 a[c] += str(" " + word)
138 if end == "\\":
139 end = ""
140 if not word.endswith("\\"):
141 if end != '"':
142 if word.startswith('"'):
143 end = '"'
144 else:
145 c += 1
146 else:
147 if word.endswith('"'):
148 end = ""
149 c += 1
150 else:
151 c += 1
152 else:
153 end = "\\"
154 # if len(end) == 0:
155 # print('%d:%s:' % (c, a[c-1]))
156
157 return a
158
159 def execTestFile(self, tstFile):
160 if os.path.isfile(tstFile):
161 f = open(tstFile)
162 for line in f:
163 if len(line) > 1:
164 a = self.strToArray(line)
165 if len(a) >= 6:
166 luCommand(a[1], a[2], a[3], a[4], a[5])
167 else:
168 self.l_line += 1
169 self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
170 if len(a) >= 2:
171 if a[0] == "sleep":
172 time.sleep(int(a[1]))
173 elif a[0] == "include":
174 self.execTestFile(a[1])
175 f.close()
176 else:
177 self.log("unable to read: " + tstFile)
178 sys.exit(1)
179
180 def command(self, target, command, regexp, op, result, returnJson, startt=None, force_result=False):
181 global net
182 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
183 returnJson = True
184
185 self.log(
186 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
187 % (
188 time.asctime(),
189 self.l_total + 1,
190 self.l_filename,
191 self.l_line,
192 target,
193 command,
194 regexp,
195 op,
196 result,
197 )
198 )
199 if self.net == "":
200 return False
201 # self.log("Running %s %s" % (target, command))
202 js = None
203 out = self.net[target].cmd(command).rstrip()
204 if len(out) == 0:
205 report = "<no output>"
206 else:
207 report = out
208 if returnJson == True:
209 try:
210 js = json.loads(out)
211 except:
212 js = None
213 self.log(
214 "WARNING: JSON load failed -- confirm command output is in JSON format."
215 )
216 self.log("COMMAND OUTPUT:%s:" % report)
217
218 # JSON comparison
219 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
220 try:
221 expect = json.loads(regexp)
222 except:
223 expect = None
224 self.log(
225 "WARNING: JSON load failed -- confirm regex input is in JSON format."
226 )
227 json_diff = json_cmp(js, expect)
228 if json_diff != None:
229 if op == "jsoncmp_fail":
230 success = True
231 else:
232 success = False
233 self.log("JSON DIFF:%s:" % json_diff)
234 ret = success
235 else:
236 if op == "jsoncmp_fail":
237 success = False
238 else:
239 success = True
240 self.result(target, success, result)
241 if js != None:
242 return js
243 return ret
244
245 # Experiment: can we achieve the same match behavior via DOTALL
246 # without converting newlines to spaces?
247 out_nl = out
248 search_nl = re.search(regexp, out_nl, re.DOTALL)
249 self.l_last_nl = search_nl
250 # Set up for comparison
251 if search_nl != None:
252 group_nl = search_nl.group()
253 group_nl_converted = " ".join(group_nl.splitlines())
254 else:
255 group_nl_converted = None
256
257 out = " ".join(out.splitlines())
258 search = re.search(regexp, out)
259 self.l_last = search
260 if search == None:
261 if op == "fail":
262 success = True
263 else:
264 success = False
265 ret = success
266 else:
267 ret = search.group()
268 if op != "fail":
269 success = True
270 level = 7
271 else:
272 success = False
273 level = 5
274 self.log("found:%s:" % ret, level)
275 # Experiment: compare matched strings obtained each way
276 if self.l_dotall_experiment and (group_nl_converted != ret):
277 self.log(
278 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
279 % (group_nl_converted, ret),
280 9,
281 )
282 if startt != None:
283 if js != None or ret is not False or force_result is not False:
284 delta = time.time() - startt
285 self.result(target, success, "%s +%4.2f secs" % (result, delta))
286 elif op == "pass" or op == "fail":
287 self.result(target, success, result)
288 if js != None:
289 return js
290 return ret
291
292 def wait(
293 self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
294 ):
295 self.log(
296 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
297 % (
298 self.l_filename,
299 self.l_line,
300 target,
301 command,
302 regexp,
303 op,
304 result,
305 wait,
306 wait_time,
307 )
308 )
309 found = False
310 n = 0
311 startt = time.time()
312
313 if (op == "wait-strict") or ((op == "wait") and self.l_wait_strict):
314 strict = True
315 else:
316 strict = False
317
318 # Calculate the amount of `sleep`s we are going to peform.
319 wait_count = int(math.ceil(wait / wait_time)) + 1
320
321 force_result = False
322 while wait_count > 0:
323 n += 1
324
325 # log a failure on last iteration if we don't get desired regexp
326 if strict and (wait_count == 1):
327 force_result = True
328
329 found = self.command(target, command, regexp, op, result, returnJson, startt, force_result)
330 if found is not False:
331 break
332
333 wait_count -= 1
334 if wait_count > 0:
335 time.sleep(wait_time)
336
337 delta = time.time() - startt
338 self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
339 return found
340
341
342 # initialized by luStart
343 LUtil = None
344
345 # entry calls
346 def luStart(
347 baseScriptDir=".",
348 baseLogDir=".",
349 net="",
350 fout="output.log",
351 fsum="summary.txt",
352 level=None,
353 ):
354 global LUtil
355 # init class
356 LUtil = lUtil()
357 LUtil.base_script_dir = baseScriptDir
358 LUtil.base_log_dir = baseLogDir
359 LUtil.net = net
360 if fout != "":
361 LUtil.fout_name = baseLogDir + "/" + fout
362 if fsum != None:
363 LUtil.fsum_name = baseLogDir + "/" + fsum
364 if level != None:
365 LUtil.l_level = level
366 LUtil.l_dotall_experiment = False
367 LUtil.l_dotall_experiment = True
368
369
370 def luCommand(
371 target,
372 command,
373 regexp=".",
374 op="none",
375 result="",
376 time=10,
377 returnJson=False,
378 wait_time=0.5,
379 ):
380 waitops = ["wait", "wait-strict", "wait-nostrict"]
381
382 if op in waitops:
383 return LUtil.wait(
384 target, command, regexp, op, result, time, returnJson, wait_time
385 )
386 else:
387 return LUtil.command(target, command, regexp, op, result, returnJson)
388
389
390 def luLast(usenl=False):
391 if usenl:
392 if LUtil.l_last_nl != None:
393 LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
394 return LUtil.l_last_nl
395 else:
396 if LUtil.l_last != None:
397 LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
398 return LUtil.l_last
399
400
401 def luInclude(filename, CallOnFail=None):
402 tstFile = LUtil.base_script_dir + "/" + filename
403 LUtil.setFilename(filename)
404 if CallOnFail != None:
405 oldCallOnFail = LUtil.getCallOnFail()
406 LUtil.setCallOnFail(CallOnFail)
407 if filename.endswith(".py"):
408 LUtil.log("luInclude: execfile " + tstFile)
409 with open(tstFile) as infile:
410 exec(infile.read())
411 else:
412 LUtil.log("luInclude: execTestFile " + tstFile)
413 LUtil.execTestFile(tstFile)
414 if CallOnFail != None:
415 LUtil.setCallOnFail(oldCallOnFail)
416
417
418 def luFinish():
419 global LUtil
420 ret = LUtil.closeFiles()
421 # done
422 LUtil = None
423 return ret
424
425
426 def luNumFail():
427 return LUtil.l_fail
428
429
430 def luNumPass():
431 return LUtil.l_pass
432
433
434 def luResult(target, success, str, logstr=None):
435 return LUtil.result(target, success, str, logstr)
436
437
438 def luShowResults(prFunction):
439 printed = 0
440 sf = open(LUtil.fsum_name, "r")
441 for line in sf:
442 printed += 1
443 prFunction(line.rstrip())
444 sf.close()
445
446
447 def luShowFail():
448 printed = 0
449 sf = open(LUtil.fsum_name, "r")
450 for line in sf:
451 if line[-2] != "0":
452 printed += 1
453 logger.error(line.rstrip())
454 sf.close()
455 if printed > 0:
456 logger.error("See %s for details of errors" % LUtil.fout_name)
457
458 #
459 # Sets default wait type for luCommand(op="wait) (may be overridden by
460 # specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
461 #
462 # "nostrict" is the historical default behavior, which is to ignore
463 # failures to match the specified regexp in the specified time.
464 #
465 # "strict" means that failure to match the specified regexp in the
466 # specified time yields an explicit, logged failure result
467 #
468 def luSetWaitType(waittype):
469 if waittype == "strict":
470 LUtil.l_wait_strict = 1
471 else:
472 if waittype == "nostrict":
473 LUtil.l_wait_strict = 0
474 else:
475 raise ValueError('waittype must be one of "strict" or "nostrict"')
476
477
478 # for testing
479 if __name__ == "__main__":
480 print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
481 luStart()
482 for arg in sys.argv[1:]:
483 luInclude(arg)
484 luFinish()
485 sys.exit(0)