1 #!/usr/bin/env python3 2 # SPDX-License-Identifier: GPL-2.0 3 # 4 # Program to allow users to fuzz test Hyper-V 5 # by interfacing with Hyper-V debugfs attribut 6 # Current test methods available: 7 # 1. delay testing 8 # 9 # Current file/directory structure of hyper-V 10 # /sys/kernel/debug/hyperv/UUID 11 # /sys/kernel/debug/hyperv/UUID/<test-st 12 # /sys/kernel/debug/hyperv/UUID/<test-me 13 # 14 # author: Branden Bonaby <brandonbonaby94@gmail 15 16 import os 17 import cmd 18 import argparse 19 import glob 20 from argparse import RawDescriptionHelpFormatt 21 from argparse import RawTextHelpFormatter 22 from enum import Enum 23 24 # Do not change unless, you change the debugfs 25 # in /drivers/hv/debugfs.c. All fuzz testing 26 # attributes will start with "fuzz_test". 27 28 # debugfs path for hyperv must exist before pr 29 debugfs_hyperv_path = "/sys/kernel/debug/hyper 30 if not os.path.isdir(debugfs_hyperv_path): 31 print("{} doesn't exist/check permissi 32 exit(-1) 33 34 class dev_state(Enum): 35 off = 0 36 on = 1 37 38 # File names, that correspond to the files cre 39 # /drivers/hv/debugfs.c 40 class f_names(Enum): 41 state_f = "fuzz_test_state" 42 buff_f = "fuzz_test_buffer_interrupt_ 43 mess_f = "fuzz_test_message_delay" 44 45 # Both single_actions and all_actions are used 46 # for error checking and to allow for some sub 47 # names to be abbreviated. Do not abbreviate t 48 # test method names, as it will become less in 49 # as to what the user can do. If you do decide 50 # abbreviate the test method name, make sure t 51 # function reflects this change. 52 53 all_actions = [ 54 "disable_all", 55 "D", 56 "enable_all", 57 "view_all", 58 "V" 59 ] 60 61 single_actions = [ 62 "disable_single", 63 "d", 64 "enable_single", 65 "view_single", 66 "v" 67 ] 68 69 def main(): 70 71 file_map = recursive_file_lookup(debug 72 args = parse_args() 73 if (not args.action): 74 print ("Error, no options sele 75 exit(-1) 76 arg_set = { k for (k,v) in vars(args). 77 arg_set.add(args.action) 78 path = args.path if "path" in arg_set 79 if (path and path[-1] == "/"): 80 path = path[:-1] 81 validate_args_path(path, arg_set, file 82 if (path and "enable_single" in arg_se 83 state_path = locate_state(path, fi 84 set_test_state(state_path, dev_sta 85 86 # Use subparsers as the key for differ 87 if ("delay" in arg_set): 88 validate_delay_values(args.del 89 if (args.enable_all): 90 set_delay_all_devices( 91 92 else: 93 set_delay_values(path, 94 args. 95 elif ("disable_all" in arg_set or "D" 96 disable_all_testing(file_map) 97 elif ("disable_single" in arg_set or " 98 disable_testing_single_device( 99 elif ("view_all" in arg_set or "V" in 100 get_all_devices_test_status(fi 101 elif ("view_single" in arg_set or "v" 102 get_device_test_values(path, f 103 104 # Get the state location 105 def locate_state(device, file_map): 106 return file_map[device][f_names.state_ 107 108 # Validate delay values to make sure they are 109 # enable delays on a device 110 def validate_delay_values(delay): 111 112 if (delay[0] == -1 and delay[1] == -1 113 print("\nError, At least 1 val 114 exit(-1) 115 for i in delay: 116 if (i < -1 or i == 0 or i > 10 117 print("\nError, Values 118 "or be > 0 and < 119 exit(-1) 120 121 # Validate argument path 122 def validate_args_path(path, arg_set, file_map 123 124 if (not path and any(element in arg_se 125 print("Error, path (-p) REQUIR 126 "Use (-h) to check usage 127 exit(-1) 128 elif (path and any(item in arg_set for 129 print("Error, path (-p) NOT RE 130 "Use (-h) to check usage 131 exit(-1) 132 elif (path not in file_map and any(ite 133 for 134 print("Error, path '{}' not a 135 exit(-1) 136 137 # display Testing status of single device 138 def get_device_test_values(path, file_map): 139 140 for name in file_map[path]: 141 file_location = file_map[path] 142 print( name + " = " + str(read 143 144 # Create a map of the vmbus devices and their 145 # [key=device, value = [key = filename, value 146 def recursive_file_lookup(path, file_map): 147 148 for f_path in glob.iglob(path + '**/*' 149 if (os.path.isfile(f_path)): 150 if (f_path.rsplit("/", 151 directory = f_ 152 else: 153 directory = f_ 154 f_name = f_path.split( 155 if (file_map.get(direc 156 file_map[direc 157 else: 158 file_map[direc 159 elif (os.path.isdir(f_path)): 160 recursive_file_lookup( 161 return file_map 162 163 # display Testing state of devices 164 def get_all_devices_test_status(file_map): 165 166 for device in file_map: 167 if (get_test_state(locate_stat 168 print("Testing = ON fo 169 .format(device.s 170 else: 171 print("Testing = OFF f 172 .format(device.s 173 174 # read the vmbus device files, path must be ab 175 def read_test_files(path): 176 try: 177 with open(path,"r") as f: 178 file_value = f.readlin 179 return int(file_value) 180 181 except IOError as e: 182 errno, strerror = e.args 183 print("I/O error({0}): {1} on 184 .format(errno, strerror, 185 exit(-1) 186 except ValueError: 187 print ("Element to int convers 188 exit(-1) 189 190 # writing to vmbus device files, path must be 191 def write_test_files(path, value): 192 193 try: 194 with open(path,"w") as f: 195 f.write("{}".format(va 196 except IOError as e: 197 errno, strerror = e.args 198 print("I/O error({0}): {1} on 199 .format(errno, strerror, 200 exit(-1) 201 202 # set testing state of device 203 def set_test_state(state_path, state_value, qu 204 205 write_test_files(state_path, state_val 206 if (get_test_state(state_path) == 1): 207 if (not quiet): 208 print("Testing = ON fo 209 .format(state_pa 210 else: 211 if (not quiet): 212 print("Testing = OFF f 213 .format(state_pa 214 215 # get testing state of device 216 def get_test_state(state_path): 217 #state == 1 - test = ON 218 #state == 0 - test = OFF 219 return read_test_files(state_path) 220 221 # write 1 - 1000 microseconds, into a single d 222 # fuzz_test_buffer_interrupt_delay and fuzz_te 223 # debugfs attributes 224 def set_delay_values(device, file_map, delay_l 225 226 try: 227 interrupt = file_map[device][f 228 message = file_map[device][f_n 229 230 # delay[0]- buffer interrupt d 231 if (delay_length[0] >= 0 and d 232 write_test_files(inter 233 if (delay_length[1] >= 0 and d 234 write_test_files(messa 235 if (not quiet): 236 print("Buffer delay te 237 .format(read_tes 238 interrup 239 print("Message delay t 240 .format(read_tes 241 message. 242 except IOError as e: 243 errno, strerror = e.args 244 print("I/O error({0}): {1} on 245 .format(errno, strerror, 246 exit(-1) 247 248 # enabling delay testing on all devices 249 def set_delay_all_devices(file_map, delay, qui 250 251 for device in (file_map): 252 set_test_state(locate_state(de 253 dev_state.on.va 254 quiet) 255 set_delay_values(device, file_ 256 257 # disable all testing on a SINGLE device. 258 def disable_testing_single_device(device, file 259 260 for name in file_map[device]: 261 file_location = file_map[devic 262 write_test_files(file_location 263 print("ALL testing now OFF for {}".for 264 265 # disable all testing on ALL devices 266 def disable_all_testing(file_map): 267 268 for device in file_map: 269 disable_testing_single_device( 270 271 def parse_args(): 272 parser = argparse.ArgumentParser(prog 273 "%(prog)s [delay] [-h] [-e|- 274 "%(prog)s [view_all | V] 275 "%(prog)s [disable_all | D] 276 "%(prog)s [disable_single | d] 277 "%(prog)s [view_single | v] 278 "%(prog)s --version\n", 279 description = "\nUse lsvmbus t 280 "information.\n" "\nThe debugf 281 "/sys/kernel/debug/hyperv", 282 formatter_class = RawDescripti 283 subparsers = parser.add_subparsers(des 284 parser.add_argument("--version", actio 285 version = '%(prog)s 0.1.0') 286 parser.add_argument("-q","--quiet", ac 287 help = "silence none important 288 " This will only work w 289 " on a device.") 290 # Use the path parser to hold the --pa 291 # be shared between subparsers. Also d 292 # parser, as all testing methods will 293 # enable_single. 294 path_parser = argparse.ArgumentParser( 295 path_parser.add_argument("-p","--path" 296 help = "Debugfs path to a vmbu 297 "must be the absolute path to 298 state_parser = argparse.ArgumentParser 299 state_group = state_parser.add_mutuall 300 state_group.add_argument("-E", "--enab 301 const = "enab 302 help = "Enabl 303 "on ALL vmbus 304 state_group.add_argument("-e", "--enab 305 action = "sto 306 const = "enab 307 help = "Enabl 308 "SINGLE vmbus 309 parser_delay = subparsers.add_parser(" 310 parents = [state_parse 311 help = "Delay the ring 312 "ring buffer message r 313 prog = "vmbus_testing" 314 usage = "%(prog)s [-h] 315 "%(prog)s -E -t [value 316 "%(prog)s -e -t [value 317 description = "Delay t 318 "vmbus devices, or del 319 "reads for vmbus devic 320 "is only on the host t 321 parser_delay.add_argument("-t", "--del 322 type = check_range, de 323 help = "Set [buffer] & 324 "Value constraints: -1 325 "or 0 < value <= 1000. 326 "Use -1 to keep the pr 327 "type, or a value > 0 328 "time.") 329 parser_dis_all = subparsers.add_parser 330 aliases = ['D'], prog 331 usage = "%(prog)s [dis 332 "%(prog)s [disable_all 333 help = "Disable ALL te 334 description = "Disable 335 "devices.") 336 parser_dis_single = subparsers.add_par 337 aliases = ['d'], 338 parents = [path_parser 339 usage = "%(prog)s [dis 340 "%(prog)s [disable_sin 341 help = "Disable ALL te 342 description = "Disable 343 "device.") 344 parser_view_all = subparsers.add_parse 345 help = "View the test 346 prog = "vmbus_testing" 347 usage = "%(prog)s [vie 348 "%(prog)s [view_all | 349 description = "This sh 350 "vmbus devices.") 351 parser_view_single = subparsers.add_pa 352 aliases = ['v'],parent 353 help = "View the test 354 "device.", 355 description = "This sh 356 "vmbus device.", prog 357 usage = "%(prog)s [vie 358 "%(prog)s [view_single 359 360 return parser.parse_args() 361 362 # value checking for range checking input in p 363 def check_range(arg1): 364 365 try: 366 val = int(arg1) 367 except ValueError as err: 368 raise argparse.ArgumentTypeErr 369 if val < -1 or val > 1000: 370 message = ("\n\nvalue must be 371 "Value program rece 372 raise argparse.ArgumentTypeErr 373 return val 374 375 if __name__ == "__main__": 376 main()
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.