Workflow State Decoupling via Direct Store Queries
In multi-phase supervision workflows, earlier phases discover and store papers while later phases process them. The naive approach passes the full corpus through state, creating bloat, tight coupling, and stale data problems.
This pattern simplifies state by having loops query persistent stores directly for data rather than receiving large corpus dictionaries.
The Problem
Passing corpus data through workflow state creates issues:
# Antipattern: State bloated with corpus data
class Loop5State(TypedDict):
current_review: str
paper_summaries: dict[str, Any] # full DOI -> summary mapping
zotero_keys: dict[str, str] # DOI -> key mapping
# ... 10+ more fieldsProblems with this approach:
- Tight coupling: Loops depend on orchestration’s data structures.
- State bloat: Large dicts serialized through every node transition.
- Stale data: Corpus reflects discovery-time state, not current store contents.
- Testing difficulty: Loops require realistic corpus fixtures.
The Solution
Have loops query persistent stores directly:
graph LR subgraph "BEFORE: Corpus data flows through state" L1[Loop 1] -->|corpus<br/>summaries<br/>zotero_keys| L2[Loop 2] L2 -->|corpus<br/>summaries<br/>zotero_keys| L3[Loop 3] end subgraph "AFTER: Loops query stores directly" A1[Loop 1] -->|review<br/>topic<br/>quality| A2[Loop 2] A2 -->|review<br/>topic<br/>quality| A3[Loop 3] A1 --> SQS[StoreQueryService] A2 --> SQS A3 --> SQS SQS --> ES[Elasticsearch] SQS --> Z[Zotero] SQS --> C[ChromaDB] end
Store Query Service
Centralize store access with lazy initialization and fallback strategies:
class StoreQueryService:
"""Query service for loops to access stored data directly."""
def __init__(self, store_manager=None):
self._store_manager = store_manager
self._owned_manager = False
@property
def store_manager(self):
"""Lazy initialization of store manager."""
if self._store_manager is None:
from langchain_tools.base import get_store_manager
self._store_manager = get_store_manager()
self._owned_manager = True
return self._store_manager
async def get_paper_content(
self,
zotero_key: str,
compression_level: int = 2,
) -> tuple[Optional[str], Optional[dict]]:
"""Fetch paper content with compression fallback."""
store = self.store_manager.es_stores.store
query = {"term": {"zotero_key": zotero_key}}
records = await store.search(
query=query,
size=1,
compression_level=compression_level,
)
if records:
return records[0].content, records[0].metadata
# Fallback to other compression levels
for alt_level in [1, 2, 0]:
if alt_level == compression_level:
continue
records = await store.search(
query=query, size=1, compression_level=alt_level
)
if records:
return records[0].content, records[0].metadata
return None, None
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()Minimal Loop State
Remove corpus data from state, keep only iteration control:
from typing import Annotated
from typing_extensions import TypedDict
def add_lists(a: list, b: list) -> list:
return a + b
class Loop2State(TypedDict, total=False):
"""State for Loop 2—no corpus data passed through."""
# Core inputs (minimal)
current_review: str
topic: str
research_questions: list[str]
quality_settings: dict
# Iteration tracking
iteration: int
max_iterations: int
explored_bases: Annotated[list[str], add_lists]
is_complete: bool
# Error tracking
errors: Annotated[list[dict], add_lists]Use typed result dataclasses instead of dicts:
@dataclass
class Loop2Result:
"""Typed result from Loop 2."""
current_review: str
changes_summary: str
explored_bases: list[str]Decoupled Loop Signatures
Loops accept only essential parameters:
@traceable(run_type="chain", name="Loop2_LiteratureExpansion")
async def run_loop2_standalone(
review: str,
topic: str,
research_questions: list[str],
quality_settings: dict,
config: dict | None = None,
) -> Loop2Result:
"""Run Loop 2 literature base expansion.
Papers discovered are sent to stores via nested workflow.
No corpus data passed back to orchestration.
"""
max_iterations = quality_settings.get("max_stages", 1)
initial_state: Loop2State = {
"current_review": review,
"topic": topic,
"research_questions": research_questions,
"quality_settings": quality_settings,
"iteration": 0,
"max_iterations": max_iterations,
"explored_bases": [],
"is_complete": False,
}
graph = create_loop2_graph()
result = await graph.ainvoke(initial_state, config=config)
return Loop2Result(
current_review=result["current_review"],
changes_summary=f"Expanded {len(result['explored_bases'])} bases",
explored_bases=result.get("explored_bases", []),
)Orchestration Integration
Orchestration passes minimal params and extracts typed results:
async def run_loop2_node(state: dict) -> dict:
"""Pass minimal params, extract typed result."""
input_data = state.get("input", {})
result = await run_loop2_standalone(
review=state["current_review"],
topic=input_data.get("topic", ""),
research_questions=input_data.get("research_questions", []),
quality_settings=state["quality_settings"],
config={"run_name": f"loop2:{input_data.get('topic', '')[:20]}"},
)
# No corpus merging—papers already in stores
return {
"current_review": result.current_review,
"loop2_result": {
"explored_bases": result.explored_bases,
"changes_summary": result.changes_summary,
},
}Using Store Queries in Loops
Example of citation verification using store queries:
async def resolve_invalid_citations(
document: str,
invalid_keys: set[str],
topic: str,
) -> str:
"""Resolve invalid citations by querying stores directly."""
async with StoreQueryService() as store_query:
for key in invalid_keys:
# Query store directly instead of filtering corpus dict
content, metadata = await store_query.get_paper_content(key)
if content and metadata:
# Key exists in store—citation is valid
continue
# Key not in store—resolve with LLM
document = await _llm_resolve_citation(
document=document,
invalid_key=key,
topic=topic,
)
return documentWhen to Use This Pattern
Use when:
- Workflows have multiple sequential phases processing stored documents.
- Data is persisted to external stores (Elasticsearch, Zotero, ChromaDB).
- Later phases only need to query subsets of stored data.
- Loops should be independently testable without full pipeline setup.
Do not use when:
- All data exists only in workflow state (no persistent stores).
- Phases genuinely need the full corpus in memory for cross-referencing.
- Query latency would significantly impact hot loops.
Trade-offs
Benefits:
- Decoupling: Loops don’t depend on orchestration’s data structures.
- Scalability: State size independent of corpus size.
- Fresh data: Queries return current store contents, not stale snapshots.
- Testability: Loops runnable standalone with mock store queries.
- Type safety: Typed result dataclasses instead of untyped dicts.
Costs:
- Query latency: Each store query adds network overhead.
- Connection management: Must properly close store clients.
- Debugging complexity: Data flow less visible in state snapshots.