1 # SPDX-License-Identifier: GPL-2.0 1 # SPDX-License-Identifier: GPL-2.0 2 # Copyright (c) 2020 SUSE LLC. 2 # Copyright (c) 2020 SUSE LLC. 3 3 4 import collections 4 import collections 5 import functools 5 import functools 6 import json 6 import json 7 import os 7 import os 8 import socket 8 import socket 9 import subprocess 9 import subprocess 10 import unittest 10 import unittest 11 11 12 12 13 # Add the source tree of bpftool and /usr/loca 13 # Add the source tree of bpftool and /usr/local/sbin to PATH 14 cur_dir = os.path.dirname(os.path.realpath(__f 14 cur_dir = os.path.dirname(os.path.realpath(__file__)) 15 bpftool_dir = os.path.abspath(os.path.join(cur 15 bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..", 16 "to 16 "tools", "bpf", "bpftool")) 17 os.environ["PATH"] = bpftool_dir + ":/usr/loca 17 os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"] 18 18 19 19 20 class IfaceNotFoundError(Exception): 20 class IfaceNotFoundError(Exception): 21 pass 21 pass 22 22 23 23 24 class UnprivilegedUserError(Exception): 24 class UnprivilegedUserError(Exception): 25 pass 25 pass 26 26 27 27 28 def _bpftool(args, json=True): 28 def _bpftool(args, json=True): 29 _args = ["bpftool"] 29 _args = ["bpftool"] 30 if json: 30 if json: 31 _args.append("-j") 31 _args.append("-j") 32 _args.extend(args) 32 _args.extend(args) 33 33 34 return subprocess.check_output(_args) 34 return subprocess.check_output(_args) 35 35 36 36 37 def bpftool(args): 37 def bpftool(args): 38 return _bpftool(args, json=False).decode(" 38 return _bpftool(args, json=False).decode("utf-8") 39 39 40 40 41 def bpftool_json(args): 41 def bpftool_json(args): 42 res = _bpftool(args) 42 res = _bpftool(args) 43 return json.loads(res) 43 return json.loads(res) 44 44 45 45 46 def get_default_iface(): 46 def get_default_iface(): 47 for iface in socket.if_nameindex(): 47 for iface in socket.if_nameindex(): 48 if iface[1] != "lo": 48 if iface[1] != "lo": 49 return iface[1] 49 return iface[1] 50 raise IfaceNotFoundError("Could not find a 50 raise IfaceNotFoundError("Could not find any network interface to probe") 51 51 52 52 53 def default_iface(f): 53 def default_iface(f): 54 @functools.wraps(f) 54 @functools.wraps(f) 55 def wrapper(*args, **kwargs): 55 def wrapper(*args, **kwargs): 56 iface = get_default_iface() 56 iface = get_default_iface() 57 return f(*args, iface, **kwargs) 57 return f(*args, iface, **kwargs) 58 return wrapper 58 return wrapper 59 59 60 DMESG_EMITTING_HELPERS = [ 60 DMESG_EMITTING_HELPERS = [ 61 "bpf_probe_write_user", 61 "bpf_probe_write_user", 62 "bpf_trace_printk", 62 "bpf_trace_printk", 63 "bpf_trace_vprintk", 63 "bpf_trace_vprintk", 64 ] 64 ] 65 65 66 class TestBpftool(unittest.TestCase): 66 class TestBpftool(unittest.TestCase): 67 @classmethod 67 @classmethod 68 def setUpClass(cls): 68 def setUpClass(cls): 69 if os.getuid() != 0: 69 if os.getuid() != 0: 70 raise UnprivilegedUserError( 70 raise UnprivilegedUserError( 71 "This test suite needs root pr 71 "This test suite needs root privileges") 72 72 73 @default_iface 73 @default_iface 74 def test_feature_dev_json(self, iface): 74 def test_feature_dev_json(self, iface): 75 unexpected_helpers = DMESG_EMITTING_HE 75 unexpected_helpers = DMESG_EMITTING_HELPERS 76 expected_keys = [ 76 expected_keys = [ 77 "syscall_config", 77 "syscall_config", 78 "program_types", 78 "program_types", 79 "map_types", 79 "map_types", 80 "helpers", 80 "helpers", 81 "misc", 81 "misc", 82 ] 82 ] 83 83 84 res = bpftool_json(["feature", "probe" 84 res = bpftool_json(["feature", "probe", "dev", iface]) 85 # Check if the result has all expected 85 # Check if the result has all expected keys. 86 self.assertCountEqual(res.keys(), expe 86 self.assertCountEqual(res.keys(), expected_keys) 87 # Check if unexpected helpers are not 87 # Check if unexpected helpers are not included in helpers probes 88 # result. 88 # result. 89 for helpers in res["helpers"].values() 89 for helpers in res["helpers"].values(): 90 for unexpected_helper in unexpecte 90 for unexpected_helper in unexpected_helpers: 91 self.assertNotIn(unexpected_he 91 self.assertNotIn(unexpected_helper, helpers) 92 92 93 def test_feature_kernel(self): 93 def test_feature_kernel(self): 94 test_cases = [ 94 test_cases = [ 95 bpftool_json(["feature", "probe", 95 bpftool_json(["feature", "probe", "kernel"]), 96 bpftool_json(["feature", "probe"]) 96 bpftool_json(["feature", "probe"]), 97 bpftool_json(["feature"]), 97 bpftool_json(["feature"]), 98 ] 98 ] 99 unexpected_helpers = DMESG_EMITTING_HE 99 unexpected_helpers = DMESG_EMITTING_HELPERS 100 expected_keys = [ 100 expected_keys = [ 101 "syscall_config", 101 "syscall_config", 102 "system_config", 102 "system_config", 103 "program_types", 103 "program_types", 104 "map_types", 104 "map_types", 105 "helpers", 105 "helpers", 106 "misc", 106 "misc", 107 ] 107 ] 108 108 109 for tc in test_cases: 109 for tc in test_cases: 110 # Check if the result has all expe 110 # Check if the result has all expected keys. 111 self.assertCountEqual(tc.keys(), e 111 self.assertCountEqual(tc.keys(), expected_keys) 112 # Check if unexpected helpers are 112 # Check if unexpected helpers are not included in helpers probes 113 # result. 113 # result. 114 for helpers in tc["helpers"].value 114 for helpers in tc["helpers"].values(): 115 for unexpected_helper in unexp 115 for unexpected_helper in unexpected_helpers: 116 self.assertNotIn(unexpecte 116 self.assertNotIn(unexpected_helper, helpers) 117 117 118 def test_feature_kernel_full(self): 118 def test_feature_kernel_full(self): 119 test_cases = [ 119 test_cases = [ 120 bpftool_json(["feature", "probe", 120 bpftool_json(["feature", "probe", "kernel", "full"]), 121 bpftool_json(["feature", "probe", 121 bpftool_json(["feature", "probe", "full"]), 122 ] 122 ] 123 expected_helpers = DMESG_EMITTING_HELP 123 expected_helpers = DMESG_EMITTING_HELPERS 124 124 125 for tc in test_cases: 125 for tc in test_cases: 126 # Check if expected helpers are in 126 # Check if expected helpers are included at least once in any 127 # helpers list for any program typ 127 # helpers list for any program type. Unfortunately we cannot assume 128 # that they will be included in al 128 # that they will be included in all program types or a specific 129 # subset of programs. It depends o 129 # subset of programs. It depends on the kernel version and 130 # configuration. 130 # configuration. 131 found_helpers = False 131 found_helpers = False 132 132 133 for helpers in tc["helpers"].value 133 for helpers in tc["helpers"].values(): 134 if all(expected_helper in help 134 if all(expected_helper in helpers 135 for expected_helper in 135 for expected_helper in expected_helpers): 136 found_helpers = True 136 found_helpers = True 137 break 137 break 138 138 139 self.assertTrue(found_helpers) 139 self.assertTrue(found_helpers) 140 140 141 def test_feature_kernel_full_vs_not_full(s 141 def test_feature_kernel_full_vs_not_full(self): 142 full_res = bpftool_json(["feature", "p 142 full_res = bpftool_json(["feature", "probe", "full"]) 143 not_full_res = bpftool_json(["feature" 143 not_full_res = bpftool_json(["feature", "probe"]) 144 not_full_set = set() 144 not_full_set = set() 145 full_set = set() 145 full_set = set() 146 146 147 for helpers in full_res["helpers"].val 147 for helpers in full_res["helpers"].values(): 148 for helper in helpers: 148 for helper in helpers: 149 full_set.add(helper) 149 full_set.add(helper) 150 150 151 for helpers in not_full_res["helpers"] 151 for helpers in not_full_res["helpers"].values(): 152 for helper in helpers: 152 for helper in helpers: 153 not_full_set.add(helper) 153 not_full_set.add(helper) 154 154 155 self.assertCountEqual(full_set - not_f 155 self.assertCountEqual(full_set - not_full_set, 156 set(DMESG_EMITTI 156 set(DMESG_EMITTING_HELPERS)) 157 self.assertCountEqual(not_full_set - f 157 self.assertCountEqual(not_full_set - full_set, set()) 158 158 159 def test_feature_macros(self): 159 def test_feature_macros(self): 160 expected_patterns = [ 160 expected_patterns = [ 161 r"/\*\*\* System call availability 161 r"/\*\*\* System call availability \*\*\*/", 162 r"#define HAVE_BPF_SYSCALL", 162 r"#define HAVE_BPF_SYSCALL", 163 r"/\*\*\* eBPF program types \*\*\ 163 r"/\*\*\* eBPF program types \*\*\*/", 164 r"#define HAVE.*PROG_TYPE", 164 r"#define HAVE.*PROG_TYPE", 165 r"/\*\*\* eBPF map types \*\*\*/", 165 r"/\*\*\* eBPF map types \*\*\*/", 166 r"#define HAVE.*MAP_TYPE", 166 r"#define HAVE.*MAP_TYPE", 167 r"/\*\*\* eBPF helper functions \* 167 r"/\*\*\* eBPF helper functions \*\*\*/", 168 r"#define HAVE.*HELPER", 168 r"#define HAVE.*HELPER", 169 r"/\*\*\* eBPF misc features \*\*\ 169 r"/\*\*\* eBPF misc features \*\*\*/", 170 ] 170 ] 171 171 172 res = bpftool(["feature", "probe", "ma 172 res = bpftool(["feature", "probe", "macros"]) 173 for pattern in expected_patterns: 173 for pattern in expected_patterns: 174 self.assertRegex(res, pattern) 174 self.assertRegex(res, pattern)
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.