]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/lutil.py
Merge pull request #12634 from anlancs/fix/lib-seq-adjust-return-value
[mirror_frr.git] / tests / topotests / lib / lutil.py
1 #!/usr/bin/env python
2
3 # Copyright 2017, LabN Consulting, L.L.C.
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 import os
20 import re
21 import sys
22 import time
23 import json
24 import math
25 import time
26 from lib.topolog import logger
27 from lib.topotest import json_cmp
28
29
30 # L utility functions
31 #
32 # These functions are inteneted to provide support for CI testing within MiniNet
33 # environments.
34
35
36 class lUtil:
37 # to be made configurable in the future
38 base_script_dir = "."
39 base_log_dir = "."
40 fout_name = "output.log"
41 fsum_name = "summary.txt"
42 l_level = 6
43 CallOnFail = False
44
45 l_total = 0
46 l_pass = 0
47 l_fail = 0
48 l_filename = ""
49 l_last = None
50 l_line = 0
51 l_dotall_experiment = False
52 l_last_nl = None
53 l_wait_strict = 1
54
55 fout = ""
56 fsum = ""
57 net = ""
58
59 def log(self, str, level=6):
60 if self.l_level > 0:
61 if self.fout == "":
62 self.fout = open(self.fout_name, "w")
63 self.fout.write(str + "\n")
64 if level <= self.l_level:
65 print(str)
66
67 def summary(self, str):
68 if self.fsum == "":
69 self.fsum = open(self.fsum_name, "w")
70 self.fsum.write(
71 "\
72 ******************************************************************************\n"
73 )
74 self.fsum.write(
75 "\
76 Test Target Summary Pass Fail\n"
77 )
78 self.fsum.write(
79 "\
80 ******************************************************************************\n"
81 )
82 self.fsum.write(str + "\n")
83
84 def result(self, target, success, str, logstr=None):
85 if success:
86 p = 1
87 f = 0
88 self.l_pass += 1
89 sstr = "PASS"
90 else:
91 f = 1
92 p = 0
93 self.l_fail += 1
94 sstr = "FAIL"
95 self.l_total += 1
96 if logstr != None:
97 self.log("R:%d %s: %s" % (self.l_total, sstr, logstr))
98 res = "%-4d %-6s %-56s %-4d %d" % (self.l_total, target, str, p, f)
99 self.log("R:" + res)
100 self.summary(res)
101 if f == 1 and self.CallOnFail != False:
102 self.CallOnFail()
103
104 def closeFiles(self):
105 ret = (
106 "\
107 ******************************************************************************\n\
108 Total %-4d %-4d %d\n\
109 ******************************************************************************"
110 % (self.l_total, self.l_pass, self.l_fail)
111 )
112 if self.fsum != "":
113 self.fsum.write(ret + "\n")
114 self.fsum.close()
115 self.fsum = ""
116 if self.fout != "":
117 if os.path.isfile(self.fsum_name):
118 r = open(self.fsum_name, "r")
119 self.fout.write(r.read())
120 r.close()
121 self.fout.close()
122 self.fout = ""
123 return ret
124
125 def setFilename(self, name):
126 str = "FILE: " + name
127 self.log(str)
128 self.summary(str)
129 self.l_filename = name
130 self.line = 0
131
132 def getCallOnFail(self):
133 return self.CallOnFail
134
135 def setCallOnFail(self, CallOnFail):
136 self.CallOnFail = CallOnFail
137
138 def strToArray(self, string):
139 a = []
140 c = 0
141 end = ""
142 words = string.split()
143 if len(words) < 1 or words[0].startswith("#"):
144 return a
145 words = string.split()
146 for word in words:
147 if len(end) == 0:
148 a.append(word)
149 else:
150 a[c] += str(" " + word)
151 if end == "\\":
152 end = ""
153 if not word.endswith("\\"):
154 if end != '"':
155 if word.startswith('"'):
156 end = '"'
157 else:
158 c += 1
159 else:
160 if word.endswith('"'):
161 end = ""
162 c += 1
163 else:
164 c += 1
165 else:
166 end = "\\"
167 # if len(end) == 0:
168 # print('%d:%s:' % (c, a[c-1]))
169
170 return a
171
172 def execTestFile(self, tstFile):
173 if os.path.isfile(tstFile):
174 f = open(tstFile)
175 for line in f:
176 if len(line) > 1:
177 a = self.strToArray(line)
178 if len(a) >= 6:
179 luCommand(a[1], a[2], a[3], a[4], a[5])
180 else:
181 self.l_line += 1
182 self.log("%s:%s %s" % (self.l_filename, self.l_line, line))
183 if len(a) >= 2:
184 if a[0] == "sleep":
185 time.sleep(int(a[1]))
186 elif a[0] == "include":
187 self.execTestFile(a[1])
188 f.close()
189 else:
190 self.log("unable to read: " + tstFile)
191 sys.exit(1)
192
193 def command(self, target, command, regexp, op, result, returnJson, startt=None, force_result=False):
194 global net
195 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
196 returnJson = True
197
198 self.log(
199 "%s (#%d) %s:%s COMMAND:%s:%s:%s:%s:%s:"
200 % (
201 time.asctime(),
202 self.l_total + 1,
203 self.l_filename,
204 self.l_line,
205 target,
206 command,
207 regexp,
208 op,
209 result,
210 )
211 )
212 if self.net == "":
213 return False
214 # self.log("Running %s %s" % (target, command))
215 js = None
216 out = self.net[target].cmd(command).rstrip()
217 if len(out) == 0:
218 report = "<no output>"
219 else:
220 report = out
221 if returnJson == True:
222 try:
223 js = json.loads(out)
224 except:
225 js = None
226 self.log(
227 "WARNING: JSON load failed -- confirm command output is in JSON format."
228 )
229 self.log("COMMAND OUTPUT:%s:" % report)
230
231 # JSON comparison
232 if op == "jsoncmp_pass" or op == "jsoncmp_fail":
233 try:
234 expect = json.loads(regexp)
235 except:
236 expect = None
237 self.log(
238 "WARNING: JSON load failed -- confirm regex input is in JSON format."
239 )
240 json_diff = json_cmp(js, expect)
241 if json_diff != None:
242 if op == "jsoncmp_fail":
243 success = True
244 else:
245 success = False
246 self.log("JSON DIFF:%s:" % json_diff)
247 ret = success
248 else:
249 if op == "jsoncmp_fail":
250 success = False
251 else:
252 success = True
253 self.result(target, success, result)
254 if js != None:
255 return js
256 return ret
257
258 # Experiment: can we achieve the same match behavior via DOTALL
259 # without converting newlines to spaces?
260 out_nl = out
261 search_nl = re.search(regexp, out_nl, re.DOTALL)
262 self.l_last_nl = search_nl
263 # Set up for comparison
264 if search_nl != None:
265 group_nl = search_nl.group()
266 group_nl_converted = " ".join(group_nl.splitlines())
267 else:
268 group_nl_converted = None
269
270 out = " ".join(out.splitlines())
271 search = re.search(regexp, out)
272 self.l_last = search
273 if search == None:
274 if op == "fail":
275 success = True
276 else:
277 success = False
278 ret = success
279 else:
280 ret = search.group()
281 if op != "fail":
282 success = True
283 level = 7
284 else:
285 success = False
286 level = 5
287 self.log("found:%s:" % ret, level)
288 # Experiment: compare matched strings obtained each way
289 if self.l_dotall_experiment and (group_nl_converted != ret):
290 self.log(
291 "DOTALL experiment: strings differ dotall=[%s] orig=[%s]"
292 % (group_nl_converted, ret),
293 9,
294 )
295 if startt != None:
296 if js != None or ret is not False or force_result is not False:
297 delta = time.time() - startt
298 self.result(target, success, "%s +%4.2f secs" % (result, delta))
299 elif op == "pass" or op == "fail":
300 self.result(target, success, result)
301 if js != None:
302 return js
303 return ret
304
305 def wait(
306 self, target, command, regexp, op, result, wait, returnJson, wait_time=0.5
307 ):
308 self.log(
309 "%s:%s WAIT:%s:%s:%s:%s:%s:%s:%s:"
310 % (
311 self.l_filename,
312 self.l_line,
313 target,
314 command,
315 regexp,
316 op,
317 result,
318 wait,
319 wait_time,
320 )
321 )
322 found = False
323 n = 0
324 startt = time.time()
325
326 if (op == "wait-strict") or ((op == "wait") and self.l_wait_strict):
327 strict = True
328 else:
329 strict = False
330
331 # Calculate the amount of `sleep`s we are going to peform.
332 wait_count = int(math.ceil(wait / wait_time)) + 1
333
334 force_result = False
335 while wait_count > 0:
336 n += 1
337
338 # log a failure on last iteration if we don't get desired regexp
339 if strict and (wait_count == 1):
340 force_result = True
341
342 found = self.command(target, command, regexp, op, result, returnJson, startt, force_result)
343 if found is not False:
344 break
345
346 wait_count -= 1
347 if wait_count > 0:
348 time.sleep(wait_time)
349
350 delta = time.time() - startt
351 self.log("Done after %d loops, time=%s, Found=%s" % (n, delta, found))
352 return found
353
354
355 # initialized by luStart
356 LUtil = None
357
358 # entry calls
359 def luStart(
360 baseScriptDir=".",
361 baseLogDir=".",
362 net="",
363 fout="output.log",
364 fsum="summary.txt",
365 level=None,
366 ):
367 global LUtil
368 # init class
369 LUtil = lUtil()
370 LUtil.base_script_dir = baseScriptDir
371 LUtil.base_log_dir = baseLogDir
372 LUtil.net = net
373 if fout != "":
374 LUtil.fout_name = baseLogDir + "/" + fout
375 if fsum != None:
376 LUtil.fsum_name = baseLogDir + "/" + fsum
377 if level != None:
378 LUtil.l_level = level
379 LUtil.l_dotall_experiment = False
380 LUtil.l_dotall_experiment = True
381
382
383 def luCommand(
384 target,
385 command,
386 regexp=".",
387 op="none",
388 result="",
389 time=10,
390 returnJson=False,
391 wait_time=0.5,
392 ):
393 waitops = ["wait", "wait-strict", "wait-nostrict"]
394
395 if op in waitops:
396 return LUtil.wait(
397 target, command, regexp, op, result, time, returnJson, wait_time
398 )
399 else:
400 return LUtil.command(target, command, regexp, op, result, returnJson)
401
402
403 def luLast(usenl=False):
404 if usenl:
405 if LUtil.l_last_nl != None:
406 LUtil.log("luLast:%s:" % LUtil.l_last_nl.group(), 7)
407 return LUtil.l_last_nl
408 else:
409 if LUtil.l_last != None:
410 LUtil.log("luLast:%s:" % LUtil.l_last.group(), 7)
411 return LUtil.l_last
412
413
414 def luInclude(filename, CallOnFail=None):
415 tstFile = LUtil.base_script_dir + "/" + filename
416 LUtil.setFilename(filename)
417 if CallOnFail != None:
418 oldCallOnFail = LUtil.getCallOnFail()
419 LUtil.setCallOnFail(CallOnFail)
420 if filename.endswith(".py"):
421 LUtil.log("luInclude: execfile " + tstFile)
422 with open(tstFile) as infile:
423 exec(infile.read())
424 else:
425 LUtil.log("luInclude: execTestFile " + tstFile)
426 LUtil.execTestFile(tstFile)
427 if CallOnFail != None:
428 LUtil.setCallOnFail(oldCallOnFail)
429
430
431 def luFinish():
432 global LUtil
433 ret = LUtil.closeFiles()
434 # done
435 LUtil = None
436 return ret
437
438
439 def luNumFail():
440 return LUtil.l_fail
441
442
443 def luNumPass():
444 return LUtil.l_pass
445
446
447 def luResult(target, success, str, logstr=None):
448 return LUtil.result(target, success, str, logstr)
449
450
451 def luShowResults(prFunction):
452 printed = 0
453 sf = open(LUtil.fsum_name, "r")
454 for line in sf:
455 printed += 1
456 prFunction(line.rstrip())
457 sf.close()
458
459
460 def luShowFail():
461 printed = 0
462 sf = open(LUtil.fsum_name, "r")
463 for line in sf:
464 if line[-2] != "0":
465 printed += 1
466 logger.error(line.rstrip())
467 sf.close()
468 if printed > 0:
469 logger.error("See %s for details of errors" % LUtil.fout_name)
470
471 #
472 # Sets default wait type for luCommand(op="wait) (may be overridden by
473 # specifying luCommand(op="wait-strict") or luCommand(op="wait-nostrict")).
474 #
475 # "nostrict" is the historical default behavior, which is to ignore
476 # failures to match the specified regexp in the specified time.
477 #
478 # "strict" means that failure to match the specified regexp in the
479 # specified time yields an explicit, logged failure result
480 #
481 def luSetWaitType(waittype):
482 if waittype == "strict":
483 LUtil.l_wait_strict = 1
484 else:
485 if waittype == "nostrict":
486 LUtil.l_wait_strict = 0
487 else:
488 raise ValueError('waittype must be one of "strict" or "nostrict"')
489
490
491 # for testing
492 if __name__ == "__main__":
493 print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/lib")
494 luStart()
495 for arg in sys.argv[1:]:
496 luInclude(arg)
497 luFinish()
498 sys.exit(0)