How to Build a Per-Tenant AI Agent Checkpoint-and-Resume System for Multi-Tenant LLM Pipelines

How to Build a Per-Tenant AI Agent Checkpoint-and-Resume System for Multi-Tenant LLM Pipelines

Long-running agentic workflows are the new normal in 2026. Enterprises are deploying AI agents that browse the web, write and execute code, call third-party APIs, draft reports, and loop back on their own reasoning, all in a single uninterrupted task that can span minutes or even hours. That's exciting. It's also terrifying from a reliability engineering standpoint.

Here's the problem nobody talks about enough: when your infrastructure hiccups, your agent dies, and your tenant loses everything. No partial results. No context. No way to resume. Just a cold restart and a frustrated customer wondering why they're being billed for a task that never finished.

In a multi-tenant LLM platform, this failure mode is multiplied across every customer running a workflow simultaneously. A single pod eviction, a Redis timeout, or a spot-instance reclamation can wipe out dozens of in-flight agent sessions at once. The stakes are high, and the solution requires more than just "add a try/except block."

This guide walks you through designing and implementing a per-tenant checkpoint-and-resume system for multi-tenant LLM pipelines. You'll learn how to serialize mid-task agent state, isolate it safely per tenant, store it durably, and resume seamlessly after any infrastructure failure, without the agent (or your customer) ever knowing anything went wrong.

Why Standard Retry Logic Is Not Enough

Before diving into the architecture, let's be precise about why naive retry logic fails for agentic workflows specifically.

A typical microservice retry strategy assumes that requests are idempotent and stateless. You call an endpoint, it fails, you call it again. But an LLM agent is neither of those things. By the time a failure occurs, the agent may have already:

  • Made several tool calls (some of which had side effects, like sending an email or writing to a database)
  • Accumulated a long message history that forms the reasoning context
  • Branched through a decision tree and arrived at a sub-goal several layers deep
  • Consumed significant token budget that you cannot refund

Replaying from scratch doesn't just waste time and money. It can cause duplicate side effects, confuse downstream systems, and produce inconsistent results because the LLM's non-deterministic sampling may take a different reasoning path entirely. You need true mid-task state preservation, not retry theater.

Core Concepts: What "Agent State" Actually Means

To checkpoint an agent, you first need a precise definition of what constitutes its full state. For a typical ReAct-style or tool-calling LLM agent, state is composed of several layers:

1. The Conversation / Message History

This is the ordered list of messages exchanged between the system, user, assistant, and tool outputs. It is the agent's working memory. Without it, the agent has no context about what it has already done or decided.

2. The Tool Call Ledger

A record of every tool invocation made so far, including its inputs, outputs, and whether it produced side effects. This ledger is critical for idempotency: on resume, you must know which tool calls to skip (because they already happened) versus which to re-execute.

3. The Task Manifest

The original task description, any structured sub-goals derived from it, and the current position in the execution plan. Think of this as the agent's "to-do list with progress markers."

4. Ephemeral Scratchpad / Working Files

Any in-memory data the agent was working with: partially written documents, intermediate computation results, downloaded file contents, or parsed API responses that haven't been committed anywhere yet.

5. Execution Metadata

Token usage so far, wall-clock time elapsed, retry counts per step, and any tenant-specific configuration or policy flags (rate limits, allowed tools, data-residency constraints).

A complete checkpoint captures all five layers. Miss any one of them and your resume is a lie, not a recovery.

Architecture Overview

The system we're building has four main components working together:

  • The Agent Runtime: The execution engine that runs the LLM loop and calls tools.
  • The Checkpoint Manager: A service responsible for serializing, versioning, and storing agent state at defined intervals.
  • The Durable State Store: A storage backend that is tenant-isolated, fast enough for frequent writes, and durable enough to survive infrastructure failures.
  • The Resume Coordinator: A component that detects orphaned (interrupted) agent sessions and restores them to a healthy worker.

The key architectural principle is tenant isolation at every layer. Checkpoints for Tenant A must never be readable, writable, or deletable by Tenant B, even in a shared infrastructure. This is both a security requirement and a data-integrity requirement.

Step 1: Design Your Checkpoint Schema

Start with a well-typed checkpoint schema. Using a structured format like Pydantic (in Python) gives you validation, versioning, and easy serialization for free.


from pydantic import BaseModel, Field
from typing import Any, Optional
from enum import Enum
import uuid
from datetime import datetime, timezone

class MessageRole(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"
    TOOL = "tool"

class Message(BaseModel):
    role: MessageRole
    content: str
    tool_call_id: Optional[str] = None
    tool_name: Optional[str] = None
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

class ToolCallRecord(BaseModel):
    call_id: str
    tool_name: str
    inputs: dict[str, Any]
    output: Optional[Any] = None
    had_side_effects: bool = False
    completed: bool = False
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

class TaskManifest(BaseModel):
    original_task: str
    sub_goals: list[str] = []
    current_goal_index: int = 0
    completed_goals: list[str] = []

class AgentCheckpoint(BaseModel):
    # Identity
    checkpoint_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    session_id: str
    tenant_id: str
    schema_version: str = "1.0"

    # Core state
    message_history: list[Message] = []
    tool_ledger: list[ToolCallRecord] = []
    task_manifest: TaskManifest
    scratchpad: dict[str, Any] = {}

    # Execution metadata
    total_tokens_used: int = 0
    step_count: int = 0
    created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    last_updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    status: str = "in_progress"  # in_progress | completed | failed | paused

Notice the schema_version field. As your agent evolves, the checkpoint format will change. Versioning from day one means you can migrate old checkpoints forward without corrupting them.

Step 2: Choose and Configure Your Durable State Store

Your checkpoint store needs to satisfy three competing requirements: low write latency (so frequent checkpointing doesn't slow down the agent), high durability (so data survives node failures), and tenant isolation (so data is logically and cryptographically separated).

In 2026, the most common pattern is a two-tier storage approach:

  • Hot tier (Redis with AOF persistence or Valkey): Stores the latest checkpoint for fast reads on resume. Use a per-tenant Redis keyspace prefix and ACL rules to enforce isolation.
  • Cold tier (object storage: S3, GCS, or Azure Blob): Stores a full checkpoint history for audit, debugging, and disaster recovery. Use per-tenant bucket prefixes or separate buckets depending on your compliance requirements.

Here's the storage key convention you should adopt. Predictability in key naming is critical for both operational debugging and access control policy enforcement:


# Redis hot-tier key pattern
# tenants:{tenant_id}:sessions:{session_id}:checkpoint:latest
# tenants:{tenant_id}:sessions:{session_id}:checkpoint:history (sorted set by timestamp)

# Object storage cold-tier path pattern
# checkpoints/{tenant_id}/{session_id}/{checkpoint_id}.json.gz

class CheckpointStorageConfig(BaseModel):
    redis_url: str
    redis_key_prefix: str = "tenants"
    object_storage_bucket: str
    object_storage_prefix: str = "checkpoints"
    compress: bool = True
    encrypt_at_rest: bool = True
    encryption_key_id: str  # Per-tenant KMS key ID for envelope encryption

A critical note on encryption: In a multi-tenant system, each tenant's checkpoints should be encrypted with a tenant-specific key, not a single shared platform key. Use envelope encryption via AWS KMS, Google Cloud KMS, or HashiCorp Vault. This means a compromised platform key does not expose all tenant data, and a tenant can request key deletion to cryptographically erase their data.

Step 3: Build the Checkpoint Manager

The Checkpoint Manager is the heart of the system. It wraps your agent runtime and intercepts execution at defined checkpointing moments.


import json
import gzip
import redis.asyncio as aioredis
import boto3
from datetime import datetime, timezone
from typing import Optional

class CheckpointManager:
    def __init__(self, config: CheckpointStorageConfig):
        self.config = config
        self.redis = aioredis.from_url(config.redis_url)
        self.s3 = boto3.client("s3")
        self.kms = boto3.client("kms")

    def _hot_key(self, tenant_id: str, session_id: str) -> str:
        return f"{self.config.redis_key_prefix}:{tenant_id}:sessions:{session_id}:checkpoint:latest"

    def _history_key(self, tenant_id: str, session_id: str) -> str:
        return f"{self.config.redis_key_prefix}:{tenant_id}:sessions:{session_id}:checkpoint:history"

    def _cold_path(self, tenant_id: str, session_id: str, checkpoint_id: str) -> str:
        return f"{self.config.object_storage_prefix}/{tenant_id}/{session_id}/{checkpoint_id}.json.gz"

    async def save(self, checkpoint: AgentCheckpoint) -> str:
        checkpoint.last_updated_at = datetime.now(timezone.utc)
        raw = checkpoint.model_dump_json().encode("utf-8")

        # Compress
        compressed = gzip.compress(raw) if self.config.compress else raw

        # Encrypt with tenant-specific KMS key
        if self.config.encrypt_at_rest:
            encrypted_response = self.kms.encrypt(
                KeyId=self.config.encryption_key_id,
                Plaintext=compressed
            )
            payload = encrypted_response["CiphertextBlob"]
        else:
            payload = compressed

        # Write to hot tier (Redis) with 24-hour TTL
        hot_key = self._hot_key(checkpoint.tenant_id, checkpoint.session_id)
        await self.redis.set(hot_key, payload, ex=86400)

        # Append to history sorted set (score = Unix timestamp)
        history_key = self._history_key(checkpoint.tenant_id, checkpoint.session_id)
        score = checkpoint.last_updated_at.timestamp()
        await self.redis.zadd(history_key, {checkpoint.checkpoint_id: score})
        await self.redis.expire(history_key, 86400)

        # Write to cold tier (S3) asynchronously
        cold_path = self._cold_path(
            checkpoint.tenant_id,
            checkpoint.session_id,
            checkpoint.checkpoint_id
        )
        self.s3.put_object(
            Bucket=self.config.object_storage_bucket,
            Key=cold_path,
            Body=payload,
            ContentType="application/octet-stream",
            Metadata={
                "tenant-id": checkpoint.tenant_id,
                "session-id": checkpoint.session_id,
                "schema-version": checkpoint.schema_version,
            }
        )

        return checkpoint.checkpoint_id

    async def load(self, tenant_id: str, session_id: str) -> Optional[AgentCheckpoint]:
        hot_key = self._hot_key(tenant_id, session_id)
        payload = await self.redis.get(hot_key)

        if payload is None:
            # Fall back to cold tier
            payload = self._load_from_cold(tenant_id, session_id)

        if payload is None:
            return None

        # Decrypt
        if self.config.encrypt_at_rest:
            decrypted = self.kms.decrypt(CiphertextBlob=payload)["Plaintext"]
        else:
            decrypted = payload

        # Decompress
        raw = gzip.decompress(decrypted) if self.config.compress else decrypted

        return AgentCheckpoint.model_validate_json(raw)

    def _load_from_cold(self, tenant_id: str, session_id: str) -> Optional[bytes]:
        # List objects with the session prefix and return the latest
        prefix = f"{self.config.object_storage_prefix}/{tenant_id}/{session_id}/"
        response = self.s3.list_objects_v2(
            Bucket=self.config.object_storage_bucket,
            Prefix=prefix
        )
        objects = response.get("Contents", [])
        if not objects:
            return None
        latest = max(objects, key=lambda o: o["LastModified"])
        obj = self.s3.get_object(
            Bucket=self.config.object_storage_bucket,
            Key=latest["Key"]
        )
        return obj["Body"].read()

Step 4: Instrument the Agent Runtime with Checkpoint Hooks

Now you need to weave checkpointing into the agent's execution loop. The key insight is: checkpoint after every completed step, not just at the end. For a long-running agent, "after every step" means after each LLM inference call and after each tool call resolves.


class CheckpointedAgentRunner:
    def __init__(
        self,
        llm_client,
        tools: dict,
        checkpoint_manager: CheckpointManager,
        checkpoint_interval_steps: int = 1,  # Checkpoint every N steps
    ):
        self.llm = llm_client
        self.tools = tools
        self.cp_manager = checkpoint_manager
        self.checkpoint_interval = checkpoint_interval_steps

    async def run(
        self,
        tenant_id: str,
        session_id: str,
        task: str,
        resume: bool = False,
    ) -> str:
        # Load existing checkpoint or create a fresh one
        if resume:
            checkpoint = await self.cp_manager.load(tenant_id, session_id)
            if checkpoint is None:
                raise ValueError(f"No checkpoint found for session {session_id}")
            print(f"Resuming session {session_id} from step {checkpoint.step_count}")
        else:
            checkpoint = AgentCheckpoint(
                session_id=session_id,
                tenant_id=tenant_id,
                task_manifest=TaskManifest(original_task=task),
                message_history=[
                    Message(role=MessageRole.SYSTEM, content=self._system_prompt()),
                    Message(role=MessageRole.USER, content=task),
                ]
            )

        # Main agent loop
        while True:
            # Call the LLM
            response = await self.llm.chat(
                messages=[m.model_dump() for m in checkpoint.message_history],
                tools=list(self.tools.keys()),
            )

            assistant_message = Message(
                role=MessageRole.ASSISTANT,
                content=response.content or "",
            )
            checkpoint.message_history.append(assistant_message)
            checkpoint.total_tokens_used += response.usage.total_tokens
            checkpoint.step_count += 1

            # Check for task completion
            if response.finish_reason == "stop" and not response.tool_calls:
                checkpoint.status = "completed"
                await self.cp_manager.save(checkpoint)
                return response.content

            # Process tool calls
            if response.tool_calls:
                for tool_call in response.tool_calls:
                    result = await self._execute_tool_with_ledger(
                        tool_call, checkpoint
                    )
                    checkpoint.message_history.append(
                        Message(
                            role=MessageRole.TOOL,
                            content=str(result),
                            tool_call_id=tool_call.id,
                            tool_name=tool_call.name,
                        )
                    )

            # Checkpoint at defined intervals
            if checkpoint.step_count % self.checkpoint_interval == 0:
                await self.cp_manager.save(checkpoint)

    async def _execute_tool_with_ledger(self, tool_call, checkpoint: AgentCheckpoint):
        # Check if this tool call already completed (idempotency on resume)
        existing = next(
            (r for r in checkpoint.tool_ledger if r.call_id == tool_call.id),
            None
        )
        if existing and existing.completed:
            print(f"Skipping already-completed tool call: {tool_call.id}")
            return existing.output

        # Record the call as in-progress
        record = ToolCallRecord(
            call_id=tool_call.id,
            tool_name=tool_call.name,
            inputs=tool_call.arguments,
            had_side_effects=self.tools[tool_call.name].has_side_effects,
        )
        checkpoint.tool_ledger.append(record)

        # Execute the tool
        tool_fn = self.tools[tool_call.name].fn
        output = await tool_fn(**tool_call.arguments)

        # Mark as completed
        record.output = output
        record.completed = True

        return output

    def _system_prompt(self) -> str:
        return (
            "You are a helpful AI assistant. Complete the user's task step by step, "
            "using the available tools as needed. Be thorough and precise."
        )

The _execute_tool_with_ledger method is where the idempotency magic happens. On resume, if a tool call was already completed and recorded in the ledger, the agent skips re-execution and returns the cached output. This prevents duplicate side effects like sending the same email twice or inserting the same database row twice.

Step 5: Build the Resume Coordinator

Checkpoints are useless if nothing detects orphaned sessions and triggers a resume. The Resume Coordinator is a background service that watches for sessions that were marked in_progress but have stopped receiving heartbeats.


import asyncio
from datetime import datetime, timezone, timedelta

class ResumeCoordinator:
    def __init__(
        self,
        checkpoint_manager: CheckpointManager,
        agent_runner: CheckpointedAgentRunner,
        heartbeat_timeout_seconds: int = 60,
        poll_interval_seconds: int = 15,
    ):
        self.cp_manager = checkpoint_manager
        self.runner = agent_runner
        self.heartbeat_timeout = heartbeat_timeout_seconds
        self.poll_interval = poll_interval_seconds

    async def start(self):
        print("Resume Coordinator started.")
        while True:
            await self._scan_and_resume()
            await asyncio.sleep(self.poll_interval)

    async def _scan_and_resume(self):
        # In production, maintain an index of active sessions in Redis
        # Key: active_sessions (sorted set, score = last heartbeat timestamp)
        now = datetime.now(timezone.utc).timestamp()
        cutoff = now - self.heartbeat_timeout

        # Find sessions whose last heartbeat is older than the timeout
        orphaned = await self.cp_manager.redis.zrangebyscore(
            "active_sessions", 0, cutoff
        )

        for session_bytes in orphaned:
            session_info = json.loads(session_bytes)
            tenant_id = session_info["tenant_id"]
            session_id = session_info["session_id"]

            print(f"Detected orphaned session: {session_id} for tenant: {tenant_id}")

            # Claim the session atomically to prevent duplicate resumption
            claimed = await self._claim_session(session_id)
            if not claimed:
                continue

            # Resume in a background task
            asyncio.create_task(
                self._resume_session(tenant_id, session_id)
            )

    async def _claim_session(self, session_id: str) -> bool:
        # Use a Redis SET NX (set if not exists) lock to prevent race conditions
        # between multiple coordinator instances
        lock_key = f"resume_lock:{session_id}"
        result = await self.cp_manager.redis.set(
            lock_key, "1", nx=True, ex=120  # 2-minute lock
        )
        return result is not None

    async def _resume_session(self, tenant_id: str, session_id: str):
        try:
            checkpoint = await self.cp_manager.load(tenant_id, session_id)
            if checkpoint is None or checkpoint.status != "in_progress":
                return

            print(f"Resuming session {session_id} from step {checkpoint.step_count}")
            await self.runner.run(
                tenant_id=tenant_id,
                session_id=session_id,
                task=checkpoint.task_manifest.original_task,
                resume=True,
            )
        except Exception as e:
            print(f"Failed to resume session {session_id}: {e}")
            # Mark session as failed and notify tenant via webhook/event
            await self._mark_failed(tenant_id, session_id, str(e))

    async def _mark_failed(self, tenant_id: str, session_id: str, reason: str):
        checkpoint = await self.cp_manager.load(tenant_id, session_id)
        if checkpoint:
            checkpoint.status = "failed"
            checkpoint.scratchpad["failure_reason"] = reason
            await self.cp_manager.save(checkpoint)

The _claim_session method uses a Redis SET NX atomic operation as a distributed lock. In a horizontally scaled deployment where multiple Resume Coordinator instances run in parallel, this prevents two workers from simultaneously trying to resume the same orphaned session, which would cause chaos.

Step 6: Implement Per-Tenant Heartbeating

The Resume Coordinator needs to know which sessions are alive. Integrate a heartbeat signal directly into the agent runtime loop:


class HeartbeatEmitter:
    def __init__(self, redis_client, interval_seconds: int = 10):
        self.redis = redis_client
        self.interval = interval_seconds
        self._running = False

    async def start(self, tenant_id: str, session_id: str):
        self._running = True
        session_info = json.dumps({
            "tenant_id": tenant_id,
            "session_id": session_id,
        })
        while self._running:
            now = datetime.now(timezone.utc).timestamp()
            # Update the score in the active_sessions sorted set
            await self.redis.zadd("active_sessions", {session_info: now})
            await asyncio.sleep(self.interval)

    def stop(self):
        self._running = False

    async def deregister(self, tenant_id: str, session_id: str):
        session_info = json.dumps({
            "tenant_id": tenant_id,
            "session_id": session_id,
        })
        await self.redis.zrem("active_sessions", session_info)

Run the heartbeat emitter as a concurrent asyncio task alongside the agent loop. When the agent completes or explicitly fails, call deregister to remove it from the active sessions index so the Resume Coordinator doesn't try to revive a session that finished cleanly.

Step 7: Handle Schema Migration for Long-Lived Checkpoints

As your platform evolves, your checkpoint schema will change. A checkpoint written with schema version 1.0 might not be valid under version 1.2. Build a migration layer from the start:


class CheckpointMigrator:
    CURRENT_VERSION = "1.2"

    @classmethod
    def migrate(cls, data: dict) -> dict:
        version = data.get("schema_version", "1.0")

        if version == "1.0":
            data = cls._migrate_1_0_to_1_1(data)
            version = "1.1"

        if version == "1.1":
            data = cls._migrate_1_1_to_1_2(data)
            version = "1.2"

        data["schema_version"] = cls.CURRENT_VERSION
        return data

    @staticmethod
    def _migrate_1_0_to_1_1(data: dict) -> dict:
        # Example: 1.1 added 'had_side_effects' to ToolCallRecord
        for record in data.get("tool_ledger", []):
            if "had_side_effects" not in record:
                record["had_side_effects"] = False
        return data

    @staticmethod
    def _migrate_1_1_to_1_2(data: dict) -> dict:
        # Example: 1.2 added 'completed_goals' to TaskManifest
        manifest = data.get("task_manifest", {})
        if "completed_goals" not in manifest:
            manifest["completed_goals"] = []
        return data

Call CheckpointMigrator.migrate() on the raw dict before passing it to AgentCheckpoint.model_validate() in your load path. This ensures that no matter how old a checkpoint is, it can always be safely loaded and resumed.

Step 8: Enforce Tenant Isolation and Access Control

Tenant isolation is not just a nice-to-have. In regulated industries (finance, healthcare, legal tech), it is a compliance requirement. Enforce it at every layer:

Redis ACL Rules

Create a dedicated Redis user per tenant with ACL rules that restrict access to only their keyspace prefix. In your Redis ACL configuration:


# Redis ACL entry for tenant "acme-corp"
ACL SETUSER tenant-acme-corp on >their-password ~tenants:acme-corp:* +get +set +del +zadd +zrangebyscore +expire

S3 Bucket Policies

Use IAM policies or S3 bucket policies with a condition on the object key prefix to ensure that the service role for Tenant A's agent workers can only access checkpoints/tenant-a/*:


{
  "Effect": "Allow",
  "Action": ["s3:GetObject", "s3:PutObject"],
  "Resource": "arn:aws:s3:::your-checkpoint-bucket/checkpoints/acme-corp/*"
}

Application-Layer Validation

Never trust the tenant_id embedded in a checkpoint blindly. Always validate it against the authenticated tenant context of the request before loading or saving:


async def load_checkpoint_for_tenant(
    tenant_id: str,
    session_id: str,
    authenticated_tenant_id: str,
    cp_manager: CheckpointManager,
) -> AgentCheckpoint:
    if tenant_id != authenticated_tenant_id:
        raise PermissionError(
            f"Tenant {authenticated_tenant_id} cannot access session "
            f"belonging to tenant {tenant_id}"
        )
    return await cp_manager.load(tenant_id, session_id)

Step 9: Expose a Checkpoint Status API

Your tenants deserve visibility into their in-flight workflows. Build a simple status API that surfaces checkpoint data without exposing raw internals:


from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel

app = FastAPI()

class SessionStatusResponse(BaseModel):
    session_id: str
    status: str
    step_count: int
    total_tokens_used: int
    current_goal: Optional[str]
    last_updated_at: datetime

@app.get("/sessions/{session_id}/status", response_model=SessionStatusResponse)
async def get_session_status(
    session_id: str,
    tenant_id: str = Depends(get_authenticated_tenant_id),
    cp_manager: CheckpointManager = Depends(get_checkpoint_manager),
):
    checkpoint = await load_checkpoint_for_tenant(
        tenant_id, session_id, tenant_id, cp_manager
    )
    if checkpoint is None:
        raise HTTPException(status_code=404, detail="Session not found")

    manifest = checkpoint.task_manifest
    current_goal = (
        manifest.sub_goals[manifest.current_goal_index]
        if manifest.sub_goals and manifest.current_goal_index < len(manifest.sub_goals)
        else manifest.original_task
    )

    return SessionStatusResponse(
        session_id=checkpoint.session_id,
        status=checkpoint.status,
        step_count=checkpoint.step_count,
        total_tokens_used=checkpoint.total_tokens_used,
        current_goal=current_goal,
        last_updated_at=checkpoint.last_updated_at,
    )

Operational Considerations and Pitfalls to Avoid

Building the system is one challenge. Running it reliably in production is another. Here are the lessons that usually come from painful experience:

Checkpoint Size Creep

Long-running agents accumulate enormous message histories. A 50-step agent with tool outputs can easily produce a checkpoint that is several megabytes. Compress aggressively (gzip is fine, zstd is better), and consider truncating the middle of the message history while always preserving the system prompt, the first user message, and the most recent N messages. This is called a "sliding window" context strategy and it keeps checkpoint sizes bounded.

The Thundering Herd on Resume

If your infrastructure has a major outage and hundreds of sessions become orphaned simultaneously, your Resume Coordinator will try to restart all of them at once. This can overwhelm your LLM provider's rate limits and your own backend. Add a jittered exponential backoff and a concurrency cap to the resume queue.

Tool Idempotency Is Your Responsibility

The tool ledger only prevents re-execution of already-completed calls. But what if a tool call was in-flight when the failure occurred? It may have partially executed. Tag tools as has_side_effects=True and build those tools to be idempotent by design (using request IDs, database upserts, or conditional API calls) so that re-executing an incomplete call is safe.

Clock Skew in Distributed Environments

The heartbeat system relies on timestamps. In a distributed environment, different nodes can have clock skew of hundreds of milliseconds. Use logical clocks or monotonic sequence numbers instead of wall-clock timestamps for ordering checkpoints, and add a generous buffer to your heartbeat timeout to avoid false-positive orphan detection.

Putting It All Together: The Deployment Checklist

Before going to production with this system, run through this checklist:

  • Schema versioning: Every checkpoint has a schema version field and a migration path.
  • Per-tenant encryption: Each tenant's data is encrypted with a unique KMS key.
  • Redis ACLs: Keyspace access is restricted per tenant at the Redis layer.
  • S3 bucket policies: Object storage access is restricted by tenant prefix.
  • Application-layer validation: Tenant ID is verified against the authenticated session on every load and save.
  • Tool ledger idempotency: All tools with side effects are marked and designed to be safely re-executable.
  • Distributed resume lock: The Resume Coordinator uses atomic Redis locks to prevent duplicate resumption.
  • Heartbeat deregistration: Completed and failed sessions are removed from the active sessions index.
  • Checkpoint size monitoring: Alert when any single checkpoint exceeds a size threshold (e.g., 2MB).
  • Resume concurrency cap: The coordinator limits simultaneous resume attempts to avoid thundering-herd scenarios.
  • Status API: Tenants can query session status without accessing raw checkpoint data.
  • Cold-tier fallback: If Redis is unavailable, the system can load from object storage.

Conclusion

Building a per-tenant checkpoint-and-resume system is not a small project, but it is an essential investment for any platform that runs long-lived agentic workflows in production. As AI agents take on increasingly complex, multi-hour tasks in 2026, the difference between a platform that survives infrastructure failures gracefully and one that doesn't is the difference between a product customers trust and one they churn from.

The architecture described here, combining structured state serialization, two-tier durable storage, idempotent tool execution, distributed resume coordination, and strict per-tenant isolation, gives you a foundation that is both resilient and secure. It treats the agent's mid-task state as a first-class artifact, not an afterthought.

Start with Step 1 and Step 2 even if you can't build everything at once. A checkpoint you can't resume is still better than no checkpoint at all. Instrument your agent loop, get data into durable storage, and build the Resume Coordinator as your second milestone. From there, the operational improvements are incremental.

Your tenants are trusting your platform with their most important automated workflows. Give their agents a memory worth keeping.