Multi-Workflow Orchestration with Parallel Execution in LangGraph
Complex research topics benefit from multiple source types: web for current information, academic papers for peer-reviewed depth, and books for cross-domain connections. Running these workflows separately is inefficient. Running them together requires careful orchestration to handle different quality parameters, independent failures, and long execution times.
The Problem
When orchestrating multiple AI workflows:
- Different parameters: Each workflow has its own quality/depth settings.
- Independent failures: One workflow failing shouldn’t stop others.
- Long execution: Hours-long runs need checkpoint/resume support.
- Output combination: Results must be unified from heterogeneous sources.
The Solution: Wrapper Workflow Pattern
Create an orchestration layer that:
- Maps unified quality tiers to each workflow’s parameters.
- Runs workflows in parallel with
asyncio.gather. - Wraps each workflow in try/except for independent failure handling.
- Saves checkpoints after major phases for resumption.
flowchart LR START --> P[run_parallel_workflows] P --> |asyncio.gather| W[web_research] P --> |asyncio.gather| A[academic_research] W --> C[checkpoint] A --> C C --> S[synthesize] S --> END
Implementation
Quality Tier Mapping
Map unified quality levels to each workflow’s specific parameters:
QualityTier = Literal["quick", "standard", "comprehensive"]
def get_workflow_quality(quality: QualityTier, workflow: str) -> str:
"""Get workflow-specific quality parameter."""
if quality == "comprehensive" and workflow == "academic":
return "high_quality" # Academic uses different naming
return qualityThis handles the common case where most workflows use the same names but some have exceptions.
WorkflowResult for Structured Status
Define a result type that captures success/failure with context:
WorkflowStatus = Literal["pending", "running", "completed", "failed"]
class WorkflowResult(TypedDict):
workflow_type: str
final_output: str | None
started_at: datetime
completed_at: datetime | None
status: WorkflowStatus
error: str | NoneIndependent Failure Handling
The key pattern: wrap each workflow in try/except and return a result object rather than raising. This is better than asyncio.gather(return_exceptions=True) because you get structured error information.
async def execute_workflow(
workflow_type: str,
coro: Any,
timeout_seconds: int = 300,
) -> WorkflowResult:
"""Execute a workflow with standardized error handling."""
started_at = datetime.now(tz=UTC)
try:
async with asyncio.timeout(timeout_seconds):
result = await coro
return WorkflowResult(
workflow_type=workflow_type,
final_output=result.get("final_report"),
started_at=started_at,
completed_at=datetime.now(tz=UTC),
status="completed",
error=None,
)
except asyncio.TimeoutError:
return WorkflowResult(
status="failed",
error=f"Timed out after {timeout_seconds}s",
...
)
except asyncio.CancelledError:
raise # Always re-raise CancelledError
except Exception:
logger.exception("Workflow %s failed", workflow_type)
return WorkflowResult(
status="failed",
error="See logs for details",
...
)Parallel Execution with asyncio.gather
Run workflows in parallel. Each returns a WorkflowResult regardless of success or failure:
async def run_parallel_workflows(state: OrchestrationState) -> dict:
query = state["input"]["query"]
quality = state["input"]["quality"]
web_result, academic_result = await asyncio.gather(
execute_workflow(
"web",
deep_research(query=query, depth=get_workflow_quality(quality, "web")),
timeout_seconds=600,
),
execute_workflow(
"academic",
academic_lit_review(topic=query, quality=get_workflow_quality(quality, "academic")),
timeout_seconds=900,
),
)
return {"web_result": web_result, "academic_result": academic_result}File-Based Checkpointing
For long-running workflows, save state after each major phase:
def save_checkpoint(state: dict, name: str) -> Path:
"""Save workflow state to a checkpoint file."""
CHECKPOINT_DIR.mkdir(exist_ok=True)
timestamp = datetime.now(tz=UTC).strftime("%Y%m%d_%H%M%S")
path = CHECKPOINT_DIR / f"{name}_{timestamp}.json"
path.write_text(json.dumps(state, indent=2, default=str))
return path
def load_checkpoint(path: str | Path) -> dict | None:
"""Load workflow state from a checkpoint file."""
path = Path(path)
if not path.exists():
return None
return json.loads(path.read_text())Why This Approach Works
- Independent failures: One workflow failing returns a result object instead of raising. The other workflow continues.
- Structured errors:
WorkflowResultcaptures timing, status, and error details for debugging. - Unified quality: Single parameter configures all workflows appropriately.
- Resumability: Checkpoints enable recovery from interruption.
- Timeout protection: Prevents indefinite hangs on slow workflows.
Why Not return_exceptions=True?
The alternative pattern:
results = await asyncio.gather(run_web(), run_academic(), return_exceptions=True)
for result in results:
if isinstance(result, Exception):
# Handle...This is worse because:
- Mixed types in results require isinstance checks.
- Only the exception message is captured, not timing or context.
- Error handling is implicit rather than explicit.
Trade-offs
- Complexity: More state management than a single workflow.
- Duration: Comprehensive runs can take hours.
- Resource usage: Multiple LLM calls and external APIs simultaneously.