Advanced Integration
Production patterns, custom implementations, and advanced configurations
Multi-Connection Strategies
A/B Testing AI Providers
Create one Telepath connection per provider and route a percentage of calls to each. Compare latency, cost, and quality scores in the dashboard to determine the best fit for your use case.
import random
providers = [
{"name": "openai", "connection_id": "conn_primary", "weight": 0.7},
{"name": "elevenlabs", "connection_id": "conn_fallback", "weight": 0.3},
]
def select_provider(failed_providers=[]):
available = [p for p in providers if p["name"] not in failed_providers]
total = sum(p["weight"] for p in available)
weights = [p["weight"] / total for p in available]
return random.choices(available, weights=weights)[0]
Geo-Distributed Architecture
Route calls based on caller geography to minimise latency and meet data-residency requirements:
Caller in US → US Region → Telepath → OpenAI
Caller in EU → EU Region → Telepath → ElevenLabs
Set up regional carrier connections, route by caller location, and monitor per-region latency in the dashboard.
Fallback Chains
The select_provider pattern above also serves as a graceful fallback: if a provider fails, exclude it and re-select from the remaining pool. This avoids service interruptions during transient provider outages.
Custom WebSocket Implementation
Advanced Audio Processing
Process audio before forwarding to your AI agent for noise suppression, normalisation, or filtering:
import numpy as np
from scipy import signal
class AudioProcessor:
def __init__(self, sample_rate=16000):
self.sample_rate = sample_rate
self.noise_profile = None
async def process_inbound(self, audio_chunk):
audio = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)
# Estimate noise on first chunk
if self.noise_profile is None:
self.noise_profile = np.std(audio[:len(audio) // 10])
# Normalise amplitude
peak = np.max(np.abs(audio))
if peak > 0:
audio = audio / peak
# High-pass filter: remove rumble below 300 Hz
sos = signal.butter(4, 300, "hp", fs=self.sample_rate, output="sos")
audio = signal.sosfilt(sos, audio)
return (audio * 32767).astype(np.int16).tobytes()
Stateful Agent Implementation
Maintain conversation context across calls using per-caller profiles:
class StatefulAgent:
def __init__(self):
self.profiles = {} # phone_number → profile cache
async def handle_call(self, call_id, caller_number):
profile = await self.load_profile(caller_number)
system_prompt = f"""You are a helpful support agent.
User: {profile.get('name', 'Guest')} ({profile.get('account_type', 'free')})
Previous issues: {', '.join(profile.get('issues', [])) or 'None'}"""
return await self.forward_to_ai(call_id, system_prompt)
async def load_profile(self, phone_number):
if phone_number not in self.profiles:
self.profiles[phone_number] = await self.db.get_user_by_phone(phone_number)
return self.profiles[phone_number]
High-Volume Scaling
Connection Pooling
import queue
class ConnectionPool:
def __init__(self, size=10):
self.pool = queue.Queue(maxsize=size)
for _ in range(size):
self.pool.put(self._create_connection())
def _create_connection(self):
return TelephonyBridge(api_key=os.environ["TELEPATH_API_KEY"])
def acquire(self, timeout=5):
return self.pool.get(timeout=timeout)
def release(self, conn):
self.pool.put(conn)
Rate Limiting
import asyncio, time
class RateLimiter:
def __init__(self, calls_per_minute=5000):
self.limit = calls_per_minute
self.times = []
async def wait_if_needed(self):
now = time.time()
self.times = [t for t in self.times if t > now - 60]
if len(self.times) >= self.limit:
await asyncio.sleep(self.times[0] + 60 - now)
self.times.append(now)
Monitoring & Observability
Custom Metrics
from prometheus_client import Counter, Histogram, Gauge
calls_total = Counter("calls_total", "Total calls", ["provider", "status"])
call_latency = Histogram("call_latency_seconds", "Call latency", buckets=[.1,.5,1,2,5])
active_calls = Gauge("active_calls", "Currently active calls")
def on_call_start(provider):
active_calls.inc()
def on_call_end(provider, status, duration_s):
calls_total.labels(provider=provider, status=status).inc()
call_latency.observe(duration_s)
active_calls.dec()
Distributed Tracing
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def handle_call(call_id, caller_number, provider):
with tracer.start_as_current_span("handle_call") as span:
span.set_attribute("call.id", call_id)
span.set_attribute("call.from", caller_number)
span.set_attribute("call.provider", provider)
with tracer.start_as_current_span("route_to_ai"):
route_to_ai(call_id, provider)
Error Recovery Patterns
Retry with Exponential Backoff
import asyncio
def retry(max_retries=3, base_delay=1):
def decorator(func):
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
return wrapper
return decorator
@retry(max_retries=3, base_delay=1)
async def call_ai_provider(prompt):
return await openai_client.chat(prompt)
Circuit Breaker
Prevent cascading failures by stopping calls to a failing provider until it recovers:
from datetime import datetime
from enum import Enum
class State(Enum):
CLOSED = "closed" # Normal
OPEN = "open" # Failing — reject calls
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, threshold=5, timeout_s=60):
self.threshold = threshold
self.timeout_s = timeout_s
self.state = State.CLOSED
self.failures = 0
self.last_failure = None
async def call(self, func, *args, **kwargs):
if self.state == State.OPEN:
elapsed = (datetime.now() - self.last_failure).seconds
if elapsed > self.timeout_s:
self.state = State.HALF_OPEN
else:
raise RuntimeError("Circuit breaker OPEN")
try:
result = await func(*args, **kwargs)
self.failures = 0
self.state = State.CLOSED
return result
except Exception:
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= self.threshold:
self.state = State.OPEN
raise
Security Best Practices
Encrypted Configuration Storage
from cryptography.fernet import Fernet
import json
class ConfigManager:
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)
def save(self, data: dict, path: str):
encrypted = self.cipher.encrypt(json.dumps(data).encode())
with open(path, "wb") as f:
f.write(encrypted)
def load(self, path: str) -> dict:
with open(path, "rb") as f:
return json.loads(self.cipher.decrypt(f.read()))
See Also
- Security — credential management, encryption, and compliance
- Webhooks — real-time call lifecycle notifications
- API Reference — authentication and available endpoints