How to Build a Tenant-Scoped AI Agent Memory Architecture Using Vector Databases and TTL-Based Expiration Policies to Prevent Cross-Tenant Context Bleed in Multi-Tenant Backend Systems

How to Build a Tenant-Scoped AI Agent Memory Architecture Using Vector Databases and TTL-Based Expiration Policies to Prevent Cross-Tenant Context Bleed in Multi-Tenant Backend Systems

As AI agents become first-class citizens inside SaaS platforms, the engineering teams building them are running headfirst into a problem that traditional multi-tenant architectures never had to solve: memory that thinks. Unlike a relational database row that sits inertly behind a foreign key, an AI agent's memory is dynamic, fuzzy, and context-sensitive. It bleeds.

Cross-tenant context bleed, where semantic fragments from one tenant's conversation history surface in another tenant's AI responses, is not just a data privacy bug. It is a compliance catastrophe waiting to happen. In regulated industries like healthcare, finance, and legal tech, it is also a liability that can end a company.

This tutorial walks you through a production-grade architecture for tenant-scoped AI agent memory using vector databases, namespace partitioning, metadata filtering, and TTL-based expiration policies. By the end, you will have a concrete blueprint you can implement in any modern backend stack.

Understanding the Problem: Why AI Agent Memory Is Different

Traditional multi-tenancy is solved with a tenant ID column, a row-level security policy, or a separate database schema. The isolation model is discrete and deterministic. Vector databases break this assumption in several critical ways:

  • Approximate nearest-neighbor (ANN) search is probabilistic. A similarity query does not look up a row by ID. It finds the closest embedding vectors in a high-dimensional space. Without hard partitioning, a query from Tenant A can retrieve semantically similar vectors that belong to Tenant B.
  • Embeddings encode meaning, not ownership. Two tenants in the same industry (say, two competing law firms) may produce embeddings that cluster very close together in vector space. Cosine similarity does not respect business boundaries.
  • Memory accumulates over time. Unlike a stateless API call, agent memory grows. Stale, expired, or superseded context from old sessions can pollute future retrievals if there is no expiration strategy.
  • LLM context windows amplify leakage. Even a single foreign memory chunk injected into a prompt can cause the model to hallucinate tenant-specific facts, pricing, or policy details that belong to a different customer entirely.

The solution requires a layered approach: hard namespace isolation at the storage layer, metadata-enforced filtering at query time, and TTL-based expiration to prevent stale context from accumulating indefinitely.

Architecture Overview

Before diving into implementation, here is the high-level architecture you are building:

  • Memory Ingestion Pipeline: Converts agent conversation turns and retrieved documents into embeddings, tags them with tenant metadata, and writes them to the vector store.
  • Tenant-Scoped Namespace Layer: Enforces hard partitioning at the collection or namespace level inside the vector database.
  • Metadata Filter Layer: Adds a secondary, query-time enforcement layer using tenant ID and session ID as mandatory filter predicates.
  • TTL Manager: A background service that evaluates expiration timestamps on stored memory vectors and deletes or archives them on schedule.
  • Memory Retrieval API: A thin service layer that the AI agent calls to fetch relevant context, enforcing both namespace and metadata isolation before returning results.

The examples in this guide use Python, Qdrant as the vector database (though the patterns apply equally to Pinecone, Weaviate, and pgvector), and OpenAI embeddings. The backend framework is FastAPI.

Step 1: Design Your Tenant Memory Schema

Every memory vector you store must carry a metadata payload that enables both isolation and expiration. Define a strict schema before you write a single vector.


# memory_schema.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import uuid

class MemoryPayload(BaseModel):
    tenant_id: str                        # Hard partition key ,  NEVER optional
    session_id: str                       # Logical session within the tenant
    user_id: Optional[str] = None         # End-user within the tenant (if applicable)
    memory_type: str                      # "episodic" | "semantic" | "procedural"
    content: str                          # Original text chunk
    source: Optional[str] = None          # Document ID, conversation turn ID, etc.
    created_at: datetime = Field(default_factory=datetime.utcnow)
    expires_at: Optional[datetime] = None # TTL anchor ,  None means no expiration
    memory_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    version: int = 1                      # For future schema migrations

A few design decisions worth calling out explicitly:

  • tenant_id is never optional. Any code path that attempts to write a memory vector without a tenant ID should raise an exception at the application layer, not silently write an unscoped vector.
  • expires_at is a first-class field, not an afterthought. Setting it at write time, rather than calculating it at delete time, makes your TTL logic stateless and easier to audit.
  • memory_type enables tiered retention policies. Episodic memories (specific conversation turns) may expire in 7 days. Semantic memories (distilled facts about a tenant's business domain) may be retained for 90 days or indefinitely.

Step 2: Set Up Tenant-Scoped Collections in Qdrant

Qdrant supports both collections and payload filtering. You have two isolation strategies to choose from, and the right one depends on your tenant scale:

Strategy A: One Collection Per Tenant (Strong Isolation)

Best for platforms with a small number of high-value enterprise tenants where data residency requirements demand hard storage separation.


# collection_manager.py
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance
import re

client = QdrantClient(host="localhost", port=6333)

def get_collection_name(tenant_id: str) -> str:
    # Sanitize tenant_id to prevent injection into collection names
    safe_id = re.sub(r'[^a-zA-Z0-9_-]', '_', tenant_id)
    return f"agent_memory_{safe_id}"

def ensure_tenant_collection(tenant_id: str, vector_size: int = 1536):
    collection_name = get_collection_name(tenant_id)
    existing = [c.name for c in client.get_collections().collections]

    if collection_name not in existing:
        client.create_collection(
            collection_name=collection_name,
            vectors_config=VectorParams(
                size=vector_size,
                distance=Distance.COSINE
            )
        )
        # Create a payload index on expires_at for efficient TTL scans
        client.create_payload_index(
            collection_name=collection_name,
            field_name="expires_at",
            field_schema="datetime"
        )
        # Index tenant_id and session_id for fast filter queries
        client.create_payload_index(
            collection_name=collection_name,
            field_name="session_id",
            field_schema="keyword"
        )
        print(f"Created collection: {collection_name}")

    return collection_name

Strategy B: Shared Collection with Mandatory Metadata Filtering (High-Scale)

Best for platforms with thousands of small tenants (SMB SaaS). You use a single collection but enforce tenant isolation through mandatory payload filters at every query.


# For shared collection strategy, create a single collection
# with indexed payload fields for high-cardinality filtering

def setup_shared_collection(vector_size: int = 1536):
    collection_name = "agent_memory_shared"
    existing = [c.name for c in client.get_collections().collections]

    if collection_name not in existing:
        client.create_collection(
            collection_name=collection_name,
            vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE)
        )
        # CRITICAL: Index tenant_id for O(log n) filter performance
        client.create_payload_index(
            collection_name=collection_name,
            field_name="tenant_id",
            field_schema="keyword"
        )
        client.create_payload_index(
            collection_name=collection_name,
            field_name="session_id",
            field_schema="keyword"
        )
        client.create_payload_index(
            collection_name=collection_name,
            field_name="expires_at",
            field_schema="datetime"
        )

Important: With Strategy B, a missing or misconfigured filter is a security incident. You must treat the tenant filter as a mandatory query parameter enforced at the service layer, not something callers can opt out of.

Step 3: Build the Memory Ingestion Service

The ingestion service converts raw text into embeddings, attaches the tenant metadata payload, and calculates TTL expiration timestamps based on memory type.


# memory_ingestion.py
from openai import OpenAI
from qdrant_client.models import PointStruct
from datetime import datetime, timedelta
from memory_schema import MemoryPayload
from collection_manager import client, get_collection_name
import uuid

openai_client = OpenAI()

# TTL policy per memory type (in days). None = no expiration.
TTL_POLICY = {
    "episodic":    7,
    "semantic":    90,
    "procedural":  None
}

def get_embedding(text: str) -> list[float]:
    response = openai_client.embeddings.create(
        input=text,
        model="text-embedding-3-large"
    )
    return response.data[0].embedding

def calculate_expiry(memory_type: str) -> datetime | None:
    ttl_days = TTL_POLICY.get(memory_type)
    if ttl_days is None:
        return None
    return datetime.utcnow() + timedelta(days=ttl_days)

def ingest_memory(
    tenant_id: str,
    session_id: str,
    content: str,
    memory_type: str = "episodic",
    user_id: str | None = None,
    source: str | None = None
) -> str:
    if not tenant_id:
        raise ValueError("tenant_id is required. Memory cannot be written without tenant scope.")

    expires_at = calculate_expiry(memory_type)

    payload = MemoryPayload(
        tenant_id=tenant_id,
        session_id=session_id,
        user_id=user_id,
        memory_type=memory_type,
        content=content,
        source=source,
        expires_at=expires_at
    )

    embedding = get_embedding(content)
    collection_name = get_collection_name(tenant_id)  # For Strategy A

    point = PointStruct(
        id=payload.memory_id,
        vector=embedding,
        payload=payload.model_dump(mode="json")
    )

    client.upsert(collection_name=collection_name, points=[point])
    return payload.memory_id

Step 4: Build the Tenant-Scoped Memory Retrieval Service

This is where context bleed is most likely to occur if you are not careful. The retrieval service must enforce tenant isolation at two levels: the collection level (Strategy A) and the filter level (both strategies).


# memory_retrieval.py
from qdrant_client.models import Filter, FieldCondition, MatchValue, Range
from datetime import datetime
from collection_manager import client, get_collection_name
from memory_ingestion import get_embedding

def retrieve_memories(
    tenant_id: str,
    session_id: str,
    query_text: str,
    top_k: int = 5,
    memory_types: list[str] | None = None,
    include_expired: bool = False   # Should almost always be False in production
) -> list[dict]:

    if not tenant_id:
        raise ValueError("tenant_id is required for memory retrieval.")

    query_vector = get_embedding(query_text)
    collection_name = get_collection_name(tenant_id)

    # Build mandatory filter conditions
    must_conditions = [
        FieldCondition(
            key="tenant_id",
            match=MatchValue(value=tenant_id)
        ),
        FieldCondition(
            key="session_id",
            match=MatchValue(value=session_id)
        )
    ]

    # Exclude expired memories at query time (defense-in-depth)
    if not include_expired:
        must_conditions.append(
            FieldCondition(
                key="expires_at",
                range=Range(
                    gt=datetime.utcnow().isoformat()
                )
            )
        )

    # Optional: filter by memory type
    if memory_types:
        must_conditions.append(
            FieldCondition(
                key="memory_type",
                match=MatchValue(value=memory_types[0])  # Extend for multi-value
            )
        )

    search_filter = Filter(must=must_conditions)

    results = client.search(
        collection_name=collection_name,
        query_vector=query_vector,
        query_filter=search_filter,
        limit=top_k,
        with_payload=True
    )

    return [
        {
            "memory_id": hit.id,
            "content": hit.payload["content"],
            "score": hit.score,
            "memory_type": hit.payload["memory_type"],
            "created_at": hit.payload["created_at"],
            "expires_at": hit.payload.get("expires_at")
        }
        for hit in results
    ]

Notice the double enforcement pattern: the collection name itself is scoped to the tenant (Strategy A), AND the filter explicitly matches tenant_id. This means that even if a bug in your collection routing logic caused a cross-collection query, the metadata filter would prevent foreign vectors from being returned. Defense in depth is not optional here.

Step 5: Implement the TTL Expiration Manager

Relying solely on query-time filtering to exclude expired memories is not enough. Expired vectors still consume storage, inflate index size, and slow down ANN search. You need an active expiration manager that runs as a background job.


# ttl_manager.py
import asyncio
from datetime import datetime
from qdrant_client.models import Filter, FieldCondition, Range, ScrollRequest
from collection_manager import client
import logging

logger = logging.getLogger(__name__)

async def expire_memories_for_collection(collection_name: str, batch_size: int = 100):
    """
    Scroll through all vectors in a collection and delete those
    whose expires_at timestamp is in the past.
    """
    now_iso = datetime.utcnow().isoformat()
    expired_ids = []
    offset = None

    while True:
        # Scroll with a filter for expired records
        results, next_offset = client.scroll(
            collection_name=collection_name,
            scroll_filter=Filter(
                must=[
                    FieldCondition(
                        key="expires_at",
                        range=Range(lt=now_iso)   # expires_at < now
                    )
                ]
            ),
            limit=batch_size,
            offset=offset,
            with_payload=False,
            with_vectors=False
        )

        expired_ids.extend([point.id for point in results])

        if next_offset is None:
            break
        offset = next_offset

    if expired_ids:
        client.delete(
            collection_name=collection_name,
            points_selector=expired_ids
        )
        logger.info(
            f"TTL sweep: deleted {len(expired_ids)} expired vectors "
            f"from collection '{collection_name}'"
        )

    return len(expired_ids)

async def run_ttl_sweep(tenant_ids: list[str]):
    """
    Run TTL expiration sweep across all tenant collections.
    Schedule this via APScheduler, Celery Beat, or a cron job.
    """
    from collection_manager import get_collection_name
    total_deleted = 0

    for tenant_id in tenant_ids:
        collection_name = get_collection_name(tenant_id)
        try:
            deleted = await expire_memories_for_collection(collection_name)
            total_deleted += deleted
        except Exception as e:
            logger.error(f"TTL sweep failed for tenant {tenant_id}: {e}")

    logger.info(f"TTL sweep complete. Total vectors deleted: {total_deleted}")
    return total_deleted

Schedule this sweep using a tool like APScheduler or Celery Beat. A sweep interval of every 15 to 60 minutes is appropriate for most production workloads. For extremely latency-sensitive compliance requirements, consider a real-time expiration approach using a Redis sorted set to track upcoming expirations and trigger deletes proactively.

Step 6: Wire It Into Your FastAPI Agent Backend

Now assemble the pieces into a clean API that your AI agent can call for memory read/write operations.


# main.py
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from typing import Optional
from memory_ingestion import ingest_memory
from memory_retrieval import retrieve_memories

app = FastAPI(title="Tenant-Scoped Agent Memory API")

# --- Request/Response Models ---

class IngestRequest(BaseModel):
    session_id: str
    content: str
    memory_type: str = "episodic"
    user_id: Optional[str] = None
    source: Optional[str] = None

class RetrieveRequest(BaseModel):
    session_id: str
    query: str
    top_k: int = 5
    memory_types: Optional[list[str]] = None

# --- Tenant Resolution (simplified) ---

def get_tenant_id(x_tenant_id: str = Header(...)) -> str:
    """
    In production, resolve tenant_id from a verified JWT claim,
    not a raw header. This is simplified for illustration.
    """
    if not x_tenant_id:
        raise HTTPException(status_code=401, detail="Missing tenant context")
    return x_tenant_id

# --- Endpoints ---

@app.post("/memory/ingest")
async def ingest_endpoint(
    request: IngestRequest,
    tenant_id: str = Depends(get_tenant_id)
):
    memory_id = ingest_memory(
        tenant_id=tenant_id,
        session_id=request.session_id,
        content=request.content,
        memory_type=request.memory_type,
        user_id=request.user_id,
        source=request.source
    )
    return {"memory_id": memory_id, "status": "ingested"}

@app.post("/memory/retrieve")
async def retrieve_endpoint(
    request: RetrieveRequest,
    tenant_id: str = Depends(get_tenant_id)
):
    memories = retrieve_memories(
        tenant_id=tenant_id,
        session_id=request.session_id,
        query_text=request.query,
        top_k=request.top_k,
        memory_types=request.memory_types
    )
    return {"memories": memories, "count": len(memories)}

Step 7: Harden Against Edge Cases and Attack Vectors

A working implementation is not a secure one. Here are the hardening steps you must apply before shipping to production:

1. Resolve Tenant ID from Verified JWT Claims, Not Headers

The simplified example above reads tenant ID from a raw HTTP header. In production, your get_tenant_id dependency must decode and verify a signed JWT, extract the tenant_id claim, and validate it against your identity provider. A caller who can forge a tenant ID header can trivially read any tenant's memory.

2. Enforce Collection Name Sanitization

The get_collection_name function already strips non-alphanumeric characters. Extend this with an allowlist of known tenant IDs validated against your tenant registry before any collection operation is attempted.

3. Add Observability and Anomaly Detection

Log every memory read and write with the tenant ID, session ID, and result count. Set up alerts for any retrieval that returns results with a tenant_id in the payload that does not match the requesting tenant. This should never happen, but if it does, you want to know immediately.

4. Test Cross-Tenant Isolation Explicitly

Write automated integration tests that ingest memories for Tenant A, then query the retrieval API as Tenant B using the same semantic content. Assert that zero results are returned. This test should be part of your CI/CD pipeline, not just a one-time check.


# test_isolation.py
def test_cross_tenant_isolation():
    # Ingest a memory for tenant_alpha
    ingest_memory(
        tenant_id="tenant_alpha",
        session_id="session_001",
        content="Our enterprise pricing is $50,000 per year.",
        memory_type="semantic"
    )

    # Attempt to retrieve the same content as tenant_beta
    results = retrieve_memories(
        tenant_id="tenant_beta",
        session_id="session_001",
        query_text="What is the enterprise pricing?",
        top_k=5
    )

    # MUST return zero results ,  any other outcome is a critical security failure
    assert len(results) == 0, (
        f"CRITICAL: Cross-tenant context bleed detected! "
        f"{len(results)} foreign memories returned."
    )

5. Handle the "No Expiry" Case Carefully

Memories with expires_at = None (procedural memories, in the schema above) are excluded from TTL sweeps by design. Make sure your expire_memories_for_collection function only matches records where expires_at is both present and in the past. A null/missing field should never be treated as "expired immediately."

Choosing the Right TTL Policy for Your Use Case

There is no universal TTL policy. Here is a practical reference table based on common AI agent use cases:

  • Customer support agent (episodic turns): 24 to 72 hours. Session context is irrelevant after a ticket closes.
  • Sales assistant (deal context): 30 to 90 days. Tied to the typical sales cycle length.
  • Legal research agent (case documents): Match the case lifecycle, or use procedural (no expiry) with manual deletion on case close.
  • Healthcare agent (patient interactions): Governed by HIPAA minimum necessary standards. Consult your compliance team. Do not set TTL policy without legal review.
  • Internal knowledge base agent (semantic facts): 6 to 12 months, with a refresh mechanism that re-ingests updated source documents and deletes superseded vectors.

A Note on pgvector as an Alternative

If your team is already running PostgreSQL, pgvector is a compelling alternative to a standalone vector database for this architecture. You get native row-level security (RLS), which can enforce tenant isolation at the database engine level, and you can use PostgreSQL's built-in pg_cron extension or a simple scheduled query to handle TTL expiration. The tradeoff is query performance at very high vector counts (above 10 million vectors per tenant), where dedicated ANN indexes in Qdrant or Pinecone will outperform pgvector's HNSW implementation under concurrent load.

Conclusion

Building a multi-tenant AI agent memory system is not just a performance engineering problem. It is a security and compliance engineering problem that requires deliberate, layered defenses. The architecture covered in this guide gives you four independent layers of protection against cross-tenant context bleed:

  1. Physical namespace isolation via per-tenant collections.
  2. Mandatory metadata filtering enforced at every query, regardless of routing.
  3. TTL-based expiration to prevent stale context from accumulating and leaking across session boundaries.
  4. Automated isolation testing in CI/CD to catch regressions before they reach production.

As AI agents take on more sensitive roles inside enterprise SaaS platforms in 2026 and beyond, tenant-scoped memory hygiene will become a baseline expectation, not a differentiator. The teams that build it correctly from day one will spend their time shipping features. The ones that bolt it on later will spend their time in incident reviews.

Start with the schema. Get the isolation right. Then worry about performance.