PipelineWrapper¶
The PipelineWrapper
class is the core component for deploying Haystack pipelines with Hayhooks. It provides maximum flexibility for pipeline initialization and execution.
Why PipelineWrapper?¶
The pipeline wrapper provides a flexible foundation for deploying Haystack pipelines, agents or any other component by allowing users to:
- Choose their preferred initialization method (YAML files, Haystack templates, or inline code)
- Define custom execution logic with configurable inputs and outputs
- Optionally expose OpenAI-compatible chat endpoints with streaming support for integration with interfaces like open-webui
Basic Structure¶
from pathlib import Path
from typing import List, Generator, Union, AsyncGenerator
from haystack import Pipeline, AsyncPipeline
from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator, async_streaming_generator
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)
def run_api(self, urls: List[str], question: str) -> str:
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
Required Methods¶
setup()¶
The setup()
method is called once when the pipeline is deployed. It should initialize the self.pipeline
attribute as a Haystack pipeline.
def setup(self) -> None:
# Initialize your pipeline here
pass
Initialization patterns:
1. Programmatic Initialization (Recommended)¶
Define your pipeline directly in code for maximum flexibility and control:
def setup(self) -> None:
from haystack import Pipeline
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
# Create components
fetcher = LinkContentFetcher()
converter = HTMLToDocument()
prompt_builder = PromptBuilder(
template="Based on: {{documents}}\nAnswer: {{query}}"
)
llm = OpenAIGenerator(model="gpt-4o-mini")
# Build pipeline
self.pipeline = Pipeline()
self.pipeline.add_component("fetcher", fetcher)
self.pipeline.add_component("converter", converter)
self.pipeline.add_component("prompt", prompt_builder)
self.pipeline.add_component("llm", llm)
# Connect components
self.pipeline.connect("fetcher.streams", "converter.sources")
self.pipeline.connect("converter.documents", "prompt.documents")
self.pipeline.connect("prompt.prompt", "llm.prompt")
Benefits of Programmatic Initialization
- Full IDE support with autocomplete and type checking
- Easier debugging and testing
- Better refactoring capabilities
- Dynamic component configuration based on runtime conditions
2. Load from YAML¶
Load an existing YAML pipeline file:
def setup(self) -> None:
from pathlib import Path
from haystack import Pipeline
pipeline_yaml = (Path(__file__).parent / "pipeline.yml").read_text()
self.pipeline = Pipeline.loads(pipeline_yaml)
When to use:
- You already have a YAML pipeline definition
- You want to version control pipeline structure separately
- You need to share pipeline definitions across different deployments
Consider YAML-only deployment
If your pipeline is simple and doesn't need custom logic, consider using YAML Pipeline Deployment instead, which doesn't require a wrapper at all.
run_api()¶
The run_api()
method is called for each API request to the {pipeline_name}/run
endpoint.
def run_api(self, urls: List[str], question: str) -> str:
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
Key features:
- Flexible Input: You can define any input arguments you need
- Automatic Validation: Hayhooks creates Pydantic models for request validation
- Type Safety: Use proper type hints for better validation
- Error Handling: Implement proper error handling for production use
Input argument rules:
- Arguments must be JSON-serializable
- Use proper type hints (
List[str]
,Optional[int]
, etc.) - Default values are supported
- Complex types like
Dict[str, Any]
are allowed
Optional Methods¶
run_api_async()¶
The asynchronous version of run_api()
for better performance under high load.
async def run_api_async(self, urls: List[str], question: str) -> str:
result = await self.pipeline.run_async({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
When to use run_api_async
:
- Working with
AsyncPipeline
instances - Handling many concurrent requests
- Integrating with async-compatible components
- Better performance for I/O-bound operations
run_chat_completion()¶
Enable OpenAI-compatible chat endpoints for integration with chat interfaces.
def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]:
question = get_last_user_message(messages)
result = self.pipeline.run({"prompt": {"query": question}})
return result["llm"]["replies"][0]
Fixed signature:
model
: The pipeline namemessages
: OpenAI-format message listbody
: Full request body (for additional parameters)
run_chat_completion_async()¶
Async version of chat completion with streaming support.
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
return async_streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt": {"query": question}},
)
File Upload Support¶
Hayhooks can handle file uploads by adding a files
parameter:
from fastapi import UploadFile
from typing import Optional, List
def run_api(self, files: Optional[List[UploadFile]] = None, query: str = "") -> str:
if files:
# Process uploaded files
filenames = [f.filename for f in files if f.filename]
file_contents = [f.file.read() for f in files]
return f"Processed {len(files)} files: {', '.join(filenames)}"
return "No files uploaded"
PipelineWrapper Development¶
During Development¶
Use the --overwrite
flag for rapid development:
hayhooks pipeline deploy-files -n my_pipeline --overwrite ./path/to/pipeline
Development workflow:
- Make changes to your pipeline wrapper
- Redeploy with
--overwrite
- Test the changes
- Repeat as needed
For even faster iterations¶
Combine --overwrite
with --skip-saving-files
:
hayhooks pipeline deploy-files -n my_pipeline --overwrite --skip-saving-files ./path/to/pipeline
This avoids writing files to disk and speeds up development.
Additional Dependencies¶
Your pipeline wrapper may require additional dependencies:
# pipeline_wrapper.py
import trafilatura # Additional dependency
def run_api(self, urls: List[str], question: str) -> str:
# Use additional library
content = trafilatura.fetch(urls[0])
# ... rest of pipeline logic
Install dependencies:
pip install trafilatura
Debugging tip: Enable tracebacks to see full error messages:
HAYHOOKS_SHOW_TRACEBACKS=true hayhooks run
Error Handling¶
Implement proper error handling in production:
from hayhooks import log
from fastapi import HTTPException
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
try:
self.pipeline = self._create_pipeline()
except Exception as e:
log.error(f"Failed to initialize pipeline: {e}")
raise
def run_api(self, query: str) -> str:
if not query or not query.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
try:
result = self.pipeline.run({"prompt": {"query": query}})
return result["llm"]["replies"][0]
except Exception as e:
log.error(f"Pipeline execution failed: {e}")
raise HTTPException(status_code=500, detail="Pipeline execution failed")
MCP Tool Configuration¶
Skip MCP Tool Listing¶
To skip MCP tool registration:
class PipelineWrapper(BasePipelineWrapper):
skip_mcp = True # This pipeline won't be listed as an MCP tool
def setup(self) -> None:
...
def run_api(self, ...) -> str:
...
MCP Tool Description¶
Use docstrings to provide MCP tool descriptions:
def run_api(self, urls: List[str], question: str) -> str:
"""
Ask questions about website content.
Args:
urls: List of website URLs to analyze
question: Question to ask about the content
Returns:
Answer to the question based on the website content
"""
result = self.pipeline.run({"fetcher": {"urls": urls}, "prompt": {"query": question}})
return result["llm"]["replies"][0]
Examples¶
For complete, working examples see:
- Chat with Website (Streaming) - Pipeline with streaming chat completion support
- Async Question Answer - Async pipeline patterns with streaming
- RAG Indexing & Query - Complete RAG system with file uploads and Elasticsearch
Next Steps¶
- YAML Pipeline Deployment - Alternative deployment method
- Agent Deployment - Deploy Haystack agents