1 #!/usr/bin/env python3 !! 1 #!/usr/bin/python3 2 # SPDX-License-Identifier: GPL-2.0 2 # SPDX-License-Identifier: GPL-2.0 3 # 3 # 4 # A thin wrapper on top of the KUnit Kernel 4 # A thin wrapper on top of the KUnit Kernel 5 # 5 # 6 # Copyright (C) 2019, Google LLC. 6 # Copyright (C) 2019, Google LLC. 7 # Author: Felix Guo <felixguoxiuping@gmail.com> 7 # Author: Felix Guo <felixguoxiuping@gmail.com> 8 # Author: Brendan Higgins <brendanhiggins@googl 8 # Author: Brendan Higgins <brendanhiggins@google.com> 9 9 10 import argparse 10 import argparse 11 import os << 12 import re << 13 import shlex << 14 import sys 11 import sys >> 12 import os 15 import time 13 import time >> 14 import shutil 16 15 17 assert sys.version_info >= (3, 7), "Python ver !! 16 from collections import namedtuple 18 << 19 from dataclasses import dataclass << 20 from enum import Enum, auto 17 from enum import Enum, auto 21 from typing import Iterable, List, Optional, S << 22 18 23 import kunit_json !! 19 import kunit_config 24 import kunit_kernel 20 import kunit_kernel 25 import kunit_parser 21 import kunit_parser 26 from kunit_printer import stdout !! 22 >> 23 KunitResult = namedtuple('KunitResult', ['status','result']) >> 24 >> 25 KunitRequest = namedtuple('KunitRequest', ['raw_output','timeout', 'jobs', 'build_dir', 'defconfig']) >> 26 >> 27 KernelDirectoryPath = sys.argv[0].split('tools/testing/kunit/')[0] 27 28 28 class KunitStatus(Enum): 29 class KunitStatus(Enum): 29 SUCCESS = auto() 30 SUCCESS = auto() 30 CONFIG_FAILURE = auto() 31 CONFIG_FAILURE = auto() 31 BUILD_FAILURE = auto() 32 BUILD_FAILURE = auto() 32 TEST_FAILURE = auto() 33 TEST_FAILURE = auto() 33 34 34 @dataclass !! 35 def create_default_kunitconfig(): 35 class KunitResult: !! 36 if not os.path.exists(kunit_kernel.kunitconfig_path): 36 status: KunitStatus !! 37 shutil.copyfile('arch/um/configs/kunit_defconfig', 37 elapsed_time: float !! 38 kunit_kernel.kunitconfig_path) 38 !! 39 39 @dataclass !! 40 def get_kernel_root_path(): 40 class KunitConfigRequest: !! 41 parts = sys.argv[0] if not __file__ else __file__ 41 build_dir: str !! 42 parts = os.path.realpath(parts).split('tools/testing/kunit') 42 make_options: Optional[List[str]] << 43 << 44 @dataclass << 45 class KunitBuildRequest(KunitConfigRequest): << 46 jobs: int << 47 << 48 @dataclass << 49 class KunitParseRequest: << 50 raw_output: Optional[str] << 51 json: Optional[str] << 52 << 53 @dataclass << 54 class KunitExecRequest(KunitParseRequest): << 55 build_dir: str << 56 timeout: int << 57 filter_glob: str << 58 filter: str << 59 filter_action: Optional[str] << 60 kernel_args: Optional[List[str]] << 61 run_isolated: Optional[str] << 62 list_tests: bool << 63 list_tests_attr: bool << 64 << 65 @dataclass << 66 class KunitRequest(KunitExecRequest, KunitBuil << 67 pass << 68 << 69 << 70 def get_kernel_root_path() -> str: << 71 path = sys.argv[0] if not __file__ els << 72 parts = os.path.realpath(path).split(' << 73 if len(parts) != 2: 43 if len(parts) != 2: 74 sys.exit(1) 44 sys.exit(1) 75 return parts[0] 45 return parts[0] 76 46 77 def config_tests(linux: kunit_kernel.LinuxSour !! 47 def run_tests(linux: kunit_kernel.LinuxSourceTree, 78 request: KunitConfigRequest) !! 48 request: KunitRequest) -> KunitResult: 79 stdout.print_with_timestamp('Configuri << 80 << 81 config_start = time.time() 49 config_start = time.time() 82 success = linux.build_reconfig(request !! 50 success = linux.build_reconfig(request.build_dir) 83 config_end = time.time() 51 config_end = time.time() 84 status = KunitStatus.SUCCESS if succes !! 52 if not success: 85 return KunitResult(status, config_end !! 53 return KunitResult(KunitStatus.CONFIG_FAILURE, 'could not configure kernel') 86 54 87 def build_tests(linux: kunit_kernel.LinuxSourc !! 55 kunit_parser.print_with_timestamp('Building KUnit Kernel ...') 88 request: KunitBuildRequest) -> << 89 stdout.print_with_timestamp('Building << 90 56 91 build_start = time.time() 57 build_start = time.time() 92 success = linux.build_kernel(request.j !! 58 success = linux.build_um_kernel(request.jobs, request.build_dir) 93 request.b << 94 request.m << 95 build_end = time.time() 59 build_end = time.time() 96 status = KunitStatus.SUCCESS if succes !! 60 if not success: 97 return KunitResult(status, build_end - !! 61 return KunitResult(KunitStatus.BUILD_FAILURE, 'could not build kernel') 98 62 99 def config_and_build_tests(linux: kunit_kernel !! 63 kunit_parser.print_with_timestamp('Starting KUnit Kernel ...') 100 request: KunitBuild !! 64 test_start = time.time() 101 config_result = config_tests(linux, re << 102 if config_result.status != KunitStatus << 103 return config_result << 104 << 105 return build_tests(linux, request) << 106 << 107 def _list_tests(linux: kunit_kernel.LinuxSourc << 108 args = ['kunit.action=list'] << 109 << 110 if request.kernel_args: << 111 args.extend(request.kernel_arg << 112 << 113 output = linux.run_kernel(args=args, << 114 timeout=request.tim << 115 filter_glob=request << 116 filter=request.filt << 117 filter_action=reque << 118 build_dir=request.b << 119 lines = kunit_parser.extract_tap_lines << 120 # Hack! Drop the dummy TAP version hea << 121 lines.pop() << 122 << 123 # Filter out any extraneous non-test o << 124 return [l for l in output if re.match( << 125 << 126 def _list_tests_attr(linux: kunit_kernel.Linux << 127 args = ['kunit.action=list_attr'] << 128 << 129 if request.kernel_args: << 130 args.extend(request.kernel_arg << 131 << 132 output = linux.run_kernel(args=args, << 133 timeout=request.tim << 134 filter_glob=request << 135 filter=request.filt << 136 filter_action=reque << 137 build_dir=request.b << 138 lines = kunit_parser.extract_tap_lines << 139 # Hack! Drop the dummy TAP version hea << 140 lines.pop() << 141 << 142 # Filter out any extraneous non-test o << 143 return lines << 144 << 145 def _suites_from_test_list(tests: List[str]) - << 146 """Extracts all the suites from an ord << 147 suites = [] # type: List[str] << 148 for t in tests: << 149 parts = t.split('.', maxsplit= << 150 if len(parts) != 2: << 151 raise ValueError(f'int << 152 suite, _ = parts << 153 if not suites or suites[-1] != << 154 suites.append(suite) << 155 return suites << 156 << 157 def exec_tests(linux: kunit_kernel.LinuxSource << 158 filter_globs = [request.filter_glob] << 159 if request.list_tests: << 160 output = _list_tests(linux, re << 161 for line in output: << 162 print(line.rstrip()) << 163 return KunitResult(status=Kuni << 164 if request.list_tests_attr: << 165 attr_output = _list_tests_attr << 166 for line in attr_output: << 167 print(line.rstrip()) << 168 return KunitResult(status=Kuni << 169 if request.run_isolated: << 170 tests = _list_tests(linux, req << 171 if request.run_isolated == 'te << 172 filter_globs = tests << 173 elif request.run_isolated == ' << 174 filter_globs = _suites << 175 # Apply the test-part << 176 if '.' in request.filt << 177 test_glob = re << 178 filter_globs = << 179 << 180 metadata = kunit_json.Metadata(arch=li << 181 << 182 test_counts = kunit_parser.TestCounts( << 183 exec_time = 0.0 << 184 for i, filter_glob in enumerate(filter << 185 stdout.print_with_timestamp('S << 186 << 187 test_start = time.time() << 188 run_result = linux.run_kernel( << 189 args=request.kernel_ar << 190 timeout=request.timeou << 191 filter_glob=filter_glo << 192 filter=request.filter, << 193 filter_action=request. << 194 build_dir=request.buil << 195 << 196 _, test_result = parse_tests(r << 197 # run_kernel() doesn't block o << 198 # That only happens after we g << 199 # So exec_time here actually c << 200 test_end = time.time() << 201 exec_time += test_end - test_s << 202 << 203 test_counts.add_subtest_counts << 204 << 205 if len(filter_globs) == 1 and test_cou << 206 bd = request.build_dir << 207 print('The kernel seems to hav << 208 print('$ scripts/decode_stackt << 209 bd, bd, kunit_ << 210 << 211 kunit_status = _map_to_overall_status( << 212 return KunitResult(status=kunit_status << 213 << 214 def _map_to_overall_status(test_status: kunit_ << 215 if test_status in (kunit_parser.TestSt << 216 return KunitStatus.SUCCESS << 217 return KunitStatus.TEST_FAILURE << 218 << 219 def parse_tests(request: KunitParseRequest, me << 220 parse_start = time.time() << 221 65 >> 66 test_result = kunit_parser.TestResult(kunit_parser.TestStatus.SUCCESS, >> 67 [], >> 68 'Tests not Parsed.') 222 if request.raw_output: 69 if request.raw_output: 223 # Treat unparsed results as on !! 70 kunit_parser.raw_output( 224 fake_test = kunit_parser.Test( !! 71 linux.run_kernel(timeout=request.timeout, 225 fake_test.status = kunit_parse !! 72 build_dir=request.build_dir)) 226 fake_test.counts.passed = 1 !! 73 else: 227 !! 74 kunit_output = linux.run_kernel(timeout=request.timeout, 228 output: Iterable[str] = input_ !! 75 build_dir=request.build_dir) 229 if request.raw_output == 'all' !! 76 test_result = kunit_parser.parse_run_tests(kunit_output) 230 pass !! 77 test_end = time.time() 231 elif request.raw_output == 'ku << 232 output = kunit_parser. << 233 for line in output: << 234 print(line.rstrip()) << 235 parse_time = time.time() - par << 236 return KunitResult(KunitStatus << 237 << 238 << 239 # Actually parse the test results. << 240 test = kunit_parser.parse_run_tests(in << 241 parse_time = time.time() - parse_start << 242 << 243 if request.json: << 244 json_str = kunit_json.get_json << 245 test=t << 246 metada << 247 if request.json == 'stdout': << 248 print(json_str) << 249 else: << 250 with open(request.json << 251 f.write(json_s << 252 stdout.print_with_time << 253 os.path.abspat << 254 << 255 if test.status != kunit_parser.TestSta << 256 return KunitResult(KunitStatus << 257 << 258 return KunitResult(KunitStatus.SUCCESS << 259 << 260 def run_tests(linux: kunit_kernel.LinuxSourceT << 261 request: KunitRequest) -> KunitR << 262 run_start = time.time() << 263 << 264 config_result = config_tests(linux, re << 265 if config_result.status != KunitStatus << 266 return config_result << 267 << 268 build_result = build_tests(linux, requ << 269 if build_result.status != KunitStatus. << 270 return build_result << 271 << 272 exec_result = exec_tests(linux, reques << 273 << 274 run_end = time.time() << 275 78 276 stdout.print_with_timestamp(( !! 79 kunit_parser.print_with_timestamp(( 277 'Elapsed time: %.3fs total, %. 80 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 278 'building, %.3fs running\n') % 81 'building, %.3fs running\n') % ( 279 run_end - run_ !! 82 test_end - config_start, 280 config_result. !! 83 config_end - config_start, 281 build_result.e !! 84 build_end - build_start, 282 exec_result.el !! 85 test_end - test_start)) 283 return exec_result << 284 << 285 # Problem: << 286 # $ kunit.py run --json << 287 # works as one would expect and prints the par << 288 # $ kunit.py run --json suite_name << 289 # would *not* pass suite_name as the filter_gl << 290 # argparse will consider it to be another way << 291 # $ kunit.py run --json=suite_name << 292 # i.e. it would run all tests, and dump the js << 293 # So we hackily automatically rewrite --json = << 294 pseudo_bool_flag_defaults = { << 295 '--json': 'stdout', << 296 '--raw_output': 'kunit', << 297 } << 298 def massage_argv(argv: Sequence[str]) -> Seque << 299 def massage_arg(arg: str) -> str: << 300 if arg not in pseudo_bool_flag << 301 return arg << 302 return f'{arg}={pseudo_bool_f << 303 return list(map(massage_arg, argv)) << 304 << 305 def get_default_jobs() -> int: << 306 return len(os.sched_getaffinity(0)) << 307 << 308 def add_common_opts(parser: argparse.ArgumentP << 309 parser.add_argument('--build_dir', << 310 help='As in the ma << 311 'directory.', << 312 type=str, default= << 313 parser.add_argument('--make_options', << 314 help='X=Y make opt << 315 action='append', m << 316 parser.add_argument('--alltests', << 317 help='Run all KUni << 318 action='store_true << 319 parser.add_argument('--kunitconfig', << 320 help='Path to Kco << 321 ' If given a dire << 322 'will get automa << 323 'blindly concaten << 324 action='append', << 325 parser.add_argument('--kconfig_add', << 326 help='Additional << 327 '.kunitconfig, e. << 328 action='append', m << 329 << 330 parser.add_argument('--arch', << 331 help=('Specifies t << 332 'The archite << 333 'string pass << 334 'e.g. i386, << 335 'architectur << 336 type=str, default= << 337 << 338 parser.add_argument('--cross_compile', << 339 help=('Sets make\' << 340 'be set to a << 341 'of gcc and << 342 'example `sp << 343 'sparc toolc << 344 '`$HOME/tool << 345 'if you have << 346 'from the 0- << 347 'home direct << 348 metavar='PREFIX') << 349 << 350 parser.add_argument('--qemu_config', << 351 help=('Takes a pat << 352 'a QemuArchP << 353 type=str, metavar= << 354 << 355 parser.add_argument('--qemu_args', << 356 help='Additional Q << 357 action='append', m << 358 << 359 def add_build_opts(parser: argparse.ArgumentPa << 360 parser.add_argument('--jobs', << 361 help='As in the ma << 362 'jobs (commands) t << 363 type=int, default= << 364 << 365 def add_exec_opts(parser: argparse.ArgumentPar << 366 parser.add_argument('--timeout', << 367 help='maximum numb << 368 'to run. This does << 369 'tests.', << 370 type=int, << 371 default=300, << 372 metavar='SECONDS') << 373 parser.add_argument('filter_glob', << 374 help='Filter which << 375 'boot-time, e.g. l << 376 type=str, << 377 nargs='?', << 378 default='', << 379 metavar='filter_gl << 380 parser.add_argument('--filter', << 381 help='Filter KUnit << 382 'e.g. module=examp << 383 type=str, << 384 default='') << 385 parser.add_argument('--filter_action', << 386 help='If set to sk << 387 'e.g. --filter << 388 type=str, << 389 choices=['skip << 390 parser.add_argument('--kernel_args', << 391 help='Kernel comma << 392 action='append', << 393 parser.add_argument('--run_isolated', << 394 'individual suite/ << 395 'a non-hermetic te << 396 'what ran before i << 397 type=str, << 398 choices=['suite', << 399 parser.add_argument('--list_tests', he << 400 'run.', << 401 action='store_true << 402 parser.add_argument('--list_tests_attr << 403 'attributes.', << 404 action='store_true << 405 << 406 def add_parse_opts(parser: argparse.ArgumentPa << 407 parser.add_argument('--raw_output', he << 408 'By default, filte << 409 '--raw_output=all << 410 type=str, nargs=' << 411 parser.add_argument('--json', << 412 nargs='?', << 413 help='Prints parse << 414 'a filename is spe << 415 type=str, const='s << 416 << 417 << 418 def tree_from_args(cli_args: argparse.Namespac << 419 """Returns a LinuxSourceTree based on << 420 # Allow users to specify multiple argu << 421 qemu_args: List[str] = [] << 422 if cli_args.qemu_args: << 423 for arg in cli_args.qemu_args: << 424 qemu_args.extend(shlex << 425 << 426 kunitconfigs = cli_args.kunitconfig if << 427 if cli_args.alltests: << 428 # Prepend so user-specified op << 429 # --kunitconfig options to hav << 430 kunitconfigs = [kunit_kernel.A << 431 << 432 return kunit_kernel.LinuxSourceTree(cl << 433 kunitconfig_paths=kuni << 434 kconfig_add=cli_args.k << 435 arch=cli_args.arch, << 436 cross_compile=cli_args << 437 qemu_config_path=cli_a << 438 extra_qemu_args=qemu_a << 439 << 440 << 441 def run_handler(cli_args: argparse.Namespace) << 442 if not os.path.exists(cli_args.build_d << 443 os.mkdir(cli_args.build_dir) << 444 << 445 linux = tree_from_args(cli_args) << 446 request = KunitRequest(build_dir=cli_a << 447 make_o << 448 jobs=c << 449 raw_ou << 450 json=c << 451 timeou << 452 filter << 453 filter << 454 filter << 455 kernel << 456 run_is << 457 list_t << 458 list_t << 459 result = run_tests(linux, request) << 460 if result.status != KunitStatus.SUCCES << 461 sys.exit(1) << 462 86 463 !! 87 if test_result.status != kunit_parser.TestStatus.SUCCESS: 464 def config_handler(cli_args: argparse.Namespac !! 88 return KunitResult(KunitStatus.TEST_FAILURE, test_result) 465 if cli_args.build_dir and ( << 466 not os.path.exists(cli << 467 os.mkdir(cli_args.build_dir) << 468 << 469 linux = tree_from_args(cli_args) << 470 request = KunitConfigRequest(build_dir << 471 << 472 result = config_tests(linux, request) << 473 stdout.print_with_timestamp(( << 474 'Elapsed time: %.3fs\n') % ( << 475 result.elapsed_time)) << 476 if result.status != KunitStatus.SUCCES << 477 sys.exit(1) << 478 << 479 << 480 def build_handler(cli_args: argparse.Namespace << 481 linux = tree_from_args(cli_args) << 482 request = KunitBuildRequest(build_dir= << 483 make_o << 484 jobs=c << 485 result = config_and_build_tests(linux, << 486 stdout.print_with_timestamp(( << 487 'Elapsed time: %.3fs\n') % ( << 488 result.elapsed_time)) << 489 if result.status != KunitStatus.SUCCES << 490 sys.exit(1) << 491 << 492 << 493 def exec_handler(cli_args: argparse.Namespace) << 494 linux = tree_from_args(cli_args) << 495 exec_request = KunitExecRequest(raw_ou << 496 build_ << 497 json=c << 498 timeou << 499 filter << 500 filter << 501 filter << 502 kernel << 503 run_is << 504 list_t << 505 list_t << 506 result = exec_tests(linux, exec_reques << 507 stdout.print_with_timestamp(( << 508 'Elapsed time: %.3fs\n') % (re << 509 if result.status != KunitStatus.SUCCES << 510 sys.exit(1) << 511 << 512 << 513 def parse_handler(cli_args: argparse.Namespace << 514 if cli_args.file is None: << 515 sys.stdin.reconfigure(errors=' << 516 kunit_output = sys.stdin # ty << 517 else: 89 else: 518 with open(cli_args.file, 'r', !! 90 return KunitResult(KunitStatus.SUCCESS, test_result) 519 kunit_output = f.read( << 520 # We know nothing about how the result << 521 metadata = kunit_json.Metadata() << 522 request = KunitParseRequest(raw_output << 523 json=c << 524 result, _ = parse_tests(request, metad << 525 if result.status != KunitStatus.SUCCES << 526 sys.exit(1) << 527 << 528 << 529 subcommand_handlers_map = { << 530 'run': run_handler, << 531 'config': config_handler, << 532 'build': build_handler, << 533 'exec': exec_handler, << 534 'parse': parse_handler << 535 } << 536 91 537 !! 92 def main(argv, linux=None): 538 def main(argv: Sequence[str]) -> None: << 539 parser = argparse.ArgumentParser( 93 parser = argparse.ArgumentParser( 540 description='Helps wri 94 description='Helps writing and running KUnit tests.') 541 subparser = parser.add_subparsers(dest 95 subparser = parser.add_subparsers(dest='subcommand') 542 96 543 # The 'run' command will config, build << 544 run_parser = subparser.add_parser('run 97 run_parser = subparser.add_parser('run', help='Runs KUnit tests.') 545 add_common_opts(run_parser) !! 98 run_parser.add_argument('--raw_output', help='don\'t format output from kernel', 546 add_build_opts(run_parser) !! 99 action='store_true') 547 add_exec_opts(run_parser) << 548 add_parse_opts(run_parser) << 549 << 550 config_parser = subparser.add_parser(' << 551 << 552 << 553 add_common_opts(config_parser) << 554 << 555 build_parser = subparser.add_parser('b << 556 add_common_opts(build_parser) << 557 add_build_opts(build_parser) << 558 << 559 exec_parser = subparser.add_parser('ex << 560 add_common_opts(exec_parser) << 561 add_exec_opts(exec_parser) << 562 add_parse_opts(exec_parser) << 563 << 564 # The 'parse' option is special, as it << 565 # (therefore there is no need for a bu << 566 # and the '--file' argument is not rel << 567 # add_parse_opts() << 568 parse_parser = subparser.add_parser('p << 569 he << 570 'a << 571 add_parse_opts(parse_parser) << 572 parse_parser.add_argument('file', << 573 help='Specif << 574 type=str, na << 575 << 576 cli_args = parser.parse_args(massage_a << 577 100 578 if get_kernel_root_path(): !! 101 run_parser.add_argument('--timeout', 579 os.chdir(get_kernel_root_path( !! 102 help='maximum number of seconds to allow for all tests ' 580 !! 103 'to run. This does not include time taken to build the ' 581 subcomand_handler = subcommand_handler !! 104 'tests.', 582 !! 105 type=int, 583 if subcomand_handler is None: !! 106 default=300, >> 107 metavar='timeout') >> 108 >> 109 run_parser.add_argument('--jobs', >> 110 help='As in the make command, "Specifies the number of ' >> 111 'jobs (commands) to run simultaneously."', >> 112 type=int, default=8, metavar='jobs') >> 113 >> 114 run_parser.add_argument('--build_dir', >> 115 help='As in the make command, it specifies the build ' >> 116 'directory.', >> 117 type=str, default='', metavar='build_dir') >> 118 >> 119 run_parser.add_argument('--defconfig', >> 120 help='Uses a default .kunitconfig.', >> 121 action='store_true') >> 122 >> 123 cli_args = parser.parse_args(argv) >> 124 >> 125 if cli_args.subcommand == 'run': >> 126 if get_kernel_root_path(): >> 127 os.chdir(get_kernel_root_path()) >> 128 >> 129 if cli_args.build_dir: >> 130 if not os.path.exists(cli_args.build_dir): >> 131 os.mkdir(cli_args.build_dir) >> 132 kunit_kernel.kunitconfig_path = os.path.join( >> 133 cli_args.build_dir, >> 134 kunit_kernel.kunitconfig_path) >> 135 >> 136 if cli_args.defconfig: >> 137 create_default_kunitconfig() >> 138 >> 139 if not linux: >> 140 linux = kunit_kernel.LinuxSourceTree() >> 141 >> 142 request = KunitRequest(cli_args.raw_output, >> 143 cli_args.timeout, >> 144 cli_args.jobs, >> 145 cli_args.build_dir, >> 146 cli_args.defconfig) >> 147 result = run_tests(linux, request) >> 148 if result.status != KunitStatus.SUCCESS: >> 149 sys.exit(1) >> 150 else: 584 parser.print_help() 151 parser.print_help() 585 return << 586 << 587 subcomand_handler(cli_args) << 588 << 589 152 590 if __name__ == '__main__': 153 if __name__ == '__main__': 591 main(sys.argv[1:]) 154 main(sys.argv[1:])
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.