Bundle verification example

The code below will showcase how to verify an attestation bundle in Python. It is split up to mirror the sections in Attestation bundle verification for readability. This implementation always raises an exception at the first error, but a viable alternative is to report all issues and allow for all the steps to complete. The steps for key validation and CSR linkage have been omitted but can be added once all relevant information about the key has been processed below.

Parsing and marshalling/format functions

While this example uses nShield libraries for cryptography and marshalling/formatting data, the relevant fields can be extracted from the byte strings directly.

The example also uses a simple parse_bytes function to format the raw base64 strings into ByteBlocks.

def parse_bytes(value):
  return nfkm.ByteBlock(base64.urlsafe_b64decode(value), fromraw=1)

Unpacking

The bundle is unpacked from the JSON file.

import json

bundle = open("path/to/bundle").read()
bundle = json.loads(bundle)

Warrant verification (WV1)

As outlined in nShield Warrants, the warrant will be a list containing the name of the root key followed by two or more certificates. The first certificate must be verified under the root key, and each subsequent certificate is verified under the key contained in the payload of the previous. The root key should always be KWARN-1 for nShield warrants. Its public key data is provided in nShield root key and is refered to as KWARN_1.

The code below is simplified for readability, but a more complete implementation might also include:

  • checks to ensure the key and mechanism types are as expected

  • optional support for FieldUpgradeModuleInformation, which will also have the fields KLF2pub, KLF2mech and ElectronicSerialNumber

  • sanity checks for any assumptions about the structure of the warrant

import nfddds
import choosealg
import nfkm

warrant = parse_bytes(bundle["warrant"])
warrant = nfddds.decode(warrant)

if str(warrant[0]) != "KWARN-1":
  raise Exception("Warrant error: Incorrect root name")

trusted_pubkey = KWARN_1
warrant_esn = None
for cert in warrant[1:]:
  # Parse the certificate
  payload = cert[nfddds.Symbol("Payload")]
  sig = cert[nfddds.Symbol("Signature")]

  # Convert to nCore format
  rslen = len(sig) // 2
  r_part = sig[:rslen]
  s_part = sig[rslen:]
  sig = nfkm.CipherText(
    ["ECDSAhSHA512", int.from_bytes(r_part, "big"), int.from_bytes(s_part, "big")]
  )

  # Verify the signature
  plain_text = nfkm.PlainText(["Bytes", payload])
  choosealg.verify(trusted_pubkey, sig.mech, plain_text, sig) # raises error if verify failed

  # Parse the payload
  payload = nfddds.decode(payload)
  cert_type = payload[nfddds.Symbol("WarrantCertificateType")]

  # Retrieve signing key and mechanism for next certificate or KLF2 in the last certificate
  if cert_type == nfddds.Symbol("Delegation"):
    trusted_pubkey = payload[nfddds.Symbol("DelegateKey")]
    mech = payload[nfddds.Symbol("SigMech")]
  elif cert_type == nfddds.Symbol("ModuleInformation"):
    trusted_pubkey = payload[nfddds.Symbol("KLF2pub")]
    mech = payload[nfddds.Symbol("KLF2mech")]
    warrant_esn = payload[nfddds.Symbol("ElectronicSerialNumber")]
  else:
    raise Exception("Warrant error: unrecognized certificate type")

  # Convert the key to nCore format
  trusted_pubkey = nfkm.KeyData(
    ["ECDSAPublic", "NISTP521", 0, trusted_pubkey[3][0], trusted_pubkey[3][1]]
  )

  # If it was a module information certificate then stop; we have no further delegation authority
  if warrant_esn is not None:
    break

if warrant_esn is None:
  raise Exception("Warrant error: No module information certificate found")

klf2 = trusted_pubkey

Module state certificate verification (MSCV1-5)

Verifying the module state certificate now uses the KLF2 obtained during the previous step.

modstatemsg = parse_bytes(bundle["modstatemsg"])
modstatesig = parse_bytes(bundle["modstatesig"])
modstatesig, _ = nfkm.unmarshal(modstatesig, nfkm.CipherText)
plain_text = nfkm.PlainText(["Bytes", modstatemsg])
if modstatesig.mech not in KLF2_MECHS:
  raise Exception("Module state certificate error: unexpected KLF2 signature mechanism")

# MSCV1 - verify module state cert
choosealg.verify(klf2, modstatesig.mech, plain_text, modstatesig)

# MSCV2 - extract the attributes we need
modstatemsg, _ = nfkm.unmarshal(modstatemsg, nfkm.ModCertMsg)
esn = None
kml = None
hknso = None
hkms = []

for attrib in modstatemsg.data.state.attribs:
  if attrib.tag == "ESN":
    esn = attrib.value.esn
  elif attrib.tag == "KML":
    kml = attrib.value.kmlpub
  elif "KMLEx" in nfkm.ModuleAttribTag.words and attrib.tag == "KMLEx":
    kml = attrib.value.kmlpub
  elif attrib.tag == "KNSO":
    hknso = nfkm.KeyHashEx(["SHA1Hash", attrib.value.hknso])
  elif "KNSOEx" in nfkm.ModuleAttribTag.words and attrib.tag == "KNSOEx":
    hknso = attrib.value.hknso
  elif attrib.tag == "KMList":
    hkms = [nfkm.KeyHashEx(["SHA1Hash", km.hk]) for km in attrib.value.hkms]
  elif "ModKeyInfoEx" in nfkm.ModuleAttribTag.words and attrib.tag == "ModKeyInfoEx":
    hkms = [mki.hk for mki in attrib.value.kms]

# MSCV2 - ESN and KML must be present
if not esn:
  raise Exception("Module state certificate error: No module serial number found")
if not kml:
  raise Exception("Module state certificate error: no KML public key found")
# MSCV3 - ESN must match that from warrant
if esn != warrant_esn:
  raise Exception("Module state certificate error: ESN does not match warrant")

# MSCV4 - If KNSO is present, verify it
if "knsopub" in bundle:
  if not hknso:
    raise Exception("Module state certificate error: no HKNSO found")
  knsopub, _ = nfkm.unmarshal(parse_bytes(bundle["knsopub"]), nfkm.KeyData)
  hknso_bundle = choosealg.calchash(knsopub, nfkm.KeyHashMech("SHA1Hash"))
  if hknso_bundle != hknso:
      raise Exception("Module state certificate error: KNSO inconsistent with HKNSO")

# MSCV5 - If KM is present check it matches the module state cert
if "hkm" in bundle:
  hkm, _ = nfkm.unmarshal(parse_bytes(bundle["hkm"]), nfkm.KeyHashEx)
  if not hkm in hkms:
    raise Exception("Module state certificate error: KM not found")

World binding certificate verification (WBCV1-5)

As specified in Attestation bundle verification, the certificates must be constructed in a raw bytes format which given a known certname and ciphersuite, then verified against the signatures Cert*. At the end of this process, the keys in hashes can be trusted.

CERTIFICATES = {
  "CertKMaKMCbKNSO": ["Module keys", "hknso", "hkm", "hkmc"],
  "CertKMaKMCaKFIPSbKNSO": ["Module setup, FIPS3", "hknso", "hkm", "hkmc", "hkfips"],
  "CertKREaKRAbKNSO": ["Card Recovery", "hknso", "hkre", "hkra"],
}
CERTTAIL = {"DLf1024s160mDES3": None, "DLf1024s160mRijndael": "KM type Rijndael"}
KEYS = ["hkre", "hkra", "hkfips", "hkmc", "hkm"]

hashes = {"hknso": hknso}
for key in KEYS:
  if key in bundle:
    hashes[key], _ = nfkm.unmarshal(parse_bytes(bundle[key]), nfkm.KeyHashEx)

# WBCV1-3 - Check any security world certificates supplied
for certname in CERTIFICATES:
  if certname not in bundle:
    continue
  if "knsopub" not in bundle:
    raise Exception("World binding certificate error: no KNSO found in bundle")
  if "ciphersuite" not in bundle:
    raise Exception("World binding certificate error: no ciphersuite found in bundle")

  ciphersuite = bundle["ciphersuite"]
  template = CERTIFICATES[certname]
  cert = b""
  for element in template:
    if element == "Module keys" or element = "Module setup, FIPS3":
      certtail = CERTTAIL.get(ciphersuite, f"suite = {ciphersuite}")
      if certtail is None:
        value = element + "\0"
      else:
        if element == "Module keys":
          value = element + "; " + certtail + "\0"
        else:
          value = element + ": " + certtail + "\0"
      value = bytes(value, "ASCII")
    elif element[0] == "h":
      value = hashes[element].data.hash
    else:
      value = bytes(element + "\0", "ASCII")
    cert += value

  plain_text = nfkm.PlainText(["Bytes", nfkm.ByteBlock(cert, fromraw=1)])
  sig, _ = nfkm.unmarshal(certbytes, nfkm.CipherText)
  choosealg.verify(pubkey, sig.mech, plain_text, sig)

# WBCV4-5 - Remove keys without a suitable certification chain
uncertified = set()
if "CertKREaKRAbKNSO" not in bundle:
  uncertified.add("hkre")
  uncertified.add("hkra")
if "CertKMaKMCaKFIPSbKNSO" not in bundle:
  uncertified.add("hkfips")
if "CertKMaKMCaKFIPSbKNSO" not in bundle and "CertKMaKMCbKNSO" not in bundle:
  uncertified.add("hkm")
  uncertified.add("hkmc")

for key in uncertified:
  if key in hashes:
    del hashes[key]

Key generation certificate verification (KGCV1-2)

KML_MECHS = set(["ECDSAhSHA512", "DSAhSHA256"])

# KGCV1 - Verify the key generation certificate
kcmsg = parse_bytes(bundle["kcmsg"])
kcsig = parse_bytes(bundle["kcsig"])
kcsig, _ = nfkm.unmarshal(kcsig, nfkm.CipherText)
plain_text = nfkm.PlainText(["Bytes", kcmsg])
if kcsig.mech not in KML_MECHS:
  raise Exception(f"Key generation certificate error: unexpected KML signature mechanism")
choosealg.verify(kml, kcsig.mech, plain_text, kcsig)
kcmsg, _ = nfkm.unmarshal(kcmsg, nfkm.ModCertMsg)

# KGCV2 - Verify that the key generation certificate references the right key
pubkeydata = parse_bytes(bundle["pubkeydata"])
pubkeydata, _ = nfkm.unmarshal(pubkeydata, nfkm.KeyData)
pubkeyhash = choosealg.calchash(pubkeydata, nfkm.KeyHashMech("SHA1Hash")).data.hash
kcmsghash = kcmsg.data.hka
if pubkeyhash != kcmsghash:
  raise Exception("Key generation certificate error: key hash mismatch")

ACL validation (ACLV1,3-5)

The code below is split for readability.

acl = kcmsg.data.acl

protection = "unknown"
recovery = False
permissions = set()

for perm_group in acl:
  # ACLV1 - If any of the certifier fields matches hknso, mark the key as recoverable
  # and skip 'main use group' checks for this group
  if is_trump_ops_group(perm_group):
    recovery = True
    continue

  for action in perm_group.actions:
    if action.type == "OpPermissions":
      # ACLV3 - Require allowed permissions only
      permissions = oppermissions(action, permissions)
    elif action.type == "MakeBlob":
      protection = makeblob(action)
    elif action.type == "MakeArchiveBlob":
      makearchiveblob(action)
      recovery = True
    elif action.type in ['DeriveKey', 'DeriveKeyEx'] and action.details.mech == 'PublicFromPrivate':
      # DeriveMech_PublicFromPrivate is harmless
      pass
    else:
      # ACLV4 - disallow any actions other than those above
      raise Exception("ACL error: unacceptable permitted action")

Trump ops group

def is_trump_ops_group(perm_group):
  if "hknso" in hashes:
    if ((perm_group.flags & "certifier_present"
        and hashes["hknso"].mech == "SHA1Hash"
        and perm_group.certifier == hashes["hknso"].data.hash)
        or
        (perm_group.flags & "certmech_present"
        and hashes["hknso"].mech == "SHA1Hash"
        and perm_group.certmech.hash == hashes["hknso"].data.hash)
        or
        ('certmechex_present' in nfkm.PermissionGroup_flags.words
        and perm_group.flags & 'certmechex_present'
        and hashes["hknso"] == perm_group.certmechex.hash)):
      return True
  return False

Main use actions

Act_OpPermissions

The permissions are saved as direct categories (sign, verify, encrypt, decrypt, export and unwrap) rather than nCore permission types.

OPPERMISSIONS_MAP = {
  "UseAsCertificate": ["sign"],
  "ExportAsPlain": ["export"],
  "ExpandACL": ["export"],
  "Encrypt": ["encrypt"],
  "Decrypt": ["decrypt"],
  "Verify": ["verify"],
  "UseAsBlobKey": ["unwrap"],
  "UseAsKM": ["unwrap", "sign"],
  "Sign": ["sign"],
  "UseAsLoaderKey": ["decrypt"],
  "SignModuleCert": ["sign"],
}
# 'good' permissions can be limited further according to need
good_oppermissions = nfkm.Act_OpPermissions_Details_perms(
  ["DuplicateHandle", "GetAppData", "ReduceACL", "Sign", "GetACL",
   "Decrypt", "UseAsCertificate", "UseAsBlobKey", "SignModuleCert"]
)

def oppermissions(action, permissions):
  bad_oppermissions = action.details.perms & ~good_oppermissions
  if bad_oppermissions != 0:
    raise Exception("ACL error: unacceptable permissions: {bad_oppermissions}")

  # Record the permissions we've seen
  for perm in action.details.perms.words:
    if action.details.perms & perm:
      for permission in OPPERMISSIONS_MAP.get(perm, []):
        permissions.add(permission)

  return permissions

Act_MakeBlob (WB1-3,5-7)

This action determines the key’s protection type. Module protected keys are expected to contain flags AllowNonKM0|AllowKmOnly|kmhash_present. Token protected keys are expected to contain flags AllowNonKM0|kmhash_present|kthash_present

def makeblob(action):
  protection = {
    "module": False,
    "softcard": False,
    "cardset": False
  }

  if (not (action.details.flags & "AllowKmOnly") and
      not (action.details.flags & "kthash_present")):
    # WB1 - key must be either module or token protected, not both or neither
    raise Exception("ACL error: makeblob action permits incoherent protection")
  elif not (action.details.flags & "kmhash_present"):
    # WB2 - kmhash must be present
    raise Exception("ACL error: makeblob action specifies no module key")
  elif hashes["hkm"].mech != "SHA1Hash" or action.details.kmhash != hashes["hkm"].data.hash:
    # WB2 - kmhash must match hkm
    raise Exception("ACL error: makeblob action permits inappropriate module key")
  elif action.details.flags & "AllowNullKmToken":
    # WB3 - AllowNullKMToken is not allowed
    raise Exception("ACL error: makeblob action permits null module key")

  # Get the protection type
  if action.details.flags & "AllowKmOnly":
    # WB5 - AllowKmOnly indicates a module-protected key
    protection["module"] = True
  elif action.details.flags & "kthash_present":
    # WB6 - If kthash is present, ktparams must be present
    if action.details.flags & "ktparams_present":
      # WB7 - AllowSoftSlots indicates a softcard-protected key
      if action.details.ktparams.flags & "AllowSoftSlots":
        protection["softcard"] = True
      else:
        protection["cardset"] = True
    else:
      raise Exception("ACL error: makeblob action permits any token type")

  # ACLV5 - Get protection type, using weakest if more than one is present
  if protection["module"]:
    prot = "module"
  elif protection["softcard"]:
    prot = "softcard"
  elif protection["cardset"]:
    prot = "cardset"

  return prot

Act_MakeArchiveBlob (RB1-3,5)

This allows for the creation of a recovery blob. The flag kahash_present is required and the mechanism must be correct for the ciphersuite.

RECOVERY_MECH = {
  "DLf1024s160mDES3": "RSApPKCS1",
  "DLf1024s160mRijndael": "BlobCryptv2kRSAeRijndaelCBC0hSHA512mSHA512HMAC",
  "DLf3072s256mRijndael": "BlobCryptv2kRSAeRijndaelCBC0hSHA512mSHA512HMAC",
  "DLf3072s256mAEScSP800131Ar1": "BlobCryptv3kRSAOAEPeAESCBC0dCTRCMACmSHA512HMAC",
}

def makearchiveblob(action):
  if "hkre" not in hashes:
    # RB1 - hkre must be present and trusted
    raise Exception("ACL error: makearchiveblob but no hkre in bundle")
  elif not (action.details.flags & "kahash_present"):
    # RB2 - kahash must be present
    raise Exception("ACL error: makearchiveblob action specifies no blobbing key")
  elif hashes["hkre"].mech != "SHA1Hash" or action.details.kahash != hashes["hkre"].data.hash:
    # RB2 - kahash must match hkre
    raise Exception("ACL error: makearchiveblob action specifies unsuitable blobbing key")
  elif action.details.mech != RECOVERY_MECH[ciphersuite]:
    # RB3 - recovery mech must match ciphersuite
    raise Exception("ACL error: makearchiveblob action specifies unsuitable blobbing mechanism")