Pipeline
Overview
The Pipeline is a key component of the EZStitcher architecture. For an overview of the complete architecture, see Architecture Overview.
A Pipeline is a sequence of processing steps that are executed in order. It provides:
Step management (adding, removing, reordering)
Context passing between steps
Input/output directory management
Automatic directory resolution between steps
Creating a Pipeline
The recommended way to create a pipeline is to provide all steps at once during initialization:
from ezstitcher.core.pipeline import Pipeline
from ezstitcher.core.steps import Step, PositionGenerationStep
from ezstitcher.core.image_processor import ImageProcessor as IP
# Create a pipeline with all steps at once (recommended approach)
pipeline = Pipeline(
input_dir=orchestrator.workspace_path, # Pipeline input directory
output_dir=orchestrator.plate_path.parent / f"{orchestrator.plate_path.name}_stitched", # Pipeline output directory
steps=[
Step(
func=(IP.create_projection, {'method': 'max_projection'}),
variable_components=['z_index'],
input_dir=orchestrator.workspace_path
),
Step(
func=IP.stack_percentile_normalize
),
PositionGenerationStep()
],
name="My Processing Pipeline"
)
Alternatively, you can add steps one by one using the add_step() method:
# Create an empty pipeline
pipeline = Pipeline(name="My Processing Pipeline")
# Add steps one by one
pipeline.add_step(Step(name="Z-Stack Flattening",
func=(IP.create_projection, {'method': 'max_projection'}),
variable_components=['z_index'],
input_dir=orchestrator.workspace_path))
pipeline.add_step(Step(name="Image Enhancement",
func=IP.stack_percentile_normalize))
pipeline.add_step(PositionGenerationStep(name="Generate Positions"))
The first approach (providing all steps at once) is recommended for most use cases as it’s more concise and easier to understand. The second approach (adding steps one by one) is useful for dynamic scenarios where steps need to be added conditionally or configured based on the output of previous steps.
Pipeline Parameters
For detailed API documentation, see Pipeline.
A Pipeline accepts the following parameters:
name: A human-readable name for the pipeline (optional but recommended for logging)
steps: A list of Step objects to execute in sequence
input_dir: The directory containing input images (typically
orchestrator.workspace_path)output_dir: The directory where final output will be saved
well_filter: List of wells to process (optional, can be overridden by the orchestrator)
Each parameter plays an important role:
name helps identify the pipeline in logs and debugging output
steps defines the sequence of operations to perform
input_dir establishes the initial input directory for the pipeline
output_dir establishes the final output directory, typically used by the last step in the pipeline
well_filter allows for selective processing of specific wells
Running a Pipeline
A pipeline can be run directly, but it’s typically run through the orchestrator:
# Run through the orchestrator (recommended)
success = orchestrator.run(pipelines=[pipeline])
if success:
print("Pipeline completed successfully!")
else:
print("Pipeline failed. Check logs for details.")
# Run directly (advanced usage)
results = pipeline.run(
input_dir="path/to/input",
output_dir="path/to/output",
well_filter=["A01", "B02"],
orchestrator=orchestrator # Required for microscope handler access
)
Running through the orchestrator is recommended because it:
Handles multithreaded execution across wells
Provides plate-specific services to the pipeline
Manages error handling and logging
Ensures proper directory resolution
For detailed information on how the orchestrator runs pipelines, see Running Pipelines.
Pipeline Context
When a pipeline runs, it creates a ProcessingContext that is passed from step to step. This context holds:
Input/output directories
Well filter
Configuration
Results from previous steps
Reference to the orchestrator
This allows steps to communicate and build on each other’s results. The context is created at the beginning of pipeline execution and updated by each step as it runs.
The flow of context between steps in a pipeline.
The context serves as a communication mechanism between:
The orchestrator and the pipeline
The pipeline and its steps
Different steps within the pipeline
For example, steps like PositionGenerationStep use the orchestrator reference in the context to access plate-specific services. For more information on the relationship between the orchestrator and pipeline, see Orchestrator-Pipeline Relationship.
Multithreaded Processing
Pipelines can be run in a multithreaded environment through the orchestrator:
# Create configuration with custom directory suffixes
config = PipelineConfig(
out_dir_suffix="_output", # For regular steps
positions_dir_suffix="_pos", # For position generation
stitched_dir_suffix="_stitched" # For stitching
)
# Create orchestrator with multithreading
orchestrator = PipelineOrchestrator(
config=config,
plate_path=plate_path
)
# Run the pipeline with multithreading
# Each well will be processed in a separate thread
orchestrator.run(pipelines=[pipeline])
The number of worker threads determines how many wells can be processed concurrently. This can significantly improve performance when processing multiple wells.
Important
Multithreading happens at the well level, not the step level. Each well is processed in a separate thread, but steps within a pipeline are executed sequentially for each well.
Key points about multithreaded processing:
The orchestrator creates a thread pool with
num_workersthreadsEach well is assigned to a thread from the pool
All pipelines for a well are executed in the same thread
Steps within a pipeline are executed sequentially
This approach provides good performance while avoiding race conditions and ensuring that steps have access to the results of previous steps.
For more information on how the orchestrator manages multithreaded execution, see Running Pipelines.
Directory Resolution
EZStitcher automatically resolves directories for steps in a pipeline, minimizing the need for manual directory management.
Pipelines manage input and output directories for steps. For detailed information about directory structure, see Directory Structure.
Saving and Loading Pipelines
While EZStitcher doesn’t have built-in functions for saving and loading pipelines, you can easily save your pipeline configurations as Python scripts:
# save_pipeline.py
def create_basic_pipeline(plate_path, num_workers=1):
"""Create a basic processing pipeline."""
# Create configuration
config = PipelineConfig(
num_workers=num_workers
)
# Create orchestrator
orchestrator = PipelineOrchestrator(
config=config,
plate_path=plate_path
)
# Create pipeline
pipeline = Pipeline(
input_dir=orchestrator.workspace_path,
output_dir=orchestrator.plate_path.parent / f"{orchestrator.plate_path.name}_stitched",
steps=[
# Pipeline steps...
],
name="Basic Processing Pipeline"
)
return orchestrator, pipeline
This approach allows you to: * Parameterize your pipelines * Reuse pipeline configurations across projects * Version control your pipeline configurations
Best Practices
For comprehensive best practices on using pipelines effectively, see Pipeline Best Practices in the Best Practices guide.
Pipeline Factory Integration
While you can create pipelines manually as shown in this document, EZStitcher also provides the Pipeline Factory for creating pre-configured pipelines for common workflows:
from ezstitcher.core import AutoPipelineFactory
from ezstitcher.core.pipeline_orchestrator import PipelineOrchestrator
# Create orchestrator
orchestrator = PipelineOrchestrator(plate_path=plate_path)
# Create a factory with default settings
factory = AutoPipelineFactory(
input_dir=orchestrator.workspace_path,
normalize=True # Apply normalization (default)
)
# Create the pipelines
pipelines = factory.create_pipelines()
# Run the pipelines
orchestrator.run(pipelines=pipelines)
The AutoPipelineFactory creates two pipelines:
Position Generation Pipeline: Creates position files for stitching
Image Assembly Pipeline: Stitches images using the position files
For more information on pipeline factories, see Pipeline Factory.