When building an image generation pipeline, the natural instinct is to generate one image per location and retry on failure. This approach has problems: Retries are sequential, feedback loops don’t guarantee improvement, and state management for retry tracking is complex. A better approach is to generate two candidates per location from genuinely different creative briefs, then use a vision LLM to pick the winner.
The article describes how we built this pattern in LangGraph using the Send API for dynamic fan-out, add reducers for accumulating results across retry rounds, and cross-strategy fallback for failed locations.
The Problem With Retry Loops
A standard image generation workflow generates one image per location, reviews it with a vision model, and retries with feedback if it fails:
# Sequential retry loop — what we replaced
for attempt in range(max_retries):
image = await generate_image(brief)
review = await review_with_vision(image)
if review.passes:
return image
brief = incorporate_feedback(brief, review.feedback)
# May still fail after all retriesThis has several issues:
- Each retry waits for the previous to fail, inflating wall-clock time
- Retries with feedback don’t guarantee improvement; they sometimes oscillate
- State management for tracking retry counts, pending retries, and retry briefs adds complexity
- Failed generations provide no value to the final output
The Over-Generation Pattern
Instead of retrying, generate two candidates per location upfront from different briefs, then select the best:
graph TD A[Plan Briefs] -->|"~12 Sends"| B[generate_candidate] B --> C[sync_after_generation] C -->|"~6 Sends"| D[select_per_location] D --> E[sync_after_selection] E -->|failed locations| F{retry?} F -->|"cross-strategy"| B F -->|"all done"| G[finalize]
For a document with six image locations, the planning phase writes two CandidateBrief objects per location—one literal, one metaphorical. The graph fans out 12 parallel generation tasks, syncs, fans out six parallel selection tasks (one per location), syncs again, and optionally retries failed locations with a different image source type.
State Design
The state uses Annotated[list, add] reducers so parallel nodes can append results without conflicts:
from operator import add
from typing import Annotated, Literal
from typing_extensions import TypedDict
def merge_dicts(left: dict, right: dict) -> dict:
"""Reducer that merges dicts from parallel nodes."""
result = dict(left) if left else {}
if right:
result.update(right)
return result
class ImageGenResult(TypedDict):
location_id: str
brief_id: str # "{location_id}_{candidate_index}"
success: bool
image_bytes: bytes | None
image_type: Literal["generated", "public_domain", "diagram"]
prompt_used: str
class LocationSelection(TypedDict):
location_id: str
selected_brief_id: str | None
quality_tier: Literal["excellent", "acceptable", "failed"]
reasoning: str
class WorkflowState(TypedDict, total=False):
generation_results: Annotated[list[ImageGenResult], add]
selection_results: Annotated[list[LocationSelection], add]
retry_count: Annotated[dict[str, int], merge_dicts]The add reducer is critical. Each parallel generate_candidate node returns a one-element list, and the reducer concatenates them. The list grows across retry rounds, which is intentional: Accumulated results enable correct deduplication and retry count derivation.
Fan-Out With Send
LangGraph’s Send API creates dynamic parallel tasks at runtime. The routing function returns one Send per candidate brief:
from langgraph.types import Send
def route_to_generation(state: WorkflowState) -> list[Send] | str:
"""Fan out one Send per CandidateBrief."""
briefs = state.get("candidate_briefs", [])
if not briefs:
return "finalize"
sends = []
for brief in briefs:
brief_id = f"{brief['location_id']}_{brief['candidate_index']}"
sends.append(Send("generate_candidate", {
"brief": brief,
"brief_id": brief_id,
}))
return sends or "finalize"Each Send gets its own isolated state slice, executes in parallel, and writes back via the add reducer. For six locations with two candidates each, 12 generation tasks run simultaneously.
Per-Location Pair Selection
After generation, results are grouped by location_id and each location gets its own selection task:
async def select_per_location_node(state: dict) -> dict:
"""Select the best candidate using vision pair comparison.
Three cases based on candidate count:
- 0 → quality_tier="failed" (trigger retry)
- 1 → auto-select, quality_tier="acceptable"
- 2 → vision comparison, quality_tier="excellent"
"""
location_id = state["location_id"]
candidates = state["candidates"]
if not candidates:
return {"selection_results": [{
"location_id": location_id,
"selected_brief_id": None,
"quality_tier": "failed",
"reasoning": "No candidates succeeded",
}]}
if len(candidates) == 1:
return {"selection_results": [{
"location_id": location_id,
"selected_brief_id": candidates[0]["brief_id"],
"quality_tier": "acceptable",
"reasoning": "Auto-selected: only one candidate",
}]}
# Vision pair comparison for two candidates
best_idx = await vision_pair_select(
[c["image_bytes"] for c in candidates],
selection_criteria=state.get("selection_criteria", ""),
)
return {"selection_results": [{
"location_id": location_id,
"selected_brief_id": candidates[best_idx]["brief_id"],
"quality_tier": "excellent",
"reasoning": "Selected via vision pair comparison",
}]}The quality tier captures confidence. “Excellent” means the workflow had two options and the vision model picked the best. “Acceptable” means only one candidate succeeded. “Failed” means the workflow needs to retry. This is more informative than binary pass/fail and enables selective retry decisions.
Neutral Selection Criteria
A subtle correctness issue: The comparison criteria passed to the vision LLM must not favor either candidate. If you use candidate one’s brief text as the criteria, the comparison is biased toward candidate one’s approach.
def _build_selection_criteria(opportunities, editorial_notes, location_id):
"""Build neutral criteria from document-level purpose, not candidate briefs."""
for opp in opportunities:
if opp.location_id == location_id:
return f"Purpose: {opp.purpose}. {opp.rationale}"
return editorial_notes or ""The criteria describe why the location needs an image—from the planning phase—not how any specific candidate addresses it.
Cross-Strategy Fallback
When both candidates fail at a location, retrying with the same image source type often fails the same way. Switch strategies instead:
_FALLBACK_IMAGE_TYPE = {
"public_domain": "generated", # search found nothing → try AI generation
"generated": "public_domain", # AI generation failed → try search
"diagram": "generated", # diagram too complex → try AI generation
}
def route_after_selection(state: WorkflowState) -> list[Send] | str:
"""Retry failed locations with alternate image source type."""
retry_count = state.get("retry_count", {})
max_retries = state.get("config", {}).get("max_retries", 1)
failed = [
s["location_id"]
for s in state.get("selection_results", [])
if s["quality_tier"] == "failed"
and retry_count.get(s["location_id"], 0) <= max_retries
]
if not failed:
return "finalize"
sends = []
for loc_id in failed:
for orig in briefs_for_location(loc_id)[:2]:
fallback = _FALLBACK_IMAGE_TYPE.get(orig["image_type"], "generated")
retry_brief = {**orig, "image_type": fallback}
round_num = retry_count.get(loc_id, 0)
brief_id = f"{loc_id}_{orig['candidate_index']}_retry{round_num}"
sends.append(Send("generate_candidate", {
"brief": retry_brief,
"brief_id": brief_id,
}))
return sends if sends else "finalize"Over-generation provides inherent redundancy, so max_retries drops from two to one. The cross-strategy fallback provides diversity that same-approach retries cannot.
Gotchas With Add Reducers and Retry Rounds
Building this pattern revealed several non-obvious bugs that stem from using Annotated[list, add] reducers with retry loops. The add reducer concatenates lists across all rounds, so selection_results accumulates entries from round one and retry rounds together. This is correct behavior, but requires careful handling.
Deduplication: Keep Last, Not First
When selecting winners in the finalize step, you must keep the last selection entry per location_id, not the first. Round one’s “failed” entry appears before the retry round’s “excellent” entry:
# Wrong: first entry per location wins (round one failure claims slot)
seen = set()
for sel in selection_results:
if sel["location_id"] not in seen:
seen.add(sel["location_id"])
# ... process—BUG: retry success ignored
# Right: last entry per location wins (retry overrides failure)
latest = {}
for sel in selection_results:
latest[sel["location_id"]] = sel # last write winsDerive Retry Counts From Accumulated Results
Do not increment retry_count from stale state. With parallel selection nodes writing to a shared dict via a merge reducer, each node independently increments the count, causing over-inflation:
# Wrong: incremental count from stale state
retry_count = dict(state.get("retry_count", {}))
for s in selection_results:
if s["quality_tier"] == "failed":
retry_count[s["location_id"]] += 1 # BUG: re-counts old failures
# Right: derive from accumulated list directly
retry_count = {}
for s in selection_results:
if s["quality_tier"] == "failed":
retry_count[s["location_id"]] = retry_count.get(s["location_id"], 0) + 1Brief ID Collision Across Retry Rounds
If brief_id is "{location_id}_{candidate_index}", retry round candidates collide with originals. Include the round number:
# Round one: "section_1_1", "section_1_2"
# Retry one: "section_1_1_retry1", "section_1_2_retry1"
# Retry two: "section_1_1_retry2", "section_1_2_retry2"Memory Management
With 12 candidates generating two to five MB images each, memory adds up. After selection, clear non-winning candidates. Because generation_results uses an add reducer, you cannot replace the list; mutate entries in place:
def sync_after_selection(state):
# ... derive retry_count, identify winners ...
for gen in state.get("generation_results", []):
if gen["brief_id"] not in winning_brief_ids and gen.get("image_bytes"):
gen["image_bytes"] = b"" # free memory in placeGraph Construction
Putting it together:
from langgraph.graph import END, START, StateGraph
builder = StateGraph(WorkflowState)
builder.add_node("plan_briefs", plan_briefs_node)
builder.add_node("generate_candidate", generate_candidate_node)
builder.add_node("sync_after_generation", sync_after_generation)
builder.add_node("select_per_location", select_per_location_node)
builder.add_node("sync_after_selection", sync_after_selection)
builder.add_node("finalize", finalize_node)
builder.add_edge(START, "plan_briefs")
builder.add_conditional_edges(
"plan_briefs", route_to_generation, ["generate_candidate", "finalize"]
)
builder.add_edge("generate_candidate", "sync_after_generation")
builder.add_conditional_edges(
"sync_after_generation", route_to_selection, ["select_per_location", "finalize"]
)
builder.add_edge("select_per_location", "sync_after_selection")
builder.add_conditional_edges(
"sync_after_selection", route_after_selection, ["generate_candidate", "finalize"]
)
builder.add_edge("finalize", END)
graph = builder.compile()The retry loop flows: sync_after_selection → route_after_selection → generate_candidate → sync_after_generation → route_to_selection → select_per_location → sync_after_selection. With max_retries=1, the worst-case graph depth is about 16, well within LangGraph’s default recursion limit of 25.
Cost Analysis
For six locations with two candidates each:
| Phase | Calls | Notes |
|---|---|---|
| Generation | 12 parallel | Two per location, simultaneous |
| Selection | Six parallel | One vision call per location |
| Retry (if needed) | Two to four per failed location | Cross-strategy fallback |
Total for all locations succeeding first try: 18 API calls. A single-generation approach with review and retry uses six generation + six review + variable retries. The over-generation approach front-loads cost for higher quality and reliability.
Why Two Candidates?
Two provides the best cost-quality tradeoff. One candidate gives no redundancy or comparison. Three or more candidates have diminishing returns but cost N-1 additional vision calls for tournament-style comparison. Research on pair comparison (MLLM-as-a-Judge) shows about 80 percent selection accuracy, which is sufficient for image quality decisions.
Related Work
The individual components are well-established:
- Best-of-N sampling is thoroughly studied in diffusion model literature, but at the model internals level (noise selection, trajectory resampling), not at the workflow orchestration level
- MLLM-as-a-Judge pair comparison achieves 80.6 percent accuracy vs. 55.7 percent for scoring (Chen et al., ICML 2024)
- LangGraph Send API for fan-out/fan-in is covered in the official map-reduce documentation
- Over-generation and pruning is an established NLG pattern, but hasn’t crossed over to image generation pipelines
The novel contribution is the integration: Composing these techniques into a single LangGraph workflow with quality tiers, cross-strategy fallback, and the practical gotchas (deduplication, retry count derivation, brief ID collisions, memory management) that emerge from combining add reducers with retry loops.
Resources
- GitHub Gist with the core pattern code
- Parallel Candidate Vision Selection—the underlying vision pair comparison technique
- Two-Pass LLM Planning—how two candidate briefs per location are planned
- LangGraph Map-Reduce How-To—official Send API documentation
- MLLM-as-a-Judge (ICML 2024)—pair comparison accuracy research