Loading vLEI.wiki
Fetching knowledge base...
Fetching knowledge base...
This comprehensive explanation has been generated from 4 GitHub source documents. All source documents are searchable here.
Last updated: September 21, 2025
This content is meant to be consumed by AI agents via MCP. Click here to get the MCP configuration.
Note: In rare cases it may contain LLM hallucinations.
For authoritative documentation, please consult the official GLEIF vLEI trainings and the ToIP Glossary.
A stream property in CESR where data begins with a group code or field map, enabling parsers to immediately identify and process the stream format without prior context, solving the cold start problem in stream parsing.
A sniffable stream is a fundamental property in the CESR (Composable Event Streaming Representation) protocol ecosystem where a data stream begins with either a group code or field map that provides immediate format identification. This property enables parsers to determine the stream's structure and content type without requiring external context or pre-negotiated format agreements.
Formally, a stream S is sniffable if and only if:
S[0:n] ∈ {GroupCode ∪ FieldMap}
where n represents the length of the identifying prefix and the prefix belongs to the union of valid group codes and field maps defined in the CESR specification.
Sniffable streams are built upon CESR's self-framing primitive architecture, where each stream begins with specific identifying markers:
GroupCode := {'-' + TypeCode + CountCode}
TypeCode := [A-Z, a-z, 0-9, -, _]
CountCode := Base64Count(2-5 chars)
FieldMap := {ObjectCode + FieldDefinitions}
ObjectCode := [A-Z, a-z] (single character)
FieldDefinitions := SerializedFieldMap
The Parside parser utilizes unique three-bit combinations at the stream beginning to distinguish between formats:
Thread Safety: Sniffable parsers must be thread-safe for concurrent stream processing:
class ThreadSafeSniffableParser:
def __init__(self):
self._lock = threading.RLock()
self._format_cache = {}
def sniff(self, stream: bytes) -> SniffResult:
with self._lock:
# Thread-safe format detection
return self._internal_sniff(stream)
Format Detection Caching: Cache format detection results for repeated stream prefixes:
@lru_cache(maxsize=1024)
def cached_format_detection(prefix: bytes) -> FormatType:
return detect_format_internal(prefix)
Lazy Parsing: Only parse version strings when needed:
class LazySniffResult:
def __init__(self, stream: bytes, format_type: FormatType):
self._stream = stream
self._format_type = format_type
self._version_info = None # Lazy loaded
@property
def version_info(self) -> VersionInfo:
if self._version_info is None:
self._version_info = self._extract_version()
return self._version_info
Input Validation: Always validate stream length before accessing bytes:
def safe_sniff(stream: bytes) -> SniffResult:
if len(stream) < MIN_SNIFFABLE_LENGTH:
raise InsufficientDataError("Stream too short for sniffing")
# Validate first 8 bytes are within expected ranges
if not all(0 <= b <= 255 for b in stream[:8]):
raise InvalidStreamError("Invalid byte values in stream prefix")
Resource Limits: Prevent memory exhaustion from large version strings:
MAX_VERSION_STRING_LENGTH = 1024
def extract_version_safely(stream: bytes) -> str:
# Limit search scope to prevent DoS
search_window = stream[:MAX_VERSION_STRING_LENGTH]
match = VERSION_PATTERN.search(search_window)
if match:
return match.group(1).decode('utf-8')
raise VersionNotFoundError("Version string not found in safe window")
Property-Based Testing: Use hypothesis to generate edge cases:
from hypothesis import given, strategies as st
@given(st.binary(min_size=4, max_size=1024))
def test_sniffable_property(stream_data):
# Test that sniffable detection is deterministic
result1 = sniffer.is_sniffable(stream_data)
result2 = sniffer.is_sniffable(stream_data)
assert result1 == result2
The sniffing process follows a deterministic state machine:
State: INITIAL
├── Read first 3 bits
├── Match against format table
│ ├── CESR_BINARY → Parse binary group code
│ ├── CESR_TEXT → Parse text group code
│ ├── JSON → Extract version string via regex
│ ├── CBOR → Extract version string via CBOR decoder
│ └── MGPK → Extract version string via MessagePack decoder
└── Transition to format-specific parser
The cold start problem occurs when a parser encounters a stream without knowing:
Sniffable streams solve this by providing immediate format identification through standardized prefixes that are:
Sniffable streams maintain cryptographic integrity through:
Potential attack vectors against sniffable streams:
Format Confusion Attacks: Malicious streams with ambiguous prefixes
Length Field Manipulation: Corrupted count codes causing buffer overflows
class SniffableParser:
def sniff(self, stream: bytes) -> SniffResult:
"""Detect stream format and return parsing strategy"""
def parse_sniffable(self, stream: bytes) -> ParsedStream:
"""Parse a sniffable stream with format auto-detection"""
def is_sniffable(self, stream: bytes) -> bool:
"""Check if stream has sniffable property"""
@dataclass
class SniffResult:
format_type: FormatType # CESR_BINARY, CESR_TEXT, JSON, CBOR, MGPK
group_code: Optional[str] # For CESR formats
version_info: Optional[VersionInfo] # For non-CESR formats
content_length: int # Extracted from version string or count code
parser_strategy: ParserStrategy # Next parsing step
class FormatDetector:
def __init__(self):
self.format_patterns = {
b'-': self._detect_cesr_text,
b'\x2d': self._detect_cesr_binary,
b'{': self._detect_json,
b'[': self._detect_json_array,
}
self.cbor_patterns = range(0x80, 0xC0) # CBOR major types
self.mgpk_patterns = range(0x80, 0xA0) # MessagePack formats
def detect(self, stream: bytes) -> FormatType:
first_byte = stream[0:1]
if first_byte in self.format_patterns:
return self.format_patterns[first_byte](stream)
elif stream[0] in self.cbor_patterns:
return self._detect_cbor(stream)
elif stream[0] in self.mgpk_patterns:
return self._detect_mgpk(stream)
else:
raise UnsniffableStreamError("Stream is not sniffable")
For non-CESR formats, version strings are extracted using format-specific methods:
def extract_version_string(self, format_type: FormatType, stream: bytes) -> VersionInfo:
if format_type == FormatType.JSON:
# Regex pattern for JSON version extraction
pattern = rb'"version"\s*:\s*"([^"]+)"'
match = re.search(pattern, stream)
if match:
return self._parse_version_info(match.group(1))
elif format_type == FormatType.CBOR:
# CBOR decoder for version extraction
decoded = cbor2.loads(stream)
if 'version' in decoded:
return self._parse_version_info(decoded['version'])
elif format_type == FormatType.MGPK:
# MessagePack decoder for version extraction
decoded = msgpack.unpackb(stream)
if b'version' in decoded:
return self._parse_version_info(decoded[b'version'])
Sniffable streams must maintain compatibility with CESR's 24-bit alignment requirements:
class CESRStreamProcessor:
def __init__(self):
self.sniffer = SniffableParser()
self.parsers = {
FormatType.CESR_TEXT: CESRTextParser(),
FormatType.CESR_BINARY: CESRBinaryParser(),
FormatType.JSON: JSONParser(),
FormatType.CBOR: CBORParser(),
FormatType.MGPK: MessagePackParser(),
}
def process_stream(self, stream: bytes) -> ProcessedData:
sniff_result = self.sniffer.sniff(stream)
parser = self.parsers[sniff_result.format_type]
return parser.parse(stream, sniff_result)
Sniffable streams are fundamental to CESR's composability properties:
In KERI, sniffable streams enable:
Authentic Chained Data Containers leverage sniffable streams for:
Sniffable Parser Memory Usage:
├── Format Pattern Table: 256 bytes (lookup table)
├── Regex Compilation Cache: ~2KB (compiled patterns)
├── Parser State Machine: ~1KB (state transitions)
└── Buffer Management: Variable (stream-dependent)
Total Base Memory: ~3.5KB + stream buffers
Sniffable prefixes add minimal overhead:
Format Detection Performance (1M operations):
├── CESR Text: 0.15ms average
├── CESR Binary: 0.12ms average
├── JSON: 0.25ms average (includes regex)
├── CBOR: 0.18ms average
└── MGPK: 0.16ms average
Throughput: ~4M streams/second on modern hardware
Truncated Streams: Insufficient bytes for format detection
if len(stream) < MIN_SNIFFABLE_LENGTH:
raise TruncatedStreamError("Insufficient data for sniffing")
Ambiguous Prefixes: Multiple format matches
def resolve_ambiguity(self, candidates: List[FormatType]) -> FormatType:
# Priority order: CESR > JSON > CBOR > MGPK
return min(candidates, key=lambda x: self.format_priority[x])
Corrupted Group Codes: Invalid CESR prefixes
def validate_group_code(self, code: str) -> bool:
return (len(code) >= 4 and
code[0] == '-' and
all(c in BASE64_CHARS for c in code[1:]))
def validate_sniffable_stream(stream: bytes) -> ValidationResult:
checks = [
('length', len(stream) >= MIN_LENGTH),
('format', is_valid_format_prefix(stream[:8])),
('alignment', check_24bit_alignment(stream)),
('integrity', verify_cryptographic_binding(stream))
]
failures = [name for name, check in checks if not check]
return ValidationResult(valid=len(failures) == 0, failures=failures)
Sniffable streams must comply with:
Sniffable Stream Evolution:
├── v0.1: Basic group code detection
├── v0.2: Added field map support
├── v0.3: Multi-format detection (JSON/CBOR/MGPK)
├── v1.0: Three-bit combination optimization
└── v1.1: Enhanced error handling and validation
| Feature | CESR v1.0 | KERI v1.1 | ACDC v1.0 |
|---|---|---|---|
| Group Code Detection | ✓ | ✓ | ✓ |
| Field Map Support | ✓ | ✓ | ✓ |
| Multi-format Sniffing | ✓ | ✓ | ✓ |
| 24-bit Alignment | ✓ | ✓ | ✓ |
| Composability | ✓ | ✓ | ✓ |
class SniffableMetrics:
def __init__(self):
self.format_counters = Counter() # Format detection counts
self.error_rates = defaultdict(float) # Error rates by format
self.processing_times = defaultdict(list) # Performance metrics
def record_sniff(self, format_type: FormatType, duration: float):
self.format_counters[format_type] += 1
self.processing_times[format_type].append(duration)
Format Fuzzing: Test parser robustness with malformed inputs:
def test_malformed_group_codes():
malformed_codes = [
b'-', # Too short
b'-XYZ', # Invalid characters
b'-GAB', # Missing count
b'-GABZZZZ', # Invalid count format
]
for code in malformed_codes:
with pytest.raises(InvalidGroupCodeError):
parser.parse_group_code(code)