1 # SPDX-License-Identifier: GPL-2.0 2 3 import builtins 4 import inspect 5 import sys 6 import time 7 import traceback 8 from .consts import KSFT_MAIN_NAME 9 from .utils import global_defer_queue 10 11 KSFT_RESULT = None 12 KSFT_RESULT_ALL = True 13 14 15 class KsftFailEx(Exception): 16 pass 17 18 19 class KsftSkipEx(Exception): 20 pass 21 22 23 class KsftXfailEx(Exception): 24 pass 25 26 27 def ksft_pr(*objs, **kwargs): 28 print("#", *objs, **kwargs) 29 30 31 def _fail(*args): 32 global KSFT_RESULT 33 KSFT_RESULT = False 34 35 frame = inspect.stack()[2] 36 ksft_pr("At " + frame.filename + " line " + str(frame.lineno) + ":") 37 ksft_pr(*args) 38 39 40 def ksft_eq(a, b, comment=""): 41 global KSFT_RESULT 42 if a != b: 43 _fail("Check failed", a, "!=", b, comment) 44 45 46 def ksft_true(a, comment=""): 47 if not a: 48 _fail("Check failed", a, "does not eval to True", comment) 49 50 51 def ksft_in(a, b, comment=""): 52 if a not in b: 53 _fail("Check failed", a, "not in", b, comment) 54 55 56 def ksft_ge(a, b, comment=""): 57 if a < b: 58 _fail("Check failed", a, "<", b, comment) 59 60 61 def ksft_lt(a, b, comment=""): 62 if a >= b: 63 _fail("Check failed", a, ">=", b, comment) 64 65 66 class ksft_raises: 67 def __init__(self, expected_type): 68 self.exception = None 69 self.expected_type = expected_type 70 71 def __enter__(self): 72 return self 73 74 def __exit__(self, exc_type, exc_val, exc_tb): 75 if exc_type is None: 76 _fail(f"Expected exception {str(self.expected_type.__name__)}, none raised") 77 elif self.expected_type != exc_type: 78 _fail(f"Expected exception {str(self.expected_type.__name__)}, raised {str(exc_type.__name__)}") 79 self.exception = exc_val 80 # Suppress the exception if its the expected one 81 return self.expected_type == exc_type 82 83 84 def ksft_busy_wait(cond, sleep=0.005, deadline=1, comment=""): 85 end = time.monotonic() + deadline 86 while True: 87 if cond(): 88 return 89 if time.monotonic() > end: 90 _fail("Waiting for condition timed out", comment) 91 return 92 time.sleep(sleep) 93 94 95 def ktap_result(ok, cnt=1, case="", comment=""): 96 global KSFT_RESULT_ALL 97 KSFT_RESULT_ALL = KSFT_RESULT_ALL and ok 98 99 res = "" 100 if not ok: 101 res += "not " 102 res += "ok " 103 res += str(cnt) + " " 104 res += KSFT_MAIN_NAME 105 if case: 106 res += "." + str(case.__name__) 107 if comment: 108 res += " # " + comment 109 print(res) 110 111 112 def ksft_flush_defer(): 113 global KSFT_RESULT 114 115 i = 0 116 qlen_start = len(global_defer_queue) 117 while global_defer_queue: 118 i += 1 119 entry = global_defer_queue.pop() 120 try: 121 entry.exec_only() 122 except: 123 ksft_pr(f"Exception while handling defer / cleanup (callback {i} of {qlen_start})!") 124 tb = traceback.format_exc() 125 for line in tb.strip().split('\n'): 126 ksft_pr("Defer Exception|", line) 127 KSFT_RESULT = False 128 129 130 def ksft_run(cases=None, globs=None, case_pfx=None, args=()): 131 cases = cases or [] 132 133 if globs and case_pfx: 134 for key, value in globs.items(): 135 if not callable(value): 136 continue 137 for prefix in case_pfx: 138 if key.startswith(prefix): 139 cases.append(value) 140 break 141 142 totals = {"pass": 0, "fail": 0, "skip": 0, "xfail": 0} 143 144 print("KTAP version 1") 145 print("1.." + str(len(cases))) 146 147 global KSFT_RESULT 148 cnt = 0 149 stop = False 150 for case in cases: 151 KSFT_RESULT = True 152 cnt += 1 153 comment = "" 154 cnt_key = "" 155 156 try: 157 case(*args) 158 except KsftSkipEx as e: 159 comment = "SKIP " + str(e) 160 cnt_key = 'skip' 161 except KsftXfailEx as e: 162 comment = "XFAIL " + str(e) 163 cnt_key = 'xfail' 164 except BaseException as e: 165 stop |= isinstance(e, KeyboardInterrupt) 166 tb = traceback.format_exc() 167 for line in tb.strip().split('\n'): 168 ksft_pr("Exception|", line) 169 if stop: 170 ksft_pr("Stopping tests due to KeyboardInterrupt.") 171 KSFT_RESULT = False 172 cnt_key = 'fail' 173 174 ksft_flush_defer() 175 176 if not cnt_key: 177 cnt_key = 'pass' if KSFT_RESULT else 'fail' 178 179 ktap_result(KSFT_RESULT, cnt, case, comment=comment) 180 totals[cnt_key] += 1 181 182 if stop: 183 break 184 185 print( 186 f"# Totals: pass:{totals['pass']} fail:{totals['fail']} xfail:{totals['xfail']} xpass:0 skip:{totals['skip']} error:0" 187 ) 188 189 190 def ksft_exit(): 191 global KSFT_RESULT_ALL 192 sys.exit(0 if KSFT_RESULT_ALL else 1)
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.