Trust Block Verification

This guide explains how to verify the cryptographic signatures on Trust Blocks retrieved from the EA Consent Delta Share.

Table of Contents

  1. Trust Block Verification
    1. Why Verify Trust Blocks?
    2. Verification Workflow Overview
    3. Step 1: Retrieve the Trust Block
    4. Step 2: Extract Key ID from JWT Header
    5. Step 3: Query for the Verification Key
    6. Step 4: Decode the Key Material
    7. Step 5: Verify the JWT Signature
    8. Handling Key Rotation
      1. Key States
      2. Multiple Keys per Issuer
      3. Detecting New or Revoked Keys
    9. Handling Verification Failures
      1. Common Failure Scenarios
      2. Verification Failure Response
    10. Complete Verification Example
    11. Security Considerations
    12. Next Steps

Why Verify Trust Blocks?

Trust Blocks contain signed EA Consent Verifiable Credentials. Verification ensures:

  1. Integrity - The consent record has not been tampered with
  2. Authenticity - The consent was issued by a trusted issuer
  3. Non-repudiation - The issuer cannot deny issuing the consent

Important: Always verify Trust Block signatures before enforcing consent decisions.


Verification Workflow Overview

1. Retrieve Trust Block from ea-consent-tb table
         │
         ▼
2. Decode JWT header to extract 'kid' and 'iss' (issuer)
         │
         ▼
3. Query ea-consent-verification-keys for the active public key
         │
         ▼
4. Decode base64-encoded key material
         │
         ▼
5. Verify JWT signature using the public key
         │
         ▼
6. If valid → enforce consent
   If invalid → reject/alert

Step 1: Retrieve the Trust Block

First, retrieve the Trust Block from the ea-consent-tb table:

import delta_sharing

profile_file = "/path/to/profile.json"
vc_url = f"{profile_file}#ea-consent-share.ea-consent-schema.ea-consent-tb"

df_vc = delta_sharing.load_as_pandas(vc_url)

# Get a specific Trust Block
consent = df_vc[
    (df_vc['consent_issuer'] == 'https://issuer.example.org') &
    (df_vc['consent_id'] == '85389cfb-75c5-434e-b2e6-651fc75a6ae5')
].iloc[0]

trust_block = consent['trust_block']
format_type = consent['trust_block_format_type']

print(f"Format: {format_type}")
print(f"Trust Block: {trust_block[:80]}...")

The trust_block_format_type will typically be COMPACT_JWT, indicating a standard JWT format.


Step 2: Extract Key ID from JWT Header

Decode the JWT header (without verification) to extract the key identifier:

import base64
import json

def decode_jwt_header(jwt_string):
    """Decode JWT header to extract kid and issuer."""
    # Split JWT into parts
    parts = jwt_string.split('.')
    if len(parts) != 3:
        raise ValueError("Invalid JWT format")

    # Decode header (first part)
    header_b64 = parts[0]
    # Add padding if needed
    padding = 4 - len(header_b64) % 4
    if padding != 4:
        header_b64 += '=' * padding

    header_json = base64.urlsafe_b64decode(header_b64)
    header = json.loads(header_json)

    return header

# Extract header info
header = decode_jwt_header(trust_block)
kid = header.get('kid')  # Key ID
alg = header.get('alg')  # Algorithm (e.g., ES256, RS256)

print(f"Key ID (kid): {kid}")
print(f"Algorithm: {alg}")

Note: The issuer for key lookup comes from the JWT payload (iss claim), not the header. For Trust Blocks, use the consent_issuer field from the database which matches this value.


Step 3: Query for the Verification Key

Query ea-consent-verification-keys to get the active public key:

SELECT *
FROM ea_consent_verification_keys
WHERE issuer = '<issuer>'
  AND kid = '<kid>'
  AND revocation_ts IS NULL
ORDER BY ingestion_ts DESC
LIMIT 1;

Python:

keys_url = f"{profile_file}#ea-consent-share.ea-consent-schema.ea-consent-verification-keys"
df_keys = delta_sharing.load_as_pandas(keys_url)

# Find the active key for this issuer and kid
issuer = consent['consent_issuer']

active_key = df_keys[
    (df_keys['issuer'] == issuer) &
    (df_keys['kid'] == kid) &
    (df_keys['revocation_ts'].isna())
].sort_values('ingestion_ts', ascending=False).iloc[0]

print(f"Found key:")
print(f"  Issuer: {active_key['issuer']}")
print(f"  Key ID: {active_key['kid']}")
print(f"  Algorithm: {active_key['alg']}")
print(f"  Key Type: {active_key['kty']}")
print(f"  Format: {active_key['format_type']}")

Step 4: Decode the Key Material

Keys are stored as base64-encoded JWK JSON. Decode to get the public key:

import base64
import json

def decode_verification_key(key_row):
    """Decode base64-encoded key material to JWK dict."""
    format_type = key_row['format_type']
    value_b64 = key_row['value']

    if format_type == 'BASE64_JWK':
        # Decode base64 to get JWK JSON
        jwk_json = base64.b64decode(value_b64)
        jwk = json.loads(jwk_json)
        return jwk
    elif format_type == 'BASE64_PEM':
        # Decode base64 to get PEM string
        pem = base64.b64decode(value_b64).decode('utf-8')
        return pem
    else:
        raise ValueError(f"Unknown format type: {format_type}")

jwk = decode_verification_key(active_key)
print(f"JWK: {json.dumps(jwk, indent=2)}")

Step 5: Verify the JWT Signature

Use a JWT library to verify the signature. Example using PyJWT:

import jwt
from jwt import PyJWK

def verify_trust_block(trust_block, jwk_dict):
    """Verify Trust Block JWT signature using JWK."""
    # Convert JWK dict to PyJWT key object
    key = PyJWK.from_dict(jwk_dict).key

    try:
        # Verify and decode the JWT
        # Note: You may need to specify allowed algorithms
        payload = jwt.decode(
            trust_block,
            key,
            algorithms=['ES256', 'ES384', 'ES512', 'RS256', 'RS384', 'RS512'],
            options={'verify_aud': False}  # Adjust based on your requirements
        )
        return True, payload
    except jwt.InvalidSignatureError:
        return False, "Invalid signature"
    except jwt.ExpiredSignatureError:
        return False, "Token expired"
    except Exception as e:
        return False, str(e)

# Verify the Trust Block
is_valid, result = verify_trust_block(trust_block, jwk)

if is_valid:
    print("✓ Signature verified successfully")
    print(f"Payload: {json.dumps(result, indent=2, default=str)}")
else:
    print(f"✗ Verification failed: {result}")

Install PyJWT with cryptography support:

pip install PyJWT[crypto]

Handling Key Rotation

Issuers may rotate keys over time. The ea-consent-verification-keys table tracks all key versions:

Key States

State Condition Meaning
Active revocation_ts IS NULL Key is valid for verification
Revoked revocation_ts IS NOT NULL Key should not be used

Multiple Keys per Issuer

An issuer may have multiple active keys (during rotation):

# Get all active keys for an issuer
issuer_keys = df_keys[
    (df_keys['issuer'] == issuer) &
    (df_keys['revocation_ts'].isna())
]

print(f"Active keys for {issuer}:")
for _, key in issuer_keys.iterrows():
    print(f"  - {key['kid']} (alg: {key['alg']})")

Detecting New or Revoked Keys

Monitor for key changes using incremental queries:

SELECT *
FROM ea_consent_verification_keys
WHERE ingestion_ts > '<last_check_ts>'
ORDER BY ingestion_ts ASC;

Handling Verification Failures

Common Failure Scenarios

Failure Cause Action
Key not found Key rotated or wrong issuer Check issuer URL matches exactly
Key revoked Key was compromised or retired Do not accept consents signed with this key
Invalid signature Tampering or wrong key Reject the consent, investigate
Algorithm mismatch JWT alg doesn’t match key Ensure alg in JWT header matches key’s alg

Verification Failure Response

def handle_consent(trust_block, consent_id):
    """Process a consent with verification."""
    # Get key
    key = get_active_key(trust_block)
    if key is None:
        log_warning(f"No active key found for consent {consent_id}")
        return False, "key_not_found"

    # Check if key is revoked
    if key['revocation_ts'] is not None:
        log_warning(f"Key {key['kid']} is revoked")
        return False, "key_revoked"

    # Verify signature
    is_valid, result = verify_trust_block(trust_block, decode_key(key))

    if not is_valid:
        log_error(f"Verification failed for {consent_id}: {result}")
        return False, "invalid_signature"

    return True, result

Complete Verification Example

Putting it all together:

import delta_sharing
import base64
import json
import jwt
from jwt import PyJWK

def verify_consent(profile_file, consent_issuer, consent_id):
    """
    Complete workflow to retrieve and verify a consent.

    Returns:
        tuple: (is_valid: bool, payload_or_error: dict|str)
    """
    # 1. Load tables
    vc_url = f"{profile_file}#ea-consent-share.ea-consent-schema.ea-consent-tb"
    keys_url = f"{profile_file}#ea-consent-share.ea-consent-schema.ea-consent-verification-keys"

    df_vc = delta_sharing.load_as_pandas(vc_url)
    df_keys = delta_sharing.load_as_pandas(keys_url)

    # 2. Get the Trust Block
    consent_rows = df_vc[
        (df_vc['consent_issuer'] == consent_issuer) &
        (df_vc['consent_id'] == consent_id)
    ]

    if len(consent_rows) == 0:
        return False, "consent_not_found"

    trust_block = consent_rows.iloc[0]['trust_block']

    # 3. Extract kid from JWT header
    header_b64 = trust_block.split('.')[0]
    padding = 4 - len(header_b64) % 4
    if padding != 4:
        header_b64 += '=' * padding
    header = json.loads(base64.urlsafe_b64decode(header_b64))
    kid = header.get('kid')

    if not kid:
        return False, "missing_kid_in_jwt"

    # 4. Get the active verification key
    active_keys = df_keys[
        (df_keys['issuer'] == consent_issuer) &
        (df_keys['kid'] == kid) &
        (df_keys['revocation_ts'].isna())
    ].sort_values('ingestion_ts', ascending=False)

    if len(active_keys) == 0:
        return False, f"key_not_found: issuer={consent_issuer}, kid={kid}"

    key_row = active_keys.iloc[0]

    # 5. Decode the JWK
    jwk_json = base64.b64decode(key_row['value'])
    jwk = json.loads(jwk_json)

    # 6. Verify the signature
    try:
        key = PyJWK.from_dict(jwk).key
        payload = jwt.decode(
            trust_block,
            key,
            algorithms=[key_row['alg']],
            options={'verify_aud': False}
        )
        return True, payload
    except jwt.InvalidSignatureError:
        return False, "invalid_signature"
    except Exception as e:
        return False, f"verification_error: {str(e)}"


# Usage
profile = "/path/to/profile.json"
is_valid, result = verify_consent(
    profile,
    "https://issuer.example.org",
    "85389cfb-75c5-434e-b2e6-651fc75a6ae5"
)

if is_valid:
    print("✓ Consent verified")
    print(f"Issuer: {result.get('iss')}")
    print(f"Subject: {result.get('sub')}")
else:
    print(f"✗ Verification failed: {result}")

Security Considerations

  1. Always verify before enforcement - Never trust consent data without signature verification
  2. Cache keys carefully - If caching verification keys, implement cache invalidation on key rotation
  3. Check revocation - Always check revocation_ts is NULL before using a key
  4. Log verification failures - Track failed verifications for security monitoring
  5. Use latest key version - When multiple versions exist, use the most recent (ORDER BY ingestion_ts DESC)

Next Steps