Loading vLEI.wiki
Fetching knowledge base...
Fetching knowledge base...
This comprehensive explanation has been generated from 4 GitHub source documents. All source documents are searchable here.
Last updated: September 21, 2025
This content is meant to be consumed by AI agents via MCP. Click here to get the MCP configuration.
Note: In rare cases it may contain LLM hallucinations.
For authoritative documentation, please consult the official GLEIF vLEI trainings and the ToIP Glossary.
The drt event type represents a delegated rotation event in the KERI protocol, enabling key rotation for delegated Autonomic Identifiers (AIDs) through a cooperative cryptographic process between delegator and delegate. The term "deltate" derives from the mathematical delta symbol (Δ), representing change or transformation in the delegated key state.
drt := {
v: version_string,
t: "drt",
d: SAID(event),
i: delegated_AID_prefix,
s: sequence_number,
p: prior_event_digest,
kt: current_key_threshold,
k: [current_signing_keys],
nt: next_key_threshold,
n: [next_key_digests],
bt: witness_threshold,
br: [witness_cuts],
ba: [witness_adds],
a: [anchoring_seals],
di: delegator_AID_prefix
}
v (Version): CESR-encoded version string (e.g., KERI10JSON00011c_)t (Type): Fixed string "drt" identifying delegated rotation eventd (Digest/SAID): Self-Addressing Identifier computed from event contenti (Identifier): 44-character Base64 prefix of the delegated AIDs (Sequence): Monotonically increasing integer sequence numberp (Prior): 44-character digest of immediately prior event in KELkt (Key Threshold): String representation of M-of-N threshold (e.g., "2", "1/2")Problem: Delegate creates rotation event before delegator has anchored it, causing validation failures.
Solution: Implement proper event ordering and escrow mechanisms:
class DelegationEscrow:
def __init__(self):
self.pending_rotations = {}
def escrow_rotation(self, drt_event):
"""Hold rotation until delegator anchoring confirmed"""
self.pending_rotations[drt_event['d']] = {
'event': drt_event,
'timestamp': time.time(),
'status': 'awaiting_anchor'
}
def process_delegator_anchor(self, anchor_event):
"""Process pending rotations when anchor received"""
for seal in anchor_event.get('a', []):
if seal['d'] in self.pending_rotations:
self.release_from_escrow(seal['d'])
Problem: Including the d field in SAID computation creates circular dependency.
Solution: Use proper SAID derivation with field replacement:
def compute_drt_said(event_dict):
"""Compute SAID for drt event with proper field handling"""
# Create copy and replace SAID field with dummy
temp_event = event_dict.copy()
temp_event['d'] = '#' * 44 # Dummy placeholder
# Serialize with consistent field ordering
serialized = json.dumps(temp_event, separators=(',', ':'), sort_keys=True)
# Compute Blake3 digest
digest = blake3.blake3(serialized.encode('utf-8')).digest()
# Return CESR-encoded SAID
return 'E' + base64.urlsafe_b64encode(digest).decode('ascii').rstrip('=')
def batch_validate_drt_events(events, delegator_kel):
"""Optimize validation for multiple drt events"""
# Pre-compute delegator seal index for O(1) lookups
seal_index = build_seal_index(delegator_kel)
# Validate events in dependency order
sorted_events = sorted(events, key=lambda e: int(e['s']))
results = []
for event in sorted_events:
try:
validate_drt_with_index(event, seal_index)
results.append(ValidationResult.VALID)
except ValidationError as e:
results.append(ValidationResult.INVALID(str(e)))
return results
class StreamingKELValidator:
"""Process KEL events without loading entire log into memory"""
def __init__(self, kel_stream):
self.stream = kel_stream
self.last_establishment = None
self.sequence_tracker = 0
def validate_next_drt(self):
"""Validate next drt event in stream"""
event = self.stream.read_next_event()
if event['t'] == 'drt':
self.validate_against_prior(event, self.last_establishment)
self.last_establishment = event
self.sequence_tracker += 1
return event
k (Keys): Ordered array of current signing key identifiersnt (Next Threshold): Threshold for next establishment eventn (Next): Ordered array of cryptographic digests of pre-rotated keysbt (Backer Threshold): Witness signature threshold requirementbr (Backer Removes): Array of witness AIDs to remove from witness poolba (Backer Adds): Array of witness AIDs to add to witness poola (Anchors): Array of seal objects for external data anchoringdi (Delegator Identifier): AID prefix of the delegating authorityDelegated rotation events utilize CESR's qualified cryptographic primitives:
Event Structure (CESR):
┌─────────────────┬──────────────────┬─────────────────┐
│ Count Code │ Event Data │ Signatures │
│ (4 chars) │ (variable) │ (variable) │
└─────────────────┴──────────────────┴─────────────────┘
Signature Attachment:
┌─────────────────┬──────────────────┬─────────────────┐
│ Counter Code │ Indexed Sigs │ Witness Receipts│
│ -AAB (indexed) │ (88 chars each) │ (optional) │
└─────────────────┴──────────────────┴─────────────────┘
drt event with new key materialdef validate_drt_transition(prior_event, drt_event, delegator_kel):
# Verify sequence progression
assert drt_event.s == prior_event.s + 1
# Verify prior event digest
assert drt_event.p == compute_digest(prior_event)
# Verify key threshold satisfaction
assert len(drt_event.k) >= int(drt_event.kt)
# Verify pre-rotation commitment
assert len(drt_event.n) >= int(drt_event.nt)
# Verify delegator anchoring
delegation_seal = find_delegation_seal(delegator_kel, drt_event.d)
assert delegation_seal is not None
return True
Delegated rotation events require multi-party cryptographic validation:
The n field contains Blake3-256 digests of pre-rotated key pairs:
for each next_key in rotation_set:
key_digest = Blake3_256(next_key.public_bytes)
n_array.append(CESR_encode(key_digest))
This provides post-quantum security by cryptographically committing to keys that remain unexposed until the subsequent rotation event.
KERI supports both numeric and fractional thresholds:
"3" requires exactly 3 signatures"2/3" requires 2 signatures from a 3-key setdef create_drt_event(delegated_aid, new_keys, next_keys, delegator_aid,
witness_config=None, anchors=None):
"""
Create delegated rotation event with proper field ordering
"""
prior_event = get_latest_establishment_event(delegated_aid)
event = {
"v": KERI_VERSION,
"t": "drt",
"d": "", # Placeholder for SAID computation
"i": delegated_aid,
"s": prior_event["s"] + 1,
"p": compute_digest(prior_event),
"kt": str(len(new_keys)), # Simplified threshold
"k": [key.qb64 for key in new_keys],
"nt": str(len(next_keys)),
"n": [compute_digest(key.qb64b) for key in next_keys],
"di": delegator_aid
}
# Add witness configuration if provided
if witness_config:
event.update(witness_config)
# Add anchoring seals if provided
if anchors:
event["a"] = anchors
# Compute SAID
event["d"] = compute_said(event)
return event
class DRTValidator:
def __init__(self, delegated_kel, delegator_kel):
self.delegated_kel = delegated_kel
self.delegator_kel = delegator_kel
def validate(self, drt_event):
"""Comprehensive DRT event validation"""
# Phase 1: Structural validation
self._validate_structure(drt_event)
# Phase 2: Sequence validation
self._validate_sequence(drt_event)
# Phase 3: Cryptographic validation
self._validate_signatures(drt_event)
# Phase 4: Delegation validation
self._validate_delegation_authority(drt_event)
# Phase 5: Witness validation
self._validate_witness_consensus(drt_event)
return ValidationResult.VALID
graph TD
A[dip - Delegated Inception] --> B[drt - Delegated Rotation]
B --> C[drt - Subsequent Rotation]
B --> D[ixn - Interaction Event]
E[Delegator ixn] --> B
B --> F[Witness Receipts]
B --> G[Watcher Confirmations]
Delegated rotation events can anchor ACDC credential operations:
{
"a": [
{
"i": "credential_registry_AID",
"s": "0",
"d": "credential_event_digest"
}
]
}
DRT events integrate seamlessly into CESR streams for efficient processing:
CESR Stream Format:
{"v":"KERI10JSON00011c_","t":"drt",...}-AABAA{signatures}
class InsufficientDelegationAuthority(KERIError):
"""Raised when delegator hasn't properly anchored rotation"""
def __init__(self, delegated_aid, delegator_aid):
self.message = f"No delegation seal found in {delegator_aid} for {delegated_aid}"
class ThresholdViolation(KERIError):
"""Raised when signature threshold not satisfied"""
def __init__(self, required, provided):
self.message = f"Required {required} signatures, got {provided}"
def detect_sequence_gaps(kel):
"""Detect missing events in KEL sequence"""
sequences = [event["s"] for event in kel]
expected = list(range(len(kel)))
gaps = set(expected) - set(sequences)
if gaps:
raise SequenceGapError(f"Missing sequences: {gaps}")
When delegator is unavailable for anchoring:
def emergency_rotation(compromised_aid, recovery_keys, delegator_signature):
"""Emergency rotation for compromised delegated AID"""
# Create emergency rotation event
emergency_drt = {
"v": KERI_VERSION,
"t": "drt",
"i": compromised_aid,
"emergency": True, # Special flag
"k": recovery_keys,
"di": delegator_aid
}
# Require delegator signature for emergency rotation
validate_delegator_signature(emergency_drt, delegator_signature)
return emergency_drt
DRT events must comply with:
| KERI Version | DRT Support | Features |
|---|---|---|
| 1.0 | Full | Basic delegation |
| 1.1 | Enhanced | Partial rotation support |
| 2.0 | Extended | Weighted thresholds |
Delegated Identity Service:
components:
- delegate_controller: Manages delegated AIDs
- delegator_interface: Communicates with delegating authority
- witness_network: Provides consensus and availability
- watcher_service: Monitors for duplicity
scaling:
- horizontal: Multiple delegate controllers
- vertical: Increased witness pool size
- geographic: Distributed witness deployment
ALERT_CONDITIONS = {
"delegation_timeout": "No delegator anchoring within 24 hours",
"witness_unavailable": "Witness threshold not achievable",
"sequence_gap": "Missing events detected in KEL",
"duplicity_detected": "Conflicting rotation events found"
}
class SecureDRTSigner:
"""Secure signing for delegated rotation events"""
def __init__(self, hsm_interface):
self.hsm = hsm_interface
self.signing_policy = SigningPolicy()
def sign_drt_event(self, event, key_identifiers):
"""Sign drt event with HSM-protected keys"""
# Verify signing authorization
self.signing_policy.authorize_rotation(event, key_identifiers)
# Generate signatures using HSM
signatures = []
for key_id in key_identifiers:
signature = self.hsm.sign(
key_id=key_id,
data=event['d'].encode('utf-8'),
algorithm='Ed25519'
)
signatures.append(signature)
return signatures
def verify_delegator_authority(drt_event, delegator_kel):
"""Comprehensive delegator authority verification"""
# Verify delegator AID is valid
if not validate_aid_format(drt_event['di']):
raise SecurityError("Invalid delegator AID format")
# Verify delegator KEL integrity
if not verify_kel_integrity(delegator_kel):
raise SecurityError("Delegator KEL integrity compromised")
# Verify delegation seal exists and is properly signed
seal = find_delegation_seal(delegator_kel, drt_event['d'])
if not seal:
raise SecurityError("No delegation seal found")
# Verify seal signature with current delegator keys
current_keys = get_current_keys(delegator_kel)
if not verify_seal_signature(seal, current_keys):
raise SecurityError("Invalid delegation seal signature")
return True
from hypothesis import given, strategies as st
@given(
sequence=st.integers(min_value=1, max_value=1000),
key_count=st.integers(min_value=1, max_value=10),
threshold=st.integers(min_value=1, max_value=10)
)
def test_drt_event_properties(sequence, key_count, threshold):
"""Property-based testing for drt events"""
# Ensure threshold doesn't exceed key count
threshold = min(threshold, key_count)
# Generate test event
drt_event = generate_test_drt(
sequence=sequence,
key_count=key_count,
threshold=threshold
)
# Verify invariants
assert int(drt_event['kt']) <= len(drt_event['k'])
assert int(drt_event['nt']) <= len(drt_event['n'])
assert int(drt_event['s']) == sequence
assert drt_event['t'] == 'drt'
class DRTIntegrationTest:
"""End-to-end testing for delegated rotation"""
def test_full_delegation_lifecycle(self):
"""Test complete delegation from inception to rotation"""
# Setup delegator and delegate
delegator = create_test_aid()
delegate = create_delegated_aid(delegator)
# Perform rotation
new_keys = generate_key_pairs(2)
drt_event = create_drt_event(delegate, new_keys, delegator)
# Verify rotation success
assert validate_drt_event(drt_event)
assert get_current_keys(delegate) == new_keys
# Verify KEL integrity
assert verify_kel_integrity(delegate.kel)