How to Build a Per-Tenant AI Agent Rate Limit Negotiation Pipeline That Dynamically Reclassifies Tenant Priority Tiers During Upstream Foundation Model Provider Outages

How to Build a Per-Tenant AI Agent Rate Limit Negotiation Pipeline That Dynamically Reclassifies Tenant Priority Tiers During Upstream Foundation Model Provider Outages

Here is a scenario that will feel familiar to any platform engineer running a multi-tenant AI product in 2026: it is 2:17 AM, your on-call alert fires, and your upstream foundation model provider (OpenAI, Anthropic, Google Gemini, or one of the newer players like xAI Grok API) is experiencing a partial outage. Your rate-limit budget is suddenly cut by 60%. Your enterprise tenants are screaming. Your free-tier users are hammering the retry queue. And your static priority configuration, written six months ago during a calm sprint, is making everything worse.

The hard truth is that static rate-limit tiers are a liability in a world of dynamic AI infrastructure. What you actually need is a negotiation pipeline: a system that continuously senses upstream capacity, reclassifies tenant priority tiers in real time, and redistributes available token budgets intelligently so that your most critical workloads survive the storm.

This tutorial walks you through designing and implementing exactly that. We will cover the architecture, the data models, the negotiation algorithm, the reclassification logic, and the operational runbooks that tie it all together. Code examples are in Python, but the concepts apply to any stack.

Understanding the Problem Space

Before writing a single line of code, it is worth precisely defining what we are solving. A per-tenant rate limit negotiation pipeline has three distinct sub-problems:

  • Upstream capacity sensing: Detecting that a provider is degraded, throttled, or fully down, and quantifying the available headroom in real time.
  • Tenant priority reclassification: Dynamically adjusting which tenants get which share of the remaining capacity based on current business rules, SLA contracts, and live usage signals.
  • Negotiation and enforcement: Communicating new limits back to the agent layer, queuing or shedding excess requests, and recovering gracefully when capacity is restored.

Most platforms get the first part right (they have health checks), partially implement the third (they have a token bucket), and completely skip the second. That gap is what causes the 2 AM incident.

System Architecture Overview

The pipeline consists of five cooperating components. Think of them as a chain of responsibility:

  1. Upstream Health Probe (UHP): A lightweight poller that monitors provider health endpoints, latency percentiles, and error rates.
  2. Capacity Budget Ledger (CBL): A shared state store (Redis or a distributed KV store) that holds the current available token/request budget per provider and per model.
  3. Tenant Priority Engine (TPE): The brain. It reads tenant metadata, SLA contracts, real-time usage, and current budget to produce a dynamic priority score and a reclassified tier assignment.
  4. Rate Limit Negotiator (RLN): The component that translates tier assignments into concrete per-tenant rate limits and pushes them to the enforcement layer.
  5. Agent Gateway Enforcer (AGE): The API gateway middleware that enforces limits, manages queues, and returns appropriate responses (retry headers, degraded-mode flags, etc.) to tenant agents.

Here is a high-level diagram of how data flows through the system during a partial outage event:


  [Provider API] --> [UHP] --> [CBL]
                                 |
                            [TPE] (reads tenant DB + CBL)
                                 |
                            [RLN] (computes new limits)
                                 |
                            [AGE] (enforces + queues)
                                 |
                         [Tenant AI Agents]

Step 1: Build the Upstream Health Probe

The UHP needs to do more than just ping a health endpoint. It must produce a capacity confidence score between 0.0 and 1.0 that represents how much of the nominal provider capacity is actually available right now. Here is a production-grade implementation:


import asyncio
import time
import httpx
from dataclasses import dataclass, field
from collections import deque

@dataclass
class ProviderHealthSnapshot:
    provider_id: str
    timestamp: float
    http_error_rate: float       # 0.0 to 1.0
    p95_latency_ms: float
    rate_limit_header_remaining: int  # from x-ratelimit-remaining
    rate_limit_header_limit: int      # from x-ratelimit-limit
    capacity_confidence: float = 1.0  # computed

    def compute_confidence(self):
        # Weighted scoring model
        error_penalty   = self.http_error_rate * 0.5
        latency_penalty = min((self.p95_latency_ms / 5000.0), 1.0) * 0.2
        quota_ratio     = (
            self.rate_limit_header_remaining / max(self.rate_limit_header_limit, 1)
        )
        quota_penalty   = (1.0 - quota_ratio) * 0.3
        self.capacity_confidence = max(
            0.0,
            1.0 - error_penalty - latency_penalty - quota_penalty
        )
        return self.capacity_confidence


class UpstreamHealthProbe:
    def __init__(self, provider_id: str, probe_url: str, interval_seconds: int = 15):
        self.provider_id = provider_id
        self.probe_url = probe_url
        self.interval = interval_seconds
        self._history: deque = deque(maxlen=20)  # rolling 5-min window at 15s intervals
        self._latency_samples: deque = deque(maxlen=20)
        self._error_count = 0
        self._sample_count = 0

    async def probe_once(self) -> ProviderHealthSnapshot:
        start = time.monotonic()
        error_rate = 0.0
        remaining = 1000
        limit = 1000
        try:
            async with httpx.AsyncClient(timeout=5.0) as client:
                resp = await client.post(
                    self.probe_url,
                    json={"model": "probe", "messages": [{"role": "user", "content": "ping"}]},
                    headers={"Authorization": "Bearer "}
                )
                elapsed_ms = (time.monotonic() - start) * 1000
                self._latency_samples.append(elapsed_ms)
                if resp.status_code in (429, 500, 502, 503):
                    self._error_count += 1
                remaining = int(resp.headers.get("x-ratelimit-remaining-requests", 1000))
                limit     = int(resp.headers.get("x-ratelimit-limit-requests", 1000))
        except Exception:
            self._error_count += 1
            elapsed_ms = 5000.0
            self._latency_samples.append(elapsed_ms)
        finally:
            self._sample_count += 1

        error_rate = self._error_count / max(self._sample_count, 1)
        sorted_latencies = sorted(self._latency_samples)
        p95_idx = int(len(sorted_latencies) * 0.95)
        p95_latency = sorted_latencies[min(p95_idx, len(sorted_latencies) - 1)]

        snapshot = ProviderHealthSnapshot(
            provider_id=self.provider_id,
            timestamp=time.time(),
            http_error_rate=error_rate,
            p95_latency_ms=p95_latency,
            rate_limit_header_remaining=remaining,
            rate_limit_header_limit=limit,
        )
        snapshot.compute_confidence()
        self._history.append(snapshot)
        return snapshot

    async def run_loop(self, on_snapshot_callback):
        while True:
            snapshot = await self.probe_once()
            await on_snapshot_callback(snapshot)
            await asyncio.sleep(self.interval)

The key insight here is the three-factor confidence score: error rate, latency degradation, and remaining quota ratio are all signals of different failure modes. An outage that throttles you silently (429s with no error rate spike) will still be caught by the quota ratio signal.

Step 2: Design the Capacity Budget Ledger

The CBL is a Redis-backed shared state store. Every component in the pipeline reads from and writes to it. The schema is simple but must be carefully designed to avoid race conditions:


import redis.asyncio as aioredis
import json
import time

class CapacityBudgetLedger:
    def __init__(self, redis_url: str):
        self.redis = aioredis.from_url(redis_url)
        self.TTL_SECONDS = 60  # snapshots expire after 60s

    async def write_snapshot(self, snapshot):
        key = f"uhp:snapshot:{snapshot.provider_id}"
        payload = {
            "confidence": snapshot.capacity_confidence,
            "p95_latency_ms": snapshot.p95_latency_ms,
            "remaining": snapshot.rate_limit_header_remaining,
            "limit": snapshot.rate_limit_header_limit,
            "ts": snapshot.timestamp,
        }
        await self.redis.setex(key, self.TTL_SECONDS, json.dumps(payload))

    async def get_confidence(self, provider_id: str) -> float:
        key = f"uhp:snapshot:{provider_id}"
        raw = await self.redis.get(key)
        if not raw:
            return 1.0  # assume healthy if no data
        data = json.loads(raw)
        # Penalize stale data
        age_seconds = time.time() - data["ts"]
        staleness_penalty = min(age_seconds / 120.0, 0.5)
        return max(0.0, data["confidence"] - staleness_penalty)

    async def set_tenant_allocation(self, tenant_id: str, provider_id: str, rps: float, tpm: int):
        key = f"alloc:{provider_id}:{tenant_id}"
        payload = {"rps": rps, "tpm": tpm, "ts": time.time()}
        await self.redis.setex(key, 300, json.dumps(payload))

    async def get_tenant_allocation(self, tenant_id: str, provider_id: str):
        key = f"alloc:{provider_id}:{tenant_id}"
        raw = await self.redis.get(key)
        if not raw:
            return None
        return json.loads(raw)

Step 3: Build the Tenant Priority Engine

This is the most critical and most underengineered component in most platforms. The TPE must answer one question: given the current upstream capacity confidence, what priority tier does each tenant belong to right now?

The reclassification logic should consider at least four inputs:

  • Static SLA tier: The contractual tier the tenant paid for (e.g., Enterprise, Business, Starter, Free).
  • Real-time usage pressure: How much of their current allocation is the tenant actively consuming? A tenant burning 95% of their quota is higher priority to protect than one at 10%.
  • Business value signal: Revenue contribution, contract ARR, or a manually configured importance weight.
  • Outage sensitivity: Does this tenant have a declared critical workload (e.g., a real-time customer-facing agent vs. a nightly batch job)?

from dataclasses import dataclass
from enum import Enum
from typing import List

class PriorityTier(Enum):
    CRITICAL   = 4   # Protected at all costs. Never shed.
    HIGH       = 3   # Shed only if confidence < 0.3
    MEDIUM     = 2   # Shed if confidence < 0.5
    LOW        = 1   # Shed if confidence < 0.7
    SUSPENDED  = 0   # No capacity allocated during outage

@dataclass
class TenantProfile:
    tenant_id: str
    static_tier: str          # "enterprise", "business", "starter", "free"
    arr_usd: float            # Annual recurring revenue
    is_realtime_workload: bool
    current_usage_ratio: float  # 0.0 to 1.0 (usage / allocation)
    importance_weight: float    # 0.0 to 1.0, manually configured

    def compute_priority_score(self) -> float:
        tier_base = {
            "enterprise": 0.7,
            "business":   0.5,
            "starter":    0.3,
            "free":       0.1,
        }.get(self.static_tier, 0.1)

        arr_score       = min(self.arr_usd / 500_000.0, 1.0) * 0.15
        realtime_bonus  = 0.10 if self.is_realtime_workload else 0.0
        usage_urgency   = self.current_usage_ratio * 0.05
        manual_weight   = self.importance_weight * 0.10

        return min(1.0, tier_base + arr_score + realtime_bonus + usage_urgency + manual_weight)


class TenantPriorityEngine:
    def __init__(self, ledger: CapacityBudgetLedger):
        self.ledger = ledger

    def reclassify(
        self,
        tenant: TenantProfile,
        capacity_confidence: float
    ) -> PriorityTier:
        score = tenant.compute_priority_score()

        # During full health (confidence >= 0.9), use static tiers
        if capacity_confidence >= 0.9:
            return self._static_to_tier(tenant.static_tier)

        # Dynamic reclassification during degraded capacity
        if capacity_confidence < 0.15:
            # Severe outage: only CRITICAL tenants get anything
            return PriorityTier.CRITICAL if score >= 0.85 else PriorityTier.SUSPENDED

        if capacity_confidence < 0.40:
            if score >= 0.85: return PriorityTier.CRITICAL
            if score >= 0.65: return PriorityTier.HIGH
            return PriorityTier.SUSPENDED

        if capacity_confidence < 0.65:
            if score >= 0.85: return PriorityTier.CRITICAL
            if score >= 0.65: return PriorityTier.HIGH
            if score >= 0.45: return PriorityTier.MEDIUM
            return PriorityTier.SUSPENDED

        # Moderate degradation (0.65 to 0.90)
        if score >= 0.85: return PriorityTier.CRITICAL
        if score >= 0.65: return PriorityTier.HIGH
        if score >= 0.45: return PriorityTier.MEDIUM
        if score >= 0.25: return PriorityTier.LOW
        return PriorityTier.SUSPENDED

    def _static_to_tier(self, static_tier: str) -> PriorityTier:
        return {
            "enterprise": PriorityTier.HIGH,
            "business":   PriorityTier.MEDIUM,
            "starter":    PriorityTier.LOW,
            "free":       PriorityTier.LOW,
        }.get(static_tier, PriorityTier.LOW)

    async def reclassify_all(
        self,
        tenants: List[TenantProfile],
        provider_id: str
    ) -> dict:
        confidence = await self.ledger.get_confidence(provider_id)
        return {
            t.tenant_id: self.reclassify(t, confidence)
            for t in tenants
        }

Notice the deliberate design decision: during normal operations, the engine defers to static SLA tiers. The dynamic reclassification only kicks in when the capacity confidence drops below 0.9. This prevents unnecessary churn in tenant allocations during healthy periods.

Step 4: Implement the Rate Limit Negotiator

The RLN translates priority tier assignments into concrete token-per-minute (TPM) and requests-per-second (RPS) budgets. It uses a proportional allocation algorithm weighted by tier, not a flat percentage cut:


from typing import Dict

# Tier weights for proportional allocation
TIER_WEIGHTS = {
    PriorityTier.CRITICAL:   1.00,
    PriorityTier.HIGH:       0.60,
    PriorityTier.MEDIUM:     0.30,
    PriorityTier.LOW:        0.10,
    PriorityTier.SUSPENDED:  0.00,
}

class RateLimitNegotiator:
    def __init__(self, ledger: CapacityBudgetLedger):
        self.ledger = ledger

    async def negotiate(
        self,
        tier_assignments: Dict[str, PriorityTier],
        provider_id: str,
        total_available_tpm: int,
        total_available_rps: float,
    ) -> Dict[str, dict]:
        # Compute weighted denominator
        total_weight = sum(
            TIER_WEIGHTS[tier]
            for tier in tier_assignments.values()
        )
        if total_weight == 0:
            return {}

        allocations = {}
        for tenant_id, tier in tier_assignments.items():
            weight = TIER_WEIGHTS[tier]
            if weight == 0:
                allocations[tenant_id] = {"tpm": 0, "rps": 0.0, "tier": tier.name}
                continue

            tenant_tpm = int((weight / total_weight) * total_available_tpm)
            tenant_rps = round((weight / total_weight) * total_available_rps, 2)

            allocations[tenant_id] = {
                "tpm": tenant_tpm,
                "rps": tenant_rps,
                "tier": tier.name
            }

            # Persist to ledger for enforcement layer
            await self.ledger.set_tenant_allocation(
                tenant_id, provider_id, tenant_rps, tenant_tpm
            )

        return allocations

One important nuance: CRITICAL tenants always receive their full nominal allocation first, before proportional distribution happens. You should pre-reserve their budget from the total pool before running the proportional algorithm on the remainder. This is a simple but important modification to the above code for production use.

Step 5: Build the Agent Gateway Enforcer

The AGE is the enforcement layer. It sits as middleware in your API gateway (FastAPI, Kong, Envoy, or similar) and applies the allocations stored in the CBL. Here is a FastAPI middleware implementation using a sliding window token bucket:


import asyncio
import time
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware

class AgentGatewayEnforcer(BaseHTTPMiddleware):
    def __init__(self, app, ledger: CapacityBudgetLedger, provider_id: str):
        super().__init__(app)
        self.ledger = ledger
        self.provider_id = provider_id

    async def dispatch(self, request: Request, call_next):
        tenant_id = request.headers.get("X-Tenant-ID")
        if not tenant_id:
            raise HTTPException(status_code=400, detail="Missing X-Tenant-ID header")

        alloc = await self.ledger.get_tenant_allocation(tenant_id, self.provider_id)

        if alloc is None:
            # No allocation means the negotiator hasn't run yet; allow with caution
            return await call_next(request)

        if alloc["rps"] == 0:
            # Tenant is SUSPENDED during outage
            raise HTTPException(
                status_code=503,
                detail="Service temporarily unavailable for your tier during provider degradation.",
                headers={
                    "Retry-After": "60",
                    "X-Outage-Mode": "true",
                    "X-Tier": alloc.get("tier", "SUSPENDED"),
                }
            )

        # Sliding window rate check using Redis
        now = time.time()
        window_key = f"ratelimit:{self.provider_id}:{tenant_id}:rps"
        allowed = await self._check_sliding_window(window_key, alloc["rps"], now)

        if not allowed:
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded for your current priority tier.",
                headers={
                    "Retry-After": str(int(1.0 / max(alloc["rps"], 0.01))),
                    "X-RateLimit-Limit": str(alloc["rps"]),
                    "X-Tier": alloc.get("tier", "UNKNOWN"),
                }
            )

        response = await call_next(request)
        return response

    async def _check_sliding_window(
        self, key: str, rps_limit: float, now: float
    ) -> bool:
        window = 1.0  # 1-second window
        pipe = self.ledger.redis.pipeline()
        pipe.zremrangebyscore(key, 0, now - window)
        pipe.zadd(key, {str(now): now})
        pipe.zcard(key)
        pipe.expire(key, 10)
        results = await pipe.execute()
        request_count = results[2]
        return request_count <= rps_limit

Step 6: Wire the Pipeline Together with an Orchestrator

All five components need to run in a coordinated loop. Here is the orchestrator that ties everything together:


import asyncio
from typing import List

class RateLimitNegotiationPipeline:
    def __init__(
        self,
        probes: List[UpstreamHealthProbe],
        ledger: CapacityBudgetLedger,
        tpe: TenantPriorityEngine,
        rln: RateLimitNegotiator,
        tenant_loader,        # async callable returning List[TenantProfile]
        provider_capacities,  # dict: provider_id -> {tpm, rps}
        reclassify_interval: int = 30,
    ):
        self.probes = probes
        self.ledger = ledger
        self.tpe = tpe
        self.rln = rln
        self.tenant_loader = tenant_loader
        self.provider_capacities = provider_capacities
        self.reclassify_interval = reclassify_interval

    async def on_health_snapshot(self, snapshot):
        await self.ledger.write_snapshot(snapshot)

    async def reclassify_loop(self):
        while True:
            tenants = await self.tenant_loader()
            for provider_id, caps in self.provider_capacities.items():
                tier_assignments = await self.tpe.reclassify_all(tenants, provider_id)
                await self.rln.negotiate(
                    tier_assignments,
                    provider_id,
                    total_available_tpm=caps["tpm"],
                    total_available_rps=caps["rps"],
                )
            await asyncio.sleep(self.reclassify_interval)

    async def run(self):
        probe_tasks = [
            asyncio.create_task(probe.run_loop(self.on_health_snapshot))
            for probe in self.probes
        ]
        reclassify_task = asyncio.create_task(self.reclassify_loop())
        await asyncio.gather(*probe_tasks, reclassify_task)

Set reclassify_interval to 30 seconds during normal operations. When you detect a confidence drop below 0.5, you should dynamically tighten this to 10 seconds for faster response. You can implement this as a simple adaptive interval inside reclassify_loop.

Handling the Recovery Phase

The recovery phase is just as important as the degradation phase, and it is where many implementations fail. A naive implementation will immediately restore all tenants to their full allocations the moment confidence crosses back above 0.9, causing a thundering herd problem as every suspended tenant floods the provider simultaneously.

Instead, implement a staggered recovery ramp:

  • Phase 1 (confidence 0.5 to 0.7): Restore CRITICAL and HIGH tiers only. Keep MEDIUM and LOW suspended.
  • Phase 2 (confidence 0.7 to 0.85): Restore MEDIUM at 50% of nominal allocation. Restore LOW at 0%.
  • Phase 3 (confidence 0.85 to 0.95): Restore MEDIUM to 100%, restore LOW at 25%.
  • Phase 4 (confidence above 0.95, sustained for 2 minutes): Full restoration for all tiers.

The "sustained for 2 minutes" condition on Phase 4 is critical. It prevents flapping when a provider is oscillating between healthy and degraded states.

Operational Runbook and Observability

No pipeline is complete without observability. You should emit the following metrics from every component:

  • uhp.capacity_confidence{provider_id}: A gauge from 0.0 to 1.0. Alert when below 0.6 for more than 2 minutes.
  • tpe.tier_assignment_change{tenant_id, old_tier, new_tier}: A counter. Alert on-call when more than 10% of enterprise tenants are downgraded in a single reclassification cycle.
  • rln.allocation_tpm{tenant_id, provider_id}: A gauge. Track allocation drift over time.
  • age.requests_shed{tenant_id, reason}: A counter. This is your SLA breach signal.
  • pipeline.reclassify_duration_ms: A histogram. If this exceeds your reclassify interval, you have a scaling problem.

For your runbook, define explicit escalation thresholds. A confidence score below 0.3 for more than 5 minutes should trigger automatic failover to a secondary provider if one is configured. Do not wait for a human to make that call at 2 AM.

Key Design Decisions and Trade-offs

Before you ship this to production, there are several important trade-offs to document for your team:

Reclassification frequency vs. stability

Reclassifying every 10 seconds gives you fast response but creates instability for tenants whose agents are mid-conversation. Consider adding a hysteresis buffer: a tenant's tier can only be downgraded if their new score has been below the threshold for at least two consecutive reclassification cycles. Upgrades, however, can be immediate.

Transparency with tenants

Enterprise tenants will notice when their allocations change. Build a status page or webhook notification system that fires when a tenant's tier is reclassified downward. The X-Tier and X-Outage-Mode headers in the AGE are a start, but proactive communication is far better than a confused support ticket.

Multi-provider fallback vs. tier reclassification

This pipeline assumes you are managing capacity on a single provider during an outage. In practice, you should combine it with a multi-provider router: CRITICAL tenants should be failed over to a secondary provider (e.g., from OpenAI to Anthropic Claude or Google Gemini) rather than just protected on a degraded primary. The TPE score is a great input signal for routing decisions as well.

Conclusion

Building a per-tenant AI agent rate limit negotiation pipeline is one of the highest-leverage infrastructure investments you can make in 2026. The days of static token buckets and manual priority lists are over. As foundation model providers continue to scale under enormous demand, partial outages and capacity constraints are not edge cases; they are routine operational events that your platform must handle gracefully and automatically.

The architecture described here gives you a principled, observable, and recoverable system that protects your most critical tenants, treats lower-tier tenants fairly, and recovers from outages without creating new problems. Start with the UHP and CBL, validate your confidence scoring against historical outage data, then layer in the TPE and RLN incrementally.

The most important thing is to design the reclassification logic before the outage happens, not during it. Your 2 AM self will thank you.