Workflow State Decoupling via Direct Store Queries

In multi-phase supervision workflows, earlier phases discover and store papers while later phases process them. The naive approach passes the full corpus through state, creating bloat, tight coupling, and stale data problems.

This pattern simplifies state by having loops query persistent stores directly for data rather than receiving large corpus dictionaries.

The Problem

Passing corpus data through workflow state creates issues:

# Antipattern: State bloated with corpus data
class Loop5State(TypedDict):
    current_review: str
    paper_summaries: dict[str, Any]      # full DOI -> summary mapping
    zotero_keys: dict[str, str]          # DOI -> key mapping
    # ... 10+ more fields

Problems with this approach:

  • Tight coupling: Loops depend on orchestration’s data structures.
  • State bloat: Large dicts serialized through every node transition.
  • Stale data: Corpus reflects discovery-time state, not current store contents.
  • Testing difficulty: Loops require realistic corpus fixtures.

The Solution

Have loops query persistent stores directly:

graph LR
    subgraph "BEFORE: Corpus data flows through state"
        L1[Loop 1] -->|corpus<br/>summaries<br/>zotero_keys| L2[Loop 2]
        L2 -->|corpus<br/>summaries<br/>zotero_keys| L3[Loop 3]
    end

    subgraph "AFTER: Loops query stores directly"
        A1[Loop 1] -->|review<br/>topic<br/>quality| A2[Loop 2]
        A2 -->|review<br/>topic<br/>quality| A3[Loop 3]
        A1 --> SQS[StoreQueryService]
        A2 --> SQS
        A3 --> SQS
        SQS --> ES[Elasticsearch]
        SQS --> Z[Zotero]
        SQS --> C[ChromaDB]
    end

Store Query Service

Centralize store access with lazy initialization and fallback strategies:

class StoreQueryService:
    """Query service for loops to access stored data directly."""
 
    def __init__(self, store_manager=None):
        self._store_manager = store_manager
        self._owned_manager = False
 
    @property
    def store_manager(self):
        """Lazy initialization of store manager."""
        if self._store_manager is None:
            from langchain_tools.base import get_store_manager
            self._store_manager = get_store_manager()
            self._owned_manager = True
        return self._store_manager
 
    async def get_paper_content(
        self,
        zotero_key: str,
        compression_level: int = 2,
    ) -> tuple[Optional[str], Optional[dict]]:
        """Fetch paper content with compression fallback."""
        store = self.store_manager.es_stores.store
        query = {"term": {"zotero_key": zotero_key}}
 
        records = await store.search(
            query=query,
            size=1,
            compression_level=compression_level,
        )
 
        if records:
            return records[0].content, records[0].metadata
 
        # Fallback to other compression levels
        for alt_level in [1, 2, 0]:
            if alt_level == compression_level:
                continue
            records = await store.search(
                query=query, size=1, compression_level=alt_level
            )
            if records:
                return records[0].content, records[0].metadata
 
        return None, None
 
    async def __aenter__(self):
        return self
 
    async def __aexit__(self, *args):
        await self.close()

Minimal Loop State

Remove corpus data from state, keep only iteration control:

from typing import Annotated
from typing_extensions import TypedDict
 
 
def add_lists(a: list, b: list) -> list:
    return a + b
 
 
class Loop2State(TypedDict, total=False):
    """State for Loop 2—no corpus data passed through."""
 
    # Core inputs (minimal)
    current_review: str
    topic: str
    research_questions: list[str]
    quality_settings: dict
 
    # Iteration tracking
    iteration: int
    max_iterations: int
    explored_bases: Annotated[list[str], add_lists]
    is_complete: bool
 
    # Error tracking
    errors: Annotated[list[dict], add_lists]

Use typed result dataclasses instead of dicts:

@dataclass
class Loop2Result:
    """Typed result from Loop 2."""
    current_review: str
    changes_summary: str
    explored_bases: list[str]

Decoupled Loop Signatures

Loops accept only essential parameters:

@traceable(run_type="chain", name="Loop2_LiteratureExpansion")
async def run_loop2_standalone(
    review: str,
    topic: str,
    research_questions: list[str],
    quality_settings: dict,
    config: dict | None = None,
) -> Loop2Result:
    """Run Loop 2 literature base expansion.
 
    Papers discovered are sent to stores via nested workflow.
    No corpus data passed back to orchestration.
    """
    max_iterations = quality_settings.get("max_stages", 1)
 
    initial_state: Loop2State = {
        "current_review": review,
        "topic": topic,
        "research_questions": research_questions,
        "quality_settings": quality_settings,
        "iteration": 0,
        "max_iterations": max_iterations,
        "explored_bases": [],
        "is_complete": False,
    }
 
    graph = create_loop2_graph()
    result = await graph.ainvoke(initial_state, config=config)
 
    return Loop2Result(
        current_review=result["current_review"],
        changes_summary=f"Expanded {len(result['explored_bases'])} bases",
        explored_bases=result.get("explored_bases", []),
    )

Orchestration Integration

Orchestration passes minimal params and extracts typed results:

async def run_loop2_node(state: dict) -> dict:
    """Pass minimal params, extract typed result."""
    input_data = state.get("input", {})
 
    result = await run_loop2_standalone(
        review=state["current_review"],
        topic=input_data.get("topic", ""),
        research_questions=input_data.get("research_questions", []),
        quality_settings=state["quality_settings"],
        config={"run_name": f"loop2:{input_data.get('topic', '')[:20]}"},
    )
 
    # No corpus merging—papers already in stores
    return {
        "current_review": result.current_review,
        "loop2_result": {
            "explored_bases": result.explored_bases,
            "changes_summary": result.changes_summary,
        },
    }

Using Store Queries in Loops

Example of citation verification using store queries:

async def resolve_invalid_citations(
    document: str,
    invalid_keys: set[str],
    topic: str,
) -> str:
    """Resolve invalid citations by querying stores directly."""
    async with StoreQueryService() as store_query:
        for key in invalid_keys:
            # Query store directly instead of filtering corpus dict
            content, metadata = await store_query.get_paper_content(key)
 
            if content and metadata:
                # Key exists in store—citation is valid
                continue
 
            # Key not in store—resolve with LLM
            document = await _llm_resolve_citation(
                document=document,
                invalid_key=key,
                topic=topic,
            )
 
        return document

When to Use This Pattern

Use when:

  • Workflows have multiple sequential phases processing stored documents.
  • Data is persisted to external stores (Elasticsearch, Zotero, ChromaDB).
  • Later phases only need to query subsets of stored data.
  • Loops should be independently testable without full pipeline setup.

Do not use when:

  • All data exists only in workflow state (no persistent stores).
  • Phases genuinely need the full corpus in memory for cross-referencing.
  • Query latency would significantly impact hot loops.

Trade-offs

Benefits:

  • Decoupling: Loops don’t depend on orchestration’s data structures.
  • Scalability: State size independent of corpus size.
  • Fresh data: Queries return current store contents, not stale snapshots.
  • Testability: Loops runnable standalone with mock store queries.
  • Type safety: Typed result dataclasses instead of untyped dicts.

Costs:

  • Query latency: Each store query adds network overhead.
  • Connection management: Must properly close store clients.
  • Debugging complexity: Data flow less visible in state snapshots.