]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
Merge pull request #9552 from mobash-rasool/ospfv2-bug-fixes-03
[mirror_frr.git] / tests / topotests / lib / lutil.py
1 #!/usr/bin/env python
2
3 # Copyright 2017, LabN Consulting, L.L.C.
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 import os
20 import re
21 import sys
22 import time
23 import json
24 import math
25 import time
26 from lib.topolog import logger
27 from lib.topotest import json_cmp
28
29
30 # L utility functions
31 #
32 # These functions are inteneted to provide support for CI testing within MiniNet
33 # environments.
34
35
36 class lUtil:
37 # to be made configurable in the future
38 base_script_dir = "."
39 base_log_dir = "."
40 fout_name = "output.log"
41 fsum_name = "summary.txt"
42 l_level = 6
43 CallOnFail = False
44
45 l_total = 0
46 l_pass = 0
47 l_fail = 0
48 l_filename = ""
49 l_last = None
50 l_line = 0
51 l_dotall_experiment = False
52 l_last_nl = None
53
54 fout = ""
55 fsum = ""
56 net = ""
57
58 def log(self, str, level=6):
59 if self.l_level > 0:
60 if self.fout == "":
61 self.fout = open(self.fout_name, "w")
62 self.fout.write(str + "\n")
63 if level <= self.l_level:
64 print(str)
65
66 def summary(self, str):
67 if self.fsum == "":
68 self.fsum = open(self.fsum_name, "w")
69 self.fsum.write(
70 "\
71 ******************************************************************************\n"
72 )
73 self.fsum.write(
74 "\
75 Test Target Summary Pass Fail\n"
76 )
77 self.fsum.write(
78 "\
79 ******************************************************************************\n"
80 )
81 self.fsum.write(str + "\n")
82
83 def result(self, target, success, str, logstr=None):
84 if success:
85 p = 1
86 f = 0
87 self.l_pass += 1
88 sstr = "PASS"
89 else:
90 f = 1
91 p = 0
92 self.l_fail += 1
93 sstr = "FAIL"
94 self.l_total += 1
95 if logstr != None:
96 self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
97 res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
98 self.log("R:" + res)
99 self.summary(res)
100 if f == 1 and self.CallOnFail != False:
101 self.CallOnFail()
102
103 def closeFiles(self):
104 ret = (
105 "\
106 ******************************************************************************\n\
107 Total %-4d %-4d %d\n\
108 ******************************************************************************"
109 % (self.l_total, self.l_pass, self.l_fail)
110 )
111 if self.fsum != "":
112 self.fsum.write(ret + "\n")
113 self.fsum.close()
114 self.fsum = ""
115 if self.fout != "":
116 if os.path.isfile(self.fsum_name):
117 r = open(self.fsum_name, "r")
118 self.fout.write(r.read())
119 r.close()
120 self.fout.close()
121 self.fout = ""
122 return ret
123
124 def setFilename(self, name):
125 str = "FILE: " + name
126 self.log(str)
127 self.summary(str)
128 self.l_filename = name
129 self.line = 0
130
131 def getCallOnFail(self):
132 return self.CallOnFail
133
134 def setCallOnFail(self, CallOnFail):
135 self.CallOnFail = CallOnFail
136
137 def strToArray(self, string):
138 a = []
139 c = 0
140 end = ""
141 words = string.split()
142 if len(words) < 1 or words[0].startswith("#"):
143 return a
144 words = string.split()
145 for word in words:
146 if len(end) == 0:
147 a.append(word)
148 else:
149 a[c] += str(" " + word)
150 if end == "\\":
151 end = ""
152 if not word.endswith("\\"):
153 if end != '"':
154 if word.startswith('"'):
155 end = '"'
156 else:
157 c += 1
158 else:
159 if word.endswith('"'):
160 end = ""
161 c += 1
162 else:
163 c += 1
164 else:
165 end = "\\"
166 # if len(end) == 0:
167 # print('%d:%s:' % (c, a[c-1]))
168
169 return a
170
171 def execTestFile(self, tstFile):
172 if os.path.isfile(tstFile):
173 f = open(tstFile)
174 for line in f:
175 if len(line) > 1:
176 a = self.strToArray(line)
177 if len(a) >= 6:
178 luCommand(a[1], a[2], a[3], a[4], a[5])
179 else:
180 self.l_line += 1
181 self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
182 if len(a) >= 2:
183 if a[0] == "sleep":
184 time.sleep(int(a[1]))
185 elif a[0] == "include":
186 self.execTestFile(a[1])
187 f.close()
188 else:
189 self.log("unable to read: " + tstFile)
190 sys.exit(1)
191
192 def command(self, target, command, regexp, op, result, returnJson):
193 global net
194 if op != "wait":
195 self.l_line += 1
196
197 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
198 returnJson = True
199
200 self.log(
201 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
202 % (
203 time.asctime(),
204 self.l_total + 1,
205 self.l_filename,
206 self.l_line,
207 target,
208 command,
209 regexp,
210 op,
211 result,
212 )
213 )
214 if self.net == "":
215 return False
216 # self.log("Running %s %s" % (target, command))
217 js = None
218 out = self.net[target].cmd(command).rstrip()
219 if len(out) == 0:
220 report = "<no output>"
221 else:
222 report = out
223 if returnJson == True:
224 try:
225 js = json.loads(out)
226 except:
227 js = None
228 self.log(
229 "WARNING: JSON load failed -- confirm command output is in JSON format."
230 )
231 self.log("COMMAND OUTPUT:%s:" % report)
232
233 # JSON comparison
234 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
235 try:
236 expect = json.loads(regexp)
237 except:
238 expect = None
239 self.log(
240 "WARNING: JSON load failed -- confirm regex input is in JSON format."
241 )
242 json_diff = json_cmp(js, expect)
243 if json_diff != None:
244 if op == "jsoncmp_fail":
245 success = True
246 else:
247 success = False
248 self.log("JSON DIFF:%s:" % json_diff)
249 ret = success
250 else:
251 if op == "jsoncmp_fail":
252 success = False
253 else:
254 success = True
255 self.result(target, success, result)
256 if js != None:
257 return js
258 return ret
259
260 # Experiment: can we achieve the same match behavior via DOTALL
261 # without converting newlines to spaces?
262 out_nl = out
263 search_nl = re.search(regexp, out_nl, re.DOTALL)
264 self.l_last_nl = search_nl
265 # Set up for comparison
266 if search_nl != None:
267 group_nl = search_nl.group()
268 group_nl_converted = " ".join(group_nl.splitlines())
269 else:
270 group_nl_converted = None
271
272 out = " ".join(out.splitlines())
273 search = re.search(regexp, out)
274 self.l_last = search
275 if search == None:
276 if op == "fail":
277 success = True
278 else:
279 success = False
280 ret = success
281 else:
282 ret = search.group()
283 if op != "fail":
284 success = True
285 level = 7
286 else:
287 success = False
288 level = 5
289 self.log("found:%s:" % ret, level)
290 # Experiment: compare matched strings obtained each way
291 if self.l_dotall_experiment and (group_nl_converted != ret):
292 self.log(
293 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
294 % (group_nl_converted, ret),
295 9,
296 )
297 if op == "pass" or op == "fail":
298 self.result(target, success, result)
299 if js != None:
300 return js
301 return ret
302
303 def wait(
304 self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
305 ):
306 self.log(
307 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
308 % (
309 self.l_filename,
310 self.l_line,
311 target,
312 command,
313 regexp,
314 op,
315 result,
316 wait,
317 wait_time,
318 )
319 )
320 found = False
321 n = 0
322 startt = time.time()
323
324 # Calculate the amount of `sleep`s we are going to peform.
325 wait_count = int(math.ceil(wait / wait_time)) + 1
326
327 while wait_count > 0:
328 n += 1
329 found = self.command(target, command, regexp, op, result, returnJson)
330 if found is not False:
331 break
332
333 wait_count -= 1
334 if wait_count > 0:
335 time.sleep(wait_time)
336
337 delta = time.time() - startt
338 self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
339 found = self.command(
340 target,
341 command,
342 regexp,
343 "pass",
344 "%s +%4.2f secs" % (result, delta),
345 returnJson,
346 )
347 return found
348
349
350 # initialized by luStart
351 LUtil = None
352
353 # entry calls
354 def luStart(
355 baseScriptDir=".",
356 baseLogDir=".",
357 net="",
358 fout="output.log",
359 fsum="summary.txt",
360 level=None,
361 ):
362 global LUtil
363 # init class
364 LUtil = lUtil()
365 LUtil.base_script_dir = baseScriptDir
366 LUtil.base_log_dir = baseLogDir
367 LUtil.net = net
368 if fout != "":
369 LUtil.fout_name = baseLogDir + "/" + fout
370 if fsum != None:
371 LUtil.fsum_name = baseLogDir + "/" + fsum
372 if level != None:
373 LUtil.l_level = level
374 LUtil.l_dotall_experiment = False
375 LUtil.l_dotall_experiment = True
376
377
378 def luCommand(
379 target,
380 command,
381 regexp=".",
382 op="none",
383 result="",
384 time=10,
385 returnJson=False,
386 wait_time=0.5,
387 ):
388 if op != "wait":
389 return LUtil.command(target, command, regexp, op, result, returnJson)
390 else:
391 return LUtil.wait(
392 target, command, regexp, op, result, time, returnJson, wait_time
393 )
394
395
396 def luLast(usenl=False):
397 if usenl:
398 if LUtil.l_last_nl != None:
399 LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
400 return LUtil.l_last_nl
401 else:
402 if LUtil.l_last != None:
403 LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
404 return LUtil.l_last
405
406
407 def luInclude(filename, CallOnFail=None):
408 tstFile = LUtil.base_script_dir + "/" + filename
409 LUtil.setFilename(filename)
410 if CallOnFail != None:
411 oldCallOnFail = LUtil.getCallOnFail()
412 LUtil.setCallOnFail(CallOnFail)
413 if filename.endswith(".py"):
414 LUtil.log("luInclude: execfile " + tstFile)
415 with open(tstFile) as infile:
416 exec(infile.read())
417 else:
418 LUtil.log("luInclude: execTestFile " + tstFile)
419 LUtil.execTestFile(tstFile)
420 if CallOnFail != None:
421 LUtil.setCallOnFail(oldCallOnFail)
422
423
424 def luFinish():
425 global LUtil
426 ret = LUtil.closeFiles()
427 # done
428 LUtil = None
429 return ret
430
431
432 def luNumFail():
433 return LUtil.l_fail
434
435
436 def luNumPass():
437 return LUtil.l_pass
438
439
440 def luResult(target, success, str, logstr=None):
441 return LUtil.result(target, success, str, logstr)
442
443
444 def luShowResults(prFunction):
445 printed = 0
446 sf = open(LUtil.fsum_name, "r")
447 for line in sf:
448 printed += 1
449 prFunction(line.rstrip())
450 sf.close()
451
452
453 def luShowFail():
454 printed = 0
455 sf = open(LUtil.fsum_name, "r")
456 for line in sf:
457 if line[-2] != "0":
458 printed += 1
459 logger.error(line.rstrip())
460 sf.close()
461 if printed > 0:
462 logger.error("See %s for details of errors" % LUtil.fout_name)
463
464
465 # for testing
466 if __name__ == "__main__":
467 print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
468 luStart()
469 for arg in sys.argv[1:]:
470 luInclude(arg)
471 luFinish()
472 sys.exit(0)