1 # SPDX-License-Identifier: GPL-2.0 1 # SPDX-License-Identifier: GPL-2.0 2 # 2 # 3 # Parses KTAP test results from a kernel dmesg 3 # Parses KTAP test results from a kernel dmesg log and incrementally prints 4 # results with reader-friendly format. Stores 4 # results with reader-friendly format. Stores and returns test results in a 5 # Test object. 5 # Test object. 6 # 6 # 7 # Copyright (C) 2019, Google LLC. 7 # Copyright (C) 2019, Google LLC. 8 # Author: Felix Guo <felixguoxiuping@gmail.com> 8 # Author: Felix Guo <felixguoxiuping@gmail.com> 9 # Author: Brendan Higgins <brendanhiggins@googl 9 # Author: Brendan Higgins <brendanhiggins@google.com> 10 # Author: Rae Moar <rmoar@google.com> 10 # Author: Rae Moar <rmoar@google.com> 11 11 12 from __future__ import annotations 12 from __future__ import annotations 13 from dataclasses import dataclass 13 from dataclasses import dataclass 14 import re 14 import re 15 import textwrap 15 import textwrap 16 16 17 from enum import Enum, auto 17 from enum import Enum, auto 18 from typing import Iterable, Iterator, List, O 18 from typing import Iterable, Iterator, List, Optional, Tuple 19 19 20 from kunit_printer import stdout 20 from kunit_printer import stdout 21 21 22 class Test: 22 class Test: 23 """ 23 """ 24 A class to represent a test parsed fro 24 A class to represent a test parsed from KTAP results. All KTAP 25 results within a test log are stored i 25 results within a test log are stored in a main Test object as 26 subtests. 26 subtests. 27 27 28 Attributes: 28 Attributes: 29 status : TestStatus - status of the te 29 status : TestStatus - status of the test 30 name : str - name of the test 30 name : str - name of the test 31 expected_count : int - expected number 31 expected_count : int - expected number of subtests (0 if single 32 test case and None if unknown 32 test case and None if unknown expected number of subtests) 33 subtests : List[Test] - list of subtes 33 subtests : List[Test] - list of subtests 34 log : List[str] - log of KTAP lines th 34 log : List[str] - log of KTAP lines that correspond to the test 35 counts : TestCounts - counts of the te 35 counts : TestCounts - counts of the test statuses and errors of 36 subtests or of the test itself 36 subtests or of the test itself if the test is a single 37 test case. 37 test case. 38 """ 38 """ 39 def __init__(self) -> None: 39 def __init__(self) -> None: 40 """Creates Test object with de 40 """Creates Test object with default attributes.""" 41 self.status = TestStatus.TEST_ 41 self.status = TestStatus.TEST_CRASHED 42 self.name = '' 42 self.name = '' 43 self.expected_count = 0 # typ 43 self.expected_count = 0 # type: Optional[int] 44 self.subtests = [] # type: Li 44 self.subtests = [] # type: List[Test] 45 self.log = [] # type: List[st 45 self.log = [] # type: List[str] 46 self.counts = TestCounts() 46 self.counts = TestCounts() 47 47 48 def __str__(self) -> str: 48 def __str__(self) -> str: 49 """Returns string representati 49 """Returns string representation of a Test class object.""" 50 return (f'Test({self.status}, 50 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 51 f'{self.subtests}, {se 51 f'{self.subtests}, {self.log}, {self.counts})') 52 52 53 def __repr__(self) -> str: 53 def __repr__(self) -> str: 54 """Returns string representati 54 """Returns string representation of a Test class object.""" 55 return str(self) 55 return str(self) 56 56 57 def add_error(self, error_message: str 57 def add_error(self, error_message: str) -> None: 58 """Records an error that occur 58 """Records an error that occurred while parsing this test.""" 59 self.counts.errors += 1 59 self.counts.errors += 1 60 stdout.print_with_timestamp(st 60 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') 61 61 62 def ok_status(self) -> bool: 62 def ok_status(self) -> bool: 63 """Returns true if the status 63 """Returns true if the status was ok, i.e. passed or skipped.""" 64 return self.status in (TestSta 64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED) 65 65 66 class TestStatus(Enum): 66 class TestStatus(Enum): 67 """An enumeration class to represent t 67 """An enumeration class to represent the status of a test.""" 68 SUCCESS = auto() 68 SUCCESS = auto() 69 FAILURE = auto() 69 FAILURE = auto() 70 SKIPPED = auto() 70 SKIPPED = auto() 71 TEST_CRASHED = auto() 71 TEST_CRASHED = auto() 72 NO_TESTS = auto() 72 NO_TESTS = auto() 73 FAILURE_TO_PARSE_TESTS = auto() 73 FAILURE_TO_PARSE_TESTS = auto() 74 74 75 @dataclass 75 @dataclass 76 class TestCounts: 76 class TestCounts: 77 """ 77 """ 78 Tracks the counts of statuses of all t 78 Tracks the counts of statuses of all test cases and any errors within 79 a Test. 79 a Test. 80 """ 80 """ 81 passed: int = 0 81 passed: int = 0 82 failed: int = 0 82 failed: int = 0 83 crashed: int = 0 83 crashed: int = 0 84 skipped: int = 0 84 skipped: int = 0 85 errors: int = 0 85 errors: int = 0 86 86 87 def __str__(self) -> str: 87 def __str__(self) -> str: 88 """Returns the string represen 88 """Returns the string representation of a TestCounts object.""" 89 statuses = [('passed', self.pa 89 statuses = [('passed', self.passed), ('failed', self.failed), 90 ('crashed', self.crash 90 ('crashed', self.crashed), ('skipped', self.skipped), 91 ('errors', self.errors 91 ('errors', self.errors)] 92 return f'Ran {self.total()} te 92 return f'Ran {self.total()} tests: ' + \ 93 ', '.join(f'{s}: {n}' 93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 94 94 95 def total(self) -> int: 95 def total(self) -> int: 96 """Returns the total number of 96 """Returns the total number of test cases within a test 97 object, where a test case is a 97 object, where a test case is a test with no subtests. 98 """ 98 """ 99 return (self.passed + self.fai 99 return (self.passed + self.failed + self.crashed + 100 self.skipped) 100 self.skipped) 101 101 102 def add_subtest_counts(self, counts: T 102 def add_subtest_counts(self, counts: TestCounts) -> None: 103 """ 103 """ 104 Adds the counts of another Tes 104 Adds the counts of another TestCounts object to the current 105 TestCounts object. Used to add 105 TestCounts object. Used to add the counts of a subtest to the 106 parent test. 106 parent test. 107 107 108 Parameters: 108 Parameters: 109 counts - a different TestCount 109 counts - a different TestCounts object whose counts 110 will be added to the c 110 will be added to the counts of the TestCounts object 111 """ 111 """ 112 self.passed += counts.passed 112 self.passed += counts.passed 113 self.failed += counts.failed 113 self.failed += counts.failed 114 self.crashed += counts.crashed 114 self.crashed += counts.crashed 115 self.skipped += counts.skipped 115 self.skipped += counts.skipped 116 self.errors += counts.errors 116 self.errors += counts.errors 117 117 118 def get_status(self) -> TestStatus: 118 def get_status(self) -> TestStatus: 119 """Returns the aggregated stat 119 """Returns the aggregated status of a Test using test 120 counts. 120 counts. 121 """ 121 """ 122 if self.total() == 0: 122 if self.total() == 0: 123 return TestStatus.NO_T 123 return TestStatus.NO_TESTS 124 if self.crashed: 124 if self.crashed: 125 # Crashes should take 125 # Crashes should take priority. 126 return TestStatus.TEST 126 return TestStatus.TEST_CRASHED 127 if self.failed: 127 if self.failed: 128 return TestStatus.FAIL 128 return TestStatus.FAILURE 129 if self.passed: 129 if self.passed: 130 # No failures or crash 130 # No failures or crashes, looks good! 131 return TestStatus.SUCC 131 return TestStatus.SUCCESS 132 # We have only skipped tests. 132 # We have only skipped tests. 133 return TestStatus.SKIPPED 133 return TestStatus.SKIPPED 134 134 135 def add_status(self, status: TestStatu 135 def add_status(self, status: TestStatus) -> None: 136 """Increments the count for `s 136 """Increments the count for `status`.""" 137 if status == TestStatus.SUCCES 137 if status == TestStatus.SUCCESS: 138 self.passed += 1 138 self.passed += 1 139 elif status == TestStatus.FAIL 139 elif status == TestStatus.FAILURE: 140 self.failed += 1 140 self.failed += 1 141 elif status == TestStatus.SKIP 141 elif status == TestStatus.SKIPPED: 142 self.skipped += 1 142 self.skipped += 1 143 elif status != TestStatus.NO_T 143 elif status != TestStatus.NO_TESTS: 144 self.crashed += 1 144 self.crashed += 1 145 145 146 class LineStream: 146 class LineStream: 147 """ 147 """ 148 A class to represent the lines of kern 148 A class to represent the lines of kernel output. 149 Provides a lazy peek()/pop() interface 149 Provides a lazy peek()/pop() interface over an iterator of 150 (line#, text). 150 (line#, text). 151 """ 151 """ 152 _lines: Iterator[Tuple[int, str]] 152 _lines: Iterator[Tuple[int, str]] 153 _next: Tuple[int, str] 153 _next: Tuple[int, str] 154 _need_next: bool 154 _need_next: bool 155 _done: bool 155 _done: bool 156 156 157 def __init__(self, lines: Iterator[Tup 157 def __init__(self, lines: Iterator[Tuple[int, str]]): 158 """Creates a new LineStream th 158 """Creates a new LineStream that wraps the given iterator.""" 159 self._lines = lines 159 self._lines = lines 160 self._done = False 160 self._done = False 161 self._need_next = True 161 self._need_next = True 162 self._next = (0, '') 162 self._next = (0, '') 163 163 164 def _get_next(self) -> None: 164 def _get_next(self) -> None: 165 """Advances the LineSteam to t 165 """Advances the LineSteam to the next line, if necessary.""" 166 if not self._need_next: 166 if not self._need_next: 167 return 167 return 168 try: 168 try: 169 self._next = next(self 169 self._next = next(self._lines) 170 except StopIteration: 170 except StopIteration: 171 self._done = True 171 self._done = True 172 finally: 172 finally: 173 self._need_next = Fals 173 self._need_next = False 174 174 175 def peek(self) -> str: 175 def peek(self) -> str: 176 """Returns the current line, w 176 """Returns the current line, without advancing the LineStream. 177 """ 177 """ 178 self._get_next() 178 self._get_next() 179 return self._next[1] 179 return self._next[1] 180 180 181 def pop(self) -> str: 181 def pop(self) -> str: 182 """Returns the current line an 182 """Returns the current line and advances the LineStream to 183 the next line. 183 the next line. 184 """ 184 """ 185 s = self.peek() 185 s = self.peek() 186 if self._done: 186 if self._done: 187 raise ValueError(f'Lin 187 raise ValueError(f'LineStream: going past EOF, last line was {s}') 188 self._need_next = True 188 self._need_next = True 189 return s 189 return s 190 190 191 def __bool__(self) -> bool: 191 def __bool__(self) -> bool: 192 """Returns True if stream has 192 """Returns True if stream has more lines.""" 193 self._get_next() 193 self._get_next() 194 return not self._done 194 return not self._done 195 195 196 # Only used by kunit_tool_test.py. 196 # Only used by kunit_tool_test.py. 197 def __iter__(self) -> Iterator[str]: 197 def __iter__(self) -> Iterator[str]: 198 """Empties all lines stored in 198 """Empties all lines stored in LineStream object into 199 Iterator object and returns th 199 Iterator object and returns the Iterator object. 200 """ 200 """ 201 while bool(self): 201 while bool(self): 202 yield self.pop() 202 yield self.pop() 203 203 204 def line_number(self) -> int: 204 def line_number(self) -> int: 205 """Returns the line number of 205 """Returns the line number of the current line.""" 206 self._get_next() 206 self._get_next() 207 return self._next[0] 207 return self._next[0] 208 208 209 # Parsing helper methods: 209 # Parsing helper methods: 210 210 211 KTAP_START = re.compile(r'\s*KTAP version ([0- 211 KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$') 212 TAP_START = re.compile(r'\s*TAP version ([0-9] 212 TAP_START = re.compile(r'\s*TAP version ([0-9]+)$') 213 KTAP_END = re.compile(r'\s*(List of all partit 213 KTAP_END = re.compile(r'\s*(List of all partitions:|' 214 'Kernel panic - not syncing: VFS:|rebo 214 'Kernel panic - not syncing: VFS:|reboot: System halted)') 215 EXECUTOR_ERROR = re.compile(r'\s*kunit executo 215 EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$') 216 216 217 def extract_tap_lines(kernel_output: Iterable[ 217 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 218 """Extracts KTAP lines from the kernel 218 """Extracts KTAP lines from the kernel output.""" 219 def isolate_ktap_output(kernel_output: 219 def isolate_ktap_output(kernel_output: Iterable[str]) \ 220 -> Iterator[Tuple[int, 220 -> Iterator[Tuple[int, str]]: 221 line_num = 0 221 line_num = 0 222 started = False 222 started = False 223 for line in kernel_output: 223 for line in kernel_output: 224 line_num += 1 224 line_num += 1 225 line = line.rstrip() 225 line = line.rstrip() # remove trailing \n 226 if not started and KTA 226 if not started and KTAP_START.search(line): 227 # start extrac 227 # start extracting KTAP lines and set prefix 228 # to number of 228 # to number of characters before version line 229 prefix_len = l 229 prefix_len = len( 230 line.s 230 line.split('KTAP version')[0]) 231 started = True 231 started = True 232 yield line_num 232 yield line_num, line[prefix_len:] 233 elif not started and T 233 elif not started and TAP_START.search(line): 234 # start extrac 234 # start extracting KTAP lines and set prefix 235 # to number of 235 # to number of characters before version line 236 prefix_len = l 236 prefix_len = len(line.split('TAP version')[0]) 237 started = True 237 started = True 238 yield line_num 238 yield line_num, line[prefix_len:] 239 elif started and KTAP_ 239 elif started and KTAP_END.search(line): 240 # stop extract 240 # stop extracting KTAP lines 241 break 241 break 242 elif started: 242 elif started: 243 # remove the p 243 # remove the prefix, if any. 244 line = line[pr 244 line = line[prefix_len:] 245 yield line_num 245 yield line_num, line 246 elif EXECUTOR_ERROR.se 246 elif EXECUTOR_ERROR.search(line): 247 yield line_num 247 yield line_num, line 248 return LineStream(lines=isolate_ktap_o 248 return LineStream(lines=isolate_ktap_output(kernel_output)) 249 249 250 KTAP_VERSIONS = [1] 250 KTAP_VERSIONS = [1] 251 TAP_VERSIONS = [13, 14] 251 TAP_VERSIONS = [13, 14] 252 252 253 def check_version(version_num: int, accepted_v 253 def check_version(version_num: int, accepted_versions: List[int], 254 version_type: str, tes 254 version_type: str, test: Test) -> None: 255 """ 255 """ 256 Adds error to test object if version n 256 Adds error to test object if version number is too high or too 257 low. 257 low. 258 258 259 Parameters: 259 Parameters: 260 version_num - The inputted version num 260 version_num - The inputted version number from the parsed KTAP or TAP 261 header line 261 header line 262 accepted_version - List of accepted KT 262 accepted_version - List of accepted KTAP or TAP versions 263 version_type - 'KTAP' or 'TAP' dependi 263 version_type - 'KTAP' or 'TAP' depending on the type of 264 version line. 264 version line. 265 test - Test object for current test be 265 test - Test object for current test being parsed 266 """ 266 """ 267 if version_num < min(accepted_versions 267 if version_num < min(accepted_versions): 268 test.add_error(f'{version_type 268 test.add_error(f'{version_type} version lower than expected!') 269 elif version_num > max(accepted_versio 269 elif version_num > max(accepted_versions): 270 test.add_error(f'{version_type 270 test.add_error(f'{version_type} version higer than expected!') 271 271 272 def parse_ktap_header(lines: LineStream, test: 272 def parse_ktap_header(lines: LineStream, test: Test) -> bool: 273 """ 273 """ 274 Parses KTAP/TAP header line and checks 274 Parses KTAP/TAP header line and checks version number. 275 Returns False if fails to parse KTAP/T 275 Returns False if fails to parse KTAP/TAP header line. 276 276 277 Accepted formats: 277 Accepted formats: 278 - 'KTAP version [version number]' 278 - 'KTAP version [version number]' 279 - 'TAP version [version number]' 279 - 'TAP version [version number]' 280 280 281 Parameters: 281 Parameters: 282 lines - LineStream of KTAP output to p 282 lines - LineStream of KTAP output to parse 283 test - Test object for current test be 283 test - Test object for current test being parsed 284 284 285 Return: 285 Return: 286 True if successfully parsed KTAP/TAP h 286 True if successfully parsed KTAP/TAP header line 287 """ 287 """ 288 ktap_match = KTAP_START.match(lines.pe 288 ktap_match = KTAP_START.match(lines.peek()) 289 tap_match = TAP_START.match(lines.peek 289 tap_match = TAP_START.match(lines.peek()) 290 if ktap_match: 290 if ktap_match: 291 version_num = int(ktap_match.g 291 version_num = int(ktap_match.group(1)) 292 check_version(version_num, KTA 292 check_version(version_num, KTAP_VERSIONS, 'KTAP', test) 293 elif tap_match: 293 elif tap_match: 294 version_num = int(tap_match.gr 294 version_num = int(tap_match.group(1)) 295 check_version(version_num, TAP 295 check_version(version_num, TAP_VERSIONS, 'TAP', test) 296 else: 296 else: 297 return False 297 return False 298 lines.pop() 298 lines.pop() 299 return True 299 return True 300 300 301 TEST_HEADER = re.compile(r'^\s*# Subtest: (.*) 301 TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$') 302 302 303 def parse_test_header(lines: LineStream, test: 303 def parse_test_header(lines: LineStream, test: Test) -> bool: 304 """ 304 """ 305 Parses test header and stores test nam 305 Parses test header and stores test name in test object. 306 Returns False if fails to parse test h 306 Returns False if fails to parse test header line. 307 307 308 Accepted format: 308 Accepted format: 309 - '# Subtest: [test name]' 309 - '# Subtest: [test name]' 310 310 311 Parameters: 311 Parameters: 312 lines - LineStream of KTAP output to p 312 lines - LineStream of KTAP output to parse 313 test - Test object for current test be 313 test - Test object for current test being parsed 314 314 315 Return: 315 Return: 316 True if successfully parsed test heade 316 True if successfully parsed test header line 317 """ 317 """ 318 match = TEST_HEADER.match(lines.peek() 318 match = TEST_HEADER.match(lines.peek()) 319 if not match: 319 if not match: 320 return False 320 return False 321 test.name = match.group(1) 321 test.name = match.group(1) 322 lines.pop() 322 lines.pop() 323 return True 323 return True 324 324 325 TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)') 325 TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)') 326 326 327 def parse_test_plan(lines: LineStream, test: T 327 def parse_test_plan(lines: LineStream, test: Test) -> bool: 328 """ 328 """ 329 Parses test plan line and stores the e 329 Parses test plan line and stores the expected number of subtests in 330 test object. Reports an error if expec 330 test object. Reports an error if expected count is 0. 331 Returns False and sets expected_count 331 Returns False and sets expected_count to None if there is no valid test 332 plan. 332 plan. 333 333 334 Accepted format: 334 Accepted format: 335 - '1..[number of subtests]' 335 - '1..[number of subtests]' 336 336 337 Parameters: 337 Parameters: 338 lines - LineStream of KTAP output to p 338 lines - LineStream of KTAP output to parse 339 test - Test object for current test be 339 test - Test object for current test being parsed 340 340 341 Return: 341 Return: 342 True if successfully parsed test plan 342 True if successfully parsed test plan line 343 """ 343 """ 344 match = TEST_PLAN.match(lines.peek()) 344 match = TEST_PLAN.match(lines.peek()) 345 if not match: 345 if not match: 346 test.expected_count = None 346 test.expected_count = None 347 return False 347 return False 348 expected_count = int(match.group(1)) 348 expected_count = int(match.group(1)) 349 test.expected_count = expected_count 349 test.expected_count = expected_count 350 lines.pop() 350 lines.pop() 351 return True 351 return True 352 352 353 TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0 353 TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') 354 354 355 TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok 355 TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') 356 356 357 def peek_test_name_match(lines: LineStream, te 357 def peek_test_name_match(lines: LineStream, test: Test) -> bool: 358 """ 358 """ 359 Matches current line with the format o 359 Matches current line with the format of a test result line and checks 360 if the name matches the name of the cu 360 if the name matches the name of the current test. 361 Returns False if fails to match format 361 Returns False if fails to match format or name. 362 362 363 Accepted format: 363 Accepted format: 364 - '[ok|not ok] [test number] [-] [test 364 - '[ok|not ok] [test number] [-] [test name] [optional skip 365 directive]' 365 directive]' 366 366 367 Parameters: 367 Parameters: 368 lines - LineStream of KTAP output to p 368 lines - LineStream of KTAP output to parse 369 test - Test object for current test be 369 test - Test object for current test being parsed 370 370 371 Return: 371 Return: 372 True if matched a test result line and 372 True if matched a test result line and the name matching the 373 expected test name 373 expected test name 374 """ 374 """ 375 line = lines.peek() 375 line = lines.peek() 376 match = TEST_RESULT.match(line) 376 match = TEST_RESULT.match(line) 377 if not match: 377 if not match: 378 return False 378 return False 379 name = match.group(4) 379 name = match.group(4) 380 return name == test.name 380 return name == test.name 381 381 382 def parse_test_result(lines: LineStream, test: 382 def parse_test_result(lines: LineStream, test: Test, 383 expected_num: int) -> 383 expected_num: int) -> bool: 384 """ 384 """ 385 Parses test result line and stores the 385 Parses test result line and stores the status and name in the test 386 object. Reports an error if the test n 386 object. Reports an error if the test number does not match expected 387 test number. 387 test number. 388 Returns False if fails to parse test r 388 Returns False if fails to parse test result line. 389 389 390 Note that the SKIP directive is the on 390 Note that the SKIP directive is the only direction that causes a 391 change in status. 391 change in status. 392 392 393 Accepted format: 393 Accepted format: 394 - '[ok|not ok] [test number] [-] [test 394 - '[ok|not ok] [test number] [-] [test name] [optional skip 395 directive]' 395 directive]' 396 396 397 Parameters: 397 Parameters: 398 lines - LineStream of KTAP output to p 398 lines - LineStream of KTAP output to parse 399 test - Test object for current test be 399 test - Test object for current test being parsed 400 expected_num - expected test number fo 400 expected_num - expected test number for current test 401 401 402 Return: 402 Return: 403 True if successfully parsed a test res 403 True if successfully parsed a test result line. 404 """ 404 """ 405 line = lines.peek() 405 line = lines.peek() 406 match = TEST_RESULT.match(line) 406 match = TEST_RESULT.match(line) 407 skip_match = TEST_RESULT_SKIP.match(li 407 skip_match = TEST_RESULT_SKIP.match(line) 408 408 409 # Check if line matches test result li 409 # Check if line matches test result line format 410 if not match: 410 if not match: 411 return False 411 return False 412 lines.pop() 412 lines.pop() 413 413 414 # Set name of test object 414 # Set name of test object 415 if skip_match: 415 if skip_match: 416 test.name = skip_match.group(4 416 test.name = skip_match.group(4) 417 else: 417 else: 418 test.name = match.group(4) 418 test.name = match.group(4) 419 419 420 # Check test num 420 # Check test num 421 num = int(match.group(2)) 421 num = int(match.group(2)) 422 if num != expected_num: 422 if num != expected_num: 423 test.add_error(f'Expected test 423 test.add_error(f'Expected test number {expected_num} but found {num}') 424 424 425 # Set status of test object 425 # Set status of test object 426 status = match.group(1) 426 status = match.group(1) 427 if skip_match: 427 if skip_match: 428 test.status = TestStatus.SKIPP 428 test.status = TestStatus.SKIPPED 429 elif status == 'ok': 429 elif status == 'ok': 430 test.status = TestStatus.SUCCE 430 test.status = TestStatus.SUCCESS 431 else: 431 else: 432 test.status = TestStatus.FAILU 432 test.status = TestStatus.FAILURE 433 return True 433 return True 434 434 435 def parse_diagnostic(lines: LineStream) -> Lis 435 def parse_diagnostic(lines: LineStream) -> List[str]: 436 """ 436 """ 437 Parse lines that do not match the form 437 Parse lines that do not match the format of a test result line or 438 test header line and returns them in l 438 test header line and returns them in list. 439 439 440 Line formats that are not parsed: 440 Line formats that are not parsed: 441 - '# Subtest: [test name]' 441 - '# Subtest: [test name]' 442 - '[ok|not ok] [test number] [-] [test 442 - '[ok|not ok] [test number] [-] [test name] [optional skip 443 directive]' 443 directive]' 444 - 'KTAP version [version number]' 444 - 'KTAP version [version number]' 445 445 446 Parameters: 446 Parameters: 447 lines - LineStream of KTAP output to p 447 lines - LineStream of KTAP output to parse 448 448 449 Return: 449 Return: 450 Log of diagnostic lines 450 Log of diagnostic lines 451 """ 451 """ 452 log = [] # type: List[str] 452 log = [] # type: List[str] 453 non_diagnostic_lines = [TEST_RESULT, T 453 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN] 454 while lines and not any(re.match(lines 454 while lines and not any(re.match(lines.peek()) 455 for re in non_diagnost 455 for re in non_diagnostic_lines): 456 log.append(lines.pop()) 456 log.append(lines.pop()) 457 return log 457 return log 458 458 459 459 460 # Printing helper methods: 460 # Printing helper methods: 461 461 462 DIVIDER = '=' * 60 462 DIVIDER = '=' * 60 463 463 464 def format_test_divider(message: str, len_mess 464 def format_test_divider(message: str, len_message: int) -> str: 465 """ 465 """ 466 Returns string with message centered i 466 Returns string with message centered in fixed width divider. 467 467 468 Example: 468 Example: 469 '===================== message example 469 '===================== message example =====================' 470 470 471 Parameters: 471 Parameters: 472 message - message to be centered in di 472 message - message to be centered in divider line 473 len_message - length of the message to 473 len_message - length of the message to be printed such that 474 any characters of the color co 474 any characters of the color codes are not counted 475 475 476 Return: 476 Return: 477 String containing message centered in 477 String containing message centered in fixed width divider 478 """ 478 """ 479 default_count = 3 # default number of 479 default_count = 3 # default number of dashes 480 len_1 = default_count 480 len_1 = default_count 481 len_2 = default_count 481 len_2 = default_count 482 difference = len(DIVIDER) - len_messag 482 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 483 if difference > 0: 483 if difference > 0: 484 # calculate number of dashes f 484 # calculate number of dashes for each side of the divider 485 len_1 = int(difference / 2) 485 len_1 = int(difference / 2) 486 len_2 = difference - len_1 486 len_2 = difference - len_1 487 return ('=' * len_1) + f' {message} ' 487 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 488 488 489 def print_test_header(test: Test) -> None: 489 def print_test_header(test: Test) -> None: 490 """ 490 """ 491 Prints test header with test name and 491 Prints test header with test name and optionally the expected number 492 of subtests. 492 of subtests. 493 493 494 Example: 494 Example: 495 '=================== example (2 subtes 495 '=================== example (2 subtests) ===================' 496 496 497 Parameters: 497 Parameters: 498 test - Test object representing curren 498 test - Test object representing current test being printed 499 """ 499 """ 500 message = test.name 500 message = test.name 501 if message != "": 501 if message != "": 502 # Add a leading space before t 502 # Add a leading space before the subtest counts only if a test name 503 # is provided using a "# Subte 503 # is provided using a "# Subtest" header line. 504 message += " " 504 message += " " 505 if test.expected_count: 505 if test.expected_count: 506 if test.expected_count == 1: 506 if test.expected_count == 1: 507 message += '(1 subtest 507 message += '(1 subtest)' 508 else: 508 else: 509 message += f'({test.ex 509 message += f'({test.expected_count} subtests)' 510 stdout.print_with_timestamp(format_tes 510 stdout.print_with_timestamp(format_test_divider(message, len(message))) 511 511 512 def print_log(log: Iterable[str]) -> None: 512 def print_log(log: Iterable[str]) -> None: 513 """Prints all strings in saved log for 513 """Prints all strings in saved log for test in yellow.""" 514 formatted = textwrap.dedent('\n'.join( 514 formatted = textwrap.dedent('\n'.join(log)) 515 for line in formatted.splitlines(): 515 for line in formatted.splitlines(): 516 stdout.print_with_timestamp(st 516 stdout.print_with_timestamp(stdout.yellow(line)) 517 517 518 def format_test_result(test: Test) -> str: 518 def format_test_result(test: Test) -> str: 519 """ 519 """ 520 Returns string with formatted test res 520 Returns string with formatted test result with colored status and test 521 name. 521 name. 522 522 523 Example: 523 Example: 524 '[PASSED] example' 524 '[PASSED] example' 525 525 526 Parameters: 526 Parameters: 527 test - Test object representing curren 527 test - Test object representing current test being printed 528 528 529 Return: 529 Return: 530 String containing formatted test resul 530 String containing formatted test result 531 """ 531 """ 532 if test.status == TestStatus.SUCCESS: 532 if test.status == TestStatus.SUCCESS: 533 return stdout.green('[PASSED] 533 return stdout.green('[PASSED] ') + test.name 534 if test.status == TestStatus.SKIPPED: 534 if test.status == TestStatus.SKIPPED: 535 return stdout.yellow('[SKIPPED 535 return stdout.yellow('[SKIPPED] ') + test.name 536 if test.status == TestStatus.NO_TESTS: 536 if test.status == TestStatus.NO_TESTS: 537 return stdout.yellow('[NO TEST 537 return stdout.yellow('[NO TESTS RUN] ') + test.name 538 if test.status == TestStatus.TEST_CRAS 538 if test.status == TestStatus.TEST_CRASHED: 539 print_log(test.log) 539 print_log(test.log) 540 return stdout.red('[CRASHED] ' 540 return stdout.red('[CRASHED] ') + test.name 541 print_log(test.log) 541 print_log(test.log) 542 return stdout.red('[FAILED] ') + test. 542 return stdout.red('[FAILED] ') + test.name 543 543 544 def print_test_result(test: Test) -> None: 544 def print_test_result(test: Test) -> None: 545 """ 545 """ 546 Prints result line with status of test 546 Prints result line with status of test. 547 547 548 Example: 548 Example: 549 '[PASSED] example' 549 '[PASSED] example' 550 550 551 Parameters: 551 Parameters: 552 test - Test object representing curren 552 test - Test object representing current test being printed 553 """ 553 """ 554 stdout.print_with_timestamp(format_tes 554 stdout.print_with_timestamp(format_test_result(test)) 555 555 556 def print_test_footer(test: Test) -> None: 556 def print_test_footer(test: Test) -> None: 557 """ 557 """ 558 Prints test footer with status of test 558 Prints test footer with status of test. 559 559 560 Example: 560 Example: 561 '===================== [PASSED] exampl 561 '===================== [PASSED] example =====================' 562 562 563 Parameters: 563 Parameters: 564 test - Test object representing curren 564 test - Test object representing current test being printed 565 """ 565 """ 566 message = format_test_result(test) 566 message = format_test_result(test) 567 stdout.print_with_timestamp(format_tes 567 stdout.print_with_timestamp(format_test_divider(message, 568 len(message) - stdout.color_le 568 len(message) - stdout.color_len())) 569 569 570 570 571 571 572 def _summarize_failed_tests(test: Test) -> str 572 def _summarize_failed_tests(test: Test) -> str: 573 """Tries to summarize all the failing 573 """Tries to summarize all the failing subtests in `test`.""" 574 574 575 def failed_names(test: Test, parent_na 575 def failed_names(test: Test, parent_name: str) -> List[str]: 576 # Note: we use 'main' internal 576 # Note: we use 'main' internally for the top-level test. 577 if not parent_name or parent_n 577 if not parent_name or parent_name == 'main': 578 full_name = test.name 578 full_name = test.name 579 else: 579 else: 580 full_name = parent_nam 580 full_name = parent_name + '.' + test.name 581 581 582 if not test.subtests: # this 582 if not test.subtests: # this is a leaf node 583 return [full_name] 583 return [full_name] 584 584 585 # If all the children failed, 585 # If all the children failed, just say this subtest failed. 586 # Don't summarize it down "the 586 # Don't summarize it down "the top-level test failed", though. 587 failed_subtests = [sub for sub 587 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()] 588 if parent_name and len(failed_ 588 if parent_name and len(failed_subtests) == len(test.subtests): 589 return [full_name] 589 return [full_name] 590 590 591 all_failures = [] # type: Lis 591 all_failures = [] # type: List[str] 592 for t in failed_subtests: 592 for t in failed_subtests: 593 all_failures.extend(fa 593 all_failures.extend(failed_names(t, full_name)) 594 return all_failures 594 return all_failures 595 595 596 failures = failed_names(test, '') 596 failures = failed_names(test, '') 597 # If there are too many failures, prin 597 # If there are too many failures, printing them out will just be noisy. 598 if len(failures) > 10: # this is an a 598 if len(failures) > 10: # this is an arbitrary limit 599 return '' 599 return '' 600 600 601 return 'Failures: ' + ', '.join(failur 601 return 'Failures: ' + ', '.join(failures) 602 602 603 603 604 def print_summary_line(test: Test) -> None: 604 def print_summary_line(test: Test) -> None: 605 """ 605 """ 606 Prints summary line of test object. Co 606 Prints summary line of test object. Color of line is dependent on 607 status of test. Color is green if test 607 status of test. Color is green if test passes, yellow if test is 608 skipped, and red if the test fails or 608 skipped, and red if the test fails or crashes. Summary line contains 609 counts of the statuses of the tests su 609 counts of the statuses of the tests subtests or the test itself if it 610 has no subtests. 610 has no subtests. 611 611 612 Example: 612 Example: 613 "Testing complete. Passed: 2, Failed: 613 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 614 Errors: 0" 614 Errors: 0" 615 615 616 test - Test object representing curren 616 test - Test object representing current test being printed 617 """ 617 """ 618 if test.status == TestStatus.SUCCESS: 618 if test.status == TestStatus.SUCCESS: 619 color = stdout.green 619 color = stdout.green 620 elif test.status in (TestStatus.SKIPPE 620 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): 621 color = stdout.yellow 621 color = stdout.yellow 622 else: 622 else: 623 color = stdout.red 623 color = stdout.red 624 stdout.print_with_timestamp(color(f'Te 624 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}')) 625 625 626 # Summarize failures that might have g 626 # Summarize failures that might have gone off-screen since we had a lot 627 # of tests (arbitrarily defined as >=1 627 # of tests (arbitrarily defined as >=100 for now). 628 if test.ok_status() or test.counts.tot 628 if test.ok_status() or test.counts.total() < 100: 629 return 629 return 630 summarized = _summarize_failed_tests(t 630 summarized = _summarize_failed_tests(test) 631 if not summarized: 631 if not summarized: 632 return 632 return 633 stdout.print_with_timestamp(color(summ 633 stdout.print_with_timestamp(color(summarized)) 634 634 635 # Other methods: 635 # Other methods: 636 636 637 def bubble_up_test_results(test: Test) -> None 637 def bubble_up_test_results(test: Test) -> None: 638 """ 638 """ 639 If the test has subtests, add the test 639 If the test has subtests, add the test counts of the subtests to the 640 test and check if any of the tests cra 640 test and check if any of the tests crashed and if so set the test 641 status to crashed. Otherwise if the te 641 status to crashed. Otherwise if the test has no subtests add the 642 status of the test to the test counts. 642 status of the test to the test counts. 643 643 644 Parameters: 644 Parameters: 645 test - Test object for current test be 645 test - Test object for current test being parsed 646 """ 646 """ 647 subtests = test.subtests 647 subtests = test.subtests 648 counts = test.counts 648 counts = test.counts 649 status = test.status 649 status = test.status 650 for t in subtests: 650 for t in subtests: 651 counts.add_subtest_counts(t.co 651 counts.add_subtest_counts(t.counts) 652 if counts.total() == 0: 652 if counts.total() == 0: 653 counts.add_status(status) 653 counts.add_status(status) 654 elif test.counts.get_status() == TestS 654 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 655 test.status = TestStatus.TEST_ 655 test.status = TestStatus.TEST_CRASHED 656 656 657 def parse_test(lines: LineStream, expected_num 657 def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test: 658 """ 658 """ 659 Finds next test to parse in LineStream 659 Finds next test to parse in LineStream, creates new Test object, 660 parses any subtests of the test, popul 660 parses any subtests of the test, populates Test object with all 661 information (status, name) about the t 661 information (status, name) about the test and the Test objects for 662 any subtests, and then returns the Tes 662 any subtests, and then returns the Test object. The method accepts 663 three formats of tests: 663 three formats of tests: 664 664 665 Accepted test formats: 665 Accepted test formats: 666 666 667 - Main KTAP/TAP header 667 - Main KTAP/TAP header 668 668 669 Example: 669 Example: 670 670 671 KTAP version 1 671 KTAP version 1 672 1..4 672 1..4 673 [subtests] 673 [subtests] 674 674 675 - Subtest header (must include either 675 - Subtest header (must include either the KTAP version line or 676 "# Subtest" header line) 676 "# Subtest" header line) 677 677 678 Example (preferred format with both KT 678 Example (preferred format with both KTAP version line and 679 "# Subtest" line): 679 "# Subtest" line): 680 680 681 KTAP version 1 681 KTAP version 1 682 # Subtest: name 682 # Subtest: name 683 1..3 683 1..3 684 [subtests] 684 [subtests] 685 ok 1 name 685 ok 1 name 686 686 687 Example (only "# Subtest" line): 687 Example (only "# Subtest" line): 688 688 689 # Subtest: name 689 # Subtest: name 690 1..3 690 1..3 691 [subtests] 691 [subtests] 692 ok 1 name 692 ok 1 name 693 693 694 Example (only KTAP version line, compl 694 Example (only KTAP version line, compliant with KTAP v1 spec): 695 695 696 KTAP version 1 696 KTAP version 1 697 1..3 697 1..3 698 [subtests] 698 [subtests] 699 ok 1 name 699 ok 1 name 700 700 701 - Test result line 701 - Test result line 702 702 703 Example: 703 Example: 704 704 705 ok 1 - test 705 ok 1 - test 706 706 707 Parameters: 707 Parameters: 708 lines - LineStream of KTAP output to p 708 lines - LineStream of KTAP output to parse 709 expected_num - expected test number fo 709 expected_num - expected test number for test to be parsed 710 log - list of strings containing any p 710 log - list of strings containing any preceding diagnostic lines 711 corresponding to the current t 711 corresponding to the current test 712 is_subtest - boolean indicating whethe 712 is_subtest - boolean indicating whether test is a subtest 713 713 714 Return: 714 Return: 715 Test object populated with characteris 715 Test object populated with characteristics and any subtests 716 """ 716 """ 717 test = Test() 717 test = Test() 718 test.log.extend(log) 718 test.log.extend(log) 719 719 720 # Parse any errors prior to parsing te 720 # Parse any errors prior to parsing tests 721 err_log = parse_diagnostic(lines) 721 err_log = parse_diagnostic(lines) 722 test.log.extend(err_log) 722 test.log.extend(err_log) 723 723 724 if not is_subtest: 724 if not is_subtest: 725 # If parsing the main/top-leve 725 # If parsing the main/top-level test, parse KTAP version line and 726 # test plan 726 # test plan 727 test.name = "main" 727 test.name = "main" 728 ktap_line = parse_ktap_header( 728 ktap_line = parse_ktap_header(lines, test) 729 test.log.extend(parse_diagnost 729 test.log.extend(parse_diagnostic(lines)) 730 parse_test_plan(lines, test) 730 parse_test_plan(lines, test) 731 parent_test = True 731 parent_test = True 732 else: 732 else: 733 # If not the main test, attemp 733 # If not the main test, attempt to parse a test header containing 734 # the KTAP version line and/or 734 # the KTAP version line and/or subtest header line 735 ktap_line = parse_ktap_header( 735 ktap_line = parse_ktap_header(lines, test) 736 subtest_line = parse_test_head 736 subtest_line = parse_test_header(lines, test) 737 parent_test = (ktap_line or su 737 parent_test = (ktap_line or subtest_line) 738 if parent_test: 738 if parent_test: 739 # If KTAP version line 739 # If KTAP version line and/or subtest header is found, attempt 740 # to parse test plan a 740 # to parse test plan and print test header 741 test.log.extend(parse_ 741 test.log.extend(parse_diagnostic(lines)) 742 parse_test_plan(lines, 742 parse_test_plan(lines, test) 743 print_test_header(test 743 print_test_header(test) 744 expected_count = test.expected_count 744 expected_count = test.expected_count 745 subtests = [] 745 subtests = [] 746 test_num = 1 746 test_num = 1 747 while parent_test and (expected_count 747 while parent_test and (expected_count is None or test_num <= expected_count): 748 # Loop to parse any subtests. 748 # Loop to parse any subtests. 749 # Break after parsing expected 749 # Break after parsing expected number of tests or 750 # if expected number of tests 750 # if expected number of tests is unknown break when test 751 # result line with matching na 751 # result line with matching name to subtest header is found 752 # or no more lines in stream. 752 # or no more lines in stream. 753 sub_log = parse_diagnostic(lin 753 sub_log = parse_diagnostic(lines) 754 sub_test = Test() 754 sub_test = Test() 755 if not lines or (peek_test_nam 755 if not lines or (peek_test_name_match(lines, test) and 756 is_subtest): 756 is_subtest): 757 if expected_count and 757 if expected_count and test_num <= expected_count: 758 # If parser re 758 # If parser reaches end of test before 759 # parsing expe 759 # parsing expected number of subtests, print 760 # crashed subt 760 # crashed subtest and record error 761 test.add_error 761 test.add_error('missing expected subtest!') 762 sub_test.log.e 762 sub_test.log.extend(sub_log) 763 test.counts.ad 763 test.counts.add_status( 764 TestSt 764 TestStatus.TEST_CRASHED) 765 print_test_res 765 print_test_result(sub_test) 766 else: 766 else: 767 test.log.exten 767 test.log.extend(sub_log) 768 break 768 break 769 else: 769 else: 770 sub_test = parse_test( 770 sub_test = parse_test(lines, test_num, sub_log, True) 771 subtests.append(sub_test) 771 subtests.append(sub_test) 772 test_num += 1 772 test_num += 1 773 test.subtests = subtests 773 test.subtests = subtests 774 if is_subtest: 774 if is_subtest: 775 # If not main test, look for t 775 # If not main test, look for test result line 776 test.log.extend(parse_diagnost 776 test.log.extend(parse_diagnostic(lines)) 777 if test.name != "" and not pee 777 if test.name != "" and not peek_test_name_match(lines, test): 778 test.add_error('missin 778 test.add_error('missing subtest result line!') 779 else: 779 else: 780 parse_test_result(line 780 parse_test_result(lines, test, expected_num) 781 781 782 # Check for there being no subtests wi 782 # Check for there being no subtests within parent test 783 if parent_test and len(subtests) == 0: 783 if parent_test and len(subtests) == 0: 784 # Don't override a bad status 784 # Don't override a bad status if this test had one reported. 785 # Assumption: no subtests mean 785 # Assumption: no subtests means CRASHED is from Test.__init__() 786 if test.status in (TestStatus. 786 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 787 print_log(test.log) 787 print_log(test.log) 788 test.status = TestStat 788 test.status = TestStatus.NO_TESTS 789 test.add_error('0 test 789 test.add_error('0 tests run!') 790 790 791 # Add statuses to TestCounts attribute 791 # Add statuses to TestCounts attribute in Test object 792 bubble_up_test_results(test) 792 bubble_up_test_results(test) 793 if parent_test and is_subtest: 793 if parent_test and is_subtest: 794 # If test has subtests and is 794 # If test has subtests and is not the main test object, print 795 # footer. 795 # footer. 796 print_test_footer(test) 796 print_test_footer(test) 797 elif is_subtest: 797 elif is_subtest: 798 print_test_result(test) 798 print_test_result(test) 799 return test 799 return test 800 800 801 def parse_run_tests(kernel_output: Iterable[st 801 def parse_run_tests(kernel_output: Iterable[str]) -> Test: 802 """ 802 """ 803 Using kernel output, extract KTAP line 803 Using kernel output, extract KTAP lines, parse the lines for test 804 results and print condensed test resul 804 results and print condensed test results and summary line. 805 805 806 Parameters: 806 Parameters: 807 kernel_output - Iterable object contai 807 kernel_output - Iterable object contains lines of kernel output 808 808 809 Return: 809 Return: 810 Test - the main test object with all s 810 Test - the main test object with all subtests. 811 """ 811 """ 812 stdout.print_with_timestamp(DIVIDER) 812 stdout.print_with_timestamp(DIVIDER) 813 lines = extract_tap_lines(kernel_outpu 813 lines = extract_tap_lines(kernel_output) 814 test = Test() 814 test = Test() 815 if not lines: 815 if not lines: 816 test.name = '<missing>' 816 test.name = '<missing>' 817 test.add_error('Could not find 817 test.add_error('Could not find any KTAP output. Did any KUnit tests run?') 818 test.status = TestStatus.FAILU 818 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 819 else: 819 else: 820 test = parse_test(lines, 0, [] 820 test = parse_test(lines, 0, [], False) 821 if test.status != TestStatus.N 821 if test.status != TestStatus.NO_TESTS: 822 test.status = test.cou 822 test.status = test.counts.get_status() 823 stdout.print_with_timestamp(DIVIDER) 823 stdout.print_with_timestamp(DIVIDER) 824 print_summary_line(test) 824 print_summary_line(test) 825 return test 825 return test
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.