Loading vLEI.wiki
Fetching knowledge base...
Fetching knowledge base...
This comprehensive explanation has been generated from 2 GitHub source documents. All source documents are searchable here.
Last updated: September 21, 2025
This content is meant to be consumed by AI agents via MCP. Click here to get the MCP configuration.
Note: In rare cases it may contain LLM hallucinations.
For authoritative documentation, please consult the official GLEIF vLEI trainings and the ToIP Glossary.
The weight-of-weights is a hierarchical multi-signature threshold architecture within the KERI protocol that implements exactly two levels of weighted signature aggregation. This system enables complex multi-party control structures while maintaining CESR encoding efficiency and implementation simplicity.
Weight-of-Weights Structure:
Level 1: Individual Weight = w₁ᵢ where i ∈ [1, n₁]
Level 2: Team Weight = w₂ⱼ where j ∈ [1, n₂]
Threshold Function:
T(W₁, W₂) = Σ(w₁ᵢ × active₁ᵢ) + Σ(w₂ⱼ × active₂ⱼ) ≥ threshold
Constraint: depth(hierarchy) ≤ 2
{
"weightConfig": {
"version": "1.0",
"levels": 2,
"individual": {
"weights": [w₁₁, w₁₂, ..., w₁ₙ],
"threshold": t₁,
"keys": ["key₁₁", "key₁₂", ..., "key₁ₙ"]
},
"team": {
"weights": [w₂₁, w₂₂, ..., w₂ₘ],
"threshold": t₂,
"teamIds": ["team₂₁", "team₂₂", ..., "team₂ₘ"]
},
"globalThreshold": T_global
}
}
Weight-of-Weights CESR Structure:
┌─────────────────────────────────────────────────────────────┐
│ Group Code │ Count │ Level1 Weights │ Level2 Weights │ Sigs │
├─────────────────────────────────────────────────────────────┤
│ -GAB │ ## │ w₁₁w₁₂... │ w₂₁w₂₂... │ ... │
└─────────────────────────────────────────────────────────────┘
Byte Layout:
- Group Code: 4 bytes (Base64)
- Count Code: 2 bytes (Base64)
- Weight Array: Variable length, 2 bytes per weight
- Signature Array: Variable length, 88 bytes per Ed25519 signature
Pitfall: Recursive Weight Structures
depth(hierarchy) <= 2 in configuration validationPitfall: CESR Encoding Overflow
def normalize_weights(weights: List[int]) -> List[int]:
max_weight = max(weights)
if max_weight > 65535:
scale_factor = 65535 / max_weight
return [int(w * scale_factor) for w in weights]
return weights
Pitfall: Signature Replay Attacks
BLAKE3(event_bytes + nonce) for unique digestsSignature Verification Parallelization
async def parallel_signature_verification(signatures: List[Signature]) -> List[bool]:
"""Verify signatures in parallel using asyncio"""
tasks = [verify_signature_async(sig) for sig in signatures]
return await asyncio.gather(*tasks, return_exceptions=True)
Weight Calculation Caching
Early Threshold Termination
Constant-Time Operations
def constant_time_weight_check(collected_weight: int, threshold: int) -> bool:
"""Prevent timing attacks on threshold validation"""
# Always perform same number of operations regardless of result
dummy_ops = 0
for i in range(100): # Fixed iteration count
dummy_ops += (collected_weight * threshold) % 7
return collected_weight >= threshold
def aggregate_weighted_signatures(level1_sigs, level2_sigs, config):
"""
Aggregate signatures across two-level weight hierarchy
"""
total_weight = 0
verified_sigs = []
# Level 1: Individual signatures
for i, sig in enumerate(level1_sigs):
if verify_signature(sig, config.individual.keys[i]):
total_weight += config.individual.weights[i]
verified_sigs.append((1, i, sig))
# Level 2: Team signatures
for j, team_sig in enumerate(level2_sigs):
if verify_team_signature(team_sig, config.team.teamIds[j]):
total_weight += config.team.weights[j]
verified_sigs.append((2, j, team_sig))
return total_weight >= config.globalThreshold, verified_sigs
Weight-of-Weights State Machine:
[INIT] → [COLLECTING_L1] → [COLLECTING_L2] → [VALIDATING] → [COMPLETE]
↓ ↓ ↓ ↓
[ERROR] [ERROR] [ERROR] [ERROR]
Transition Conditions:
- INIT → COLLECTING_L1: Receive first Level 1 signature
- COLLECTING_L1 → COLLECTING_L2: L1 threshold met OR timeout
- COLLECTING_L2 → VALIDATING: L2 signatures received OR timeout
- VALIDATING → COMPLETE: Total weight ≥ global threshold
- Any → ERROR: Invalid signature, timeout, or protocol violation
class WeightedThresholdVerifier:
def __init__(self, config: WeightConfig):
self.config = config
self.ed25519_verifier = Ed25519Verifier()
self.secp256k1_verifier = Secp256k1Verifier()
def verify_weighted_threshold(self, event: KeyEvent,
signatures: List[Signature]) -> bool:
"""
Verify weighted threshold signatures against key event
"""
event_digest = self._compute_event_digest(event)
total_weight = 0
for sig in signatures:
if self._verify_individual_signature(sig, event_digest):
total_weight += self._get_signature_weight(sig)
return total_weight >= self.config.globalThreshold
def _compute_event_digest(self, event: KeyEvent) -> bytes:
"""
Compute BLAKE3 digest of serialized key event
"""
serialized = self._serialize_event(event)
return blake3(serialized).digest()
Threshold Security Model:
Attack Resistance:
@dataclass
class WeightConfigRequest:
controller: str
individual_weights: List[int]
team_weights: List[int]
global_threshold: int
@dataclass
class WeightConfigResponse:
config_id: str
cesr_encoding: str
validation_errors: List[str]
class WeightConfigAPI:
def create_weight_config(self, request: WeightConfigRequest) -> WeightConfigResponse:
"""
Create and validate weight-of-weights configuration
"""
config = self._build_config(request)
errors = self._validate_config(config)
if errors:
return WeightConfigResponse(
config_id="",
cesr_encoding="",
validation_errors=errors
)
config_id = self._store_config(config)
cesr_encoding = self._encode_cesr(config)
return WeightConfigResponse(
config_id=config_id,
cesr_encoding=cesr_encoding,
validation_errors=[]
)
class SignatureCollector:
def __init__(self, weight_config: WeightConfig):
self.config = weight_config
self.collected_sigs = defaultdict(list)
self.state = CollectionState.INIT
async def collect_signature(self, level: int, signer_id: str,
signature: bytes) -> CollectionResult:
"""
Collect weighted signature with async validation
"""
if not self._validate_signer(level, signer_id):
return CollectionResult.INVALID_SIGNER
if not await self._verify_signature_async(signature, signer_id):
return CollectionResult.INVALID_SIGNATURE
self.collected_sigs[level].append((signer_id, signature))
if self._check_threshold_met():
self.state = CollectionState.COMPLETE
return CollectionResult.THRESHOLD_MET
return CollectionResult.SIGNATURE_ACCEPTED
Strategy Pattern for Weight Calculation:
class WeightCalculationStrategy(ABC):
@abstractmethod
def calculate_weight(self, signatures: List[Signature]) -> int:
pass
class LinearWeightStrategy(WeightCalculationStrategy):
def calculate_weight(self, signatures: List[Signature]) -> int:
return sum(sig.weight for sig in signatures)
class ExponentialWeightStrategy(WeightCalculationStrategy):
def calculate_weight(self, signatures: List[Signature]) -> int:
return sum(2 ** sig.weight for sig in signatures)
Observer Pattern for Threshold Events:
class ThresholdObserver(ABC):
@abstractmethod
def on_threshold_met(self, event: ThresholdEvent) -> None:
pass
class WeightedThresholdManager:
def __init__(self):
self.observers: List[ThresholdObserver] = []
def add_observer(self, observer: ThresholdObserver) -> None:
self.observers.append(observer)
def _notify_threshold_met(self, event: ThresholdEvent) -> None:
for observer in self.observers:
observer.on_threshold_met(event)
Signature Cache with LRU Eviction:
class SignatureCache:
def __init__(self, max_size: int = 10000):
self.cache: OrderedDict[str, CachedSignature] = OrderedDict()
self.max_size = max_size
def get(self, sig_hash: str) -> Optional[CachedSignature]:
if sig_hash in self.cache:
# Move to end (most recently used)
self.cache.move_to_end(sig_hash)
return self.cache[sig_hash]
return None
def put(self, sig_hash: str, signature: CachedSignature) -> None:
if len(self.cache) >= self.max_size:
# Remove least recently used
self.cache.popitem(last=False)
self.cache[sig_hash] = signature
class KERIWeightIntegration:
def __init__(self, keri_core: KERICore):
self.keri = keri_core
self.weight_manager = WeightManager()
def create_weighted_inception(self,
weight_config: WeightConfig) -> InceptionEvent:
"""
Create KERI inception event with weight-of-weights configuration
"""
keys = self._extract_keys_from_config(weight_config)
next_keys = self._generate_next_keys(weight_config)
inception = self.keri.incept(
keys=keys,
next_keys=next_keys,
witness_threshold=weight_config.globalThreshold,
witnesses=self._get_witnesses(weight_config)
)
# Embed weight configuration in inception event
inception.data['weightConfig'] = weight_config.to_dict()
return inception
class CESRWeightEncoder:
def encode_weight_config(self, config: WeightConfig) -> str:
"""
Encode weight configuration in CESR format
"""
# Group code for weight-of-weights: -GAB
group_code = "-GAB"
# Count individual and team weights
count = len(config.individual.weights) + len(config.team.weights)
count_code = self._encode_count(count)
# Encode weights as 2-byte integers
weight_bytes = b''
for weight in config.individual.weights:
weight_bytes += weight.to_bytes(2, 'big')
for weight in config.team.weights:
weight_bytes += weight.to_bytes(2, 'big')
# Base64 encode weight bytes
weight_b64 = base64.urlsafe_b64encode(weight_bytes).decode('ascii')
return f"{group_code}{count_code}{weight_b64}"
Key Event Processing Pipeline:
Key Event → Weight Validation → Signature Collection → Threshold Check → Event Acceptance
↓ ↓ ↓ ↓ ↓
Parse Event Extract Config Collect Weighted Check Global Update KEL
Signatures Threshold
Witness Agreement Integration:
def integrate_with_witness_agreement(weight_config: WeightConfig,
witnesses: List[Witness]) -> bool:
"""
Integrate weight-of-weights with KERI witness agreement algorithm
"""
# Map weight configuration to witness thresholds
witness_weights = {}
for i, witness in enumerate(witnesses):
if i < len(weight_config.individual.weights):
witness_weights[witness.aid] = weight_config.individual.weights[i]
# Apply KAACE algorithm with weighted thresholds
return kaace_algorithm(witnesses, witness_weights, weight_config.globalThreshold)
Weighted Issuance Authority:
class WeightedCredentialIssuer:
def __init__(self, weight_config: WeightConfig):
self.weight_config = weight_config
self.issuer_registry = IssuerRegistry()
def issue_weighted_credential(self,
credential_data: dict,
issuer_signatures: List[Signature]) -> ACDC:
"""
Issue ACDC with weight-of-weights authority validation
"""
# Validate issuer authority using weight thresholds
if not self._validate_issuer_authority(issuer_signatures):
raise InsufficientAuthorityError("Issuer weight below threshold")
# Create ACDC with weighted provenance chain
acdc = ACDC(
schema=credential_data['schema'],
issuer=self._create_weighted_issuer_aid(),
data=credential_data,
authority_proof=self._create_authority_proof(issuer_signatures)
)
return acdc
Signature Verification Complexity:
Threshold Calculation:
def analyze_threshold_complexity():
"""
Time complexity analysis for threshold calculation
"""
# Best case: O(T) where T = threshold (early termination)
# Average case: O(n/2) where n = total signers
# Worst case: O(n) - must check all signatures
return {
'best_case': 'O(T)',
'average_case': 'O(n/2)',
'worst_case': 'O(n)',
'space_complexity': 'O(n)'
}
Weight Configuration Storage:
Memory Usage per Configuration:
- Individual weights: n₁ × 4 bytes (32-bit integers)
- Team weights: n₂ × 4 bytes
- Key references: (n₁ + n₂) × 44 bytes (CESR identifiers)
- Metadata: ~100 bytes
Total: (n₁ + n₂) × 48 + 100 bytes
Example (10 individuals, 5 teams):
15 × 48 + 100 = 820 bytes per configuration
Signature Cache Memory:
class MemoryAnalyzer:
def calculate_signature_memory(self, cache_size: int) -> dict:
"""
Calculate memory usage for signature cache
"""
signature_size = 64 # Ed25519 signature bytes
metadata_size = 32 # Hash, timestamp, etc.
total_per_signature = signature_size + metadata_size
return {
'cache_size': cache_size,
'memory_per_signature': total_per_signature,
'total_memory_mb': (cache_size * total_per_signature) / (1024 * 1024),
'recommended_max_cache': 10000 # ~960 KB
}
CESR Encoding Efficiency:
Weight-of-Weights CESR Overhead:
- Group code: 4 bytes
- Count code: 2 bytes
- Weight array: (n₁ + n₂) × 2 bytes
- Signature array: active_signers × 88 bytes
Compression ratio vs. JSON: ~60% size reduction
Compression ratio vs. raw binary: ~15% overhead (self-describing)
Insufficient Weight Scenarios:
class InsufficientWeightHandler:
def handle_insufficient_weight(self,
current_weight: int,
required_threshold: int) -> RecoveryAction:
"""
Handle cases where collected signatures don't meet threshold
"""
weight_deficit = required_threshold - current_weight
if weight_deficit <= self._get_max_single_weight():
return RecoveryAction.WAIT_FOR_ADDITIONAL_SIGNATURE
elif self._can_lower_threshold():
return RecoveryAction.PROPOSE_THRESHOLD_REDUCTION
else:
return RecoveryAction.ABORT_OPERATION
Signature Timing Attacks:
def mitigate_timing_attacks(signatures: List[Signature]) -> bool:
"""
Constant-time signature verification to prevent timing attacks
"""
results = []
# Verify all signatures regardless of early failures
for sig in signatures:
result = constant_time_verify(sig)
results.append(result)
# Aggregate results without short-circuiting
return constant_time_aggregate(results)
Concurrent Signature Collection:
class ConcurrentSignatureCollector:
def __init__(self):
self.lock = asyncio.Lock()
self.signature_set = set()
self.weight_total = 0
async def add_signature(self, signature: Signature) -> CollectionResult:
async with self.lock:
sig_hash = self._hash_signature(signature)
if sig_hash in self.signature_set:
return CollectionResult.DUPLICATE_SIGNATURE
if not self._verify_signature(signature):
return CollectionResult.INVALID_SIGNATURE
self.signature_set.add(sig_hash)
self.weight_total += signature.weight
if self.weight_total >= self.threshold:
return CollectionResult.THRESHOLD_ACHIEVED
return CollectionResult.SIGNATURE_ACCEPTED
KERI RFC Draft Alignment:
Version Compatibility Matrix:
KERI Core | Weight-of-Weights | CESR Version | Status
-------------|-------------------|--------------|--------
1.0.x | 1.0.0 | 1.0.x | Stable
1.1.x | 1.1.0 | 1.1.x | Current
2.0.x | 2.0.0 | 2.0.x | Future
Supported Signature Algorithms:
Hash Functions:
Microservice Integration:
apiVersion: apps/v1
kind: Deployment
metadata:
name: weight-of-weights-service
spec:
replicas: 3
selector:
matchLabels:
app: weight-service
template:
spec:
containers:
- name: weight-service
image: keri/weight-service:v1.1.0
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
env:
- name: MAX_WEIGHT_CONFIGS
value: "10000"
- name: SIGNATURE_CACHE_SIZE
value: "5000"
Key Metrics:
class WeightMetrics:
def __init__(self):
self.signature_verification_time = Histogram(
'weight_signature_verification_seconds',
'Time spent verifying weighted signatures'
)
self.threshold_achievement_rate = Counter(
'weight_threshold_achievements_total',
'Number of successful threshold achievements'
)
self.weight_config_cache_hits = Counter(
'weight_config_cache_hits_total',
'Cache hits for weight configurations'
)
def record_verification_time(self, duration: float):
self.signature_verification_time.observe(duration)
Health Check Endpoints:
@app.route('/health/weight-service')
def health_check():
return {
'status': 'healthy',
'active_configs': len(weight_manager.active_configs),
'cache_hit_rate': metrics.get_cache_hit_rate(),
'avg_verification_time_ms': metrics.get_avg_verification_time()
}
Signature Validation Order
Key Material Protection
Property-Based Testing
@given(st.lists(st.integers(min_value=1, max_value=100), min_size=1, max_size=10))
def test_weight_threshold_properties(weights):
"""Property-based test for weight threshold invariants"""
config = create_weight_config(weights)
# Property: threshold should never exceed sum of all weights
assert config.global_threshold <= sum(weights)
# Property: CESR encoding should be reversible
encoded = encode_weight_config_cesr(config)
decoded = decode_weight_config_cesr(encoded)
assert config == decoded
Load Testing Scenarios
Security Testing