]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / tests / topotests / lib / lutil.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: GPL-2.0-or-later
3
4 # Copyright 2017, LabN Consulting, L.L.C.
5
6 import os
7 import re
8 import sys
9 import time
10 import json
11 import math
12 import time
13 from lib.topolog import logger
14 from lib.topotest import json_cmp
15
16
17 # L utility functions
18 #
19 # These functions are inteneted to provide support for CI testing within MiniNet
20 # environments.
21
22
23 class lUtil:
24 # to be made configurable in the future
25 base_script_dir = "."
26 base_log_dir = "."
27 fout_name = "output.log"
28 fsum_name = "summary.txt"
29 l_level = 6
30 CallOnFail = False
31
32 l_total = 0
33 l_pass = 0
34 l_fail = 0
35 l_filename = ""
36 l_last = None
37 l_line = 0
38 l_dotall_experiment = False
39 l_last_nl = None
40 l_wait_strict = 1
41
42 fout = ""
43 fsum = ""
44 net = ""
45
46 def log(self, str, level=6):
47 if self.l_level > 0:
48 if self.fout == "":
49 self.fout = open(self.fout_name, "w")
50 self.fout.write(str + "\n")
51 if level <= self.l_level:
52 print(str)
53
54 def summary(self, str):
55 if self.fsum == "":
56 self.fsum = open(self.fsum_name, "w")
57 self.fsum.write(
58 "\
59 ******************************************************************************\n"
60 )
61 self.fsum.write(
62 "\
63 Test Target Summary Pass Fail\n"
64 )
65 self.fsum.write(
66 "\
67 ******************************************************************************\n"
68 )
69 self.fsum.write(str + "\n")
70
71 def result(self, target, success, str, logstr=None):
72 if success:
73 p = 1
74 f = 0
75 self.l_pass += 1
76 sstr = "PASS"
77 else:
78 f = 1
79 p = 0
80 self.l_fail += 1
81 sstr = "FAIL"
82 self.l_total += 1
83 if logstr != None:
84 self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
85 res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
86 self.log("R:" + res)
87 self.summary(res)
88 if f == 1 and self.CallOnFail != False:
89 self.CallOnFail()
90
91 def closeFiles(self):
92 ret = (
93 "\
94 ******************************************************************************\n\
95 Total %-4d %-4d %d\n\
96 ******************************************************************************"
97 % (self.l_total, self.l_pass, self.l_fail)
98 )
99 if self.fsum != "":
100 self.fsum.write(ret + "\n")
101 self.fsum.close()
102 self.fsum = ""
103 if self.fout != "":
104 if os.path.isfile(self.fsum_name):
105 r = open(self.fsum_name, "r")
106 self.fout.write(r.read())
107 r.close()
108 self.fout.close()
109 self.fout = ""
110 return ret
111
112 def setFilename(self, name):
113 str = "FILE: " + name
114 self.log(str)
115 self.summary(str)
116 self.l_filename = name
117 self.line = 0
118
119 def getCallOnFail(self):
120 return self.CallOnFail
121
122 def setCallOnFail(self, CallOnFail):
123 self.CallOnFail = CallOnFail
124
125 def strToArray(self, string):
126 a = []
127 c = 0
128 end = ""
129 words = string.split()
130 if len(words) < 1 or words[0].startswith("#"):
131 return a
132 words = string.split()
133 for word in words:
134 if len(end) == 0:
135 a.append(word)
136 else:
137 a[c] += str(" " + word)
138 if end == "\\":
139 end = ""
140 if not word.endswith("\\"):
141 if end != '"':
142 if word.startswith('"'):
143 end = '"'
144 else:
145 c += 1
146 else:
147 if word.endswith('"'):
148 end = ""
149 c += 1
150 else:
151 c += 1
152 else:
153 end = "\\"
154 # if len(end) == 0:
155 # print('%d:%s:' % (c, a[c-1]))
156
157 return a
158
159 def execTestFile(self, tstFile):
160 if os.path.isfile(tstFile):
161 f = open(tstFile)
162 for line in f:
163 if len(line) > 1:
164 a = self.strToArray(line)
165 if len(a) >= 6:
166 luCommand(a[1], a[2], a[3], a[4], a[5])
167 else:
168 self.l_line += 1
169 self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
170 if len(a) >= 2:
171 if a[0] == "sleep":
172 time.sleep(int(a[1]))
173 elif a[0] == "include":
174 self.execTestFile(a[1])
175 f.close()
176 else:
177 self.log("unable to read: " + tstFile)
178 sys.exit(1)
179
180 def command(
181 self,
182 target,
183 command,
184 regexp,
185 op,
186 result,
187 returnJson,
188 startt=None,
189 force_result=False,
190 ):
191 global net
192 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
193 returnJson = True
194
195 self.log(
196 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
197 % (
198 time.asctime(),
199 self.l_total + 1,
200 self.l_filename,
201 self.l_line,
202 target,
203 command,
204 regexp,
205 op,
206 result,
207 )
208 )
209 if self.net == "":
210 return False
211 # self.log("Running %s %s" % (target, command))
212 js = None
213 out = self.net[target].cmd(command).rstrip()
214 if len(out) == 0:
215 report = "<no output>"
216 else:
217 report = out
218 if returnJson == True:
219 try:
220 js = json.loads(out)
221 except:
222 js = None
223 self.log(
224 "WARNING: JSON load failed -- confirm command output is in JSON format."
225 )
226 self.log("COMMAND OUTPUT:%s:" % report)
227
228 # JSON comparison
229 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
230 try:
231 expect = json.loads(regexp)
232 except:
233 expect = None
234 self.log(
235 "WARNING: JSON load failed -- confirm regex input is in JSON format."
236 )
237 json_diff = json_cmp(js, expect)
238 if json_diff != None:
239 if op == "jsoncmp_fail":
240 success = True
241 else:
242 success = False
243 self.log("JSON DIFF:%s:" % json_diff)
244 ret = success
245 else:
246 if op == "jsoncmp_fail":
247 success = False
248 else:
249 success = True
250 self.result(target, success, result)
251 if js != None:
252 return js
253 return ret
254
255 # Experiment: can we achieve the same match behavior via DOTALL
256 # without converting newlines to spaces?
257 out_nl = out
258 search_nl = re.search(regexp, out_nl, re.DOTALL)
259 self.l_last_nl = search_nl
260 # Set up for comparison
261 if search_nl != None:
262 group_nl = search_nl.group()
263 group_nl_converted = " ".join(group_nl.splitlines())
264 else:
265 group_nl_converted = None
266
267 out = " ".join(out.splitlines())
268 search = re.search(regexp, out)
269 self.l_last = search
270 if search == None:
271 if op == "fail":
272 success = True
273 else:
274 success = False
275 ret = success
276 else:
277 ret = search.group()
278 if op != "fail":
279 success = True
280 level = 7
281 else:
282 success = False
283 level = 5
284 self.log("found:%s:" % ret, level)
285 # Experiment: compare matched strings obtained each way
286 if self.l_dotall_experiment and (group_nl_converted != ret):
287 self.log(
288 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
289 % (group_nl_converted, ret),
290 9,
291 )
292 if startt != None:
293 if js != None or ret is not False or force_result is not False:
294 delta = time.time() - startt
295 self.result(target, success, "%s +%4.2f secs" % (result, delta))
296 elif op == "pass" or op == "fail":
297 self.result(target, success, result)
298 if js != None:
299 return js
300 return ret
301
302 def wait(
303 self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
304 ):
305 self.log(
306 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
307 % (
308 self.l_filename,
309 self.l_line,
310 target,
311 command,
312 regexp,
313 op,
314 result,
315 wait,
316 wait_time,
317 )
318 )
319 found = False
320 n = 0
321 startt = time.time()
322
323 if (op == "wait-strict") or ((op == "wait") and self.l_wait_strict):
324 strict = True
325 else:
326 strict = False
327
328 # Calculate the amount of `sleep`s we are going to peform.
329 wait_count = int(math.ceil(wait / wait_time)) + 1
330
331 force_result = False
332 while wait_count > 0:
333 n += 1
334
335 # log a failure on last iteration if we don't get desired regexp
336 if strict and (wait_count == 1):
337 force_result = True
338
339 found = self.command(
340 target, command, regexp, op, result, returnJson, startt, force_result
341 )
342 if found is not False:
343 break
344
345 wait_count -= 1
346 if wait_count > 0:
347 time.sleep(wait_time)
348
349 delta = time.time() - startt
350 self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
351 return found
352
353
354 # initialized by luStart
355 LUtil = None
356
357
358 # entry calls
359 def luStart(
360 baseScriptDir=".",
361 baseLogDir=".",
362 net="",
363 fout="output.log",
364 fsum="summary.txt",
365 level=None,
366 ):
367 global LUtil
368 # init class
369 LUtil = lUtil()
370 LUtil.base_script_dir = baseScriptDir
371 LUtil.base_log_dir = baseLogDir
372 LUtil.net = net
373 if fout != "":
374 LUtil.fout_name = baseLogDir + "/" + fout
375 if fsum != None:
376 LUtil.fsum_name = baseLogDir + "/" + fsum
377 if level != None:
378 LUtil.l_level = level
379 LUtil.l_dotall_experiment = False
380 LUtil.l_dotall_experiment = True
381
382
383 def luCommand(
384 target,
385 command,
386 regexp=".",
387 op="none",
388 result="",
389 time=10,
390 returnJson=False,
391 wait_time=0.5,
392 ):
393 waitops = ["wait", "wait-strict", "wait-nostrict"]
394
395 if op in waitops:
396 return LUtil.wait(
397 target, command, regexp, op, result, time, returnJson, wait_time
398 )
399 else:
400 return LUtil.command(target, command, regexp, op, result, returnJson)
401
402
403 def luLast(usenl=False):
404 if usenl:
405 if LUtil.l_last_nl != None:
406 LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
407 return LUtil.l_last_nl
408 else:
409 if LUtil.l_last != None:
410 LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
411 return LUtil.l_last
412
413
414 def luInclude(filename, CallOnFail=None):
415 tstFile = LUtil.base_script_dir + "/" + filename
416 LUtil.setFilename(filename)
417 if CallOnFail != None:
418 oldCallOnFail = LUtil.getCallOnFail()
419 LUtil.setCallOnFail(CallOnFail)
420 if filename.endswith(".py"):
421 LUtil.log("luInclude: execfile " + tstFile)
422 with open(tstFile) as infile:
423 exec(infile.read())
424 else:
425 LUtil.log("luInclude: execTestFile " + tstFile)
426 LUtil.execTestFile(tstFile)
427 if CallOnFail != None:
428 LUtil.setCallOnFail(oldCallOnFail)
429
430
431 def luFinish():
432 global LUtil
433 ret = LUtil.closeFiles()
434 # done
435 LUtil = None
436 return ret
437
438
439 def luNumFail():
440 return LUtil.l_fail
441
442
443 def luNumPass():
444 return LUtil.l_pass
445
446
447 def luResult(target, success, str, logstr=None):
448 return LUtil.result(target, success, str, logstr)
449
450
451 def luShowResults(prFunction):
452 printed = 0
453 sf = open(LUtil.fsum_name, "r")
454 for line in sf:
455 printed += 1
456 prFunction(line.rstrip())
457 sf.close()
458
459
460 def luShowFail():
461 printed = 0
462 sf = open(LUtil.fsum_name, "r")
463 for line in sf:
464 if line[-2] != "0":
465 printed += 1
466 logger.error(line.rstrip())
467 sf.close()
468 if printed > 0:
469 logger.error("See %s for details of errors" % LUtil.fout_name)
470
471
472 #
473 # Sets default wait type for luCommand(op="wait) (may be overridden by
474 # specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
475 #
476 # "nostrict" is the historical default behavior, which is to ignore
477 # failures to match the specified regexp in the specified time.
478 #
479 # "strict" means that failure to match the specified regexp in the
480 # specified time yields an explicit, logged failure result
481 #
482 def luSetWaitType(waittype):
483 if waittype == "strict":
484 LUtil.l_wait_strict = 1
485 else:
486 if waittype == "nostrict":
487 LUtil.l_wait_strict = 0
488 else:
489 raise ValueError('waittype must be one of "strict" or "nostrict"')
490
491
492 # for testing
493 if __name__ == "__main__":
494 print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
495 luStart()
496 for arg in sys.argv[1:]:
497 luInclude(arg)
498 luFinish()
499 sys.exit(0)