Bundle verification example
The code below will showcase how to verify an attestation bundle in Python. It is split up to mirror the sections in Attestation bundle verification for readability. This implementation always raises an exception at the first error, but a viable alternative is to report all issues and allow for all the steps to complete. The steps for key validation and CSR linkage have been omitted but can be added once all relevant information about the key has been processed below.
Parsing and marshalling/format functions
While this example uses nShield libraries for cryptography and marshalling/formatting data, the relevant fields can be extracted from the byte strings directly.
The example also uses a simple parse_bytes
function to format the raw base64 strings into ByteBlocks.
def parse_bytes(value):
return nfkm.ByteBlock(base64.urlsafe_b64decode(value), fromraw=1)
Unpacking
The bundle is unpacked from the JSON file.
import json
bundle = open("path/to/bundle").read()
bundle = json.loads(bundle)
Warrant verification (WV1)
As outlined in nShield Warrants, the warrant will be a list containing the name of the root key followed by two or more certificates. The first certificate must be verified under the root key, and each subsequent certificate is verified under the key contained in the payload of the previous. The root key should always be KWARN-1 for nShield warrants. Its public key data is provided in nShield root key and is refered to as KWARN_1.
The code below is simplified for readability, but a more complete implementation might also include:
-
checks to ensure the key and mechanism types are as expected
-
optional support for
FieldUpgradeModuleInformation
, which will also have the fieldsKLF2pub
,KLF2mech
andElectronicSerialNumber
-
sanity checks for any assumptions about the structure of the warrant
import nfddds
import choosealg
import nfkm
warrant = parse_bytes(bundle["warrant"])
warrant = nfddds.decode(warrant)
if str(warrant[0]) != "KWARN-1":
raise Exception("Warrant error: Incorrect root name")
trusted_pubkey = KWARN_1
warrant_esn = None
for cert in warrant[1:]:
# Parse the certificate
payload = cert[nfddds.Symbol("Payload")]
sig = cert[nfddds.Symbol("Signature")]
# Convert to nCore format
rslen = len(sig) // 2
r_part = sig[:rslen]
s_part = sig[rslen:]
sig = nfkm.CipherText(
["ECDSAhSHA512", int.from_bytes(r_part, "big"), int.from_bytes(s_part, "big")]
)
# Verify the signature
plain_text = nfkm.PlainText(["Bytes", payload])
choosealg.verify(trusted_pubkey, sig.mech, plain_text, sig) # raises error if verify failed
# Parse the payload
payload = nfddds.decode(payload)
cert_type = payload[nfddds.Symbol("WarrantCertificateType")]
# Retrieve signing key and mechanism for next certificate or KLF2 in the last certificate
if cert_type == nfddds.Symbol("Delegation"):
trusted_pubkey = payload[nfddds.Symbol("DelegateKey")]
mech = payload[nfddds.Symbol("SigMech")]
elif cert_type == nfddds.Symbol("ModuleInformation"):
trusted_pubkey = payload[nfddds.Symbol("KLF2pub")]
mech = payload[nfddds.Symbol("KLF2mech")]
warrant_esn = payload[nfddds.Symbol("ElectronicSerialNumber")]
else:
raise Exception("Warrant error: unrecognized certificate type")
# Convert the key to nCore format
trusted_pubkey = nfkm.KeyData(
["ECDSAPublic", "NISTP521", 0, trusted_pubkey[3][0], trusted_pubkey[3][1]]
)
# If it was a module information certificate then stop; we have no further delegation authority
if warrant_esn is not None:
break
if warrant_esn is None:
raise Exception("Warrant error: No module information certificate found")
klf2 = trusted_pubkey
Module state certificate verification (MSCV1-5)
Verifying the module state certificate now uses the KLF2 obtained during the previous step.
modstatemsg = parse_bytes(bundle["modstatemsg"])
modstatesig = parse_bytes(bundle["modstatesig"])
modstatesig, _ = nfkm.unmarshal(modstatesig, nfkm.CipherText)
plain_text = nfkm.PlainText(["Bytes", modstatemsg])
if modstatesig.mech not in KLF2_MECHS:
raise Exception("Module state certificate error: unexpected KLF2 signature mechanism")
# MSCV1 - verify module state cert
choosealg.verify(klf2, modstatesig.mech, plain_text, modstatesig)
# MSCV2 - extract the attributes we need
modstatemsg, _ = nfkm.unmarshal(modstatemsg, nfkm.ModCertMsg)
esn = None
kml = None
hknso = None
hkms = []
for attrib in modstatemsg.data.state.attribs:
if attrib.tag == "ESN":
esn = attrib.value.esn
elif attrib.tag == "KML":
kml = attrib.value.kmlpub
elif "KMLEx" in nfkm.ModuleAttribTag.words and attrib.tag == "KMLEx":
kml = attrib.value.kmlpub
elif attrib.tag == "KNSO":
hknso = nfkm.KeyHashEx(["SHA1Hash", attrib.value.hknso])
elif "KNSOEx" in nfkm.ModuleAttribTag.words and attrib.tag == "KNSOEx":
hknso = attrib.value.hknso
elif attrib.tag == "KMList":
hkms = [nfkm.KeyHashEx(["SHA1Hash", km.hk]) for km in attrib.value.hkms]
elif "ModKeyInfoEx" in nfkm.ModuleAttribTag.words and attrib.tag == "ModKeyInfoEx":
hkms = [mki.hk for mki in attrib.value.kms]
# MSCV2 - ESN and KML must be present
if not esn:
raise Exception("Module state certificate error: No module serial number found")
if not kml:
raise Exception("Module state certificate error: no KML public key found")
# MSCV3 - ESN must match that from warrant
if esn != warrant_esn:
raise Exception("Module state certificate error: ESN does not match warrant")
# MSCV4 - If KNSO is present, verify it
if "knsopub" in bundle:
if not hknso:
raise Exception("Module state certificate error: no HKNSO found")
knsopub, _ = nfkm.unmarshal(parse_bytes(bundle["knsopub"]), nfkm.KeyData)
hknso_bundle = choosealg.calchash(knsopub, nfkm.KeyHashMech("SHA1Hash"))
if hknso_bundle != hknso:
raise Exception("Module state certificate error: KNSO inconsistent with HKNSO")
# MSCV5 - If KM is present check it matches the module state cert
if "hkm" in bundle:
hkm, _ = nfkm.unmarshal(parse_bytes(bundle["hkm"]), nfkm.KeyHashEx)
if not hkm in hkms:
raise Exception("Module state certificate error: KM not found")
World binding certificate verification (WBCV1-5)
As specified in Attestation bundle verification, the certificates must be constructed in a raw bytes format which given a known certname and ciphersuite, then verified against the signatures Cert*.
At the end of this process, the keys in hashes
can be trusted.
CERTIFICATES = {
"CertKMaKMCbKNSO": ["Module keys", "hknso", "hkm", "hkmc"],
"CertKMaKMCaKFIPSbKNSO": ["Module setup, FIPS3", "hknso", "hkm", "hkmc", "hkfips"],
"CertKREaKRAbKNSO": ["Card Recovery", "hknso", "hkre", "hkra"],
}
CERTTAIL = {"DLf1024s160mDES3": None, "DLf1024s160mRijndael": "KM type Rijndael"}
KEYS = ["hkre", "hkra", "hkfips", "hkmc", "hkm"]
hashes = {"hknso": hknso}
for key in KEYS:
if key in bundle:
hashes[key], _ = nfkm.unmarshal(parse_bytes(bundle[key]), nfkm.KeyHashEx)
# WBCV1-3 - Check any security world certificates supplied
for certname in CERTIFICATES:
if certname not in bundle:
continue
if "knsopub" not in bundle:
raise Exception("World binding certificate error: no KNSO found in bundle")
if "ciphersuite" not in bundle:
raise Exception("World binding certificate error: no ciphersuite found in bundle")
ciphersuite = bundle["ciphersuite"]
template = CERTIFICATES[certname]
cert = b""
for element in template:
if element == "Module keys" or element = "Module setup, FIPS3":
certtail = CERTTAIL.get(ciphersuite, f"suite = {ciphersuite}")
if certtail is None:
value = element + "\0"
else:
if element == "Module keys":
value = element + "; " + certtail + "\0"
else:
value = element + ": " + certtail + "\0"
value = bytes(value, "ASCII")
elif element[0] == "h":
value = hashes[element].data.hash
else:
value = bytes(element + "\0", "ASCII")
cert += value
plain_text = nfkm.PlainText(["Bytes", nfkm.ByteBlock(cert, fromraw=1)])
sig, _ = nfkm.unmarshal(certbytes, nfkm.CipherText)
choosealg.verify(pubkey, sig.mech, plain_text, sig)
# WBCV4-5 - Remove keys without a suitable certification chain
uncertified = set()
if "CertKREaKRAbKNSO" not in bundle:
uncertified.add("hkre")
uncertified.add("hkra")
if "CertKMaKMCaKFIPSbKNSO" not in bundle:
uncertified.add("hkfips")
if "CertKMaKMCaKFIPSbKNSO" not in bundle and "CertKMaKMCbKNSO" not in bundle:
uncertified.add("hkm")
uncertified.add("hkmc")
for key in uncertified:
if key in hashes:
del hashes[key]
Key generation certificate verification (KGCV1-2)
KML_MECHS = set(["ECDSAhSHA512", "DSAhSHA256"])
# KGCV1 - Verify the key generation certificate
kcmsg = parse_bytes(bundle["kcmsg"])
kcsig = parse_bytes(bundle["kcsig"])
kcsig, _ = nfkm.unmarshal(kcsig, nfkm.CipherText)
plain_text = nfkm.PlainText(["Bytes", kcmsg])
if kcsig.mech not in KML_MECHS:
raise Exception(f"Key generation certificate error: unexpected KML signature mechanism")
choosealg.verify(kml, kcsig.mech, plain_text, kcsig)
kcmsg, _ = nfkm.unmarshal(kcmsg, nfkm.ModCertMsg)
# KGCV2 - Verify that the key generation certificate references the right key
pubkeydata = parse_bytes(bundle["pubkeydata"])
pubkeydata, _ = nfkm.unmarshal(pubkeydata, nfkm.KeyData)
pubkeyhash = choosealg.calchash(pubkeydata, nfkm.KeyHashMech("SHA1Hash")).data.hash
kcmsghash = kcmsg.data.hka
if pubkeyhash != kcmsghash:
raise Exception("Key generation certificate error: key hash mismatch")
ACL validation (ACLV1,3-5)
The code below is split for readability.
acl = kcmsg.data.acl
protection = "unknown"
recovery = False
permissions = set()
for perm_group in acl:
# ACLV1 - If any of the certifier fields matches hknso, mark the key as recoverable
# and skip 'main use group' checks for this group
if is_trump_ops_group(perm_group):
recovery = True
continue
for action in perm_group.actions:
if action.type == "OpPermissions":
# ACLV3 - Require allowed permissions only
permissions = oppermissions(action, permissions)
elif action.type == "MakeBlob":
protection = makeblob(action)
elif action.type == "MakeArchiveBlob":
makearchiveblob(action)
recovery = True
elif action.type in ['DeriveKey', 'DeriveKeyEx'] and action.details.mech == 'PublicFromPrivate':
# DeriveMech_PublicFromPrivate is harmless
pass
else:
# ACLV4 - disallow any actions other than those above
raise Exception("ACL error: unacceptable permitted action")
Trump ops group
def is_trump_ops_group(perm_group):
if "hknso" in hashes:
if ((perm_group.flags & "certifier_present"
and hashes["hknso"].mech == "SHA1Hash"
and perm_group.certifier == hashes["hknso"].data.hash)
or
(perm_group.flags & "certmech_present"
and hashes["hknso"].mech == "SHA1Hash"
and perm_group.certmech.hash == hashes["hknso"].data.hash)
or
('certmechex_present' in nfkm.PermissionGroup_flags.words
and perm_group.flags & 'certmechex_present'
and hashes["hknso"] == perm_group.certmechex.hash)):
return True
return False
Main use actions
Act_OpPermissions
The permissions are saved as direct categories (sign, verify, encrypt, decrypt, export and unwrap) rather than nCore permission types.
OPPERMISSIONS_MAP = {
"UseAsCertificate": ["sign"],
"ExportAsPlain": ["export"],
"ExpandACL": ["export"],
"Encrypt": ["encrypt"],
"Decrypt": ["decrypt"],
"Verify": ["verify"],
"UseAsBlobKey": ["unwrap"],
"UseAsKM": ["unwrap", "sign"],
"Sign": ["sign"],
"UseAsLoaderKey": ["decrypt"],
"SignModuleCert": ["sign"],
}
# 'good' permissions can be limited further according to need
good_oppermissions = nfkm.Act_OpPermissions_Details_perms(
["DuplicateHandle", "GetAppData", "ReduceACL", "Sign", "GetACL",
"Decrypt", "UseAsCertificate", "UseAsBlobKey", "SignModuleCert"]
)
def oppermissions(action, permissions):
bad_oppermissions = action.details.perms & ~good_oppermissions
if bad_oppermissions != 0:
raise Exception("ACL error: unacceptable permissions: {bad_oppermissions}")
# Record the permissions we've seen
for perm in action.details.perms.words:
if action.details.perms & perm:
for permission in OPPERMISSIONS_MAP.get(perm, []):
permissions.add(permission)
return permissions
Act_MakeBlob (WB1-3,5-7)
This action determines the key’s protection type.
Module protected keys are expected to contain flags AllowNonKM0|AllowKmOnly|kmhash_present
.
Token protected keys are expected to contain flags AllowNonKM0|kmhash_present|kthash_present
def makeblob(action):
protection = {
"module": False,
"softcard": False,
"cardset": False
}
if (not (action.details.flags & "AllowKmOnly") and
not (action.details.flags & "kthash_present")):
# WB1 - key must be either module or token protected, not both or neither
raise Exception("ACL error: makeblob action permits incoherent protection")
elif not (action.details.flags & "kmhash_present"):
# WB2 - kmhash must be present
raise Exception("ACL error: makeblob action specifies no module key")
elif hashes["hkm"].mech != "SHA1Hash" or action.details.kmhash != hashes["hkm"].data.hash:
# WB2 - kmhash must match hkm
raise Exception("ACL error: makeblob action permits inappropriate module key")
elif action.details.flags & "AllowNullKmToken":
# WB3 - AllowNullKMToken is not allowed
raise Exception("ACL error: makeblob action permits null module key")
# Get the protection type
if action.details.flags & "AllowKmOnly":
# WB5 - AllowKmOnly indicates a module-protected key
protection["module"] = True
elif action.details.flags & "kthash_present":
# WB6 - If kthash is present, ktparams must be present
if action.details.flags & "ktparams_present":
# WB7 - AllowSoftSlots indicates a softcard-protected key
if action.details.ktparams.flags & "AllowSoftSlots":
protection["softcard"] = True
else:
protection["cardset"] = True
else:
raise Exception("ACL error: makeblob action permits any token type")
# ACLV5 - Get protection type, using weakest if more than one is present
if protection["module"]:
prot = "module"
elif protection["softcard"]:
prot = "softcard"
elif protection["cardset"]:
prot = "cardset"
return prot
Act_MakeArchiveBlob (RB1-3,5)
This allows for the creation of a recovery blob.
The flag kahash_present
is required and the mechanism must be correct for the ciphersuite.
RECOVERY_MECH = {
"DLf1024s160mDES3": "RSApPKCS1",
"DLf1024s160mRijndael": "BlobCryptv2kRSAeRijndaelCBC0hSHA512mSHA512HMAC",
"DLf3072s256mRijndael": "BlobCryptv2kRSAeRijndaelCBC0hSHA512mSHA512HMAC",
"DLf3072s256mAEScSP800131Ar1": "BlobCryptv3kRSAOAEPeAESCBC0dCTRCMACmSHA512HMAC",
}
def makearchiveblob(action):
if "hkre" not in hashes:
# RB1 - hkre must be present and trusted
raise Exception("ACL error: makearchiveblob but no hkre in bundle")
elif not (action.details.flags & "kahash_present"):
# RB2 - kahash must be present
raise Exception("ACL error: makearchiveblob action specifies no blobbing key")
elif hashes["hkre"].mech != "SHA1Hash" or action.details.kahash != hashes["hkre"].data.hash:
# RB2 - kahash must match hkre
raise Exception("ACL error: makearchiveblob action specifies unsuitable blobbing key")
elif action.details.mech != RECOVERY_MECH[ciphersuite]:
# RB3 - recovery mech must match ciphersuite
raise Exception("ACL error: makearchiveblob action specifies unsuitable blobbing mechanism")