1 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-C 1 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause) 2 2 3 from argparse import ArgumentParser 3 from argparse import ArgumentParser 4 from argparse import FileType 4 from argparse import FileType 5 import os 5 import os 6 import sys 6 import sys 7 import tpm2 7 import tpm2 8 from tpm2 import ProtocolError 8 from tpm2 import ProtocolError 9 import unittest 9 import unittest 10 import logging 10 import logging 11 import struct 11 import struct 12 12 13 class SmokeTest(unittest.TestCase): 13 class SmokeTest(unittest.TestCase): 14 def setUp(self): 14 def setUp(self): 15 self.client = tpm2.Client() 15 self.client = tpm2.Client() 16 self.root_key = self.client.create_roo 16 self.root_key = self.client.create_root_key() 17 17 18 def tearDown(self): 18 def tearDown(self): 19 self.client.flush_context(self.root_ke 19 self.client.flush_context(self.root_key) 20 self.client.close() 20 self.client.close() 21 21 22 def test_seal_with_auth(self): 22 def test_seal_with_auth(self): 23 data = ('X' * 64).encode() 23 data = ('X' * 64).encode() 24 auth = ('A' * 15).encode() 24 auth = ('A' * 15).encode() 25 25 26 blob = self.client.seal(self.root_key, 26 blob = self.client.seal(self.root_key, data, auth, None) 27 result = self.client.unseal(self.root_ 27 result = self.client.unseal(self.root_key, blob, auth, None) 28 self.assertEqual(data, result) 28 self.assertEqual(data, result) 29 29 30 def determine_bank_alg(self, mask): 30 def determine_bank_alg(self, mask): 31 pcr_banks = self.client.get_cap_pcrs() 31 pcr_banks = self.client.get_cap_pcrs() 32 for bank_alg, pcrSelection in pcr_bank 32 for bank_alg, pcrSelection in pcr_banks.items(): 33 if pcrSelection & mask == mask: 33 if pcrSelection & mask == mask: 34 return bank_alg 34 return bank_alg 35 return None 35 return None 36 36 37 def test_seal_with_policy(self): 37 def test_seal_with_policy(self): 38 bank_alg = self.determine_bank_alg(1 < 38 bank_alg = self.determine_bank_alg(1 << 16) 39 self.assertIsNotNone(bank_alg) 39 self.assertIsNotNone(bank_alg) 40 40 41 handle = self.client.start_auth_sessio 41 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL) 42 42 43 data = ('X' * 64).encode() 43 data = ('X' * 64).encode() 44 auth = ('A' * 15).encode() 44 auth = ('A' * 15).encode() 45 pcrs = [16] 45 pcrs = [16] 46 46 47 try: 47 try: 48 self.client.policy_pcr(handle, pcr 48 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg) 49 self.client.policy_password(handle 49 self.client.policy_password(handle) 50 50 51 policy_dig = self.client.get_polic 51 policy_dig = self.client.get_policy_digest(handle) 52 finally: 52 finally: 53 self.client.flush_context(handle) 53 self.client.flush_context(handle) 54 54 55 blob = self.client.seal(self.root_key, 55 blob = self.client.seal(self.root_key, data, auth, policy_dig) 56 56 57 handle = self.client.start_auth_sessio 57 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 58 58 59 try: 59 try: 60 self.client.policy_pcr(handle, pcr 60 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg) 61 self.client.policy_password(handle 61 self.client.policy_password(handle) 62 62 63 result = self.client.unseal(self.r 63 result = self.client.unseal(self.root_key, blob, auth, handle) 64 except: 64 except: 65 self.client.flush_context(handle) 65 self.client.flush_context(handle) 66 raise 66 raise 67 67 68 self.assertEqual(data, result) 68 self.assertEqual(data, result) 69 69 70 def test_unseal_with_wrong_auth(self): 70 def test_unseal_with_wrong_auth(self): 71 data = ('X' * 64).encode() 71 data = ('X' * 64).encode() 72 auth = ('A' * 20).encode() 72 auth = ('A' * 20).encode() 73 rc = 0 73 rc = 0 74 74 75 blob = self.client.seal(self.root_key, 75 blob = self.client.seal(self.root_key, data, auth, None) 76 try: 76 try: 77 result = self.client.unseal(self.r 77 result = self.client.unseal(self.root_key, blob, 78 auth[:-1] + 'B'.encode 78 auth[:-1] + 'B'.encode(), None) 79 except ProtocolError as e: 79 except ProtocolError as e: 80 rc = e.rc 80 rc = e.rc 81 81 82 self.assertEqual(rc, tpm2.TPM2_RC_AUTH 82 self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL) 83 83 84 def test_unseal_with_wrong_policy(self): 84 def test_unseal_with_wrong_policy(self): 85 bank_alg = self.determine_bank_alg(1 < 85 bank_alg = self.determine_bank_alg(1 << 16 | 1 << 1) 86 self.assertIsNotNone(bank_alg) 86 self.assertIsNotNone(bank_alg) 87 87 88 handle = self.client.start_auth_sessio 88 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL) 89 89 90 data = ('X' * 64).encode() 90 data = ('X' * 64).encode() 91 auth = ('A' * 17).encode() 91 auth = ('A' * 17).encode() 92 pcrs = [16] 92 pcrs = [16] 93 93 94 try: 94 try: 95 self.client.policy_pcr(handle, pcr 95 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg) 96 self.client.policy_password(handle 96 self.client.policy_password(handle) 97 97 98 policy_dig = self.client.get_polic 98 policy_dig = self.client.get_policy_digest(handle) 99 finally: 99 finally: 100 self.client.flush_context(handle) 100 self.client.flush_context(handle) 101 101 102 blob = self.client.seal(self.root_key, 102 blob = self.client.seal(self.root_key, data, auth, policy_dig) 103 103 104 # Extend first a PCR that is not part 104 # Extend first a PCR that is not part of the policy and try to unseal. 105 # This should succeed. 105 # This should succeed. 106 106 107 ds = tpm2.get_digest_size(bank_alg) 107 ds = tpm2.get_digest_size(bank_alg) 108 self.client.extend_pcr(1, ('X' * ds).e 108 self.client.extend_pcr(1, ('X' * ds).encode(), bank_alg=bank_alg) 109 109 110 handle = self.client.start_auth_sessio 110 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 111 111 112 try: 112 try: 113 self.client.policy_pcr(handle, pcr 113 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg) 114 self.client.policy_password(handle 114 self.client.policy_password(handle) 115 115 116 result = self.client.unseal(self.r 116 result = self.client.unseal(self.root_key, blob, auth, handle) 117 except: 117 except: 118 self.client.flush_context(handle) 118 self.client.flush_context(handle) 119 raise 119 raise 120 120 121 self.assertEqual(data, result) 121 self.assertEqual(data, result) 122 122 123 # Then, extend a PCR that is part of t 123 # Then, extend a PCR that is part of the policy and try to unseal. 124 # This should fail. 124 # This should fail. 125 self.client.extend_pcr(16, ('X' * ds). 125 self.client.extend_pcr(16, ('X' * ds).encode(), bank_alg=bank_alg) 126 126 127 handle = self.client.start_auth_sessio 127 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 128 128 129 rc = 0 129 rc = 0 130 130 131 try: 131 try: 132 self.client.policy_pcr(handle, pcr 132 self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg) 133 self.client.policy_password(handle 133 self.client.policy_password(handle) 134 134 135 result = self.client.unseal(self.r 135 result = self.client.unseal(self.root_key, blob, auth, handle) 136 except ProtocolError as e: 136 except ProtocolError as e: 137 rc = e.rc 137 rc = e.rc 138 self.client.flush_context(handle) 138 self.client.flush_context(handle) 139 except: 139 except: 140 self.client.flush_context(handle) 140 self.client.flush_context(handle) 141 raise 141 raise 142 142 143 self.assertEqual(rc, tpm2.TPM2_RC_POLI 143 self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL) 144 144 145 def test_seal_with_too_long_auth(self): 145 def test_seal_with_too_long_auth(self): 146 ds = tpm2.get_digest_size(tpm2.TPM2_AL 146 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1) 147 data = ('X' * 64).encode() 147 data = ('X' * 64).encode() 148 auth = ('A' * (ds + 1)).encode() 148 auth = ('A' * (ds + 1)).encode() 149 149 150 rc = 0 150 rc = 0 151 try: 151 try: 152 blob = self.client.seal(self.root_ 152 blob = self.client.seal(self.root_key, data, auth, None) 153 except ProtocolError as e: 153 except ProtocolError as e: 154 rc = e.rc 154 rc = e.rc 155 155 156 self.assertEqual(rc, tpm2.TPM2_RC_SIZE 156 self.assertEqual(rc, tpm2.TPM2_RC_SIZE) 157 157 158 def test_too_short_cmd(self): 158 def test_too_short_cmd(self): 159 rejected = False 159 rejected = False 160 try: 160 try: 161 fmt = '>HIII' 161 fmt = '>HIII' 162 cmd = struct.pack(fmt, 162 cmd = struct.pack(fmt, 163 tpm2.TPM2_ST_NO_ 163 tpm2.TPM2_ST_NO_SESSIONS, 164 struct.calcsize( 164 struct.calcsize(fmt) + 1, 165 tpm2.TPM2_CC_FLU 165 tpm2.TPM2_CC_FLUSH_CONTEXT, 166 0xDEADBEEF) 166 0xDEADBEEF) 167 167 168 self.client.send_cmd(cmd) 168 self.client.send_cmd(cmd) 169 except IOError as e: 169 except IOError as e: 170 rejected = True 170 rejected = True 171 except: 171 except: 172 pass 172 pass 173 self.assertEqual(rejected, True) 173 self.assertEqual(rejected, True) 174 174 175 def test_read_partial_resp(self): 175 def test_read_partial_resp(self): 176 try: 176 try: 177 fmt = '>HIIH' 177 fmt = '>HIIH' 178 cmd = struct.pack(fmt, 178 cmd = struct.pack(fmt, 179 tpm2.TPM2_ST_NO_ 179 tpm2.TPM2_ST_NO_SESSIONS, 180 struct.calcsize( 180 struct.calcsize(fmt), 181 tpm2.TPM2_CC_GET 181 tpm2.TPM2_CC_GET_RANDOM, 182 0x20) 182 0x20) 183 self.client.tpm.write(cmd) 183 self.client.tpm.write(cmd) 184 hdr = self.client.tpm.read(10) 184 hdr = self.client.tpm.read(10) 185 sz = struct.unpack('>I', hdr[2:6]) 185 sz = struct.unpack('>I', hdr[2:6])[0] 186 rsp = self.client.tpm.read() 186 rsp = self.client.tpm.read() 187 except: 187 except: 188 pass 188 pass 189 self.assertEqual(sz, 10 + 2 + 32) 189 self.assertEqual(sz, 10 + 2 + 32) 190 self.assertEqual(len(rsp), 2 + 32) 190 self.assertEqual(len(rsp), 2 + 32) 191 191 192 def test_read_partial_overwrite(self): 192 def test_read_partial_overwrite(self): 193 try: 193 try: 194 fmt = '>HIIH' 194 fmt = '>HIIH' 195 cmd = struct.pack(fmt, 195 cmd = struct.pack(fmt, 196 tpm2.TPM2_ST_NO_ 196 tpm2.TPM2_ST_NO_SESSIONS, 197 struct.calcsize( 197 struct.calcsize(fmt), 198 tpm2.TPM2_CC_GET 198 tpm2.TPM2_CC_GET_RANDOM, 199 0x20) 199 0x20) 200 self.client.tpm.write(cmd) 200 self.client.tpm.write(cmd) 201 # Read part of the respone 201 # Read part of the respone 202 rsp1 = self.client.tpm.read(15) 202 rsp1 = self.client.tpm.read(15) 203 203 204 # Send a new cmd 204 # Send a new cmd 205 self.client.tpm.write(cmd) 205 self.client.tpm.write(cmd) 206 206 207 # Read the whole respone 207 # Read the whole respone 208 rsp2 = self.client.tpm.read() 208 rsp2 = self.client.tpm.read() 209 except: 209 except: 210 pass 210 pass 211 self.assertEqual(len(rsp1), 15) 211 self.assertEqual(len(rsp1), 15) 212 self.assertEqual(len(rsp2), 10 + 2 + 3 212 self.assertEqual(len(rsp2), 10 + 2 + 32) 213 213 214 def test_send_two_cmds(self): 214 def test_send_two_cmds(self): 215 rejected = False 215 rejected = False 216 try: 216 try: 217 fmt = '>HIIH' 217 fmt = '>HIIH' 218 cmd = struct.pack(fmt, 218 cmd = struct.pack(fmt, 219 tpm2.TPM2_ST_NO_ 219 tpm2.TPM2_ST_NO_SESSIONS, 220 struct.calcsize( 220 struct.calcsize(fmt), 221 tpm2.TPM2_CC_GET 221 tpm2.TPM2_CC_GET_RANDOM, 222 0x20) 222 0x20) 223 self.client.tpm.write(cmd) 223 self.client.tpm.write(cmd) 224 224 225 # expect the second one to raise - 225 # expect the second one to raise -EBUSY error 226 self.client.tpm.write(cmd) 226 self.client.tpm.write(cmd) 227 rsp = self.client.tpm.read() 227 rsp = self.client.tpm.read() 228 228 229 except IOError as e: 229 except IOError as e: 230 # read the response 230 # read the response 231 rsp = self.client.tpm.read() 231 rsp = self.client.tpm.read() 232 rejected = True 232 rejected = True 233 pass 233 pass 234 except: 234 except: 235 pass 235 pass 236 self.assertEqual(rejected, True) 236 self.assertEqual(rejected, True) 237 237 238 class SpaceTest(unittest.TestCase): 238 class SpaceTest(unittest.TestCase): 239 def setUp(self): 239 def setUp(self): 240 logging.basicConfig(filename='SpaceTes 240 logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG) 241 241 242 def test_make_two_spaces(self): 242 def test_make_two_spaces(self): 243 log = logging.getLogger(__name__) 243 log = logging.getLogger(__name__) 244 log.debug("test_make_two_spaces") 244 log.debug("test_make_two_spaces") 245 245 246 space1 = tpm2.Client(tpm2.Client.FLAG_ 246 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 247 root1 = space1.create_root_key() 247 root1 = space1.create_root_key() 248 space2 = tpm2.Client(tpm2.Client.FLAG_ 248 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE) 249 root2 = space2.create_root_key() 249 root2 = space2.create_root_key() 250 root3 = space2.create_root_key() 250 root3 = space2.create_root_key() 251 251 252 log.debug("%08x" % (root1)) 252 log.debug("%08x" % (root1)) 253 log.debug("%08x" % (root2)) 253 log.debug("%08x" % (root2)) 254 log.debug("%08x" % (root3)) 254 log.debug("%08x" % (root3)) 255 255 256 def test_flush_context(self): 256 def test_flush_context(self): 257 log = logging.getLogger(__name__) 257 log = logging.getLogger(__name__) 258 log.debug("test_flush_context") 258 log.debug("test_flush_context") 259 259 260 space1 = tpm2.Client(tpm2.Client.FLAG_ 260 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 261 root1 = space1.create_root_key() 261 root1 = space1.create_root_key() 262 log.debug("%08x" % (root1)) 262 log.debug("%08x" % (root1)) 263 263 264 space1.flush_context(root1) 264 space1.flush_context(root1) 265 265 266 def test_get_handles(self): 266 def test_get_handles(self): 267 log = logging.getLogger(__name__) 267 log = logging.getLogger(__name__) 268 log.debug("test_get_handles") 268 log.debug("test_get_handles") 269 269 270 space1 = tpm2.Client(tpm2.Client.FLAG_ 270 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 271 space1.create_root_key() 271 space1.create_root_key() 272 space2 = tpm2.Client(tpm2.Client.FLAG_ 272 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE) 273 space2.create_root_key() 273 space2.create_root_key() 274 space2.create_root_key() 274 space2.create_root_key() 275 275 276 handles = space2.get_cap(tpm2.TPM2_CAP 276 handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT) 277 277 278 self.assertEqual(len(handles), 2) 278 self.assertEqual(len(handles), 2) 279 279 280 log.debug("%08x" % (handles[0])) 280 log.debug("%08x" % (handles[0])) 281 log.debug("%08x" % (handles[1])) 281 log.debug("%08x" % (handles[1])) 282 282 283 def test_invalid_cc(self): 283 def test_invalid_cc(self): 284 log = logging.getLogger(__name__) 284 log = logging.getLogger(__name__) 285 log.debug(sys._getframe().f_code.co_na 285 log.debug(sys._getframe().f_code.co_name) 286 286 287 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 287 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1 288 288 289 space1 = tpm2.Client(tpm2.Client.FLAG_ 289 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 290 root1 = space1.create_root_key() 290 root1 = space1.create_root_key() 291 log.debug("%08x" % (root1)) 291 log.debug("%08x" % (root1)) 292 292 293 fmt = '>HII' 293 fmt = '>HII' 294 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO 294 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), 295 TPM2_CC_INVALID) 295 TPM2_CC_INVALID) 296 296 297 rc = 0 297 rc = 0 298 try: 298 try: 299 space1.send_cmd(cmd) 299 space1.send_cmd(cmd) 300 except ProtocolError as e: 300 except ProtocolError as e: 301 rc = e.rc 301 rc = e.rc 302 302 303 self.assertEqual(rc, tpm2.TPM2_RC_COMM 303 self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE | 304 tpm2.TSS2_RESMGR_TPM_ 304 tpm2.TSS2_RESMGR_TPM_RC_LAYER) 305 305 306 class AsyncTest(unittest.TestCase): 306 class AsyncTest(unittest.TestCase): 307 def setUp(self): 307 def setUp(self): 308 logging.basicConfig(filename='AsyncTes 308 logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG) 309 309 310 def test_async(self): 310 def test_async(self): 311 log = logging.getLogger(__name__) 311 log = logging.getLogger(__name__) 312 log.debug(sys._getframe().f_code.co_na 312 log.debug(sys._getframe().f_code.co_name) 313 313 314 async_client = tpm2.Client(tpm2.Client 314 async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK) 315 log.debug("Calling get_cap in a NON_BL 315 log.debug("Calling get_cap in a NON_BLOCKING mode") 316 async_client.get_cap(tpm2.TPM2_CAP_HAN 316 async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION) 317 async_client.close() 317 async_client.close() 318 318 319 def test_flush_invalid_context(self): 319 def test_flush_invalid_context(self): 320 log = logging.getLogger(__name__) 320 log = logging.getLogger(__name__) 321 log.debug(sys._getframe().f_code.co_na 321 log.debug(sys._getframe().f_code.co_name) 322 322 323 async_client = tpm2.Client(tpm2.Client 323 async_client = tpm2.Client(tpm2.Client.FLAG_SPACE | tpm2.Client.FLAG_NONBLOCK) 324 log.debug("Calling flush_context passi 324 log.debug("Calling flush_context passing in an invalid handle ") 325 handle = 0x80123456 325 handle = 0x80123456 326 rc = 0 326 rc = 0 327 try: 327 try: 328 async_client.flush_context(handle) 328 async_client.flush_context(handle) 329 except OSError as e: 329 except OSError as e: 330 rc = e.errno 330 rc = e.errno 331 331 332 self.assertEqual(rc, 22) 332 self.assertEqual(rc, 22) 333 async_client.close() 333 async_client.close()
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.