Building AI Agent Workflows with LangGraph and Django
Building AI Agent Workflows with LangGraph and Django
Let's be honest: a single LLM call is impressive. But a production AI workflow — one that remembers context, retries failed steps, persists intermediate state, and exposes a clean API — is a whole different beast.
This is the capstone to two articles I've written previously:
- AI Agents: The Comprehensive Developer Guide — foundational concepts, architecture, and mental models for autonomous agents.
- Django Background Tasks: Escaping the Setup Nightmare — production-grade queue isolation and memory-safe long-running tasks.
In this guide, we marry those two worlds: LangGraph handles the agent brain (state, routing, tool execution) and Django handles the spine (persistence, API layer, background job dispatch).
What you'll learn:
- How LangGraph's
StateGraphmodels an agent as a directed graph of nodes and edges.- How to persist LangGraph checkpoint state in a Django ORM model.
- How to trigger agent runs safely as background tasks using Django Q.
- How to expose streaming or polling endpoints for agent results.
1. Why Orchestration Matters Beyond a Single LLM Call
A vanilla LLM call is stateless. You send a prompt, you receive a response, and everything evaporates. For a content generator or a chatbot, this is fine.
But real-world agent workflows have multi-step, conditional logic:
- Step A must complete before Step B can start.
- If Step B fails, retry or route to a fallback.
- Steps C and D can run in parallel.
- The whole workflow might run for 5 minutes — long after the HTTP request has timed out.
This is exactly the problem that LangGraph solves at the orchestration layer, and exactly why you need Django's background task infrastructure (not just a single endpoint) to back it up.
2. What is LangGraph?
LangGraph is a graph-based orchestration library built on top of LangChain. Instead of a linear chain of calls, you model your agent as a directed graph where:
- Nodes are units of work: an LLM call, a tool execution, a validation check.
- Edges are transitions: unconditional (always go to node B after A) or conditional (route to B or C based on the output of A).
- State is a shared Python
TypedDict(or Pydantic model) that flows through every node and accumulates results.
The key insight is that LangGraph's state is serialisable — it's just a Python dict. This makes it trivially persistable in a Django model.
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
current_step: str
result: str | None
error: str | None
3. The Architecture
Here is the high-level separation of concerns:
┌─────────────────────────────────────────────┐
│ Django (Spine) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ DRF API │ │ Django Q │ │ ORM │ │
│ │ Endpoint │→ │ Worker │→ │Checkpoint│ │
│ └──────────┘ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ LangGraph (Brain) │ │
│ │ StateGraph + Tools │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────┘
Django doesn't know about LangGraph internals — it simply stores the serialised checkpoint blob and surfaces the result via API. LangGraph doesn't know about Django — it receives a configuration dict and writes state to whatever CheckpointSaver you plug in.
4. Building the Graph
Here's a minimal two-node graph: a research node followed by a summarise node with a conditional guard.
# agents/graph.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from .state import AgentState
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def research_node(state: AgentState) -> AgentState:
"""Calls a search tool and appends results to messages."""
query = state["messages"][-1].content
# TODO: replace with your real tool call
results = f"[Search results for: {query}]"
return {
"messages": [AIMessage(content=results)],
"current_step": "summarise",
}
def summarise_node(state: AgentState) -> AgentState:
"""Summarises the accumulated messages into a final result."""
response = llm.invoke(state["messages"])
return {
"messages": [response],
"result": response.content,
"current_step": "done",
}
def should_summarise(state: AgentState) -> str:
"""Conditional edge: only summarise if we have research results."""
if state.get("error"):
return END
return "summarise"
def build_graph() -> StateGraph:
graph = StateGraph(AgentState)
graph.add_node("research", research_node)
graph.add_node("summarise", summarise_node)
graph.set_entry_point("research")
graph.add_conditional_edges("research", should_summarise, {
"summarise": "summarise",
END: END,
})
graph.add_edge("summarise", END)
return graph.compile()
5. Persisting State in Django
LangGraph ships a pluggable CheckpointSaver interface. We implement a Django ORM-backed saver so every state transition is atomically written to Postgres.
# agents/models.py
import uuid
from django.db import models
class AgentRun(models.Model):
"""Tracks a single LangGraph workflow execution."""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
thread_id = models.CharField(max_length=255, unique=True, db_index=True)
status = models.CharField(
max_length=20,
choices=[
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
],
default='pending',
)
checkpoint = models.JSONField(null=True, blank=True) # serialised AgentState
result = models.TextField(null=True, blank=True)
error = models.TextField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
During graph execution, save state after each node:
# agents/tasks.py
import json
from .models import AgentRun
from .graph import build_graph
def run_agent_workflow(run_id: str, user_query: str):
"""Background task: execute the LangGraph workflow and persist state."""
run = AgentRun.objects.get(id=run_id)
run.status = 'running'
run.save(update_fields=['status'])
try:
graph = build_graph()
final_state = graph.invoke({
"messages": [{"role": "user", "content": user_query}],
"current_step": "research",
"result": None,
"error": None,
})
run.checkpoint = final_state
run.result = final_state.get("result", "")
run.status = 'completed'
except Exception as exc:
run.status = 'failed'
run.error = str(exc)
finally:
run.save(update_fields=['status', 'checkpoint', 'result', 'error', 'updated_at'])
6. Triggering Workflows as Background Tasks
As covered in the Django Background Tasks guide, you must use transaction.on_commit to avoid the race condition where the worker picks up the task before the DB row is committed.
# agents/views.py
from django.db import transaction
from django_q.tasks import async_task
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import AgentRun
class AgentRunView(APIView):
def post(self, request):
query = request.data.get("query")
if not query:
return Response({"error": "query is required"}, status=status.HTTP_400_BAD_REQUEST)
run = AgentRun.objects.create(
thread_id=f"user-{request.user.id}-{AgentRun.objects.count()}",
)
# Safe: fires ONLY after the DB transaction commits (see Background Tasks post)
transaction.on_commit(
lambda: async_task(
'agents.tasks.run_agent_workflow',
str(run.id),
query,
q_options={'queue': 'ai_agents'},
)
)
return Response({"run_id": str(run.id), "status": "pending"}, status=status.HTTP_202_ACCEPTED)
Recommended queue configuration for agent runs (add to settings.py):
# settings.py
Q_CLUSTER_AGENTS = {
'name': 'ai_agents',
'workers': 4,
'timeout': 300, # 5-minute ceiling for a single agent workflow
'retry': 360,
'poll': 1.0,
'orm': 'default',
}
7. Exposing Results via a Polling API
Since agent workflows are long-running (seconds to minutes), a simple polling endpoint is the most robust approach. SSE streaming is explored in the next section.
# agents/views.py (continued)
from django.shortcuts import get_object_or_404
class AgentRunStatusView(APIView):
def get(self, request, run_id):
run = get_object_or_404(AgentRun, id=run_id)
return Response({
"run_id": str(run.id),
"status": run.status,
"result": run.result if run.status == 'completed' else None,
"error": run.error if run.status == 'failed' else None,
})
The frontend polls GET /api/agents/runs/{run_id}/ every 2 seconds until status is completed or failed.
8. Error Handling & Retries
There are two distinct layers of retry semantics in this architecture:
| Layer | Mechanism | Scope |
|---|---|---|
| Agent-level | LangGraph conditional edges + fallback node | Handles LLM errors, tool failures inside the graph |
| Task-level | Django Q retry + max_attempts | Handles worker crashes, network timeouts, OOM kills |
The key principle: agent-level retries are cheap (just re-invoke a node); task-level retries are expensive (rerun the whole workflow). Design your agent graph to handle partial failures internally using conditional edges before relying on task-level retries.
def error_handler_node(state: AgentState) -> AgentState:
"""Catches errors from any node and marks the run as recoverable."""
return {
"error": f"Handled gracefully: {state.get('error')}",
"current_step": "done",
}
9. Related Reading
- AI Agents: The Comprehensive Developer Guide — foundational concepts, the Perceive→Plan→Act loop, and multi-agent architectures.
- Django Background Tasks: Escaping the Setup Nightmare — queue isolation, idempotency,
transaction.on_commit, and ORM vs Redis brokers. - LangGraph on LangGraph Docs — official reference for
StateGraph,ToolNode, and checkpoint savers.
Coming soon: full implementation with streaming SSE responses and a LangGraph-native Django checkpoint saver backed by Postgres.
