Multi-Workflow Orchestration with Parallel Execution in LangGraph

Complex research topics benefit from multiple source types: web for current information, academic papers for peer-reviewed depth, and books for cross-domain connections. Running these workflows separately is inefficient. Running them together requires careful orchestration to handle different quality parameters, independent failures, and long execution times.

The Problem

When orchestrating multiple AI workflows:

  • Different parameters: Each workflow has its own quality/depth settings.
  • Independent failures: One workflow failing shouldn’t stop others.
  • Long execution: Hours-long runs need checkpoint/resume support.
  • Output combination: Results must be unified from heterogeneous sources.

The Solution: Wrapper Workflow Pattern

Create an orchestration layer that:

  1. Maps unified quality tiers to each workflow’s parameters.
  2. Runs workflows in parallel with asyncio.gather.
  3. Wraps each workflow in try/except for independent failure handling.
  4. Saves checkpoints after major phases for resumption.
flowchart LR
    START --> P[run_parallel_workflows]
    P --> |asyncio.gather| W[web_research]
    P --> |asyncio.gather| A[academic_research]
    W --> C[checkpoint]
    A --> C
    C --> S[synthesize]
    S --> END

Implementation

Quality Tier Mapping

Map unified quality levels to each workflow’s specific parameters:

QualityTier = Literal["quick", "standard", "comprehensive"]
 
def get_workflow_quality(quality: QualityTier, workflow: str) -> str:
    """Get workflow-specific quality parameter."""
    if quality == "comprehensive" and workflow == "academic":
        return "high_quality"  # Academic uses different naming
    return quality

This handles the common case where most workflows use the same names but some have exceptions.

WorkflowResult for Structured Status

Define a result type that captures success/failure with context:

WorkflowStatus = Literal["pending", "running", "completed", "failed"]
 
class WorkflowResult(TypedDict):
    workflow_type: str
    final_output: str | None
    started_at: datetime
    completed_at: datetime | None
    status: WorkflowStatus
    error: str | None

Independent Failure Handling

The key pattern: wrap each workflow in try/except and return a result object rather than raising. This is better than asyncio.gather(return_exceptions=True) because you get structured error information.

async def execute_workflow(
    workflow_type: str,
    coro: Any,
    timeout_seconds: int = 300,
) -> WorkflowResult:
    """Execute a workflow with standardized error handling."""
    started_at = datetime.now(tz=UTC)
 
    try:
        async with asyncio.timeout(timeout_seconds):
            result = await coro
            return WorkflowResult(
                workflow_type=workflow_type,
                final_output=result.get("final_report"),
                started_at=started_at,
                completed_at=datetime.now(tz=UTC),
                status="completed",
                error=None,
            )
 
    except asyncio.TimeoutError:
        return WorkflowResult(
            status="failed",
            error=f"Timed out after {timeout_seconds}s",
            ...
        )
 
    except asyncio.CancelledError:
        raise  # Always re-raise CancelledError
 
    except Exception:
        logger.exception("Workflow %s failed", workflow_type)
        return WorkflowResult(
            status="failed",
            error="See logs for details",
            ...
        )

Parallel Execution with asyncio.gather

Run workflows in parallel. Each returns a WorkflowResult regardless of success or failure:

async def run_parallel_workflows(state: OrchestrationState) -> dict:
    query = state["input"]["query"]
    quality = state["input"]["quality"]
 
    web_result, academic_result = await asyncio.gather(
        execute_workflow(
            "web",
            deep_research(query=query, depth=get_workflow_quality(quality, "web")),
            timeout_seconds=600,
        ),
        execute_workflow(
            "academic",
            academic_lit_review(topic=query, quality=get_workflow_quality(quality, "academic")),
            timeout_seconds=900,
        ),
    )
 
    return {"web_result": web_result, "academic_result": academic_result}

File-Based Checkpointing

For long-running workflows, save state after each major phase:

def save_checkpoint(state: dict, name: str) -> Path:
    """Save workflow state to a checkpoint file."""
    CHECKPOINT_DIR.mkdir(exist_ok=True)
    timestamp = datetime.now(tz=UTC).strftime("%Y%m%d_%H%M%S")
    path = CHECKPOINT_DIR / f"{name}_{timestamp}.json"
    path.write_text(json.dumps(state, indent=2, default=str))
    return path
 
def load_checkpoint(path: str | Path) -> dict | None:
    """Load workflow state from a checkpoint file."""
    path = Path(path)
    if not path.exists():
        return None
    return json.loads(path.read_text())

Why This Approach Works

  1. Independent failures: One workflow failing returns a result object instead of raising. The other workflow continues.
  2. Structured errors: WorkflowResult captures timing, status, and error details for debugging.
  3. Unified quality: Single parameter configures all workflows appropriately.
  4. Resumability: Checkpoints enable recovery from interruption.
  5. Timeout protection: Prevents indefinite hangs on slow workflows.

Why Not return_exceptions=True?

The alternative pattern:

results = await asyncio.gather(run_web(), run_academic(), return_exceptions=True)
for result in results:
    if isinstance(result, Exception):
        # Handle...

This is worse because:

  • Mixed types in results require isinstance checks.
  • Only the exception message is captured, not timing or context.
  • Error handling is implicit rather than explicit.

Trade-offs

  • Complexity: More state management than a single workflow.
  • Duration: Comprehensive runs can take hours.
  • Resource usage: Multiple LLM calls and external APIs simultaneously.