Advanced Integration

Production patterns, custom implementations, and advanced configurations

Multi-Connection Strategies

A/B Testing AI Providers

Create one Telepath connection per provider and route a percentage of calls to each. Compare latency, cost, and quality scores in the dashboard to determine the best fit for your use case.

python
import random

providers = [
    {"name": "openai",     "connection_id": "conn_primary",  "weight": 0.7},
    {"name": "elevenlabs", "connection_id": "conn_fallback", "weight": 0.3},
]

def select_provider(failed_providers=[]):
    available = [p for p in providers if p["name"] not in failed_providers]
    total = sum(p["weight"] for p in available)
    weights = [p["weight"] / total for p in available]
    return random.choices(available, weights=weights)[0]

Geo-Distributed Architecture

Route calls based on caller geography to minimise latency and meet data-residency requirements:

text
Caller in US  →  US Region  →  Telepath  →  OpenAI
Caller in EU  →  EU Region  →  Telepath  →  ElevenLabs

Set up regional carrier connections, route by caller location, and monitor per-region latency in the dashboard.

Fallback Chains

The select_provider pattern above also serves as a graceful fallback: if a provider fails, exclude it and re-select from the remaining pool. This avoids service interruptions during transient provider outages.

Custom WebSocket Implementation

Advanced Audio Processing

Process audio before forwarding to your AI agent for noise suppression, normalisation, or filtering:

python
import numpy as np
from scipy import signal

class AudioProcessor:
    def __init__(self, sample_rate=16000):
        self.sample_rate = sample_rate
        self.noise_profile = None

    async def process_inbound(self, audio_chunk):
        audio = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)

        # Estimate noise on first chunk
        if self.noise_profile is None:
            self.noise_profile = np.std(audio[:len(audio) // 10])

        # Normalise amplitude
        peak = np.max(np.abs(audio))
        if peak > 0:
            audio = audio / peak

        # High-pass filter: remove rumble below 300 Hz
        sos = signal.butter(4, 300, "hp", fs=self.sample_rate, output="sos")
        audio = signal.sosfilt(sos, audio)

        return (audio * 32767).astype(np.int16).tobytes()

Stateful Agent Implementation

Maintain conversation context across calls using per-caller profiles:

python
class StatefulAgent:
    def __init__(self):
        self.profiles = {}  # phone_number → profile cache

    async def handle_call(self, call_id, caller_number):
        profile = await self.load_profile(caller_number)

        system_prompt = f"""You are a helpful support agent.
User: {profile.get('name', 'Guest')} ({profile.get('account_type', 'free')})
Previous issues: {', '.join(profile.get('issues', [])) or 'None'}"""

        return await self.forward_to_ai(call_id, system_prompt)

    async def load_profile(self, phone_number):
        if phone_number not in self.profiles:
            self.profiles[phone_number] = await self.db.get_user_by_phone(phone_number)
        return self.profiles[phone_number]

High-Volume Scaling

Connection Pooling

python
import queue

class ConnectionPool:
    def __init__(self, size=10):
        self.pool = queue.Queue(maxsize=size)
        for _ in range(size):
            self.pool.put(self._create_connection())

    def _create_connection(self):
        return TelephonyBridge(api_key=os.environ["TELEPATH_API_KEY"])

    def acquire(self, timeout=5):
        return self.pool.get(timeout=timeout)

    def release(self, conn):
        self.pool.put(conn)

Rate Limiting

python
import asyncio, time

class RateLimiter:
    def __init__(self, calls_per_minute=5000):
        self.limit = calls_per_minute
        self.times = []

    async def wait_if_needed(self):
        now = time.time()
        self.times = [t for t in self.times if t > now - 60]
        if len(self.times) >= self.limit:
            await asyncio.sleep(self.times[0] + 60 - now)
        self.times.append(now)

Monitoring & Observability

Custom Metrics

python
from prometheus_client import Counter, Histogram, Gauge

calls_total = Counter("calls_total", "Total calls", ["provider", "status"])
call_latency = Histogram("call_latency_seconds", "Call latency", buckets=[.1,.5,1,2,5])
active_calls = Gauge("active_calls", "Currently active calls")

def on_call_start(provider):
    active_calls.inc()

def on_call_end(provider, status, duration_s):
    calls_total.labels(provider=provider, status=status).inc()
    call_latency.observe(duration_s)
    active_calls.dec()

Distributed Tracing

python
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def handle_call(call_id, caller_number, provider):
    with tracer.start_as_current_span("handle_call") as span:
        span.set_attribute("call.id", call_id)
        span.set_attribute("call.from", caller_number)
        span.set_attribute("call.provider", provider)

        with tracer.start_as_current_span("route_to_ai"):
            route_to_ai(call_id, provider)

Error Recovery Patterns

Retry with Exponential Backoff

python
import asyncio

def retry(max_retries=3, base_delay=1):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    await asyncio.sleep(delay)
        return wrapper
    return decorator

@retry(max_retries=3, base_delay=1)
async def call_ai_provider(prompt):
    return await openai_client.chat(prompt)

Circuit Breaker

Prevent cascading failures by stopping calls to a failing provider until it recovers:

python
from datetime import datetime
from enum import Enum

class State(Enum):
    CLOSED = "closed"       # Normal
    OPEN = "open"           # Failing — reject calls
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    def __init__(self, threshold=5, timeout_s=60):
        self.threshold = threshold
        self.timeout_s = timeout_s
        self.state = State.CLOSED
        self.failures = 0
        self.last_failure = None

    async def call(self, func, *args, **kwargs):
        if self.state == State.OPEN:
            elapsed = (datetime.now() - self.last_failure).seconds
            if elapsed > self.timeout_s:
                self.state = State.HALF_OPEN
            else:
                raise RuntimeError("Circuit breaker OPEN")
        try:
            result = await func(*args, **kwargs)
            self.failures = 0
            self.state = State.CLOSED
            return result
        except Exception:
            self.failures += 1
            self.last_failure = datetime.now()
            if self.failures >= self.threshold:
                self.state = State.OPEN
            raise

Security Best Practices

Encrypted Configuration Storage

python
from cryptography.fernet import Fernet
import json

class ConfigManager:
    def __init__(self, encryption_key: bytes):
        self.cipher = Fernet(encryption_key)

    def save(self, data: dict, path: str):
        encrypted = self.cipher.encrypt(json.dumps(data).encode())
        with open(path, "wb") as f:
            f.write(encrypted)

    def load(self, path: str) -> dict:
        with open(path, "rb") as f:
            return json.loads(self.cipher.decrypt(f.read()))

See Also

  • Security — credential management, encryption, and compliance
  • Webhooks — real-time call lifecycle notifications
  • API Reference — authentication and available endpoints