]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
3 # Copyright 2017, LabN Consulting, L.L.C.
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
26 from lib
.topolog
import logger
27 from lib
.topotest
import json_cmp
32 # These functions are inteneted to provide support for CI testing within MiniNet
37 # to be made configurable in the future
40 fout_name
= "output.log"
41 fsum_name
= "summary.txt"
51 l_dotall_experiment
= False
58 def log(self
, str, level
=6):
61 self
.fout
= open(self
.fout_name
, "w")
62 self
.fout
.write(str + "\n")
63 if level
<= self
.l_level
:
66 def summary(self
, str):
68 self
.fsum
= open(self
.fsum_name
, "w")
71 ******************************************************************************\n"
75 Test Target Summary Pass Fail\n"
79 ******************************************************************************\n"
81 self
.fsum
.write(str + "\n")
83 def result(self
, target
, success
, str, logstr
=None):
96 self
.log("R:%d %s: %s" % (self
.l_total
, sstr
, logstr
))
97 res
= "%-4d %-6s %-56s %-4d %d" % (self
.l_total
, target
, str, p
, f
)
100 if f
== 1 and self
.CallOnFail
!= False:
103 def closeFiles(self
):
106 ******************************************************************************\n\
107 Total %-4d %-4d %d\n\
108 ******************************************************************************"
109 % (self
.l_total
, self
.l_pass
, self
.l_fail
)
112 self
.fsum
.write(ret
+ "\n")
116 if os
.path
.isfile(self
.fsum_name
):
117 r
= open(self
.fsum_name
, "r")
118 self
.fout
.write(r
.read())
124 def setFilename(self
, name
):
125 str = "FILE: " + name
128 self
.l_filename
= name
131 def getCallOnFail(self
):
132 return self
.CallOnFail
134 def setCallOnFail(self
, CallOnFail
):
135 self
.CallOnFail
= CallOnFail
137 def strToArray(self
, string
):
141 words
= string
.split()
142 if len(words
) < 1 or words
[0].startswith("#"):
144 words
= string
.split()
149 a
[c
] += str(" " + word
)
152 if not word
.endswith("\\"):
154 if word
.startswith('"'):
159 if word
.endswith('"'):
167 # print('%d:%s:' % (c, a[c-1]))
171 def execTestFile(self
, tstFile
):
172 if os
.path
.isfile(tstFile
):
176 a
= self
.strToArray(line
)
178 luCommand(a
[1], a
[2], a
[3], a
[4], a
[5])
181 self
.log("%s:%s %s" % (self
.l_filename
, self
.l_line
, line
))
184 time
.sleep(int(a
[1]))
185 elif a
[0] == "include":
186 self
.execTestFile(a
[1])
189 self
.log("unable to read: " + tstFile
)
192 def command(self
, target
, command
, regexp
, op
, result
, returnJson
, startt
=None):
194 if op
== "jsoncmp_pass" or op
== "jsoncmp_fail":
198 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
213 # self.log("Running %s %s" % (target, command))
215 out
= self
.net
[target
].cmd(command
).rstrip()
217 report
= "<no output>"
220 if returnJson
== True:
226 "WARNING: JSON load failed -- confirm command output is in JSON format."
228 self
.log("COMMAND OUTPUT:%s:" % report
)
231 if op
== "jsoncmp_pass" or op
== "jsoncmp_fail":
233 expect
= json
.loads(regexp
)
237 "WARNING: JSON load failed -- confirm regex input is in JSON format."
239 json_diff
= json_cmp(js
, expect
)
240 if json_diff
!= None:
241 if op
== "jsoncmp_fail":
245 self
.log("JSON DIFF:%s:" % json_diff
)
248 if op
== "jsoncmp_fail":
252 self
.result(target
, success
, result
)
257 # Experiment: can we achieve the same match behavior via DOTALL
258 # without converting newlines to spaces?
260 search_nl
= re
.search(regexp
, out_nl
, re
.DOTALL
)
261 self
.l_last_nl
= search_nl
262 # Set up for comparison
263 if search_nl
!= None:
264 group_nl
= search_nl
.group()
265 group_nl_converted
= " ".join(group_nl
.splitlines())
267 group_nl_converted
= None
269 out
= " ".join(out
.splitlines())
270 search
= re
.search(regexp
, out
)
286 self
.log("found:%s:" % ret
, level
)
287 # Experiment: compare matched strings obtained each way
288 if self
.l_dotall_experiment
and (group_nl_converted
!= ret
):
290 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
291 % (group_nl_converted
, ret
),
295 if js
!= None or ret
is not False:
296 delta
= time
.time() - startt
297 self
.result(target
, success
, "%s +%4.2f secs" % (result
, delta
))
298 elif op
== "pass" or op
== "fail":
299 self
.result(target
, success
, result
)
305 self
, target
, command
, regexp
, op
, result
, wait
, returnJson
, wait_time
=0.5
308 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
325 # Calculate the amount of `sleep`s we are going to peform.
326 wait_count
= int(math
.ceil(wait
/ wait_time
)) + 1
328 while wait_count
> 0:
330 found
= self
.command(target
, command
, regexp
, op
, result
, returnJson
, startt
)
331 if found
is not False:
336 time
.sleep(wait_time
)
338 delta
= time
.time() - startt
339 self
.log("Done after %d loops, time=%s, Found=%s" % (n
, delta
, found
))
343 # initialized by luStart
358 LUtil
.base_script_dir
= baseScriptDir
359 LUtil
.base_log_dir
= baseLogDir
362 LUtil
.fout_name
= baseLogDir
+ "/" + fout
364 LUtil
.fsum_name
= baseLogDir
+ "/" + fsum
366 LUtil
.l_level
= level
367 LUtil
.l_dotall_experiment
= False
368 LUtil
.l_dotall_experiment
= True
382 return LUtil
.command(target
, command
, regexp
, op
, result
, returnJson
)
385 target
, command
, regexp
, op
, result
, time
, returnJson
, wait_time
389 def luLast(usenl
=False):
391 if LUtil
.l_last_nl
!= None:
392 LUtil
.log("luLast:%s:" % LUtil
.l_last_nl
.group(), 7)
393 return LUtil
.l_last_nl
395 if LUtil
.l_last
!= None:
396 LUtil
.log("luLast:%s:" % LUtil
.l_last
.group(), 7)
400 def luInclude(filename
, CallOnFail
=None):
401 tstFile
= LUtil
.base_script_dir
+ "/" + filename
402 LUtil
.setFilename(filename
)
403 if CallOnFail
!= None:
404 oldCallOnFail
= LUtil
.getCallOnFail()
405 LUtil
.setCallOnFail(CallOnFail
)
406 if filename
.endswith(".py"):
407 LUtil
.log("luInclude: execfile " + tstFile
)
408 with
open(tstFile
) as infile
:
411 LUtil
.log("luInclude: execTestFile " + tstFile
)
412 LUtil
.execTestFile(tstFile
)
413 if CallOnFail
!= None:
414 LUtil
.setCallOnFail(oldCallOnFail
)
419 ret
= LUtil
.closeFiles()
433 def luResult(target
, success
, str, logstr
=None):
434 return LUtil
.result(target
, success
, str, logstr
)
437 def luShowResults(prFunction
):
439 sf
= open(LUtil
.fsum_name
, "r")
442 prFunction(line
.rstrip())
448 sf
= open(LUtil
.fsum_name
, "r")
452 logger
.error(line
.rstrip())
455 logger
.error("See %s for details of errors" % LUtil
.fout_name
)
459 if __name__
== "__main__":
460 print(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/lib")
462 for arg
in sys
.argv
[1:]: