]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
2 # SPDX-License-Identifier: GPL-2.0-or-later
4 # Copyright 2017, LabN Consulting, L.L.C.
13 from lib
.topolog
import logger
14 from lib
.topotest
import json_cmp
19 # These functions are inteneted to provide support for CI testing within MiniNet
24 # to be made configurable in the future
27 fout_name
= "output.log"
28 fsum_name
= "summary.txt"
38 l_dotall_experiment
= False
46 def log(self
, str, level
=6):
49 self
.fout
= open(self
.fout_name
, "w")
50 self
.fout
.write(str + "\n")
51 if level
<= self
.l_level
:
54 def summary(self
, str):
56 self
.fsum
= open(self
.fsum_name
, "w")
59 ******************************************************************************\n"
63 Test Target Summary Pass Fail\n"
67 ******************************************************************************\n"
69 self
.fsum
.write(str + "\n")
71 def result(self
, target
, success
, str, logstr
=None):
84 self
.log("R:%d %s: %s" % (self
.l_total
, sstr
, logstr
))
85 res
= "%-4d %-6s %-56s %-4d %d" % (self
.l_total
, target
, str, p
, f
)
88 if f
== 1 and self
.CallOnFail
!= False:
94 ******************************************************************************\n\
96 ******************************************************************************"
97 % (self
.l_total
, self
.l_pass
, self
.l_fail
)
100 self
.fsum
.write(ret
+ "\n")
104 if os
.path
.isfile(self
.fsum_name
):
105 r
= open(self
.fsum_name
, "r")
106 self
.fout
.write(r
.read())
112 def setFilename(self
, name
):
113 str = "FILE: " + name
116 self
.l_filename
= name
119 def getCallOnFail(self
):
120 return self
.CallOnFail
122 def setCallOnFail(self
, CallOnFail
):
123 self
.CallOnFail
= CallOnFail
125 def strToArray(self
, string
):
129 words
= string
.split()
130 if len(words
) < 1 or words
[0].startswith("#"):
132 words
= string
.split()
137 a
[c
] += str(" " + word
)
140 if not word
.endswith("\\"):
142 if word
.startswith('"'):
147 if word
.endswith('"'):
155 # print('%d:%s:' % (c, a[c-1]))
159 def execTestFile(self
, tstFile
):
160 if os
.path
.isfile(tstFile
):
164 a
= self
.strToArray(line
)
166 luCommand(a
[1], a
[2], a
[3], a
[4], a
[5])
169 self
.log("%s:%s %s" % (self
.l_filename
, self
.l_line
, line
))
172 time
.sleep(int(a
[1]))
173 elif a
[0] == "include":
174 self
.execTestFile(a
[1])
177 self
.log("unable to read: " + tstFile
)
180 def command(self
, target
, command
, regexp
, op
, result
, returnJson
, startt
=None, force_result
=False):
182 if op
== "jsoncmp_pass" or op
== "jsoncmp_fail":
186 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
201 # self.log("Running %s %s" % (target, command))
203 out
= self
.net
[target
].cmd(command
).rstrip()
205 report
= "<no output>"
208 if returnJson
== True:
214 "WARNING: JSON load failed -- confirm command output is in JSON format."
216 self
.log("COMMAND OUTPUT:%s:" % report
)
219 if op
== "jsoncmp_pass" or op
== "jsoncmp_fail":
221 expect
= json
.loads(regexp
)
225 "WARNING: JSON load failed -- confirm regex input is in JSON format."
227 json_diff
= json_cmp(js
, expect
)
228 if json_diff
!= None:
229 if op
== "jsoncmp_fail":
233 self
.log("JSON DIFF:%s:" % json_diff
)
236 if op
== "jsoncmp_fail":
240 self
.result(target
, success
, result
)
245 # Experiment: can we achieve the same match behavior via DOTALL
246 # without converting newlines to spaces?
248 search_nl
= re
.search(regexp
, out_nl
, re
.DOTALL
)
249 self
.l_last_nl
= search_nl
250 # Set up for comparison
251 if search_nl
!= None:
252 group_nl
= search_nl
.group()
253 group_nl_converted
= " ".join(group_nl
.splitlines())
255 group_nl_converted
= None
257 out
= " ".join(out
.splitlines())
258 search
= re
.search(regexp
, out
)
274 self
.log("found:%s:" % ret
, level
)
275 # Experiment: compare matched strings obtained each way
276 if self
.l_dotall_experiment
and (group_nl_converted
!= ret
):
278 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
279 % (group_nl_converted
, ret
),
283 if js
!= None or ret
is not False or force_result
is not False:
284 delta
= time
.time() - startt
285 self
.result(target
, success
, "%s +%4.2f secs" % (result
, delta
))
286 elif op
== "pass" or op
== "fail":
287 self
.result(target
, success
, result
)
293 self
, target
, command
, regexp
, op
, result
, wait
, returnJson
, wait_time
=0.5
296 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
313 if (op
== "wait-strict") or ((op
== "wait") and self
.l_wait_strict
):
318 # Calculate the amount of `sleep`s we are going to peform.
319 wait_count
= int(math
.ceil(wait
/ wait_time
)) + 1
322 while wait_count
> 0:
325 # log a failure on last iteration if we don't get desired regexp
326 if strict
and (wait_count
== 1):
329 found
= self
.command(target
, command
, regexp
, op
, result
, returnJson
, startt
, force_result
)
330 if found
is not False:
335 time
.sleep(wait_time
)
337 delta
= time
.time() - startt
338 self
.log("Done after %d loops, time=%s, Found=%s" % (n
, delta
, found
))
342 # initialized by luStart
357 LUtil
.base_script_dir
= baseScriptDir
358 LUtil
.base_log_dir
= baseLogDir
361 LUtil
.fout_name
= baseLogDir
+ "/" + fout
363 LUtil
.fsum_name
= baseLogDir
+ "/" + fsum
365 LUtil
.l_level
= level
366 LUtil
.l_dotall_experiment
= False
367 LUtil
.l_dotall_experiment
= True
380 waitops
= ["wait", "wait-strict", "wait-nostrict"]
384 target
, command
, regexp
, op
, result
, time
, returnJson
, wait_time
387 return LUtil
.command(target
, command
, regexp
, op
, result
, returnJson
)
390 def luLast(usenl
=False):
392 if LUtil
.l_last_nl
!= None:
393 LUtil
.log("luLast:%s:" % LUtil
.l_last_nl
.group(), 7)
394 return LUtil
.l_last_nl
396 if LUtil
.l_last
!= None:
397 LUtil
.log("luLast:%s:" % LUtil
.l_last
.group(), 7)
401 def luInclude(filename
, CallOnFail
=None):
402 tstFile
= LUtil
.base_script_dir
+ "/" + filename
403 LUtil
.setFilename(filename
)
404 if CallOnFail
!= None:
405 oldCallOnFail
= LUtil
.getCallOnFail()
406 LUtil
.setCallOnFail(CallOnFail
)
407 if filename
.endswith(".py"):
408 LUtil
.log("luInclude: execfile " + tstFile
)
409 with
open(tstFile
) as infile
:
412 LUtil
.log("luInclude: execTestFile " + tstFile
)
413 LUtil
.execTestFile(tstFile
)
414 if CallOnFail
!= None:
415 LUtil
.setCallOnFail(oldCallOnFail
)
420 ret
= LUtil
.closeFiles()
434 def luResult(target
, success
, str, logstr
=None):
435 return LUtil
.result(target
, success
, str, logstr
)
438 def luShowResults(prFunction
):
440 sf
= open(LUtil
.fsum_name
, "r")
443 prFunction(line
.rstrip())
449 sf
= open(LUtil
.fsum_name
, "r")
453 logger
.error(line
.rstrip())
456 logger
.error("See %s for details of errors" % LUtil
.fout_name
)
459 # Sets default wait type for luCommand(op="wait) (may be overridden by
460 # specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
462 # "nostrict" is the historical default behavior, which is to ignore
463 # failures to match the specified regexp in the specified time.
465 # "strict" means that failure to match the specified regexp in the
466 # specified time yields an explicit, logged failure result
468 def luSetWaitType(waittype
):
469 if waittype
== "strict":
470 LUtil
.l_wait_strict
= 1
472 if waittype
== "nostrict":
473 LUtil
.l_wait_strict
= 0
475 raise ValueError('waittype must be one of "strict" or "nostrict"')
479 if __name__
== "__main__":
480 print(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/lib")
482 for arg
in sys
.argv
[1:]: