1 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-C 1 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause) 2 2 3 from argparse import ArgumentParser 3 from argparse import ArgumentParser 4 from argparse import FileType 4 from argparse import FileType 5 import os 5 import os 6 import sys 6 import sys 7 import tpm2 7 import tpm2 8 from tpm2 import ProtocolError 8 from tpm2 import ProtocolError 9 import unittest 9 import unittest 10 import logging 10 import logging 11 import struct 11 import struct 12 12 13 class SmokeTest(unittest.TestCase): 13 class SmokeTest(unittest.TestCase): 14 def setUp(self): 14 def setUp(self): 15 self.client = tpm2.Client() 15 self.client = tpm2.Client() 16 self.root_key = self.client.create_roo 16 self.root_key = self.client.create_root_key() 17 17 18 def tearDown(self): 18 def tearDown(self): 19 self.client.flush_context(self.root_ke 19 self.client.flush_context(self.root_key) 20 self.client.close() 20 self.client.close() 21 21 22 def test_seal_with_auth(self): 22 def test_seal_with_auth(self): 23 data = ('X' * 64).encode() 23 data = ('X' * 64).encode() 24 auth = ('A' * 15).encode() 24 auth = ('A' * 15).encode() 25 25 26 blob = self.client.seal(self.root_key, 26 blob = self.client.seal(self.root_key, data, auth, None) 27 result = self.client.unseal(self.root_ 27 result = self.client.unseal(self.root_key, blob, auth, None) 28 self.assertEqual(data, result) 28 self.assertEqual(data, result) 29 29 30 def determine_bank_alg(self, mask): << 31 pcr_banks = self.client.get_cap_pcrs() << 32 for bank_alg, pcrSelection in pcr_bank << 33 if pcrSelection & mask == mask: << 34 return bank_alg << 35 return None << 36 << 37 def test_seal_with_policy(self): 30 def test_seal_with_policy(self): 38 bank_alg = self.determine_bank_alg(1 < << 39 self.assertIsNotNone(bank_alg) << 40 << 41 handle = self.client.start_auth_sessio 31 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL) 42 32 43 data = ('X' * 64).encode() 33 data = ('X' * 64).encode() 44 auth = ('A' * 15).encode() 34 auth = ('A' * 15).encode() 45 pcrs = [16] 35 pcrs = [16] 46 36 47 try: 37 try: 48 self.client.policy_pcr(handle, pcr !! 38 self.client.policy_pcr(handle, pcrs) 49 self.client.policy_password(handle 39 self.client.policy_password(handle) 50 40 51 policy_dig = self.client.get_polic 41 policy_dig = self.client.get_policy_digest(handle) 52 finally: 42 finally: 53 self.client.flush_context(handle) 43 self.client.flush_context(handle) 54 44 55 blob = self.client.seal(self.root_key, 45 blob = self.client.seal(self.root_key, data, auth, policy_dig) 56 46 57 handle = self.client.start_auth_sessio 47 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 58 48 59 try: 49 try: 60 self.client.policy_pcr(handle, pcr !! 50 self.client.policy_pcr(handle, pcrs) 61 self.client.policy_password(handle 51 self.client.policy_password(handle) 62 52 63 result = self.client.unseal(self.r 53 result = self.client.unseal(self.root_key, blob, auth, handle) 64 except: 54 except: 65 self.client.flush_context(handle) 55 self.client.flush_context(handle) 66 raise 56 raise 67 57 68 self.assertEqual(data, result) 58 self.assertEqual(data, result) 69 59 70 def test_unseal_with_wrong_auth(self): 60 def test_unseal_with_wrong_auth(self): 71 data = ('X' * 64).encode() 61 data = ('X' * 64).encode() 72 auth = ('A' * 20).encode() 62 auth = ('A' * 20).encode() 73 rc = 0 63 rc = 0 74 64 75 blob = self.client.seal(self.root_key, 65 blob = self.client.seal(self.root_key, data, auth, None) 76 try: 66 try: 77 result = self.client.unseal(self.r 67 result = self.client.unseal(self.root_key, blob, 78 auth[:-1] + 'B'.encode 68 auth[:-1] + 'B'.encode(), None) 79 except ProtocolError as e: 69 except ProtocolError as e: 80 rc = e.rc 70 rc = e.rc 81 71 82 self.assertEqual(rc, tpm2.TPM2_RC_AUTH 72 self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL) 83 73 84 def test_unseal_with_wrong_policy(self): 74 def test_unseal_with_wrong_policy(self): 85 bank_alg = self.determine_bank_alg(1 < << 86 self.assertIsNotNone(bank_alg) << 87 << 88 handle = self.client.start_auth_sessio 75 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL) 89 76 90 data = ('X' * 64).encode() 77 data = ('X' * 64).encode() 91 auth = ('A' * 17).encode() 78 auth = ('A' * 17).encode() 92 pcrs = [16] 79 pcrs = [16] 93 80 94 try: 81 try: 95 self.client.policy_pcr(handle, pcr !! 82 self.client.policy_pcr(handle, pcrs) 96 self.client.policy_password(handle 83 self.client.policy_password(handle) 97 84 98 policy_dig = self.client.get_polic 85 policy_dig = self.client.get_policy_digest(handle) 99 finally: 86 finally: 100 self.client.flush_context(handle) 87 self.client.flush_context(handle) 101 88 102 blob = self.client.seal(self.root_key, 89 blob = self.client.seal(self.root_key, data, auth, policy_dig) 103 90 104 # Extend first a PCR that is not part 91 # Extend first a PCR that is not part of the policy and try to unseal. 105 # This should succeed. 92 # This should succeed. 106 93 107 ds = tpm2.get_digest_size(bank_alg) !! 94 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1) 108 self.client.extend_pcr(1, ('X' * ds).e !! 95 self.client.extend_pcr(1, ('X' * ds).encode()) 109 96 110 handle = self.client.start_auth_sessio 97 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 111 98 112 try: 99 try: 113 self.client.policy_pcr(handle, pcr !! 100 self.client.policy_pcr(handle, pcrs) 114 self.client.policy_password(handle 101 self.client.policy_password(handle) 115 102 116 result = self.client.unseal(self.r 103 result = self.client.unseal(self.root_key, blob, auth, handle) 117 except: 104 except: 118 self.client.flush_context(handle) 105 self.client.flush_context(handle) 119 raise 106 raise 120 107 121 self.assertEqual(data, result) 108 self.assertEqual(data, result) 122 109 123 # Then, extend a PCR that is part of t 110 # Then, extend a PCR that is part of the policy and try to unseal. 124 # This should fail. 111 # This should fail. 125 self.client.extend_pcr(16, ('X' * ds). !! 112 self.client.extend_pcr(16, ('X' * ds).encode()) 126 113 127 handle = self.client.start_auth_sessio 114 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 128 115 129 rc = 0 116 rc = 0 130 117 131 try: 118 try: 132 self.client.policy_pcr(handle, pcr !! 119 self.client.policy_pcr(handle, pcrs) 133 self.client.policy_password(handle 120 self.client.policy_password(handle) 134 121 135 result = self.client.unseal(self.r 122 result = self.client.unseal(self.root_key, blob, auth, handle) 136 except ProtocolError as e: 123 except ProtocolError as e: 137 rc = e.rc 124 rc = e.rc 138 self.client.flush_context(handle) 125 self.client.flush_context(handle) 139 except: 126 except: 140 self.client.flush_context(handle) 127 self.client.flush_context(handle) 141 raise 128 raise 142 129 143 self.assertEqual(rc, tpm2.TPM2_RC_POLI 130 self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL) 144 131 145 def test_seal_with_too_long_auth(self): 132 def test_seal_with_too_long_auth(self): 146 ds = tpm2.get_digest_size(tpm2.TPM2_AL 133 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1) 147 data = ('X' * 64).encode() 134 data = ('X' * 64).encode() 148 auth = ('A' * (ds + 1)).encode() 135 auth = ('A' * (ds + 1)).encode() 149 136 150 rc = 0 137 rc = 0 151 try: 138 try: 152 blob = self.client.seal(self.root_ 139 blob = self.client.seal(self.root_key, data, auth, None) 153 except ProtocolError as e: 140 except ProtocolError as e: 154 rc = e.rc 141 rc = e.rc 155 142 156 self.assertEqual(rc, tpm2.TPM2_RC_SIZE 143 self.assertEqual(rc, tpm2.TPM2_RC_SIZE) 157 144 158 def test_too_short_cmd(self): 145 def test_too_short_cmd(self): 159 rejected = False 146 rejected = False 160 try: 147 try: 161 fmt = '>HIII' 148 fmt = '>HIII' 162 cmd = struct.pack(fmt, 149 cmd = struct.pack(fmt, 163 tpm2.TPM2_ST_NO_ 150 tpm2.TPM2_ST_NO_SESSIONS, 164 struct.calcsize( 151 struct.calcsize(fmt) + 1, 165 tpm2.TPM2_CC_FLU 152 tpm2.TPM2_CC_FLUSH_CONTEXT, 166 0xDEADBEEF) 153 0xDEADBEEF) 167 154 168 self.client.send_cmd(cmd) 155 self.client.send_cmd(cmd) 169 except IOError as e: 156 except IOError as e: 170 rejected = True 157 rejected = True 171 except: 158 except: 172 pass 159 pass 173 self.assertEqual(rejected, True) 160 self.assertEqual(rejected, True) 174 161 175 def test_read_partial_resp(self): 162 def test_read_partial_resp(self): 176 try: 163 try: 177 fmt = '>HIIH' 164 fmt = '>HIIH' 178 cmd = struct.pack(fmt, 165 cmd = struct.pack(fmt, 179 tpm2.TPM2_ST_NO_ 166 tpm2.TPM2_ST_NO_SESSIONS, 180 struct.calcsize( 167 struct.calcsize(fmt), 181 tpm2.TPM2_CC_GET 168 tpm2.TPM2_CC_GET_RANDOM, 182 0x20) 169 0x20) 183 self.client.tpm.write(cmd) 170 self.client.tpm.write(cmd) 184 hdr = self.client.tpm.read(10) 171 hdr = self.client.tpm.read(10) 185 sz = struct.unpack('>I', hdr[2:6]) 172 sz = struct.unpack('>I', hdr[2:6])[0] 186 rsp = self.client.tpm.read() 173 rsp = self.client.tpm.read() 187 except: 174 except: 188 pass 175 pass 189 self.assertEqual(sz, 10 + 2 + 32) 176 self.assertEqual(sz, 10 + 2 + 32) 190 self.assertEqual(len(rsp), 2 + 32) 177 self.assertEqual(len(rsp), 2 + 32) 191 178 192 def test_read_partial_overwrite(self): 179 def test_read_partial_overwrite(self): 193 try: 180 try: 194 fmt = '>HIIH' 181 fmt = '>HIIH' 195 cmd = struct.pack(fmt, 182 cmd = struct.pack(fmt, 196 tpm2.TPM2_ST_NO_ 183 tpm2.TPM2_ST_NO_SESSIONS, 197 struct.calcsize( 184 struct.calcsize(fmt), 198 tpm2.TPM2_CC_GET 185 tpm2.TPM2_CC_GET_RANDOM, 199 0x20) 186 0x20) 200 self.client.tpm.write(cmd) 187 self.client.tpm.write(cmd) 201 # Read part of the respone 188 # Read part of the respone 202 rsp1 = self.client.tpm.read(15) 189 rsp1 = self.client.tpm.read(15) 203 190 204 # Send a new cmd 191 # Send a new cmd 205 self.client.tpm.write(cmd) 192 self.client.tpm.write(cmd) 206 193 207 # Read the whole respone 194 # Read the whole respone 208 rsp2 = self.client.tpm.read() 195 rsp2 = self.client.tpm.read() 209 except: 196 except: 210 pass 197 pass 211 self.assertEqual(len(rsp1), 15) 198 self.assertEqual(len(rsp1), 15) 212 self.assertEqual(len(rsp2), 10 + 2 + 3 199 self.assertEqual(len(rsp2), 10 + 2 + 32) 213 200 214 def test_send_two_cmds(self): 201 def test_send_two_cmds(self): 215 rejected = False 202 rejected = False 216 try: 203 try: 217 fmt = '>HIIH' 204 fmt = '>HIIH' 218 cmd = struct.pack(fmt, 205 cmd = struct.pack(fmt, 219 tpm2.TPM2_ST_NO_ 206 tpm2.TPM2_ST_NO_SESSIONS, 220 struct.calcsize( 207 struct.calcsize(fmt), 221 tpm2.TPM2_CC_GET 208 tpm2.TPM2_CC_GET_RANDOM, 222 0x20) 209 0x20) 223 self.client.tpm.write(cmd) 210 self.client.tpm.write(cmd) 224 211 225 # expect the second one to raise - 212 # expect the second one to raise -EBUSY error 226 self.client.tpm.write(cmd) 213 self.client.tpm.write(cmd) 227 rsp = self.client.tpm.read() 214 rsp = self.client.tpm.read() 228 215 229 except IOError as e: 216 except IOError as e: 230 # read the response 217 # read the response 231 rsp = self.client.tpm.read() 218 rsp = self.client.tpm.read() 232 rejected = True 219 rejected = True 233 pass 220 pass 234 except: 221 except: 235 pass 222 pass 236 self.assertEqual(rejected, True) 223 self.assertEqual(rejected, True) 237 224 238 class SpaceTest(unittest.TestCase): 225 class SpaceTest(unittest.TestCase): 239 def setUp(self): 226 def setUp(self): 240 logging.basicConfig(filename='SpaceTes 227 logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG) 241 228 242 def test_make_two_spaces(self): 229 def test_make_two_spaces(self): 243 log = logging.getLogger(__name__) 230 log = logging.getLogger(__name__) 244 log.debug("test_make_two_spaces") 231 log.debug("test_make_two_spaces") 245 232 246 space1 = tpm2.Client(tpm2.Client.FLAG_ 233 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 247 root1 = space1.create_root_key() 234 root1 = space1.create_root_key() 248 space2 = tpm2.Client(tpm2.Client.FLAG_ 235 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE) 249 root2 = space2.create_root_key() 236 root2 = space2.create_root_key() 250 root3 = space2.create_root_key() 237 root3 = space2.create_root_key() 251 238 252 log.debug("%08x" % (root1)) 239 log.debug("%08x" % (root1)) 253 log.debug("%08x" % (root2)) 240 log.debug("%08x" % (root2)) 254 log.debug("%08x" % (root3)) 241 log.debug("%08x" % (root3)) 255 242 256 def test_flush_context(self): 243 def test_flush_context(self): 257 log = logging.getLogger(__name__) 244 log = logging.getLogger(__name__) 258 log.debug("test_flush_context") 245 log.debug("test_flush_context") 259 246 260 space1 = tpm2.Client(tpm2.Client.FLAG_ 247 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 261 root1 = space1.create_root_key() 248 root1 = space1.create_root_key() 262 log.debug("%08x" % (root1)) 249 log.debug("%08x" % (root1)) 263 250 264 space1.flush_context(root1) 251 space1.flush_context(root1) 265 252 266 def test_get_handles(self): 253 def test_get_handles(self): 267 log = logging.getLogger(__name__) 254 log = logging.getLogger(__name__) 268 log.debug("test_get_handles") 255 log.debug("test_get_handles") 269 256 270 space1 = tpm2.Client(tpm2.Client.FLAG_ 257 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 271 space1.create_root_key() 258 space1.create_root_key() 272 space2 = tpm2.Client(tpm2.Client.FLAG_ 259 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE) 273 space2.create_root_key() 260 space2.create_root_key() 274 space2.create_root_key() 261 space2.create_root_key() 275 262 276 handles = space2.get_cap(tpm2.TPM2_CAP 263 handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT) 277 264 278 self.assertEqual(len(handles), 2) 265 self.assertEqual(len(handles), 2) 279 266 280 log.debug("%08x" % (handles[0])) 267 log.debug("%08x" % (handles[0])) 281 log.debug("%08x" % (handles[1])) 268 log.debug("%08x" % (handles[1])) 282 269 283 def test_invalid_cc(self): 270 def test_invalid_cc(self): 284 log = logging.getLogger(__name__) 271 log = logging.getLogger(__name__) 285 log.debug(sys._getframe().f_code.co_na 272 log.debug(sys._getframe().f_code.co_name) 286 273 287 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 274 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1 288 275 289 space1 = tpm2.Client(tpm2.Client.FLAG_ 276 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 290 root1 = space1.create_root_key() 277 root1 = space1.create_root_key() 291 log.debug("%08x" % (root1)) 278 log.debug("%08x" % (root1)) 292 279 293 fmt = '>HII' 280 fmt = '>HII' 294 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO 281 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), 295 TPM2_CC_INVALID) 282 TPM2_CC_INVALID) 296 283 297 rc = 0 284 rc = 0 298 try: 285 try: 299 space1.send_cmd(cmd) 286 space1.send_cmd(cmd) 300 except ProtocolError as e: 287 except ProtocolError as e: 301 rc = e.rc 288 rc = e.rc 302 289 303 self.assertEqual(rc, tpm2.TPM2_RC_COMM 290 self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE | 304 tpm2.TSS2_RESMGR_TPM_ 291 tpm2.TSS2_RESMGR_TPM_RC_LAYER) 305 292 306 class AsyncTest(unittest.TestCase): 293 class AsyncTest(unittest.TestCase): 307 def setUp(self): 294 def setUp(self): 308 logging.basicConfig(filename='AsyncTes 295 logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG) 309 296 310 def test_async(self): 297 def test_async(self): 311 log = logging.getLogger(__name__) 298 log = logging.getLogger(__name__) 312 log.debug(sys._getframe().f_code.co_na 299 log.debug(sys._getframe().f_code.co_name) 313 300 314 async_client = tpm2.Client(tpm2.Client 301 async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK) 315 log.debug("Calling get_cap in a NON_BL 302 log.debug("Calling get_cap in a NON_BLOCKING mode") 316 async_client.get_cap(tpm2.TPM2_CAP_HAN 303 async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION) 317 async_client.close() << 318 << 319 def test_flush_invalid_context(self): << 320 log = logging.getLogger(__name__) << 321 log.debug(sys._getframe().f_code.co_na << 322 << 323 async_client = tpm2.Client(tpm2.Client << 324 log.debug("Calling flush_context passi << 325 handle = 0x80123456 << 326 rc = 0 << 327 try: << 328 async_client.flush_context(handle) << 329 except OSError as e: << 330 rc = e.errno << 331 << 332 self.assertEqual(rc, 22) << 333 async_client.close() 304 async_client.close()
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.