PipelineWrapper¶
The PipelineWrapper class is the core component for deploying Haystack pipelines with Hayhooks. It provides maximum flexibility for pipeline initialization and execution.
Why PipelineWrapper?¶
The pipeline wrapper provides a flexible foundation for deploying Haystack pipelines, agents or any other component by allowing users to:
- Choose their preferred initialization method (YAML files, Haystack templates, or inline code)
- Define custom execution logic with configurable inputs and outputs
- Optionally expose OpenAI-compatible chat endpoints with streaming support for integration with interfaces like open-webui
Basic Structure¶
from pathlib import Path
from typing import Generator, Union, AsyncGenerator
from haystack import Pipeline, AsyncPipeline
from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator, async_streaming_generator
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)
def run_api(self, urls: list[str], question: str) -> str:
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
Required Methods¶
setup()¶
The setup() method is called once when the pipeline is deployed. It should initialize the self.pipeline attribute as a Haystack pipeline.
def setup(self) -> None:
# Initialize your pipeline here
pass
Initialization patterns:
1. Programmatic Initialization (Recommended)¶
Define your pipeline directly in code for maximum flexibility and control:
def setup(self) -> None:
from haystack import Pipeline
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
# Create components
fetcher = LinkContentFetcher()
converter = HTMLToDocument()
prompt_builder = PromptBuilder(
template="Based on: {{documents}}\nAnswer: {{query}}"
)
llm = OpenAIGenerator(model="gpt-4o-mini")
# Build pipeline
self.pipeline = Pipeline()
self.pipeline.add_component("fetcher", fetcher)
self.pipeline.add_component("converter", converter)
self.pipeline.add_component("prompt", prompt_builder)
self.pipeline.add_component("llm", llm)
# Connect components
self.pipeline.connect("fetcher.streams", "converter.sources")
self.pipeline.connect("converter.documents", "prompt.documents")
self.pipeline.connect("prompt.prompt", "llm.prompt")
Benefits of Programmatic Initialization
- Full IDE support with autocomplete and type checking
- Easier debugging and testing
- Better refactoring capabilities
- Dynamic component configuration based on runtime conditions
2. Load from YAML¶
Load an existing YAML pipeline file:
def setup(self) -> None:
from pathlib import Path
from haystack import Pipeline
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)
When to use:
- You already have a YAML pipeline definition
- You want to version control pipeline structure separately
- You need to share pipeline definitions across different deployments
Consider YAML-only deployment
If your pipeline is simple and doesn't need custom logic, consider using YAML Pipeline Deployment instead, which doesn't require a wrapper at all.
run_api()¶
The run_api() method is called for each API request to the {pipeline_name}/run endpoint.
def run_api(self, urls: list[str], question: str) -> str:
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
Key features:
- Flexible Input: You can define any input arguments you need
- Automatic Validation: Hayhooks creates Pydantic models for request validation
- Type Safety: Use proper type hints for better validation
- Error Handling: Implement proper error handling for production use
Input argument rules:
- Arguments must be JSON-serializable
- Use proper type hints (
list[str],Optional[int], etc.) - Default values are supported
- Complex types like
dict[str, Any]are allowed
Optional Methods¶
run_api_async()¶
The asynchronous version of run_api() for better performance under high load.
async def run_api_async(self, urls: list[str], question: str) -> str:
result = await self.pipeline.run_async({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
When to use run_api_async:
- Working with
AsyncPipelineinstances - Handling many concurrent requests
- Integrating with async-compatible components
- Better performance for I/O-bound operations
run_chat_completion()¶
Enable OpenAI-compatible chat endpoints for integration with chat interfaces.
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Union[str, Generator]:
question = get_last_user_message(messages)
result = self.pipeline.run({"prompt": {"query": question}})
return result["llm"]["replies"][0]
Fixed signature:
model: The pipeline namemessages: OpenAI-format message listbody: Full request body (for additional parameters)
run_chat_completion_async()¶
Async version of chat completion with streaming support.
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
return async_streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt": {"query": question}},
)
Hybrid Streaming: Mixing Async and Sync Components¶
Compatibility for Legacy Components
When working with legacy pipelines or components that only support sync streaming callbacks (like OpenAIGenerator), use allow_sync_streaming_callbacks=True to enable hybrid mode. For new code, prefer async-compatible components and use the default strict mode.
Some Haystack components only support synchronous streaming callbacks and don't have async equivalents. Examples include:
OpenAIGenerator- Legacy OpenAI text generation (⚠️ Note:OpenAIChatGeneratorIS async-compatible)- Other components without
run_async()support
The Problem¶
By default, async_streaming_generator requires all streaming components to support async callbacks:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
# This will FAIL if pipeline contains OpenAIGenerator
return async_streaming_generator(
pipeline=self.pipeline, # AsyncPipeline with OpenAIGenerator
pipeline_run_args={"prompt": {"query": question}},
)
Error:
ValueError: Component 'llm' of type 'OpenAIGenerator' seems to not support
async streaming callbacks...
The Solution: Hybrid Streaming Mode¶
Enable hybrid streaming mode to automatically handle both async and sync components:
async def run_chat_completion_async(self, model: str, messages: list[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
return async_streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt": {"query": question}},
allow_sync_streaming_callbacks=True # ✅ Auto-detect and enable hybrid mode
)
What allow_sync_streaming_callbacks=True Does¶
When you set allow_sync_streaming_callbacks=True, the system enables intelligent auto-detection:
- Scans Components: Automatically inspects all streaming components in your pipeline
- Detects Capabilities: Checks if each component has
run_async()support - Enables Hybrid Mode Only If Needed:
- ✅ If all components support async → Uses pure async mode (no overhead)
- ✅ If any component is sync-only → Automatically enables hybrid mode
- Bridges Sync to Async: For sync-only components, wraps their callbacks to work seamlessly with the async event loop
- Zero Configuration: You don't need to know which components are sync/async - it figures it out automatically
Smart Behavior
Setting allow_sync_streaming_callbacks=True does NOT force hybrid mode. It only enables it when actually needed. If your pipeline is fully async-capable, you get pure async performance with no overhead!
Configuration Options¶
# Option 1: Strict mode (Default - Recommended)
allow_sync_streaming_callbacks=False
# → Raises error if sync-only components found
# → Best for: New code, ensuring proper async components, best performance
# Option 2: Auto-detection (Compatibility mode)
allow_sync_streaming_callbacks=True
# → Automatically detects and enables hybrid mode only when needed
# → Best for: Legacy pipelines, components without async support, gradual migration
Example: Legacy OpenAI Generator with Async Pipeline¶
from typing import AsyncGenerator
from haystack import AsyncPipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.utils import Secret
from hayhooks import BasePipelineWrapper, get_last_user_message, async_streaming_generator
class LegacyOpenAIWrapper(BasePipelineWrapper):
def setup(self) -> None:
# OpenAIGenerator only supports sync streaming (legacy component)
llm = OpenAIGenerator(
api_key=Secret.from_env_var("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
prompt_builder = PromptBuilder(
template="Answer this question: {{question}}"
)
self.pipeline = AsyncPipeline()
self.pipeline.add_component("prompt", prompt_builder)
self.pipeline.add_component("llm", llm)
self.pipeline.connect("prompt.prompt", "llm.prompt")
async def run_chat_completion_async(
self, model: str, messages: list[dict], body: dict
) -> AsyncGenerator:
question = get_last_user_message(messages)
# Enable hybrid mode for OpenAIGenerator
return async_streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt": {"question": question}},
allow_sync_streaming_callbacks=True # ✅ Handles sync component
)
When to Use Each Mode¶
Use strict mode (default) when:
- Building new pipelines (recommended default)
- You want to ensure all components are async-compatible
- Performance is critical (pure async is ~1-2μs faster per chunk)
- You're building a production system with controlled dependencies
Use allow_sync_streaming_callbacks=True when:
- Working with legacy pipelines that use
OpenAIGeneratoror other sync-only components - Deploying YAML pipelines with unknown/legacy component types
- Migrating old code that doesn't have async equivalents yet
- Third-party components without async support
Performance Considerations¶
- Pure async pipeline: No overhead
- Hybrid mode (auto-detected): Minimal overhead (~1-2 microseconds per streaming chunk for sync components)
- Network-bound operations: The overhead is negligible compared to LLM generation time
Best Practice
For new code: Use the default strict mode (allow_sync_streaming_callbacks=False) to ensure you're using proper async components.
For legacy/compatibility: Use allow_sync_streaming_callbacks=True when working with older pipelines or components that don't support async streaming yet.
Streaming from Multiple Components¶
Smart Streaming Behavior
By default, Hayhooks streams only the last streaming-capable component in your pipeline. This is usually what you want - the final output streaming to users.
For advanced use cases, you can control which components stream using the streaming_components parameter.
When your pipeline contains multiple components that support streaming (e.g., multiple LLMs), you can control which ones stream their outputs as the pipeline executes.
Default Behavior: Stream Only the Last Component¶
By default, only the last streaming-capable component will stream:
class MultiLLMWrapper(BasePipelineWrapper):
def setup(self) -> None:
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
self.pipeline = Pipeline()
# First LLM - initial answer
self.pipeline.add_component(
"prompt_1",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful assistant."),
ChatMessage.from_user("{{query}}")
]
)
)
self.pipeline.add_component("llm_1", OpenAIChatGenerator(model="gpt-4o-mini"))
# Second LLM - refines the answer using Jinja2 to access ChatMessage attributes
self.pipeline.add_component(
"prompt_2",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful assistant that refines responses."),
ChatMessage.from_user(
"Previous response: {{previous_response[0].text}}\n\nRefine this."
)
]
)
)
self.pipeline.add_component("llm_2", OpenAIChatGenerator(model="gpt-4o-mini"))
# Connect components - LLM 1's replies go directly to prompt_2
self.pipeline.connect("prompt_1.prompt", "llm_1.messages")
self.pipeline.connect("llm_1.replies", "prompt_2.previous_response")
self.pipeline.connect("prompt_2.prompt", "llm_2.messages")
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Generator:
question = get_last_user_message(messages)
# By default, only llm_2 (the last streaming component) will stream
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt_1": {"query": question}}
)
What happens: Only llm_2 (the last streaming-capable component) streams its responses token by token. The first LLM (llm_1) executes normally without streaming, and only the final refined output streams to the user.
Advanced: Stream Multiple Components with streaming_components¶
For advanced use cases where you want to see outputs from multiple components, use the streaming_components parameter:
def run_chat_completion(self, model: str, messages: list[dict], body: dict) -> Generator:
question = get_last_user_message(messages)
# Enable streaming for BOTH LLMs
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt_1": {"query": question}},
streaming_components=["llm_1", "llm_2"] # Stream both components
)
What happens: Both LLMs stream their responses token by token. First you'll see the initial answer from llm_1 streaming, then the refined answer from llm_2 streaming.
You can also selectively enable streaming for specific components:
# Stream only the first LLM
streaming_components=["llm_1"]
# Stream only the second LLM (same as default)
streaming_components=["llm_2"]
# Stream ALL capable components (shorthand)
streaming_components="all"
# Stream ALL capable components (specific list)
streaming_components=["llm_1", "llm_2"]
Using the "all" Keyword¶
The "all" keyword is a convenient shorthand to enable streaming for all capable components:
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={...},
streaming_components="all" # Enable all streaming components
)
This is equivalent to explicitly enabling every streaming-capable component in your pipeline.
Global Configuration via Environment Variable¶
You can set a global default using the HAYHOOKS_STREAMING_COMPONENTS environment variable. This applies to all pipelines unless overridden:
# Stream all components by default
export HAYHOOKS_STREAMING_COMPONENTS="all"
# Stream specific components (comma-separated)
export HAYHOOKS_STREAMING_COMPONENTS="llm_1,llm_2"
Priority order:
- Explicit
streaming_componentsparameter (highest priority) HAYHOOKS_STREAMING_COMPONENTSenvironment variable- Default behavior: stream only last component (lowest priority)
When to Use Each Approach
- Default (last component only): Best for most use cases - users see only the final output
- "all" keyword: Useful for debugging, demos, or transparent multi-step workflows
- List of components: Enable multiple specific components by name
- Environment variable: For deployment-wide defaults without code changes
Async Streaming
All streaming_components options work identically with async_streaming_generator() for async pipelines.
YAML Pipeline Streaming Configuration¶
You can also specify streaming configuration in YAML pipeline definitions:
components:
prompt_1:
type: haystack.components.builders.PromptBuilder
init_parameters:
template: "Answer this question: {{query}}"
llm_1:
type: haystack.components.generators.OpenAIGenerator
prompt_2:
type: haystack.components.builders.PromptBuilder
init_parameters:
template: "Refine this response: {{previous_reply}}"
llm_2:
type: haystack.components.generators.OpenAIGenerator
connections:
- sender: prompt_1.prompt
receiver: llm_1.prompt
- sender: llm_1.replies
receiver: prompt_2.previous_reply
- sender: prompt_2.prompt
receiver: llm_2.prompt
inputs:
query: prompt_1.query
outputs:
replies: llm_2.replies
# Option 1: List specific components
streaming_components:
- llm_1
- llm_2
# Option 2: Stream all components
# streaming_components: all
YAML configuration follows the same priority rules: YAML setting > environment variable > default.
See the Multi-LLM Streaming Example for a complete working implementation.
Accessing Intermediate Outputs with include_outputs_from¶
Understanding Pipeline Outputs
By default, Haystack pipelines only return outputs from leaf components (final components with no downstream connections). Use include_outputs_from to also get outputs from intermediate components like retrievers, preprocessors, or parallel branches.
Streaming with on_pipeline_end Callback¶
For streaming responses, pass include_outputs_from to streaming_generator() or async_streaming_generator(), and use the on_pipeline_end callback to access intermediate outputs. For example:
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
question = get_last_user_message(messages)
# Store retrieved documents for citations
self.retrieved_docs = []
def on_pipeline_end(result: dict[str, Any]) -> None:
# Access intermediate outputs here
if "retriever" in result:
self.retrieved_docs = result["retriever"]["documents"]
# Use for citations, logging, analytics, etc.
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={
"retriever": {"query": question},
"prompt_builder": {"query": question}
},
include_outputs_from={"retriever"}, # Make retriever outputs available
on_pipeline_end=on_pipeline_end
)
What happens: The on_pipeline_end callback receives both llm and retriever outputs in the result dict, allowing you to access retrieved documents alongside the generated response.
The same pattern works with async streaming:
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
def on_pipeline_end(result: dict[str, Any]) -> None:
if "retriever" in result:
self.retrieved_docs = result["retriever"]["documents"]
return async_streaming_generator(
pipeline=self.async_pipeline,
pipeline_run_args={
"retriever": {"query": question},
"prompt_builder": {"query": question}
},
include_outputs_from={"retriever"},
on_pipeline_end=on_pipeline_end
)
Non-Streaming API¶
For non-streaming run_api or run_api_async endpoints, pass include_outputs_from directly to pipeline.run() or pipeline.run_async(). For example:
def run_api(self, query: str) -> dict:
result = self.pipeline.run(
data={"retriever": {"query": query}},
include_outputs_from={"retriever"}
)
# Build custom response with both answer and sources
return {"answer": result["llm"]["replies"][0], "sources": result["retriever"]["documents"]}
Same pattern for async:
async def run_api_async(self, query: str) -> dict:
result = await self.async_pipeline.run_async(
data={"retriever": {"query": query}},
include_outputs_from={"retriever"}
)
return {"answer": result["llm"]["replies"][0], "sources": result["retriever"]["documents"]}
When to Use include_outputs_from
- Streaming: Pass
include_outputs_fromtostreaming_generator()orasync_streaming_generator()and useon_pipeline_endcallback to access the outputs - Non-streaming: Pass
include_outputs_fromdirectly topipeline.run()orpipeline.run_async() - YAML Pipelines: Automatically handled - see YAML Pipeline Deployment
File Upload Support¶
Hayhooks can handle file uploads by adding a files parameter:
from fastapi import UploadFile
from typing import Optional
def run_api(self, files: Optional[list[UploadFile]] = None, query: str = "") -> str:
if files:
# Process uploaded files
filenames = [f.filename for f in files if f.filename]
file_contents = [f.file.read() for f in files]
return f"Processed {len(files)} files: {', '.join(filenames)}"
return "No files uploaded"
PipelineWrapper Development¶
During Development¶
Use the --overwrite flag for rapid development:
hayhooks pipeline deploy-files -n my_pipeline --overwrite ./path/to/pipeline
Development workflow:
- Make changes to your pipeline wrapper
- Redeploy with
--overwrite - Test the changes
- Repeat as needed
For even faster iterations¶
Combine --overwrite with --skip-saving-files:
hayhooks pipeline deploy-files -n my_pipeline --overwrite --skip-saving-files ./path/to/pipeline
This avoids writing files to disk and speeds up development.
Additional Dependencies¶
Your pipeline wrapper may require additional dependencies:
# pipeline_wrapper.py
import trafilatura # Additional dependency
def run_api(self, urls: list[str], question: str) -> str:
# Use additional library
content = trafilatura.fetch(urls[0])
# ... rest of pipeline logic
Install dependencies:
pip install trafilatura
Debugging tip: Enable tracebacks to see full error messages:
HAYHOOKS_SHOW_TRACEBACKS=true hayhooks run
Error Handling¶
Implement proper error handling in production:
from hayhooks import log
from fastapi import HTTPException
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
try:
self.pipeline = self._create_pipeline()
except Exception as e:
log.error(f"Failed to initialize pipeline: {e}")
raise
def run_api(self, query: str) -> str:
if not query or not query.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
try:
result = self.pipeline.run({"prompt": {"query": query}})
return result["llm"]["replies"][0]
except Exception as e:
log.error(f"Pipeline execution failed: {e}")
raise HTTPException(status_code=500, detail="Pipeline execution failed")
MCP Tool Configuration¶
Skip MCP Tool Listing¶
To skip MCP tool registration:
class PipelineWrapper(BasePipelineWrapper):
skip_mcp = True # This pipeline won't be listed as an MCP tool
def setup(self) -> None:
...
def run_api(self, ...) -> str:
...
MCP Tool Description¶
Use docstrings to provide MCP tool descriptions:
def run_api(self, urls: list[str], question: str) -> str:
"""
Ask questions about website content.
Args:
urls: List of website URLs to analyze
question: Question to ask about the content
Returns:
Answer to the question based on the website content
"""
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
Examples¶
For complete, working examples see:
- Chat with Website (Streaming) - Pipeline with streaming chat completion support
- Async Question Answer - Async pipeline patterns with streaming
- RAG Indexing & Query - Complete RAG system with file uploads and Elasticsearch
Next Steps¶
- YAML Pipeline Deployment - Alternative deployment method
- Agent Deployment - Deploy Haystack agents