Building AI Agent Workflows with LangGraph and Django

AI

Building AI Agent Workflows with LangGraph and Django

Let's be honest: a single LLM call is impressive. But a production AI workflow — one that remembers context, retries failed steps, persists intermediate state, and exposes a clean API — is a whole different beast.

This is the capstone to two articles I've written previously:

In this guide, we marry those two worlds: LangGraph handles the agent brain (state, routing, tool execution) and Django handles the spine (persistence, API layer, background job dispatch).

What you'll learn:

  • How LangGraph's StateGraph models an agent as a directed graph of nodes and edges.
  • How to persist LangGraph checkpoint state in a Django ORM model.
  • How to trigger agent runs safely as background tasks using Django Q.
  • How to expose streaming or polling endpoints for agent results.

1. Why Orchestration Matters Beyond a Single LLM Call

A vanilla LLM call is stateless. You send a prompt, you receive a response, and everything evaporates. For a content generator or a chatbot, this is fine.

But real-world agent workflows have multi-step, conditional logic:

  • Step A must complete before Step B can start.
  • If Step B fails, retry or route to a fallback.
  • Steps C and D can run in parallel.
  • The whole workflow might run for 5 minutes — long after the HTTP request has timed out.

This is exactly the problem that LangGraph solves at the orchestration layer, and exactly why you need Django's background task infrastructure (not just a single endpoint) to back it up.


2. What is LangGraph?

LangGraph is a graph-based orchestration library built on top of LangChain. Instead of a linear chain of calls, you model your agent as a directed graph where:

  • Nodes are units of work: an LLM call, a tool execution, a validation check.
  • Edges are transitions: unconditional (always go to node B after A) or conditional (route to B or C based on the output of A).
  • State is a shared Python TypedDict (or Pydantic model) that flows through every node and accumulates results.

The key insight is that LangGraph's state is serialisable — it's just a Python dict. This makes it trivially persistable in a Django model.

from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    current_step: str
    result: str | None
    error: str | None

3. The Architecture

Here is the high-level separation of concerns:

┌─────────────────────────────────────────────┐
│              Django (Spine)                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │  DRF API │  │ Django Q │  │   ORM    │  │
│  │ Endpoint │→ │ Worker   │→ │Checkpoint│  │
│  └──────────┘  └────┬─────┘  └──────────┘  │
│                     │                       │
│          ┌──────────▼──────────┐            │
│          │   LangGraph (Brain) │            │
│          │  StateGraph + Tools │            │
│          └─────────────────────┘            │
└─────────────────────────────────────────────┘

Django doesn't know about LangGraph internals — it simply stores the serialised checkpoint blob and surfaces the result via API. LangGraph doesn't know about Django — it receives a configuration dict and writes state to whatever CheckpointSaver you plug in.


4. Building the Graph

Here's a minimal two-node graph: a research node followed by a summarise node with a conditional guard.

# agents/graph.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from .state import AgentState

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def research_node(state: AgentState) -> AgentState:
    """Calls a search tool and appends results to messages."""
    query = state["messages"][-1].content
    # TODO: replace with your real tool call
    results = f"[Search results for: {query}]"
    return {
        "messages": [AIMessage(content=results)],
        "current_step": "summarise",
    }

def summarise_node(state: AgentState) -> AgentState:
    """Summarises the accumulated messages into a final result."""
    response = llm.invoke(state["messages"])
    return {
        "messages": [response],
        "result": response.content,
        "current_step": "done",
    }

def should_summarise(state: AgentState) -> str:
    """Conditional edge: only summarise if we have research results."""
    if state.get("error"):
        return END
    return "summarise"

def build_graph() -> StateGraph:
    graph = StateGraph(AgentState)
    graph.add_node("research", research_node)
    graph.add_node("summarise", summarise_node)
    graph.set_entry_point("research")
    graph.add_conditional_edges("research", should_summarise, {
        "summarise": "summarise",
        END: END,
    })
    graph.add_edge("summarise", END)
    return graph.compile()

5. Persisting State in Django

LangGraph ships a pluggable CheckpointSaver interface. We implement a Django ORM-backed saver so every state transition is atomically written to Postgres.

# agents/models.py
import uuid
from django.db import models

class AgentRun(models.Model):
    """Tracks a single LangGraph workflow execution."""
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    thread_id = models.CharField(max_length=255, unique=True, db_index=True)
    status = models.CharField(
        max_length=20,
        choices=[
            ('pending', 'Pending'),
            ('running', 'Running'),
            ('completed', 'Completed'),
            ('failed', 'Failed'),
        ],
        default='pending',
    )
    checkpoint = models.JSONField(null=True, blank=True)  # serialised AgentState
    result = models.TextField(null=True, blank=True)
    error = models.TextField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        ordering = ['-created_at']

During graph execution, save state after each node:

# agents/tasks.py
import json
from .models import AgentRun
from .graph import build_graph

def run_agent_workflow(run_id: str, user_query: str):
    """Background task: execute the LangGraph workflow and persist state."""
    run = AgentRun.objects.get(id=run_id)
    run.status = 'running'
    run.save(update_fields=['status'])

    try:
        graph = build_graph()
        final_state = graph.invoke({
            "messages": [{"role": "user", "content": user_query}],
            "current_step": "research",
            "result": None,
            "error": None,
        })
        run.checkpoint = final_state
        run.result = final_state.get("result", "")
        run.status = 'completed'
    except Exception as exc:
        run.status = 'failed'
        run.error = str(exc)
    finally:
        run.save(update_fields=['status', 'checkpoint', 'result', 'error', 'updated_at'])

6. Triggering Workflows as Background Tasks

As covered in the Django Background Tasks guide, you must use transaction.on_commit to avoid the race condition where the worker picks up the task before the DB row is committed.

# agents/views.py
from django.db import transaction
from django_q.tasks import async_task
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import AgentRun

class AgentRunView(APIView):
    def post(self, request):
        query = request.data.get("query")
        if not query:
            return Response({"error": "query is required"}, status=status.HTTP_400_BAD_REQUEST)

        run = AgentRun.objects.create(
            thread_id=f"user-{request.user.id}-{AgentRun.objects.count()}",
        )

        # Safe: fires ONLY after the DB transaction commits (see Background Tasks post)
        transaction.on_commit(
            lambda: async_task(
                'agents.tasks.run_agent_workflow',
                str(run.id),
                query,
                q_options={'queue': 'ai_agents'},
            )
        )

        return Response({"run_id": str(run.id), "status": "pending"}, status=status.HTTP_202_ACCEPTED)

Recommended queue configuration for agent runs (add to settings.py):

# settings.py
Q_CLUSTER_AGENTS = {
    'name': 'ai_agents',
    'workers': 4,
    'timeout': 300,    # 5-minute ceiling for a single agent workflow
    'retry': 360,
    'poll': 1.0,
    'orm': 'default',
}

7. Exposing Results via a Polling API

Since agent workflows are long-running (seconds to minutes), a simple polling endpoint is the most robust approach. SSE streaming is explored in the next section.

# agents/views.py (continued)
from django.shortcuts import get_object_or_404

class AgentRunStatusView(APIView):
    def get(self, request, run_id):
        run = get_object_or_404(AgentRun, id=run_id)
        return Response({
            "run_id": str(run.id),
            "status": run.status,
            "result": run.result if run.status == 'completed' else None,
            "error": run.error if run.status == 'failed' else None,
        })

The frontend polls GET /api/agents/runs/{run_id}/ every 2 seconds until status is completed or failed.


8. Error Handling & Retries

There are two distinct layers of retry semantics in this architecture:

LayerMechanismScope
Agent-levelLangGraph conditional edges + fallback nodeHandles LLM errors, tool failures inside the graph
Task-levelDjango Q retry + max_attemptsHandles worker crashes, network timeouts, OOM kills

The key principle: agent-level retries are cheap (just re-invoke a node); task-level retries are expensive (rerun the whole workflow). Design your agent graph to handle partial failures internally using conditional edges before relying on task-level retries.

def error_handler_node(state: AgentState) -> AgentState:
    """Catches errors from any node and marks the run as recoverable."""
    return {
        "error": f"Handled gracefully: {state.get('error')}",
        "current_step": "done",
    }

9. Related Reading


Coming soon: full implementation with streaming SSE responses and a LangGraph-native Django checkpoint saver backed by Postgres.