Loading vLEI.wiki
Fetching knowledge base...
Fetching knowledge base...
This comprehensive explanation has been generated from 4 GitHub source documents. All source documents are searchable here.
Last updated: September 21, 2025
This content is meant to be consumed by AI agents via MCP. Click here to get the MCP configuration.
Note: In rare cases it may contain LLM hallucinations.
For authoritative documentation, please consult the official GLEIF vLEI trainings and the ToIP Glossary.
A specialized KERIA class that pushes events occurring inside the cloud agent to external backend processes and web services, serving as an event distribution mechanism distinct from the general-purpose notifier class.
The interceptor is a specialized event distribution class within the KERIA (KERI Agent in the cloud) framework that implements an observer pattern for propagating internal cloud agent events to external backend systems. Unlike the general-purpose notifier class, the interceptor is specifically architected for inter-service communication and event streaming to external web services and backend processes.
class Interceptor:
"""Event interceptor for pushing cloud agent events to external backends"""
def __init__(self, backends: List[BackendService]):
self.backends = backends
self.event_queue = asyncio.Queue()
self.filters = EventFilterChain()
The interceptor operates on KERI event structures with the following core data format:
{
"event_type": "string", // KEL event type (icp, rot, ixn, dip, drt)
"sequence_number": "integer", // Event sequence in KEL
"said": "string", // Self-Addressing Identifier of event
"timestamp": "ISO8601", // Event occurrence timestamp
"agent_id": "string", // Source KERIA agent identifier
"payload": "object", // Event-specific data structure
"signatures": ["array"], // Cryptographic signatures
"metadata": {
"source": "string", // Internal component source
"priority": "integer", // Event priority (0-255)
"retry_count": "integer", // Delivery attempt counter
"correlation_id": "string" // Request correlation identifier
}
}
Inception Event Interception:
Memory Leaks in Event Queue:
asyncio.Queue(maxsize=10000) and drop oldest events on overflowRace Conditions in Circuit Breaker:
asyncio.Lock()Event Ordering Violations:
Batch Event Delivery:
# Batch multiple events for efficient delivery
async def batch_deliver(self, backend: Backend, events: List[InterceptedEvent]):
batch_payload = {
'events': [event.serialize() for event in events],
'batch_id': str(uuid.uuid4()),
'batch_size': len(events)
}
# Single HTTP request for multiple events
await self.deliver_batch(backend, batch_payload)
Connection Pooling:
aiohttp.ClientSession with connection poolingconnector=aiohttp.TCPConnector(limit=100)Event Serialization Caching:
@lru_cache(maxsize=1000)Authentication Token Management:
class InceptionEventData:
prefix: str # 44-character Base64 AID prefix
sn: int # Sequence number (always 0 for inception)
ilk: str # Event type ('icp')
kt: Union[int, str] # Key threshold (integer or fractional)
k: List[str] # Current key set (Base64 encoded)
nt: Union[int, str] # Next key threshold
n: List[str] # Next key commitment hashes
bt: int # Backer threshold for witnesses
b: List[str] # Backer (witness) identifiers
c: List[str] # Configuration traits
a: List[str] # Anchored data seals
Rotation Event Interception:
class RotationEventData:
prefix: str # AID being rotated
sn: int # Incremented sequence number
ilk: str # Event type ('rot')
p: str # Prior event SAID
kt: Union[int, str] # New key threshold
k: List[str] # New current keys
nt: Union[int, str] # Next threshold
n: List[str] # Next key commitments
bt: int # Witness threshold
br: List[str] # Witness cuts (removals)
ba: List[str] # Witness adds
a: List[str] # Anchored seals
Event Flow Architecture:
Event Capture Phase:
Filtering and Transformation:
async def process_event(self, raw_event: KERIEvent) -> InterceptedEvent:
# Apply event filters
if not self.filters.should_intercept(raw_event):
return None
# Transform to external format
external_event = self.transform_event(raw_event)
# Add metadata
external_event.metadata.update({
'intercepted_at': datetime.utcnow().isoformat(),
'agent_version': self.agent_version,
'event_hash': self.compute_event_hash(raw_event)
})
return external_event
Distribution Phase:
Message Flow Sequence:
sequenceDiagram
participant KA as KERIA Agent
participant IC as Interceptor
participant EB as Event Bus
participant BE as Backend Service
KA->>EB: Publish KEL Event
EB->>IC: Event Notification
IC->>IC: Apply Filters
IC->>IC: Transform Event
IC->>BE: HTTP POST /events
BE-->>IC: 202 Accepted
IC->>IC: Update Delivery Status
HTTP Delivery Specification:
POST /keri/events HTTP/1.1
Host: backend.example.com
Content-Type: application/json
X-KERIA-Agent-ID: EABkKWjtwKiWZ0xNPStaOX_2nCGKI8-Xd6dNhKGGG4Qc
X-Event-Type: rotation
X-Correlation-ID: 550e8400-e29b-41d4-a716-446655440000
Authorization: Bearer <JWT_TOKEN>
{
"events": [
{
"event_type": "rot",
"sequence_number": 3,
"said": "ELvaU6Z-i0d8JJR2nmwyYAZAoTNZH3ULvYAfSVPzhzS6",
"timestamp": "2024-01-15T10:30:00Z",
"agent_id": "EABkKWjtwKiWZ0xNPStaOX_2nCGKI8-Xd6dNhKGGG4Qc",
"payload": { /* rotation event data */ },
"signatures": ["AABg7d..."]
}
]
}
WebSocket Streaming Protocol:
// WebSocket connection for real-time event streaming
const ws = new WebSocket('wss://keria.example.com/intercept');
ws.onmessage = function(event) {
const interceptedEvent = JSON.parse(event.data);
if (interceptedEvent.type === 'keri_event') {
processKERIEvent(interceptedEvent.payload);
}
};
Interceptor Configuration Endpoint:
@router.post("/interceptors")
async def create_interceptor(
config: InterceptorConfig,
agent: Agent = Depends(get_agent)
) -> InterceptorResponse:
"""
Create new event interceptor
Args:
config: Interceptor configuration
agent: KERIA agent instance
Returns:
InterceptorResponse with interceptor ID and status
"""
interceptor = Interceptor(
name=config.name,
backends=config.backends,
filters=config.event_filters,
delivery_mode=config.delivery_mode
)
interceptor_id = agent.register_interceptor(interceptor)
return InterceptorResponse(
interceptor_id=interceptor_id,
status="active",
created_at=datetime.utcnow()
)
Event Filter Configuration:
class EventFilter:
event_types: List[str] = ["icp", "rot", "ixn", "dip", "drt"]
aid_patterns: List[str] = ["*"] # AID prefix patterns
sequence_range: Optional[Tuple[int, int]] = None
custom_predicates: List[str] = [] # Python expressions
def matches(self, event: KERIEvent) -> bool:
if event.ilk not in self.event_types:
return False
if not any(fnmatch(event.pre, pattern) for pattern in self.aid_patterns):
return False
if self.sequence_range:
min_sn, max_sn = self.sequence_range
if not (min_sn <= event.sn <= max_sn):
return False
return all(eval(pred, {'event': event}) for pred in self.custom_predicates)
Interceptor Class Implementation:
class Interceptor:
def __init__(self, name: str, backends: List[Backend], filters: EventFilter):
self.name = name
self.backends = backends
self.filters = filters
self.event_queue = asyncio.Queue(maxsize=10000)
self.delivery_stats = DeliveryStatistics()
self.circuit_breakers = {b.id: CircuitBreaker() for b in backends}
async def start(self):
"""Start interceptor event processing"""
self.running = True
await asyncio.gather(
self.event_processor(),
self.delivery_worker(),
self.health_monitor()
)
async def event_processor(self):
"""Process incoming events from KERIA"""
while self.running:
try:
raw_event = await self.agent.event_bus.get()
if self.filters.matches(raw_event):
processed_event = await self.transform_event(raw_event)
await self.event_queue.put(processed_event)
except Exception as e:
logger.error(f"Event processing error: {e}")
await asyncio.sleep(1)
async def delivery_worker(self):
"""Deliver events to backends"""
while self.running:
try:
event = await self.event_queue.get()
# Parallel delivery to all backends
delivery_tasks = [
self.deliver_to_backend(backend, event)
for backend in self.backends
if self.circuit_breakers[backend.id].can_execute()
]
results = await asyncio.gather(*delivery_tasks, return_exceptions=True)
# Update circuit breaker states
for backend, result in zip(self.backends, results):
if isinstance(result, Exception):
self.circuit_breakers[backend.id].record_failure()
else:
self.circuit_breakers[backend.id].record_success()
except Exception as e:
logger.error(f"Delivery error: {e}")
Circuit Breaker Implementation:
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
if self.state == "CLOSED":
return True
elif self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
return True
return False
else: # HALF_OPEN
return True
def record_success(self):
self.failure_count = 0
self.state = "CLOSED"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
KERIA Agent Integration:
# In KERIA agent initialization
class Agent:
def __init__(self):
self.interceptors: Dict[str, Interceptor] = {}
self.event_bus = EventBus()
def register_interceptor(self, interceptor: Interceptor) -> str:
interceptor_id = generate_uuid()
self.interceptors[interceptor_id] = interceptor
# Connect interceptor to event bus
self.event_bus.subscribe(interceptor.handle_event)
return interceptor_id
async def publish_event(self, event: KERIEvent):
"""Publish event to all registered interceptors"""
await self.event_bus.publish(event)
Backend Service Interface:
class Backend:
def __init__(self, url: str, auth_config: AuthConfig):
self.url = url
self.auth = auth_config
self.session = aiohttp.ClientSession()
async def deliver_event(self, event: InterceptedEvent) -> DeliveryResult:
headers = {
'Content-Type': 'application/json',
'X-KERIA-Agent-ID': event.agent_id,
'X-Event-Type': event.event_type,
'Authorization': f'Bearer {await self.auth.get_token()}'
}
async with self.session.post(
f"{self.url}/keri/events",
json=event.to_dict(),
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
return DeliveryResult(
status_code=response.status,
success=response.status < 400,
response_time=response.headers.get('X-Response-Time'),
error=await response.text() if response.status >= 400 else None
)
Event Bus Architecture:
# Interceptor subscribes to KERIA's internal event bus
class KERIAEventBus:
def __init__(self):
self.subscribers: List[Callable] = []
def subscribe(self, handler: Callable):
self.subscribers.append(handler)
async def publish(self, event: KERIEvent):
# Parallel notification to all subscribers including interceptors
await asyncio.gather(*[
handler(event) for handler in self.subscribers
], return_exceptions=True)
Relationship with Notifier Class:
| Aspect | Interceptor | Notifier |
|---|---|---|
| Purpose | External service integration | Internal/user notifications |
| Protocol | HTTP/WebSocket | Email/SMS/Push |
| Data Format | Full KERI event structure | Simplified notifications |
| Delivery | Synchronous/Asynchronous | Asynchronous only |
| Retry Logic | Circuit breaker pattern | Simple retry with backoff |
| Filtering | Complex event-based filters | User preference filters |
graph TD
A[KERI Event] --> B[Event Validation]
B --> C[KEL Storage]
C --> D[Event Bus]
D --> E[Interceptor]
D --> F[Notifier]
D --> G[Other Subscribers]
E --> H[Event Filtering]
H --> I[Event Transformation]
I --> J[Backend Delivery]
J --> K[External Services]
Event Processing: O(1) per event
Backend Delivery: O(b) where b = number of backends
Memory Footprint:
# Memory usage estimation
class MemoryProfile:
event_queue_size = 10000 # configurable
avg_event_size = 2048 # bytes
queue_memory = event_queue_size * avg_event_size # ~20MB
interceptor_overhead = 1024 # bytes per interceptor
backend_connection_pool = 100 * 1024 # 100KB per backend
total_memory = queue_memory + interceptor_overhead + backend_connection_pool
Throughput Characteristics:
Latency Analysis:
Backend Unavailability:
async def handle_backend_failure(self, backend: Backend, error: Exception):
"""Handle backend delivery failures"""
if isinstance(error, aiohttp.ClientTimeout):
# Temporary failure - circuit breaker will handle
self.circuit_breakers[backend.id].record_failure()
elif isinstance(error, aiohttp.ClientConnectorError):
# Network failure - mark backend as unhealthy
backend.mark_unhealthy()
await self.notify_admin(f"Backend {backend.url} unreachable")
elif isinstance(error, aiohttp.ClientResponseError):
if error.status >= 500:
# Server error - retry with backoff
await self.schedule_retry(backend, error.request)
else:
# Client error - log and drop
logger.error(f"Client error {error.status}: {error.message}")
Queue Overflow:
async def handle_queue_overflow(self, event: InterceptedEvent):
"""Handle event queue overflow"""
if self.event_queue.full():
# Drop oldest events to make room
dropped_count = 0
while self.event_queue.full() and dropped_count < 100:
try:
dropped_event = self.event_queue.get_nowait()
dropped_count += 1
logger.warning(f"Dropped event {dropped_event.said}")
except asyncio.QueueEmpty:
break
# Add current event
await self.event_queue.put(event)
# Alert monitoring
self.metrics.increment('events_dropped', dropped_count)
Event Corruption:
def validate_event_integrity(self, event: KERIEvent) -> bool:
"""Validate event integrity before interception"""
try:
# Verify SAID computation
computed_said = self.compute_said(event)
if computed_said != event.said:
logger.error(f"SAID mismatch: expected {event.said}, got {computed_said}")
return False
# Verify signature if present
if hasattr(event, 'signatures') and event.signatures:
if not self.verify_signatures(event):
logger.error(f"Signature verification failed for {event.said}")
return False
return True
except Exception as e:
logger.error(f"Event validation error: {e}")
return False
The interceptor must maintain KERI protocol compliance by:
RFC 7231 Compliance:
RFC 6585 Additional Status Codes:
Authentication:
Transport Security:
High Availability Setup:
# Kubernetes deployment example
apiVersion: apps/v1
kind: Deployment
metadata:
name: keria-interceptor
spec:
replicas: 3
selector:
matchLabels:
app: keria-interceptor
template:
spec:
containers:
- name: interceptor
image: keria:latest
env:
- name: INTERCEPTOR_CONFIG
valueFrom:
configMapKeyRef:
name: interceptor-config
key: config.json
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "500m"
Metrics Collection:
class InterceptorMetrics:
def __init__(self):
self.events_processed = Counter('interceptor_events_processed_total')
self.events_dropped = Counter('interceptor_events_dropped_total')
self.delivery_latency = Histogram('interceptor_delivery_duration_seconds')
self.backend_health = Gauge('interceptor_backend_health')
def record_event_processed(self, event_type: str):
self.events_processed.labels(event_type=event_type).inc()
def record_delivery(self, backend_id: str, duration: float, success: bool):
self.delivery_latency.labels(backend=backend_id).observe(duration)
self.backend_health.labels(backend=backend_id).set(1 if success else 0)
Health Check Endpoint:
@router.get("/health")
async def health_check() -> HealthResponse:
"""Interceptor health check"""
healthy_backends = sum(1 for cb in circuit_breakers.values() if cb.state == "CLOSED")
total_backends = len(circuit_breakers)
queue_utilization = event_queue.qsize() / event_queue.maxsize
status = "healthy" if (
healthy_backends > 0 and
queue_utilization < 0.8
) else "degraded"
return HealthResponse(
status=status,
backends_healthy=healthy_backends,
backends_total=total_backends,
queue_utilization=queue_utilization,
uptime=time.time() - start_time
)
Graceful Shutdown:
async def shutdown_interceptor(self):
"""Gracefully shutdown interceptor"""
logger.info("Starting interceptor shutdown")
# Stop accepting new events
self.running = False
# Process remaining events in queue
while not self.event_queue.empty():
try:
event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
await self.process_event(event)
except asyncio.TimeoutError:
break
# Close backend connections
for backend in self.backends:
await backend.close()
logger.info("Interceptor shutdown complete")
Input Validation:
def validate_interceptor_config(config: dict) -> bool:
required_fields = ['name', 'backends', 'filters']
if not all(field in config for field in required_fields):
raise ValueError("Missing required configuration fields")
# Validate backend URLs
for backend in config['backends']:
if not backend['url'].startswith(('https://', 'http://localhost')):
raise ValueError("Backend URLs must use HTTPS in production")
return True
Event Data Sanitization:
Unit Testing:
@pytest.mark.asyncio
async def test_event_filtering():
filter_config = {'event_types': ['icp', 'rot']}
interceptor = Interceptor(name='test', filters=filter_config)
# Test positive case
icp_event = create_test_event(ilk='icp')
assert interceptor.should_intercept(icp_event)
# Test negative case
ixn_event = create_test_event(ilk='ixn')
assert not interceptor.should_intercept(ixn_event)
Integration Testing:
Load Testing:
Monitoring Integration: