1 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-C 1 # SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause) 2 2 3 from argparse import ArgumentParser 3 from argparse import ArgumentParser 4 from argparse import FileType 4 from argparse import FileType 5 import os 5 import os 6 import sys 6 import sys 7 import tpm2 7 import tpm2 8 from tpm2 import ProtocolError 8 from tpm2 import ProtocolError 9 import unittest 9 import unittest 10 import logging 10 import logging 11 import struct 11 import struct 12 12 13 class SmokeTest(unittest.TestCase): 13 class SmokeTest(unittest.TestCase): 14 def setUp(self): 14 def setUp(self): 15 self.client = tpm2.Client() 15 self.client = tpm2.Client() 16 self.root_key = self.client.create_roo 16 self.root_key = self.client.create_root_key() 17 17 18 def tearDown(self): 18 def tearDown(self): 19 self.client.flush_context(self.root_ke 19 self.client.flush_context(self.root_key) 20 self.client.close() 20 self.client.close() 21 21 22 def test_seal_with_auth(self): 22 def test_seal_with_auth(self): 23 data = ('X' * 64).encode() !! 23 data = 'X' * 64 24 auth = ('A' * 15).encode() !! 24 auth = 'A' * 15 25 25 26 blob = self.client.seal(self.root_key, 26 blob = self.client.seal(self.root_key, data, auth, None) 27 result = self.client.unseal(self.root_ 27 result = self.client.unseal(self.root_key, blob, auth, None) 28 self.assertEqual(data, result) 28 self.assertEqual(data, result) 29 29 30 def determine_bank_alg(self, mask): << 31 pcr_banks = self.client.get_cap_pcrs() << 32 for bank_alg, pcrSelection in pcr_bank << 33 if pcrSelection & mask == mask: << 34 return bank_alg << 35 return None << 36 << 37 def test_seal_with_policy(self): 30 def test_seal_with_policy(self): 38 bank_alg = self.determine_bank_alg(1 < << 39 self.assertIsNotNone(bank_alg) << 40 << 41 handle = self.client.start_auth_sessio 31 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL) 42 32 43 data = ('X' * 64).encode() !! 33 data = 'X' * 64 44 auth = ('A' * 15).encode() !! 34 auth = 'A' * 15 45 pcrs = [16] 35 pcrs = [16] 46 36 47 try: 37 try: 48 self.client.policy_pcr(handle, pcr !! 38 self.client.policy_pcr(handle, pcrs) 49 self.client.policy_password(handle 39 self.client.policy_password(handle) 50 40 51 policy_dig = self.client.get_polic 41 policy_dig = self.client.get_policy_digest(handle) 52 finally: 42 finally: 53 self.client.flush_context(handle) 43 self.client.flush_context(handle) 54 44 55 blob = self.client.seal(self.root_key, 45 blob = self.client.seal(self.root_key, data, auth, policy_dig) 56 46 57 handle = self.client.start_auth_sessio 47 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 58 48 59 try: 49 try: 60 self.client.policy_pcr(handle, pcr !! 50 self.client.policy_pcr(handle, pcrs) 61 self.client.policy_password(handle 51 self.client.policy_password(handle) 62 52 63 result = self.client.unseal(self.r 53 result = self.client.unseal(self.root_key, blob, auth, handle) 64 except: 54 except: 65 self.client.flush_context(handle) 55 self.client.flush_context(handle) 66 raise 56 raise 67 57 68 self.assertEqual(data, result) 58 self.assertEqual(data, result) 69 59 70 def test_unseal_with_wrong_auth(self): 60 def test_unseal_with_wrong_auth(self): 71 data = ('X' * 64).encode() !! 61 data = 'X' * 64 72 auth = ('A' * 20).encode() !! 62 auth = 'A' * 20 73 rc = 0 63 rc = 0 74 64 75 blob = self.client.seal(self.root_key, 65 blob = self.client.seal(self.root_key, data, auth, None) 76 try: 66 try: 77 result = self.client.unseal(self.r !! 67 result = self.client.unseal(self.root_key, blob, auth[:-1] + 'B', None) 78 auth[:-1] + 'B'.encode !! 68 except ProtocolError, e: 79 except ProtocolError as e: << 80 rc = e.rc 69 rc = e.rc 81 70 82 self.assertEqual(rc, tpm2.TPM2_RC_AUTH 71 self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL) 83 72 84 def test_unseal_with_wrong_policy(self): 73 def test_unseal_with_wrong_policy(self): 85 bank_alg = self.determine_bank_alg(1 < << 86 self.assertIsNotNone(bank_alg) << 87 << 88 handle = self.client.start_auth_sessio 74 handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL) 89 75 90 data = ('X' * 64).encode() !! 76 data = 'X' * 64 91 auth = ('A' * 17).encode() !! 77 auth = 'A' * 17 92 pcrs = [16] 78 pcrs = [16] 93 79 94 try: 80 try: 95 self.client.policy_pcr(handle, pcr !! 81 self.client.policy_pcr(handle, pcrs) 96 self.client.policy_password(handle 82 self.client.policy_password(handle) 97 83 98 policy_dig = self.client.get_polic 84 policy_dig = self.client.get_policy_digest(handle) 99 finally: 85 finally: 100 self.client.flush_context(handle) 86 self.client.flush_context(handle) 101 87 102 blob = self.client.seal(self.root_key, 88 blob = self.client.seal(self.root_key, data, auth, policy_dig) 103 89 104 # Extend first a PCR that is not part 90 # Extend first a PCR that is not part of the policy and try to unseal. 105 # This should succeed. 91 # This should succeed. 106 92 107 ds = tpm2.get_digest_size(bank_alg) !! 93 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1) 108 self.client.extend_pcr(1, ('X' * ds).e !! 94 self.client.extend_pcr(1, 'X' * ds) 109 95 110 handle = self.client.start_auth_sessio 96 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 111 97 112 try: 98 try: 113 self.client.policy_pcr(handle, pcr !! 99 self.client.policy_pcr(handle, pcrs) 114 self.client.policy_password(handle 100 self.client.policy_password(handle) 115 101 116 result = self.client.unseal(self.r 102 result = self.client.unseal(self.root_key, blob, auth, handle) 117 except: 103 except: 118 self.client.flush_context(handle) 104 self.client.flush_context(handle) 119 raise 105 raise 120 106 121 self.assertEqual(data, result) 107 self.assertEqual(data, result) 122 108 123 # Then, extend a PCR that is part of t 109 # Then, extend a PCR that is part of the policy and try to unseal. 124 # This should fail. 110 # This should fail. 125 self.client.extend_pcr(16, ('X' * ds). !! 111 self.client.extend_pcr(16, 'X' * ds) 126 112 127 handle = self.client.start_auth_sessio 113 handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY) 128 114 129 rc = 0 115 rc = 0 130 116 131 try: 117 try: 132 self.client.policy_pcr(handle, pcr !! 118 self.client.policy_pcr(handle, pcrs) 133 self.client.policy_password(handle 119 self.client.policy_password(handle) 134 120 135 result = self.client.unseal(self.r 121 result = self.client.unseal(self.root_key, blob, auth, handle) 136 except ProtocolError as e: !! 122 except ProtocolError, e: 137 rc = e.rc 123 rc = e.rc 138 self.client.flush_context(handle) 124 self.client.flush_context(handle) 139 except: 125 except: 140 self.client.flush_context(handle) 126 self.client.flush_context(handle) 141 raise 127 raise 142 128 143 self.assertEqual(rc, tpm2.TPM2_RC_POLI 129 self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL) 144 130 145 def test_seal_with_too_long_auth(self): 131 def test_seal_with_too_long_auth(self): 146 ds = tpm2.get_digest_size(tpm2.TPM2_AL 132 ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1) 147 data = ('X' * 64).encode() !! 133 data = 'X' * 64 148 auth = ('A' * (ds + 1)).encode() !! 134 auth = 'A' * (ds + 1) 149 135 150 rc = 0 136 rc = 0 151 try: 137 try: 152 blob = self.client.seal(self.root_ 138 blob = self.client.seal(self.root_key, data, auth, None) 153 except ProtocolError as e: !! 139 except ProtocolError, e: 154 rc = e.rc 140 rc = e.rc 155 141 156 self.assertEqual(rc, tpm2.TPM2_RC_SIZE 142 self.assertEqual(rc, tpm2.TPM2_RC_SIZE) 157 143 158 def test_too_short_cmd(self): 144 def test_too_short_cmd(self): 159 rejected = False 145 rejected = False 160 try: 146 try: 161 fmt = '>HIII' 147 fmt = '>HIII' 162 cmd = struct.pack(fmt, 148 cmd = struct.pack(fmt, 163 tpm2.TPM2_ST_NO_ 149 tpm2.TPM2_ST_NO_SESSIONS, 164 struct.calcsize( 150 struct.calcsize(fmt) + 1, 165 tpm2.TPM2_CC_FLU 151 tpm2.TPM2_CC_FLUSH_CONTEXT, 166 0xDEADBEEF) 152 0xDEADBEEF) 167 153 168 self.client.send_cmd(cmd) 154 self.client.send_cmd(cmd) 169 except IOError as e: !! 155 except IOError, e: 170 rejected = True 156 rejected = True 171 except: 157 except: 172 pass 158 pass 173 self.assertEqual(rejected, True) 159 self.assertEqual(rejected, True) 174 160 175 def test_read_partial_resp(self): 161 def test_read_partial_resp(self): 176 try: 162 try: 177 fmt = '>HIIH' 163 fmt = '>HIIH' 178 cmd = struct.pack(fmt, 164 cmd = struct.pack(fmt, 179 tpm2.TPM2_ST_NO_ 165 tpm2.TPM2_ST_NO_SESSIONS, 180 struct.calcsize( 166 struct.calcsize(fmt), 181 tpm2.TPM2_CC_GET 167 tpm2.TPM2_CC_GET_RANDOM, 182 0x20) 168 0x20) 183 self.client.tpm.write(cmd) 169 self.client.tpm.write(cmd) 184 hdr = self.client.tpm.read(10) 170 hdr = self.client.tpm.read(10) 185 sz = struct.unpack('>I', hdr[2:6]) 171 sz = struct.unpack('>I', hdr[2:6])[0] 186 rsp = self.client.tpm.read() 172 rsp = self.client.tpm.read() 187 except: 173 except: 188 pass 174 pass 189 self.assertEqual(sz, 10 + 2 + 32) 175 self.assertEqual(sz, 10 + 2 + 32) 190 self.assertEqual(len(rsp), 2 + 32) 176 self.assertEqual(len(rsp), 2 + 32) 191 177 192 def test_read_partial_overwrite(self): 178 def test_read_partial_overwrite(self): 193 try: 179 try: 194 fmt = '>HIIH' 180 fmt = '>HIIH' 195 cmd = struct.pack(fmt, 181 cmd = struct.pack(fmt, 196 tpm2.TPM2_ST_NO_ 182 tpm2.TPM2_ST_NO_SESSIONS, 197 struct.calcsize( 183 struct.calcsize(fmt), 198 tpm2.TPM2_CC_GET 184 tpm2.TPM2_CC_GET_RANDOM, 199 0x20) 185 0x20) 200 self.client.tpm.write(cmd) 186 self.client.tpm.write(cmd) 201 # Read part of the respone 187 # Read part of the respone 202 rsp1 = self.client.tpm.read(15) 188 rsp1 = self.client.tpm.read(15) 203 189 204 # Send a new cmd 190 # Send a new cmd 205 self.client.tpm.write(cmd) 191 self.client.tpm.write(cmd) 206 192 207 # Read the whole respone 193 # Read the whole respone 208 rsp2 = self.client.tpm.read() 194 rsp2 = self.client.tpm.read() 209 except: 195 except: 210 pass 196 pass 211 self.assertEqual(len(rsp1), 15) 197 self.assertEqual(len(rsp1), 15) 212 self.assertEqual(len(rsp2), 10 + 2 + 3 198 self.assertEqual(len(rsp2), 10 + 2 + 32) 213 199 214 def test_send_two_cmds(self): 200 def test_send_two_cmds(self): 215 rejected = False 201 rejected = False 216 try: 202 try: 217 fmt = '>HIIH' 203 fmt = '>HIIH' 218 cmd = struct.pack(fmt, 204 cmd = struct.pack(fmt, 219 tpm2.TPM2_ST_NO_ 205 tpm2.TPM2_ST_NO_SESSIONS, 220 struct.calcsize( 206 struct.calcsize(fmt), 221 tpm2.TPM2_CC_GET 207 tpm2.TPM2_CC_GET_RANDOM, 222 0x20) 208 0x20) 223 self.client.tpm.write(cmd) 209 self.client.tpm.write(cmd) 224 210 225 # expect the second one to raise - 211 # expect the second one to raise -EBUSY error 226 self.client.tpm.write(cmd) 212 self.client.tpm.write(cmd) 227 rsp = self.client.tpm.read() 213 rsp = self.client.tpm.read() 228 214 229 except IOError as e: !! 215 except IOError, e: 230 # read the response 216 # read the response 231 rsp = self.client.tpm.read() 217 rsp = self.client.tpm.read() 232 rejected = True 218 rejected = True 233 pass 219 pass 234 except: 220 except: 235 pass 221 pass 236 self.assertEqual(rejected, True) 222 self.assertEqual(rejected, True) 237 223 238 class SpaceTest(unittest.TestCase): 224 class SpaceTest(unittest.TestCase): 239 def setUp(self): 225 def setUp(self): 240 logging.basicConfig(filename='SpaceTes 226 logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG) 241 227 242 def test_make_two_spaces(self): 228 def test_make_two_spaces(self): 243 log = logging.getLogger(__name__) 229 log = logging.getLogger(__name__) 244 log.debug("test_make_two_spaces") 230 log.debug("test_make_two_spaces") 245 231 246 space1 = tpm2.Client(tpm2.Client.FLAG_ 232 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 247 root1 = space1.create_root_key() 233 root1 = space1.create_root_key() 248 space2 = tpm2.Client(tpm2.Client.FLAG_ 234 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE) 249 root2 = space2.create_root_key() 235 root2 = space2.create_root_key() 250 root3 = space2.create_root_key() 236 root3 = space2.create_root_key() 251 237 252 log.debug("%08x" % (root1)) 238 log.debug("%08x" % (root1)) 253 log.debug("%08x" % (root2)) 239 log.debug("%08x" % (root2)) 254 log.debug("%08x" % (root3)) 240 log.debug("%08x" % (root3)) 255 241 256 def test_flush_context(self): 242 def test_flush_context(self): 257 log = logging.getLogger(__name__) 243 log = logging.getLogger(__name__) 258 log.debug("test_flush_context") 244 log.debug("test_flush_context") 259 245 260 space1 = tpm2.Client(tpm2.Client.FLAG_ 246 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 261 root1 = space1.create_root_key() 247 root1 = space1.create_root_key() 262 log.debug("%08x" % (root1)) 248 log.debug("%08x" % (root1)) 263 249 264 space1.flush_context(root1) 250 space1.flush_context(root1) 265 251 266 def test_get_handles(self): 252 def test_get_handles(self): 267 log = logging.getLogger(__name__) 253 log = logging.getLogger(__name__) 268 log.debug("test_get_handles") 254 log.debug("test_get_handles") 269 255 270 space1 = tpm2.Client(tpm2.Client.FLAG_ 256 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 271 space1.create_root_key() 257 space1.create_root_key() 272 space2 = tpm2.Client(tpm2.Client.FLAG_ 258 space2 = tpm2.Client(tpm2.Client.FLAG_SPACE) 273 space2.create_root_key() 259 space2.create_root_key() 274 space2.create_root_key() 260 space2.create_root_key() 275 261 276 handles = space2.get_cap(tpm2.TPM2_CAP 262 handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT) 277 263 278 self.assertEqual(len(handles), 2) 264 self.assertEqual(len(handles), 2) 279 265 280 log.debug("%08x" % (handles[0])) 266 log.debug("%08x" % (handles[0])) 281 log.debug("%08x" % (handles[1])) 267 log.debug("%08x" % (handles[1])) 282 268 283 def test_invalid_cc(self): 269 def test_invalid_cc(self): 284 log = logging.getLogger(__name__) 270 log = logging.getLogger(__name__) 285 log.debug(sys._getframe().f_code.co_na 271 log.debug(sys._getframe().f_code.co_name) 286 272 287 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 273 TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1 288 274 289 space1 = tpm2.Client(tpm2.Client.FLAG_ 275 space1 = tpm2.Client(tpm2.Client.FLAG_SPACE) 290 root1 = space1.create_root_key() 276 root1 = space1.create_root_key() 291 log.debug("%08x" % (root1)) 277 log.debug("%08x" % (root1)) 292 278 293 fmt = '>HII' 279 fmt = '>HII' 294 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO 280 cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt), 295 TPM2_CC_INVALID) 281 TPM2_CC_INVALID) 296 282 297 rc = 0 283 rc = 0 298 try: 284 try: 299 space1.send_cmd(cmd) 285 space1.send_cmd(cmd) 300 except ProtocolError as e: !! 286 except ProtocolError, e: 301 rc = e.rc 287 rc = e.rc 302 288 303 self.assertEqual(rc, tpm2.TPM2_RC_COMM 289 self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE | 304 tpm2.TSS2_RESMGR_TPM_ 290 tpm2.TSS2_RESMGR_TPM_RC_LAYER) 305 291 306 class AsyncTest(unittest.TestCase): 292 class AsyncTest(unittest.TestCase): 307 def setUp(self): 293 def setUp(self): 308 logging.basicConfig(filename='AsyncTes 294 logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG) 309 295 310 def test_async(self): 296 def test_async(self): 311 log = logging.getLogger(__name__) 297 log = logging.getLogger(__name__) 312 log.debug(sys._getframe().f_code.co_na 298 log.debug(sys._getframe().f_code.co_name) 313 299 314 async_client = tpm2.Client(tpm2.Client 300 async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK) 315 log.debug("Calling get_cap in a NON_BL 301 log.debug("Calling get_cap in a NON_BLOCKING mode") 316 async_client.get_cap(tpm2.TPM2_CAP_HAN 302 async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION) 317 async_client.close() << 318 << 319 def test_flush_invalid_context(self): << 320 log = logging.getLogger(__name__) << 321 log.debug(sys._getframe().f_code.co_na << 322 << 323 async_client = tpm2.Client(tpm2.Client << 324 log.debug("Calling flush_context passi << 325 handle = 0x80123456 << 326 rc = 0 << 327 try: << 328 async_client.flush_context(handle) << 329 except OSError as e: << 330 rc = e.errno << 331 << 332 self.assertEqual(rc, 22) << 333 async_client.close() 303 async_client.close()
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.