How to Build a Per-Tenant AI Agent Memory Eviction and Context Pruning Pipeline for Multi-Tenant LLM Platforms

How to Build a Per-Tenant AI Agent Memory Eviction and Context Pruning Pipeline for Multi-Tenant LLM Platforms

Long-running AI agent sessions are quietly bankrupting token budgets across multi-tenant LLM platforms. If you are operating a shared infrastructure where dozens or hundreds of tenants run concurrent agentic workflows, you have almost certainly hit the wall: a session that started as a focused task assistant has ballooned into a 200,000-token context window monster, competing for vector store I/O with every other tenant on the cluster. Left unmanaged, this kills latency, inflates costs, and can cause one tenant's runaway session to degrade quality for everyone else.

This guide walks you through building a per-tenant AI agent memory eviction and context pruning pipeline: a production-grade system that enforces token budget limits at the tenant level, intelligently prunes agent context, and manages eviction from shared vector store infrastructure without dropping critical memories or breaking agent coherence. We will cover the architecture, the data structures, the eviction algorithms, and the code you need to ship this today.

Understanding the Problem Space

Before writing a single line of code, it helps to be precise about what we are solving. In a multi-tenant LLM platform, each tenant typically runs one or more agent sessions. A session accumulates context across three layers:

  • In-context memory: The raw token window passed directly to the LLM on each inference call. This is the most expensive and the most constrained resource.
  • Working memory (short-term store): A fast key-value or document store (Redis, DynamoDB, etc.) that holds recent turns, tool call results, and scratchpad state for the active session.
  • Long-term memory (vector store): A shared vector database (Pinecone, Weaviate, Qdrant, pgvector, etc.) where semantically meaningful memories are embedded and persisted across sessions.

The crisis point arrives when a long-running session continuously appends to all three layers without eviction. The in-context window fills up, forcing either a hard truncation (which breaks coherence) or an expensive re-summarization. Meanwhile, the vector store accumulates stale, redundant, or low-value embeddings that consume storage quota and degrade retrieval precision for every tenant sharing that namespace or collection.

The key insight is that eviction and pruning are not the same thing, and conflating them is the most common architectural mistake:

  • Pruning is the act of reducing what is actively in the token context window for the current inference call. It is ephemeral and session-local.
  • Eviction is the act of removing or archiving records from the working memory or vector store. It has durable, cross-session consequences and must be tenant-aware.

Designing the Per-Tenant Budget Model

The foundation of the entire pipeline is a tenant budget manifest: a configuration object that defines the resource envelope for each tenant. Store this in a fast config store (etcd, Consul, or a simple Redis hash) so agents can read it at session initialization and on every turn.

The Tenant Budget Schema


{
  "tenant_id": "acme-corp",
  "context_window_budget_tokens": 32000,
  "working_memory_max_turns": 50,
  "vector_store_namespace": "acme-corp-memories",
  "vector_store_max_records": 10000,
  "vector_store_eviction_policy": "lru_scored",
  "pruning_strategy": "hierarchical_summarization",
  "summarization_model": "gpt-4o-mini",
  "priority_tier": "standard",
  "session_ttl_seconds": 86400,
  "eviction_watermark_pct": 0.85
}

A few fields here deserve explanation. The eviction_watermark_pct (set to 0.85 above) is the threshold at which eviction kicks in proactively, before the hard limit is hit. This is critical: you never want to evict reactively under load, because that adds latency to a live inference call. The priority_tier field allows you to implement QoS differentiation, giving enterprise tenants larger budgets and softer eviction curves than free-tier tenants.

Tracking Budget Consumption in Real Time

Each agent turn must update a lightweight budget tracker. Use an atomic counter in Redis with a TTL matching the session TTL:


# Python pseudocode using redis-py
import redis
import tiktoken

r = redis.Redis(host="localhost", port=6379, decode_responses=True)
enc = tiktoken.get_encoding("cl100k_base")

def track_turn_tokens(tenant_id: str, session_id: str, text: str) -> int:
    token_count = len(enc.encode(text))
    key = f"budget:{tenant_id}:{session_id}:tokens_used"
    new_total = r.incrby(key, token_count)
    r.expire(key, 86400)  # align with session TTL
    return new_total

def get_budget_utilization(tenant_id: str, session_id: str, budget: int) -> float:
    key = f"budget:{tenant_id}:{session_id}:tokens_used"
    used = int(r.get(key) or 0)
    return used / budget

This gives you a real-time utilization ratio on every turn. When this ratio crosses the eviction_watermark_pct, you enqueue an asynchronous eviction job rather than blocking the inference path.

Building the Context Pruning Layer

Pruning operates on the in-context window before each LLM call. The goal is to fit the most relevant and coherent subset of the session history into the available token budget. There are three viable strategies, and the right choice depends on your tenant's workload profile.

Strategy 1: Sliding Window Truncation (Fast, Lossy)

The simplest approach: keep the last N turns and discard everything older. This is appropriate for conversational agents where recency dominates relevance. It is fast (O(1) lookup) and introduces no additional LLM calls. The downside is that it loses all long-range context, which is catastrophic for agents running multi-step research or coding workflows.


def sliding_window_prune(turns: list[dict], max_tokens: int, encoder) -> list[dict]:
    pruned = []
    token_count = 0
    # Always preserve the system prompt (index 0)
    system_prompt = turns[0]
    system_tokens = len(encoder.encode(system_prompt["content"]))
    budget = max_tokens - system_tokens

    for turn in reversed(turns[1:]):
        turn_tokens = len(encoder.encode(turn["content"]))
        if token_count + turn_tokens > budget:
            break
        pruned.insert(0, turn)
        token_count += turn_tokens

    return [system_prompt] + pruned

Strategy 2: Relevance-Scored Pruning (Balanced)

Rather than keeping the most recent turns, score every turn in the context window against the current user query using a fast embedding similarity check. Keep the system prompt, the most recent two turns (for coherence), and then fill the remaining budget with the highest-scoring historical turns. This works well for task-oriented agents where the user may reference earlier decisions.


from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer("all-MiniLM-L6-v2")  # fast, lightweight scorer

def relevance_scored_prune(
    turns: list[dict],
    current_query: str,
    max_tokens: int,
    encoder,
    recency_window: int = 2
) -> list[dict]:
    if len(turns) <= recency_window + 1:
        return turns

    system_prompt = turns[0]
    recent_turns = turns[-recency_window:]
    candidate_turns = turns[1:-recency_window]

    if not candidate_turns:
        return turns

    query_embedding = model.encode(current_query)
    candidate_texts = [t["content"] for t in candidate_turns]
    candidate_embeddings = model.encode(candidate_texts)

    scores = np.dot(candidate_embeddings, query_embedding) / (
        np.linalg.norm(candidate_embeddings, axis=1) * np.linalg.norm(query_embedding) + 1e-8
    )

    system_tokens = len(encoder.encode(system_prompt["content"]))
    recent_tokens = sum(len(encoder.encode(t["content"])) for t in recent_turns)
    budget = max_tokens - system_tokens - recent_tokens

    ranked = sorted(zip(scores, candidate_turns), key=lambda x: x[0], reverse=True)
    selected = []
    used = 0
    for score, turn in ranked:
        t = len(encoder.encode(turn["content"]))
        if used + t > budget:
            continue
        selected.append(turn)
        used += t

    # Re-sort selected turns by original order to maintain narrative coherence
    original_indices = {id(t): i for i, t in enumerate(candidate_turns)}
    selected.sort(key=lambda t: original_indices[id(t)])

    return [system_prompt] + selected + recent_turns

Strategy 3: Hierarchical Summarization (High Fidelity, Higher Cost)

When neither truncation nor relevance scoring is acceptable (for example, in legal, medical, or financial agent workflows where every prior decision matters), you need hierarchical summarization. The idea is to compress older context segments into progressively denser summaries rather than dropping them.

The pipeline works as follows:

  1. Segment the turn history into chunks of N turns each.
  2. Summarize each chunk using a smaller, cheaper model (GPT-4o mini, Gemini Flash, or a self-hosted Llama variant).
  3. Replace the original chunk with its summary in the context window.
  4. Store the original chunk in the working memory store for potential retrieval.
  5. Repeat until the total token count is within budget.

async def hierarchical_summarize(
    turns: list[dict],
    max_tokens: int,
    encoder,
    summarizer_client,  # async LLM client
    chunk_size: int = 10
) -> list[dict]:
    system_prompt = turns[0]
    history = turns[1:]

    while True:
        total = sum(len(encoder.encode(t["content"])) for t in [system_prompt] + history)
        if total <= max_tokens:
            break

        # Summarize the oldest chunk
        chunk = history[:chunk_size]
        chunk_text = "\n".join(f"{t['role']}: {t['content']}" for t in chunk)
        summary_prompt = (
            f"Summarize the following conversation segment concisely, "
            f"preserving all decisions, facts, and action items:\n\n{chunk_text}"
        )
        summary_response = await summarizer_client.complete(summary_prompt)
        summary_turn = {
            "role": "system",
            "content": f"[SUMMARY OF EARLIER CONTEXT]: {summary_response}",
            "is_summary": True
        }
        history = [summary_turn] + history[chunk_size:]

    return [system_prompt] + history

The cost of this strategy is latency and additional LLM API spend. Mitigate this by running summarization asynchronously in a background worker, caching summaries, and only triggering it when utilization crosses the watermark threshold, not on every turn.

Building the Vector Store Eviction Pipeline

The vector store eviction problem is harder than context pruning because it is shared infrastructure. Evicting a record from a shared Pinecone index or Qdrant collection affects retrieval quality for that tenant's future sessions. You need a principled eviction policy that balances storage efficiency against memory fidelity.

The Eviction Metadata Schema

Every record you write to the vector store must carry eviction-relevant metadata. Without this, you are flying blind. At minimum, store the following alongside each embedding:


{
  "id": "mem_abc123",
  "tenant_id": "acme-corp",
  "session_id": "sess_xyz789",
  "created_at": 1740000000,
  "last_accessed_at": 1740050000,
  "access_count": 7,
  "importance_score": 0.82,
  "memory_type": "fact",  // "fact" | "decision" | "tool_result" | "conversation"
  "token_length": 312,
  "is_pinned": false
}

The importance_score is the most consequential field. Compute it at write time using a lightweight classifier or a simple heuristic: tool call results that changed agent behavior score higher, user-confirmed facts score higher, raw conversation turns score lower. The is_pinned flag lets agents explicitly protect critical memories from eviction, which is useful for things like user preferences or security constraints.

The LRU-Scored Eviction Algorithm

Pure LRU eviction is dangerous in agent systems because a memory that was accessed once early in a long session and never retrieved again looks "old" but may be critically important. Instead, use a composite eviction score that blends recency, access frequency, importance, and token cost:


import math
import time

def compute_eviction_score(record: dict, now: float = None) -> float:
    """
    Lower score = higher eviction priority.
    Returns a float in [0, 1].
    """
    if now is None:
        now = time.time()

    if record.get("is_pinned"):
        return 1.0  # Never evict pinned memories

    # Recency: decay over 7 days
    age_seconds = now - record["last_accessed_at"]
    recency = math.exp(-age_seconds / (7 * 86400))

    # Frequency: log-scaled access count, normalized to [0,1] assuming max ~100 accesses
    frequency = math.log1p(record["access_count"]) / math.log1p(100)

    # Importance: direct from metadata
    importance = record.get("importance_score", 0.5)

    # Token cost penalty: prefer evicting large records when storage is tight
    token_penalty = 1.0 - min(record["token_length"] / 2000, 1.0) * 0.2

    # Weighted composite
    score = (
        0.35 * recency +
        0.25 * frequency +
        0.30 * importance +
        0.10 * token_penalty
    )
    return score

def select_eviction_candidates(
    records: list[dict],
    target_free_count: int
) -> list[str]:
    scored = [(compute_eviction_score(r), r["id"]) for r in records]
    scored.sort(key=lambda x: x[0])  # ascending: lowest score evicted first
    return [record_id for _, record_id in scored[:target_free_count]]

The Eviction Worker

Never run eviction synchronously on the inference path. Instead, use a background worker that polls tenant utilization and triggers eviction when the watermark is crossed. Here is a minimal Celery-based implementation pattern:


from celery import Celery
from typing import Any

app = Celery("eviction_worker", broker="redis://localhost:6379/1")

@app.task(bind=True, max_retries=3)
def run_tenant_eviction(self, tenant_id: str, vector_client: Any, config: dict):
    try:
        namespace = config["vector_store_namespace"]
        max_records = config["vector_store_max_records"]
        watermark = config["eviction_watermark_pct"]

        # Fetch all records for this tenant (use metadata filtering, not full scan)
        records = vector_client.list_records(
            namespace=namespace,
            filter={"tenant_id": tenant_id},
            include_metadata=True
        )

        current_count = len(records)
        watermark_count = int(max_records * watermark)

        if current_count < watermark_count:
            return {"status": "no_eviction_needed", "count": current_count}

        target_free = current_count - int(max_records * 0.70)  # evict down to 70%
        candidates = select_eviction_candidates(records, target_free)

        # Archive before deletion (write to cold storage: S3, GCS, etc.)
        archive_memories(tenant_id, candidates)

        # Delete from vector store
        vector_client.delete(ids=candidates, namespace=namespace)

        return {
            "status": "evicted",
            "tenant_id": tenant_id,
            "evicted_count": len(candidates),
            "remaining_count": current_count - len(candidates)
        }

    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)

The archive_memories call is important. Do not simply delete evicted memories. Write them to a cold storage tier (S3 or GCS as compressed JSON-L files, keyed by tenant and date) so you can rehydrate them if needed. This also satisfies data retention requirements for regulated industries.

Handling Cross-Tenant Isolation in Shared Vector Stores

One of the most dangerous failure modes in shared vector store infrastructure is namespace bleed: a misconfigured filter or a missing tenant tag that causes one tenant's eviction job to delete or retrieve another tenant's records. This is both a correctness problem and a compliance catastrophe.

Enforce isolation at multiple layers:

  1. Namespace-level isolation (preferred): Give each tenant their own namespace or collection. This provides hard isolation at the storage layer. The tradeoff is higher operational overhead and potentially worse index utilization at small tenant scales.
  2. Metadata filter isolation: If you use a shared namespace for cost efficiency, every read and write operation must include a tenant_id filter. Enforce this at the client wrapper level, not at the application level, so individual teams cannot accidentally omit it.
  3. Eviction job scoping: The eviction worker must receive the tenant_id as a mandatory parameter and validate it against the job's auth token before executing any deletions. Log every eviction operation with the tenant ID, record count, and operator identity for audit purposes.

class TenantScopedVectorClient:
    """A wrapper that enforces tenant isolation on all vector store operations."""

    def __init__(self, base_client, tenant_id: str):
        self._client = base_client
        self._tenant_id = tenant_id

    def upsert(self, records: list[dict], namespace: str):
        for r in records:
            if r.get("metadata", {}).get("tenant_id") != self._tenant_id:
                raise ValueError(
                    f"Record tenant_id mismatch. Expected {self._tenant_id}."
                )
        return self._client.upsert(records, namespace=namespace)

    def query(self, vector, top_k: int, namespace: str, **kwargs):
        return self._client.query(
            vector=vector,
            top_k=top_k,
            namespace=namespace,
            filter={"tenant_id": {"$eq": self._tenant_id}},
            **kwargs
        )

    def delete(self, ids: list[str], namespace: str):
        # Verify ownership before deletion
        records = self._client.fetch(ids=ids, namespace=namespace)
        for record_id, record in records.items():
            if record["metadata"].get("tenant_id") != self._tenant_id:
                raise PermissionError(
                    f"Tenant {self._tenant_id} cannot delete record {record_id}."
                )
        return self._client.delete(ids=ids, namespace=namespace)

Wiring It All Together: The Pipeline Orchestrator

With all the components built, you need an orchestrator that runs on every agent turn and coordinates pruning, budget tracking, and eviction triggering. Here is the complete flow:


async def agent_turn_pipeline(
    tenant_id: str,
    session_id: str,
    user_message: str,
    context_turns: list[dict],
    tenant_config: dict,
    vector_client: TenantScopedVectorClient,
    llm_client,
    encoder
) -> dict:

    # 1. Track token budget
    utilization = get_budget_utilization(
        tenant_id, session_id, tenant_config["context_window_budget_tokens"]
    )

    # 2. Select and apply pruning strategy
    strategy = tenant_config["pruning_strategy"]
    max_tokens = tenant_config["context_window_budget_tokens"]

    if strategy == "sliding_window":
        pruned_context = sliding_window_prune(context_turns, max_tokens, encoder)
    elif strategy == "relevance_scored":
        pruned_context = relevance_scored_prune(
            context_turns, user_message, max_tokens, encoder
        )
    elif strategy == "hierarchical_summarization":
        pruned_context = await hierarchical_summarize(
            context_turns, max_tokens, encoder, llm_client
        )
    else:
        pruned_context = context_turns  # fallback: no pruning

    # 3. Retrieve relevant long-term memories from vector store
    query_embedding = embed(user_message)
    memories = vector_client.query(
        vector=query_embedding,
        top_k=5,
        namespace=tenant_config["vector_store_namespace"]
    )
    memory_context = format_memories_for_context(memories)

    # 4. Build final prompt and call LLM
    final_messages = inject_memories(pruned_context, memory_context)
    response = await llm_client.chat(messages=final_messages)

    # 5. Store new memory in vector store
    new_memory = {
        "id": generate_memory_id(),
        "values": embed(response.content),
        "metadata": {
            "tenant_id": tenant_id,
            "session_id": session_id,
            "content": response.content,
            "created_at": time.time(),
            "last_accessed_at": time.time(),
            "access_count": 0,
            "importance_score": score_importance(response),
            "memory_type": classify_memory_type(response),
            "token_length": len(encoder.encode(response.content)),
            "is_pinned": False
        }
    }
    vector_client.upsert([new_memory], namespace=tenant_config["vector_store_namespace"])
    track_turn_tokens(tenant_id, session_id, user_message + response.content)

    # 6. Trigger async eviction if watermark crossed
    new_utilization = get_budget_utilization(
        tenant_id, session_id, tenant_config["context_window_budget_tokens"]
    )
    if new_utilization >= tenant_config["eviction_watermark_pct"]:
        run_tenant_eviction.delay(tenant_id, vector_client, tenant_config)

    return {"response": response.content, "utilization": new_utilization}

Observability: What You Must Monitor

A pipeline this complex will fail silently if you do not instrument it properly. At minimum, emit the following metrics to your observability stack (Prometheus, Datadog, Grafana, etc.):

  • Token utilization per tenant per session: A gauge that lets you see which tenants are approaching their limits and how fast.
  • Pruning strategy invocation rate: Track how often each strategy fires. A spike in hierarchical summarization calls is a cost signal.
  • Eviction job latency and success rate: Eviction failures mean your vector store fills up silently. Alert on retry exhaustion.
  • Vector store record count per tenant: Track this as a gauge against the vector_store_max_records limit. Alert at 90% of limit.
  • Memory retrieval relevance scores: Log the similarity scores returned from vector queries. A declining average score indicates that eviction is removing too many high-value memories.
  • Context coherence proxy: Track LLM self-evaluation scores or user satisfaction signals segmented by sessions that triggered pruning versus those that did not. This tells you whether your pruning strategy is hurting answer quality.

Common Pitfalls and How to Avoid Them

Pitfall 1: Evicting During Peak Load

If your eviction worker shares the same Redis or vector store connection pool as your inference path, a heavy eviction job will starve inference requests of connections. Use separate connection pools, or better yet, a dedicated eviction service with its own infrastructure allocation. Rate-limit eviction jobs by tenant priority tier during peak hours.

Pitfall 2: Summarizing Summaries Recursively

In hierarchical summarization, if you are not careful, you will end up summarizing a summary of a summary, compounding information loss exponentially. Always track is_summary: true in your turn metadata and apply a different, more conservative compression ratio when summarizing summaries. Set a maximum recursion depth of two levels.

Pitfall 3: Ignoring Token Counting Discrepancies Across Models

Tiktoken's cl100k_base encoding is accurate for OpenAI models but will give you wrong counts for Anthropic Claude, Google Gemini, or open-source models. Maintain a model-specific token counter registry in your pipeline and select the right encoder based on the tenant's configured model. A 15% miscounting error on a 32,000-token budget means you will consistently over-provision or under-provision by nearly 5,000 tokens.

Pitfall 4: No Graceful Degradation for Eviction Failures

If your eviction worker is down and a tenant's vector store hits the hard limit, write operations will fail. This should not crash the agent. Implement a graceful degradation path: when vector store writes fail due to quota exhaustion, fall back to working memory only, log the event, and alert the on-call team. Never propagate a storage quota error directly to the end user.

Conclusion

Building a per-tenant AI agent memory eviction and context pruning pipeline is one of the most operationally demanding challenges in multi-tenant LLM platform engineering. But it is also one of the highest-leverage investments you can make. Done right, it gives you predictable cost curves, consistent latency under load, strong tenant isolation guarantees, and agent sessions that remain coherent and useful over days or weeks rather than burning out after a few hundred turns.

The architecture described here, combining a tenant budget manifest, a tiered pruning strategy selection, a composite-scored eviction algorithm, a tenant-scoped vector client, and an async eviction worker, is designed to be incrementally adoptable. You do not need to ship all of it at once. Start with the budget tracker and sliding window pruning. Add relevance scoring when you see quality complaints. Layer in hierarchical summarization for your highest-value tenants. Build the eviction pipeline once your vector store costs start to climb.

The agents your tenants run in 2026 are longer-lived, more capable, and more memory-hungry than anything we were managing two years ago. The infrastructure that supports them needs to grow up to match. This pipeline is how you make that happen without letting one tenant's ambitious workflow become everyone else's problem.