Performance Bottlenecks
TL;DR
PKI operations frequently encounter performance bottlenecks during scaling from dozens to thousands (or millions) of certificates. Common bottlenecks include synchronous CA interactions, database queries without proper indexing, serial certificate operations, OCSP responder overload, and cryptographic computation limits. Performance optimization requires parallelization, caching, async operations, and architectural evolution.
Key insight: Most performance problems stem from serial operations, blocking I/O, and failure to leverage modern async patterns.
Overview
Certificate management performance issues typically remain hidden until reaching critical scale. A system handling 100 certificates works fine with manual processes and synchronous operations. The same architecture collapses at 10,000 certificates and becomes completely unusable at 100,000+.
This page covers identifying, measuring, and resolving performance bottlenecks across the certificate lifecycle.
Common Performance Anti-Patterns
Anti-Pattern 1: Serial Certificate Operations
The Problem:
# SLOW: Serial certificate issuance
def renew_all_certificates(certificates: List[Certificate]) -> List[Result]:
"""
Serial renewal - processes one certificate at a time
Time complexity: O(n * request_time)
"""
results = []
for cert in certificates:
# Each operation blocks waiting for CA response
try:
new_cert = ca_client.issue_certificate(cert.csr)
results.append(Success(new_cert))
except Exception as e:
results.append(Failure(cert, e))
return results
# With 1000 certificates and 200ms per request:
# Total time: 1000 * 0.2s = 200 seconds (3+ minutes)
The Solution:
import asyncio
from typing import List
import aiohttp
async def renew_certificates_parallel(
certificates: List[Certificate],
max_concurrency: int = 50
) -> List[Result]:
"""
Parallel certificate renewal with concurrency limit
Time complexity: O(n / concurrency * request_time)
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def renew_one(cert: Certificate) -> Result:
async with semaphore: # Limit concurrent operations
try:
async with aiohttp.ClientSession() as session:
new_cert = await ca_client.issue_certificate_async(
session, cert.csr
)
return Success(new_cert)
except Exception as e:
return Failure(cert, e)
# Execute all renewals concurrently
tasks = [renew_one(cert) for cert in certificates]
results = await asyncio.gather(*tasks)
return results
# With 1000 certificates, 200ms per request, 50 concurrent:
# Total time: (1000 / 50) * 0.2s = 4 seconds
# 50x improvement
Performance Comparison:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class PerformanceComparison:
"""Compare different parallelization strategies"""
def benchmark_serial(self, operations: List[Callable]) -> float:
"""Serial execution baseline"""
start = time.time()
results = []
for op in operations:
results.append(op())
return time.time() - start
def benchmark_threading(
self,
operations: List[Callable],
workers: int = 50
) -> float:
"""Thread-based parallelization"""
start = time.time()
with ThreadPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(lambda f: f(), operations))
return time.time() - start
def benchmark_multiprocessing(
self,
operations: List[Callable],
workers: int = 8
) -> float:
"""Process-based parallelization"""
start = time.time()
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(lambda f: f(), operations))
return time.time() - start
async def benchmark_asyncio(
self,
async_operations: List[Coroutine],
concurrency: int = 100
) -> float:
"""Async/await parallelization"""
start = time.time()
semaphore = asyncio.Semaphore(concurrency)
async def limited_op(op):
async with semaphore:
return await op
results = await asyncio.gather(
*[limited_op(op) for op in async_operations]
)
return time.time() - start
# Example benchmarks for 1000 operations at 100ms each:
# Serial: 100+ seconds
# Threading (50): ~2 seconds
# Asyncio (100): ~1 second
# Multiprocess (8): ~12 seconds (worse due to IPC overhead)
Anti-Pattern 2: Database Query Without Indexing
The Problem:
-- SLOW: Full table scan on every expiry check
-- Query time: O(n) where n = total certificates
SELECT * FROM certificates
WHERE not_after < NOW() + INTERVAL '30 days'
ORDER BY not_after;
-- With 1 million certificates:
-- Query time: 5+ seconds (full table scan)
-- Executed every minute = massive CPU waste
The Solution:
-- Create index on expiry date
CREATE INDEX idx_certificates_expiry ON certificates(not_after);
-- Same query now uses index
-- Query time: < 10ms (index seek)
-- 500x improvement
-- Additional useful indexes
CREATE INDEX idx_certificates_fingerprint ON certificates(fingerprint_sha256);
CREATE INDEX idx_certificates_hostname ON certificates USING gin(hostnames);
CREATE INDEX idx_certificates_owner ON certificates(owner_team);
CREATE INDEX idx_certificates_status ON certificates(status) WHERE status != 'revoked';
-- Composite indexes for common queries
CREATE INDEX idx_certificates_expiry_status ON certificates(not_after, status);
CREATE INDEX idx_certificates_owner_env ON certificates(owner_team, environment);
Query Optimization:
from sqlalchemy import Index, func
from sqlalchemy.orm import Query
class OptimizedCertificateQueries:
"""Optimized database queries for certificate operations"""
def get_expiring_certificates(
self,
days: int = 30,
limit: int = 1000
) -> List[Certificate]:
"""
Efficiently retrieve expiring certificates
Uses index on not_after column
"""
cutoff = datetime.now() + timedelta(days=days)
return (
self.session.query(Certificate)
.filter(Certificate.not_after < cutoff)
.filter(Certificate.status == 'active')
.order_by(Certificate.not_after)
.limit(limit)
.all()
)
def bulk_update_status(
self,
certificate_ids: List[str],
new_status: str
):
"""
Bulk update avoiding N+1 queries
"""
# SLOW: N individual updates
# for cert_id in certificate_ids:
# cert = session.query(Certificate).get(cert_id)
# cert.status = new_status
# FAST: Single bulk update
self.session.query(Certificate).filter(
Certificate.id.in_(certificate_ids)
).update(
{Certificate.status: new_status},
synchronize_session=False
)
self.session.commit()
def get_certificates_by_owner_efficient(
self,
owner: str
) -> List[Certificate]:
"""
Efficient query with selective loading
"""
return (
self.session.query(Certificate)
.filter(Certificate.owner_team == owner)
.options(
# Only load needed columns
load_only(
Certificate.id,
Certificate.subject_cn,
Certificate.not_after
)
)
.all()
)
Monitoring Query Performance:
-- PostgreSQL: Enable query logging for slow queries
ALTER DATABASE certificates SET log_min_duration_statement = 1000; -- Log queries > 1s
-- Identify slow queries
SELECT
query,
calls,
total_time,
mean_time,
max_time
FROM pg_stat_statements
WHERE mean_time > 100 -- Queries averaging > 100ms
ORDER BY mean_time DESC
LIMIT 20;
-- Analyze query plan
EXPLAIN ANALYZE
SELECT * FROM certificates
WHERE not_after < NOW() + INTERVAL '30 days'
ORDER BY not_after;
Anti-Pattern 3: Synchronous CA Interactions
The Problem:
class SynchronousCAClient:
"""Blocking CA operations kill throughput"""
def issue_certificate(self, csr: str) -> Certificate:
"""
Synchronous certificate issuance
Blocks thread waiting for CA response (200-2000ms)
"""
response = requests.post(
f"{self.ca_url}/issue",
json={'csr': csr},
timeout=30 # Block for up to 30 seconds
)
if response.status_code == 200:
return Certificate(response.json())
else:
raise CAError(response.text)
def process_issuance_queue(self, queue: List[CSR]):
"""
Process queue serially - disaster at scale
100 requests * 500ms avg = 50 seconds minimum
"""
for csr in queue:
try:
cert = self.issue_certificate(csr)
self.store_certificate(cert)
except CAError as e:
self.handle_error(csr, e)
The Solution:
import aiohttp
import asyncio
from typing import List, AsyncGenerator
class AsyncCAClient:
"""Non-blocking CA operations for high throughput"""
def __init__(self, ca_url: str, max_concurrency: int = 100):
self.ca_url = ca_url
self.semaphore = asyncio.Semaphore(max_concurrency)
self.session = None
async def __aenter__(self):
"""Create persistent connection pool"""
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=100, # Connection pool size
ttl_dns_cache=300
),
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def issue_certificate_async(self, csr: str) -> Certificate:
"""
Async certificate issuance
Non-blocking - can handle 100s concurrently
"""
async with self.semaphore:
async with self.session.post(
f"{self.ca_url}/issue",
json={'csr': csr}
) as response:
if response.status == 200:
data = await response.json()
return Certificate(data)
else:
text = await response.text()
raise CAError(text)
async def process_issuance_queue(
self,
queue: List[CSR]
) -> AsyncGenerator[Result, None]:
"""
Process entire queue concurrently with backpressure
100 requests at 500ms avg = ~1-2 seconds total
"""
async def issue_with_retry(csr: CSR) -> Result:
for attempt in range(3):
try:
cert = await self.issue_certificate_async(csr)
await self.store_certificate_async(cert)
return Success(cert)
except CAError as e:
if attempt == 2: # Last attempt
return Failure(csr, e)
await asyncio.sleep(2 ** attempt) # Exponential backoff
# Process all CSRs concurrently
tasks = [issue_with_retry(csr) for csr in queue]
# Yield results as they complete
for coro in asyncio.as_completed(tasks):
yield await coro
Anti-Pattern 4: Crypto Operations Without Hardware Acceleration
The Problem:
# SLOW: Software-only RSA operations
def generate_keypair_slow(key_size: int = 2048) -> Tuple[bytes, bytes]:
"""
Pure Python/software RSA key generation
2048-bit: ~100-200ms
4096-bit: ~2-5 seconds
"""
from cryptography.hazmat.primitives.asymmetric import rsa
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size
)
return serialize_keypair(private_key)
# At scale: 10,000 keys = 30-50 minutes
The Solution:
# FAST: Use hardware acceleration when available
import os
def generate_keypair_fast(key_size: int = 2048) -> Tuple[bytes, bytes]:
"""
Use hardware acceleration (AES-NI, AVX2, etc.)
2048-bit: ~10-20ms with hardware
10-20x faster than software-only
"""
# cryptography library automatically uses hardware when available
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
# Ensure we're using backend with hardware support
backend = default_backend()
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=backend
)
return serialize_keypair(private_key)
# Consider ECDSA for even better performance
def generate_ecdsa_keypair() -> Tuple[bytes, bytes]:
"""
ECDSA P-256 key generation
~5ms per key (20x faster than RSA-2048)
Smaller keys, faster operations
"""
from cryptography.hazmat.primitives.asymmetric import ec
private_key = ec.generate_private_key(
ec.SECP256R1() # P-256 curve
)
return serialize_keypair(private_key)
Offload to Hardware Security Module (HSM):
import pkcs11
from pkcs11 import Mechanism
class HSMKeyGeneration:
"""
Offload key generation to HSM
Throughput: 100-1000+ keys/second
"""
def __init__(self, pkcs11_lib: str, slot: int, pin: str):
self.lib = pkcs11.lib(pkcs11_lib)
self.token = self.lib.get_token(slot_id=slot)
self.pin = pin
def generate_keypair_hsm(self, key_size: int = 2048) -> str:
"""
Generate RSA keypair in HSM
Returns key handle (not actual key material)
"""
with self.token.open(user_pin=self.pin) as session:
# Generate keypair in hardware
public_key, private_key = session.generate_keypair(
Mechanism.RSA_PKCS_KEY_PAIR_GEN,
{
'PUBLIC_EXPONENT': b'\x01\x00\x01', # 65537
'MODULUS_BITS': key_size,
}
)
# Return handle for later use
return private_key.id.hex()
def sign_csr_hsm(self, csr_data: bytes, key_handle: str) -> bytes:
"""
Sign CSR using HSM-stored key
Hardware performs signature operation
"""
with self.token.open(user_pin=self.pin) as session:
# Find key by handle
private_key = session.get_key(
object_class=pkcs11.ObjectClass.PRIVATE_KEY,
id=bytes.fromhex(key_handle)
)
# Sign in hardware
signature = private_key.sign(
csr_data,
mechanism=Mechanism.SHA256_RSA_PKCS
)
return signature
Anti-Pattern 5: OCSP Responder Overload
The Problem:
class OCSPResponder:
"""Naive OCSP responder - dies under load"""
def handle_request(self, ocsp_request: bytes) -> bytes:
"""
Check revocation status for each request
No caching, synchronous database query
"""
# Parse request
req = ocsp.load_der_ocsp_request(ocsp_request)
cert_id = req.certificate_id
# Query database (SLOW - no caching)
revocation_status = self.db.query(
"SELECT status FROM revoked_certs WHERE serial = ?",
cert_id.serial_number
)
# Build response
response = build_ocsp_response(cert_id, revocation_status)
return response.public_bytes(serialization.Encoding.DER)
# Problem: Every OCSP check = database query
# At 10,000 req/s: 10,000 DB queries/second
# Database melts
The Solution:
import redis
from functools import lru_cache
from cryptography import x509
from cryptography.x509 import ocsp
class CachedOCSPResponder:
"""High-performance OCSP responder with caching"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.cache_ttl = 300 # 5 minutes
# Pre-build OCSP responses for all certificates
self.prebuild_responses()
def prebuild_responses(self):
"""
Pre-build and cache OCSP responses
Update on revocation events, not per request
"""
all_certs = self.db.get_all_active_certificates()
for cert in all_certs:
# Build response once
response = self.build_ocsp_response(
cert.serial_number,
status='good'
)
# Cache in Redis
cache_key = f"ocsp:{cert.serial_number}"
self.redis.setex(
cache_key,
self.cache_ttl,
response.public_bytes(serialization.Encoding.DER)
)
async def handle_request_cached(self, ocsp_request: bytes) -> bytes:
"""
Handle OCSP request with caching
Cache hit: <1ms response time
Cache miss: ~10ms (build + cache)
"""
# Parse request
req = ocsp.load_der_ocsp_request(ocsp_request)
cert_id = req.certificate_id
serial = cert_id.serial_number
# Check cache first
cache_key = f"ocsp:{serial}"
cached_response = self.redis.get(cache_key)
if cached_response:
return cached_response
# Cache miss - build response
revocation_status = await self.db.get_status_async(serial)
response = self.build_ocsp_response(serial, revocation_status)
response_bytes = response.public_bytes(serialization.Encoding.DER)
# Cache for future requests
self.redis.setex(cache_key, self.cache_ttl, response_bytes)
return response_bytes
def on_revocation_event(self, serial_number: int):
"""
Update cached response when certificate revoked
Push-based updates instead of pull-based
"""
# Build revoked response
response = self.build_ocsp_response(serial_number, status='revoked')
response_bytes = response.public_bytes(serialization.Encoding.DER)
# Update cache immediately
cache_key = f"ocsp:{serial_number}"
self.redis.setex(cache_key, self.cache_ttl, response_bytes)
# Optionally: Invalidate CDN cache
self.cdn.purge(f"/ocsp/{serial_number}")
# Performance improvement:
# Before: 10,000 req/s = 10,000 DB queries/s (fails)
# After: 10,000 req/s = ~50 DB queries/s (cache misses only)
# 200x reduction in database load
Measuring Performance
Key Metrics
from prometheus_client import Histogram, Counter, Gauge
import time
from functools import wraps
# Define metrics
certificate_operation_duration = Histogram(
'certificate_operation_duration_seconds',
'Time spent in certificate operations',
['operation', 'status']
)
certificate_operations_total = Counter(
'certificate_operations_total',
'Total certificate operations',
['operation', 'status']
)
certificate_queue_size = Gauge(
'certificate_queue_size',
'Number of certificates waiting for processing',
['queue_type']
)
database_query_duration = Histogram(
'database_query_duration_seconds',
'Database query duration',
['query_type']
)
ca_request_duration = Histogram(
'ca_request_duration_seconds',
'CA request duration',
['operation', 'status']
)
def measure_operation(operation_name: str):
"""Decorator to measure operation performance"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.time()
try:
result = await func(*args, **kwargs)
status = 'success'
return result
except Exception as e:
status = 'failure'
raise
finally:
duration = time.time() - start
certificate_operation_duration.labels(
operation=operation_name,
status=status
).observe(duration)
certificate_operations_total.labels(
operation=operation_name,
status=status
).inc()
@wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
status = 'success'
return result
except Exception as e:
status = 'failure'
raise
finally:
duration = time.time() - start
certificate_operation_duration.labels(
operation=operation_name,
status=status
).observe(duration)
certificate_operations_total.labels(
operation=operation_name,
status=status
).inc()
# Return appropriate wrapper based on function type
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
# Usage
@measure_operation('certificate_issuance')
async def issue_certificate(csr: str) -> Certificate:
"""Measured certificate issuance"""
return await ca_client.issue(csr)
Performance Benchmarking
import statistics
from typing import Callable, List
import asyncio
class PerformanceBenchmark:
"""Comprehensive performance testing"""
async def benchmark_throughput(
self,
operation: Callable,
num_operations: int = 1000,
concurrency: int = 10
) -> Dict[str, float]:
"""
Measure throughput of concurrent operations
"""
semaphore = asyncio.Semaphore(concurrency)
start_time = time.time()
async def measured_operation():
async with semaphore:
start = time.time()
await operation()
return time.time() - start
# Execute operations
tasks = [measured_operation() for _ in range(num_operations)]
durations = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
successful_durations = [
d for d in durations if not isinstance(d, Exception)
]
total_time = time.time() - start_time
return {
'total_operations': num_operations,
'successful': len(successful_durations),
'failed': num_operations - len(successful_durations),
'total_time': total_time,
'throughput': len(successful_durations) / total_time,
'avg_latency': statistics.mean(successful_durations),
'p50_latency': statistics.median(successful_durations),
'p95_latency': statistics.quantiles(successful_durations, n=20)[18],
'p99_latency': statistics.quantiles(successful_durations, n=100)[98],
'max_latency': max(successful_durations),
}
async def benchmark_scaling(
self,
operation: Callable,
concurrency_levels: List[int] = [1, 10, 50, 100, 200]
) -> Dict[int, Dict]:
"""
Test performance at different concurrency levels
"""
results = {}
for concurrency in concurrency_levels:
print(f"Testing concurrency: {concurrency}")
results[concurrency] = await self.benchmark_throughput(
operation,
num_operations=1000,
concurrency=concurrency
)
return results
def print_scaling_report(self, results: Dict[int, Dict]):
"""Print formatted scaling report"""
print("\n" + "="*80)
print("PERFORMANCE SCALING REPORT")
print("="*80)
print(f"{'Concurrency':<12} {'Throughput':<15} {'P50':<10} {'P95':<10} {'P99':<10}")
print("-"*80)
for concurrency, metrics in sorted(results.items()):
print(
f"{concurrency:<12} "
f"{metrics['throughput']:<15.2f} "
f"{metrics['p50_latency']*1000:<10.1f} "
f"{metrics['p95_latency']*1000:<10.1f} "
f"{metrics['p99_latency']*1000:<10.1f}"
)
print("="*80)
# Example usage
async def main():
benchmark = PerformanceBenchmark()
# Benchmark certificate issuance
results = await benchmark.benchmark_scaling(
operation=lambda: ca_client.issue_certificate_async(test_csr),
concurrency_levels=[1, 10, 50, 100, 200]
)
benchmark.print_scaling_report(results)
# Output example:
# ================================================================================
# PERFORMANCE SCALING REPORT
# ================================================================================
# Concurrency Throughput P50 P95 P99
# --------------------------------------------------------------------------------
# 1 4.85 206.1 215.3 220.5
# 10 48.23 207.3 225.1 245.7
# 50 195.12 255.2 301.5 350.2
# 100 285.43 349.7 425.1 480.3
# 200 310.25 642.3 850.2 920.5
# ================================================================================
Architectural Patterns for Scale
Pattern 1: Queue-Based Processing
import asyncio
from asyncio import Queue
from typing import Optional
class CertificateProcessor:
"""
Queue-based certificate processing for high throughput
Decouples request acceptance from processing
"""
def __init__(
self,
num_workers: int = 20,
queue_size: int = 10000
):
self.request_queue = Queue(maxsize=queue_size)
self.num_workers = num_workers
self.workers = []
async def start(self):
"""Start worker pool"""
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.num_workers)
]
async def stop(self):
"""Graceful shutdown"""
# Send stop signal to all workers
for _ in range(self.num_workers):
await self.request_queue.put(None)
# Wait for workers to finish
await asyncio.gather(*self.workers)
async def worker(self, worker_id: int):
"""
Worker coroutine - processes requests from queue
"""
while True:
# Get request from queue
request = await self.request_queue.get()
# Stop signal
if request is None:
break
# Process request
try:
result = await self.process_certificate_request(request)
await self.store_result(result)
except Exception as e:
logger.error(f"Worker {worker_id} error: {e}")
finally:
self.request_queue.task_done()
async def submit_request(
self,
request: CertificateRequest
) -> str:
"""
Submit certificate request for processing
Returns immediately with request ID
"""
request_id = generate_request_id()
request.id = request_id
try:
# Non-blocking put with timeout
await asyncio.wait_for(
self.request_queue.put(request),
timeout=5.0
)
return request_id
except asyncio.TimeoutError:
raise QueueFullError("Processing queue at capacity")
async def process_certificate_request(
self,
request: CertificateRequest
) -> CertificateResult:
"""Process individual certificate request"""
# Issue certificate
cert = await self.ca_client.issue(request.csr)
# Deploy certificate
if request.auto_deploy:
await self.deploy_cert(cert, request.targets)
return CertificateResult(
request_id=request.id,
certificate=cert,
status='completed'
)
# Usage
processor = CertificateProcessor(num_workers=50)
await processor.start()
# Submit thousands of requests - they queue for processing
for csr in csrs:
request_id = await processor.submit_request(
CertificateRequest(csr=csr, auto_deploy=True)
)
print(f"Request {request_id} queued")
Pattern 2: Batch Processing
class BatchProcessor:
"""
Batch certificate operations for efficiency
Trade latency for throughput
"""
def __init__(
self,
batch_size: int = 100,
batch_timeout: float = 1.0
):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.pending_requests = []
self.batch_lock = asyncio.Lock()
async def add_to_batch(
self,
request: CertificateRequest
) -> Certificate:
"""
Add request to batch, process when batch full or timeout
"""
future = asyncio.Future()
async with self.batch_lock:
self.pending_requests.append((request, future))
# Process if batch full
if len(self.pending_requests) >= self.batch_size:
asyncio.create_task(self.process_batch())
# Also schedule timeout-based processing
asyncio.create_task(self.process_batch_after_timeout())
# Wait for result
return await future
async def process_batch_after_timeout(self):
"""Process batch after timeout"""
await asyncio.sleep(self.batch_timeout)
await self.process_batch()
async def process_batch(self):
"""Process pending batch"""
async with self.batch_lock:
if not self.pending_requests:
return
batch = self.pending_requests[:]
self.pending_requests.clear()
# Process entire batch in parallel
tasks = []
for request, future in batch:
task = self.process_single_request(request, future)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def process_single_request(
self,
request: CertificateRequest,
future: asyncio.Future
):
"""Process single request and set future result"""
try:
cert = await self.ca_client.issue(request.csr)
future.set_result(cert)
except Exception as e:
future.set_exception(e)
# Usage pattern:
# Individual requests block until batch processed
# But processing happens in large parallel batches
cert1 = await batch_processor.add_to_batch(request1) # May wait for batch
cert2 = await batch_processor.add_to_batch(request2) # Added to same batch
# ... when batch full or timeout, all process in parallel
Pattern 3: Caching Layer
import hashlib
from datetime import datetime, timedelta
class CertificateCachingLayer:
"""
Multi-level caching for certificate operations
"""
def __init__(self, redis_client):
self.redis = redis_client
self.local_cache = {} # In-memory L1 cache
self.local_cache_ttl = 60 # 1 minute
self.redis_ttl = 3600 # 1 hour
async def get_certificate(
self,
fingerprint: str
) -> Optional[Certificate]:
"""
Get certificate with multi-level caching
L1: In-memory (1ms)
L2: Redis (5-10ms)
L3: Database (50-100ms)
"""
# Check L1 cache (in-memory)
if fingerprint in self.local_cache:
cached_entry = self.local_cache[fingerprint]
if cached_entry['expires_at'] > datetime.now():
return cached_entry['certificate']
# Check L2 cache (Redis)
redis_key = f"cert:{fingerprint}"
cached_data = await self.redis.get(redis_key)
if cached_data:
cert = deserialize_certificate(cached_data)
# Populate L1 cache
self.local_cache[fingerprint] = {
'certificate': cert,
'expires_at': datetime.now() + timedelta(seconds=self.local_cache_ttl)
}
return cert
# Cache miss - query database (L3)
cert = await self.db.get_certificate(fingerprint)
if cert:
# Populate both caches
cert_data = serialize_certificate(cert)
# L2 (Redis)
await self.redis.setex(
redis_key,
self.redis_ttl,
cert_data
)
# L1 (Memory)
self.local_cache[fingerprint] = {
'certificate': cert,
'expires_at': datetime.now() + timedelta(seconds=self.local_cache_ttl)
}
return cert
def cache_key_for_query(self, **kwargs) -> str:
"""Generate cache key for query parameters"""
# Sort params for consistent key
params = sorted(kwargs.items())
params_str = json.dumps(params, sort_keys=True)
return hashlib.sha256(params_str.encode()).hexdigest()
async def get_certificates_cached(self, **query_params) -> List[Certificate]:
"""Cache query results"""
cache_key = f"query:{self.cache_key_for_query(**query_params)}"
# Check cache
cached = await self.redis.get(cache_key)
if cached:
return deserialize_certificate_list(cached)
# Execute query
results = await self.db.query_certificates(**query_params)
# Cache results (shorter TTL for queries)
await self.redis.setex(
cache_key,
300, # 5 minutes
serialize_certificate_list(results)
)
return results
Scaling Infrastructure
Horizontal Scaling
# Kubernetes deployment for scalable certificate management
apiVersion: apps/v1
kind: Deployment
metadata:
name: certificate-processor
spec:
replicas: 10 # Scale based on queue depth
selector:
matchLabels:
app: certificate-processor
template:
metadata:
labels:
app: certificate-processor
spec:
containers:
- name: processor
image: certificate-processor:v1.2.3
resources:
requests:
cpu: "1000m"
memory: "2Gi"
limits:
cpu: "2000m"
memory: "4Gi"
env:
- name: WORKER_CONCURRENCY
value: "50"
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: redis-credentials
key: url
---
# Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: certificate-processor-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: certificate-processor
minReplicas: 5
maxReplicas: 50
metrics:
- type: Pods
pods:
metric:
name: certificate_queue_size
target:
type: AverageValue
averageValue: "100" # Scale up if queue > 100 per pod
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Database Optimization
# Connection pooling for database efficiency
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
'postgresql://user:pass@localhost/certs',
poolclass=QueuePool,
pool_size=20, # Maintain 20 connections
max_overflow=10, # Allow 10 more if needed
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
)
# Read replicas for query distribution
class DatabaseRouter:
"""Route queries to appropriate database"""
def __init__(self):
self.writer = create_engine(WRITE_DB_URL, pool_size=10)
self.readers = [
create_engine(url, pool_size=20)
for url in READ_REPLICA_URLS
]
self.reader_index = 0
def get_engine(self, write: bool = False):
"""Get appropriate database engine"""
if write:
return self.writer
else:
# Round-robin read replicas
engine = self.readers[self.reader_index]
self.reader_index = (self.reader_index + 1) % len(self.readers)
return engine
Conclusion
PKI performance bottlenecks stem from three core issues:
- Serial operations - Solved with async/await and parallelization
- Blocking I/O - Solved with async operations and connection pooling
- Lack of caching - Solved with multi-level caching strategies
Scaling from thousands to millions of certificates requires architectural evolution:
- Queue-based processing for decoupling
- Batch operations for efficiency
- Aggressive caching at multiple levels
- Horizontal scaling with proper monitoring
The key: measure first, optimize second. Use metrics to identify actual bottlenecks rather than optimizing prematurely.