1 # SPDX-License-Identifier: GPL-2.0 1 # SPDX-License-Identifier: GPL-2.0 2 # Copyright (c) 2020 SUSE LLC. 2 # Copyright (c) 2020 SUSE LLC. 3 3 4 import collections 4 import collections 5 import functools 5 import functools 6 import json 6 import json 7 import os 7 import os 8 import socket 8 import socket 9 import subprocess 9 import subprocess 10 import unittest 10 import unittest 11 11 12 12 13 # Add the source tree of bpftool and /usr/loca 13 # Add the source tree of bpftool and /usr/local/sbin to PATH 14 cur_dir = os.path.dirname(os.path.realpath(__f 14 cur_dir = os.path.dirname(os.path.realpath(__file__)) 15 bpftool_dir = os.path.abspath(os.path.join(cur 15 bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..", 16 "to 16 "tools", "bpf", "bpftool")) 17 os.environ["PATH"] = bpftool_dir + ":/usr/loca 17 os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"] 18 18 19 19 20 class IfaceNotFoundError(Exception): 20 class IfaceNotFoundError(Exception): 21 pass 21 pass 22 22 23 23 24 class UnprivilegedUserError(Exception): 24 class UnprivilegedUserError(Exception): 25 pass 25 pass 26 26 27 27 28 def _bpftool(args, json=True): 28 def _bpftool(args, json=True): 29 _args = ["bpftool"] 29 _args = ["bpftool"] 30 if json: 30 if json: 31 _args.append("-j") 31 _args.append("-j") 32 _args.extend(args) 32 _args.extend(args) 33 33 34 return subprocess.check_output(_args) 34 return subprocess.check_output(_args) 35 35 36 36 37 def bpftool(args): 37 def bpftool(args): 38 return _bpftool(args, json=False).decode(" 38 return _bpftool(args, json=False).decode("utf-8") 39 39 40 40 41 def bpftool_json(args): 41 def bpftool_json(args): 42 res = _bpftool(args) 42 res = _bpftool(args) 43 return json.loads(res) 43 return json.loads(res) 44 44 45 45 46 def get_default_iface(): 46 def get_default_iface(): 47 for iface in socket.if_nameindex(): 47 for iface in socket.if_nameindex(): 48 if iface[1] != "lo": 48 if iface[1] != "lo": 49 return iface[1] 49 return iface[1] 50 raise IfaceNotFoundError("Could not find a 50 raise IfaceNotFoundError("Could not find any network interface to probe") 51 51 52 52 53 def default_iface(f): 53 def default_iface(f): 54 @functools.wraps(f) 54 @functools.wraps(f) 55 def wrapper(*args, **kwargs): 55 def wrapper(*args, **kwargs): 56 iface = get_default_iface() 56 iface = get_default_iface() 57 return f(*args, iface, **kwargs) 57 return f(*args, iface, **kwargs) 58 return wrapper 58 return wrapper 59 59 60 DMESG_EMITTING_HELPERS = [ << 61 "bpf_probe_write_user", << 62 "bpf_trace_printk", << 63 "bpf_trace_vprintk", << 64 ] << 65 60 66 class TestBpftool(unittest.TestCase): 61 class TestBpftool(unittest.TestCase): 67 @classmethod 62 @classmethod 68 def setUpClass(cls): 63 def setUpClass(cls): 69 if os.getuid() != 0: 64 if os.getuid() != 0: 70 raise UnprivilegedUserError( 65 raise UnprivilegedUserError( 71 "This test suite needs root pr 66 "This test suite needs root privileges") 72 67 73 @default_iface 68 @default_iface 74 def test_feature_dev_json(self, iface): 69 def test_feature_dev_json(self, iface): 75 unexpected_helpers = DMESG_EMITTING_HE !! 70 unexpected_helpers = [ >> 71 "bpf_probe_write_user", >> 72 "bpf_trace_printk", >> 73 ] 76 expected_keys = [ 74 expected_keys = [ 77 "syscall_config", 75 "syscall_config", 78 "program_types", 76 "program_types", 79 "map_types", 77 "map_types", 80 "helpers", 78 "helpers", 81 "misc", 79 "misc", 82 ] 80 ] 83 81 84 res = bpftool_json(["feature", "probe" 82 res = bpftool_json(["feature", "probe", "dev", iface]) 85 # Check if the result has all expected 83 # Check if the result has all expected keys. 86 self.assertCountEqual(res.keys(), expe 84 self.assertCountEqual(res.keys(), expected_keys) 87 # Check if unexpected helpers are not 85 # Check if unexpected helpers are not included in helpers probes 88 # result. 86 # result. 89 for helpers in res["helpers"].values() 87 for helpers in res["helpers"].values(): 90 for unexpected_helper in unexpecte 88 for unexpected_helper in unexpected_helpers: 91 self.assertNotIn(unexpected_he 89 self.assertNotIn(unexpected_helper, helpers) 92 90 93 def test_feature_kernel(self): 91 def test_feature_kernel(self): 94 test_cases = [ 92 test_cases = [ 95 bpftool_json(["feature", "probe", 93 bpftool_json(["feature", "probe", "kernel"]), 96 bpftool_json(["feature", "probe"]) 94 bpftool_json(["feature", "probe"]), 97 bpftool_json(["feature"]), 95 bpftool_json(["feature"]), 98 ] 96 ] 99 unexpected_helpers = DMESG_EMITTING_HE !! 97 unexpected_helpers = [ >> 98 "bpf_probe_write_user", >> 99 "bpf_trace_printk", >> 100 ] 100 expected_keys = [ 101 expected_keys = [ 101 "syscall_config", 102 "syscall_config", 102 "system_config", 103 "system_config", 103 "program_types", 104 "program_types", 104 "map_types", 105 "map_types", 105 "helpers", 106 "helpers", 106 "misc", 107 "misc", 107 ] 108 ] 108 109 109 for tc in test_cases: 110 for tc in test_cases: 110 # Check if the result has all expe 111 # Check if the result has all expected keys. 111 self.assertCountEqual(tc.keys(), e 112 self.assertCountEqual(tc.keys(), expected_keys) 112 # Check if unexpected helpers are 113 # Check if unexpected helpers are not included in helpers probes 113 # result. 114 # result. 114 for helpers in tc["helpers"].value 115 for helpers in tc["helpers"].values(): 115 for unexpected_helper in unexp 116 for unexpected_helper in unexpected_helpers: 116 self.assertNotIn(unexpecte 117 self.assertNotIn(unexpected_helper, helpers) 117 118 118 def test_feature_kernel_full(self): 119 def test_feature_kernel_full(self): 119 test_cases = [ 120 test_cases = [ 120 bpftool_json(["feature", "probe", 121 bpftool_json(["feature", "probe", "kernel", "full"]), 121 bpftool_json(["feature", "probe", 122 bpftool_json(["feature", "probe", "full"]), 122 ] 123 ] 123 expected_helpers = DMESG_EMITTING_HELP !! 124 expected_helpers = [ >> 125 "bpf_probe_write_user", >> 126 "bpf_trace_printk", >> 127 ] 124 128 125 for tc in test_cases: 129 for tc in test_cases: 126 # Check if expected helpers are in 130 # Check if expected helpers are included at least once in any 127 # helpers list for any program typ 131 # helpers list for any program type. Unfortunately we cannot assume 128 # that they will be included in al 132 # that they will be included in all program types or a specific 129 # subset of programs. It depends o 133 # subset of programs. It depends on the kernel version and 130 # configuration. 134 # configuration. 131 found_helpers = False 135 found_helpers = False 132 136 133 for helpers in tc["helpers"].value 137 for helpers in tc["helpers"].values(): 134 if all(expected_helper in help 138 if all(expected_helper in helpers 135 for expected_helper in 139 for expected_helper in expected_helpers): 136 found_helpers = True 140 found_helpers = True 137 break 141 break 138 142 139 self.assertTrue(found_helpers) 143 self.assertTrue(found_helpers) 140 144 141 def test_feature_kernel_full_vs_not_full(s 145 def test_feature_kernel_full_vs_not_full(self): 142 full_res = bpftool_json(["feature", "p 146 full_res = bpftool_json(["feature", "probe", "full"]) 143 not_full_res = bpftool_json(["feature" 147 not_full_res = bpftool_json(["feature", "probe"]) 144 not_full_set = set() 148 not_full_set = set() 145 full_set = set() 149 full_set = set() 146 150 147 for helpers in full_res["helpers"].val 151 for helpers in full_res["helpers"].values(): 148 for helper in helpers: 152 for helper in helpers: 149 full_set.add(helper) 153 full_set.add(helper) 150 154 151 for helpers in not_full_res["helpers"] 155 for helpers in not_full_res["helpers"].values(): 152 for helper in helpers: 156 for helper in helpers: 153 not_full_set.add(helper) 157 not_full_set.add(helper) 154 158 155 self.assertCountEqual(full_set - not_f 159 self.assertCountEqual(full_set - not_full_set, 156 set(DMESG_EMITTI !! 160 {"bpf_probe_write_user", "bpf_trace_printk"}) 157 self.assertCountEqual(not_full_set - f 161 self.assertCountEqual(not_full_set - full_set, set()) 158 162 159 def test_feature_macros(self): 163 def test_feature_macros(self): 160 expected_patterns = [ 164 expected_patterns = [ 161 r"/\*\*\* System call availability 165 r"/\*\*\* System call availability \*\*\*/", 162 r"#define HAVE_BPF_SYSCALL", 166 r"#define HAVE_BPF_SYSCALL", 163 r"/\*\*\* eBPF program types \*\*\ 167 r"/\*\*\* eBPF program types \*\*\*/", 164 r"#define HAVE.*PROG_TYPE", 168 r"#define HAVE.*PROG_TYPE", 165 r"/\*\*\* eBPF map types \*\*\*/", 169 r"/\*\*\* eBPF map types \*\*\*/", 166 r"#define HAVE.*MAP_TYPE", 170 r"#define HAVE.*MAP_TYPE", 167 r"/\*\*\* eBPF helper functions \* 171 r"/\*\*\* eBPF helper functions \*\*\*/", 168 r"#define HAVE.*HELPER", 172 r"#define HAVE.*HELPER", 169 r"/\*\*\* eBPF misc features \*\*\ 173 r"/\*\*\* eBPF misc features \*\*\*/", 170 ] 174 ] 171 175 172 res = bpftool(["feature", "probe", "ma 176 res = bpftool(["feature", "probe", "macros"]) 173 for pattern in expected_patterns: 177 for pattern in expected_patterns: 174 self.assertRegex(res, pattern) 178 self.assertRegex(res, pattern)
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.