Skip to content

Component API

Component

Bases: Protocol

Note this is only used by type checking tools.

In order to implement the Component protocol, custom components need to have a run method. The signature of the method and its return value won't be checked, i.e. classes with the following methods:

def run(self, param: str) -> Dict[str, Any]:
    ...

and

def run(self, **kwargs):
    ...

will be both considered as respecting the protocol. This makes the type checking much weaker, but we have other places where we ensure code is dealing with actual Components.

The protocol is runtime checkable so it'll be possible to assert:

isinstance(MyComponent, Component)
Source code in canals/component/component.py
@runtime_checkable
class Component(Protocol):
    """
    Note this is only used by type checking tools.

    In order to implement the `Component` protocol, custom components need to
    have a `run` method. The signature of the method and its return value
    won't be checked, i.e. classes with the following methods:

        def run(self, param: str) -> Dict[str, Any]:
            ...

    and

        def run(self, **kwargs):
            ...

    will be both considered as respecting the protocol. This makes the type
    checking much weaker, but we have other places where we ensure code is
    dealing with actual Components.

    The protocol is runtime checkable so it'll be possible to assert:

        isinstance(MyComponent, Component)
    """

    def run(self, *args: Any, **kwargs: Any):  # pylint: disable=missing-function-docstring
        ...

Attributes:

component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.

All components must follow the contract below. This docstring is the source of truth for components contract.


@component decorator

All component classes must be decorated with the @component decorator. This allows Canals to discover them.


__init__(self, **kwargs)

Optional method.

Components may have an __init__ method where they define:

  • self.init_parameters = {same parameters that the __init__ method received}: In this dictionary you can store any state the components wish to be persisted when they are saved. These values will be given to the __init__ method of a new instance when the pipeline is loaded. Note that by default the @component decorator saves the arguments automatically. However, if a component sets their own init_parameters manually in __init__(), that will be used instead. Note: all of the values contained here must be JSON serializable. Serialize them manually if needed.

Components should take only "basic" Python types as parameters of their __init__ function, or iterables and dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init time. If there's the need for such values, consider serializing them to a string.

(TODO explain how to use classes and functions in init. In the meantime see test/components/test_accumulate.py)

The __init__ must be extrememly lightweight, because it's a frequent operation during the construction and validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to the warm_up() method.


warm_up(self)

Optional method.

This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations, because Pipeline will not keep track of which components it called warm_up() on.


run(self, data)

Mandatory method.

This is the method where the main functionality of the component should be carried out. It's called by Pipeline.run().

When the component should run, Pipeline will call this method with an instance of the dataclass returned by the method decorated with @component.input. This dataclass contains:

  • all the input values coming from other components connected to it,
  • if any is missing, the corresponding value defined in self.defaults, if it exists.

run() must return a single instance of the dataclass declared through the method decorated with @component.output.

Component

Bases: Protocol

Note this is only used by type checking tools.

In order to implement the Component protocol, custom components need to have a run method. The signature of the method and its return value won't be checked, i.e. classes with the following methods:

def run(self, param: str) -> Dict[str, Any]:
    ...

and

def run(self, **kwargs):
    ...

will be both considered as respecting the protocol. This makes the type checking much weaker, but we have other places where we ensure code is dealing with actual Components.

The protocol is runtime checkable so it'll be possible to assert:

isinstance(MyComponent, Component)
Source code in canals/component/component.py
@runtime_checkable
class Component(Protocol):
    """
    Note this is only used by type checking tools.

    In order to implement the `Component` protocol, custom components need to
    have a `run` method. The signature of the method and its return value
    won't be checked, i.e. classes with the following methods:

        def run(self, param: str) -> Dict[str, Any]:
            ...

    and

        def run(self, **kwargs):
            ...

    will be both considered as respecting the protocol. This makes the type
    checking much weaker, but we have other places where we ensure code is
    dealing with actual Components.

    The protocol is runtime checkable so it'll be possible to assert:

        isinstance(MyComponent, Component)
    """

    def run(self, *args: Any, **kwargs: Any):  # pylint: disable=missing-function-docstring
        ...

ComponentMeta

Bases: type

Source code in canals/component/component.py
class ComponentMeta(type):
    def __call__(cls, *args, **kwargs):
        """
        This method is called when clients instantiate a Component and
        runs before __new__ and __init__.
        """
        # This will call __new__ then __init__, giving us back the Component instance
        instance = super().__call__(*args, **kwargs)

        # Before returning, we have the chance to modify the newly created
        # Component instance, so we take the chance and set up the I/O sockets

        # If `component.set_output_types()` was called in the component constructor,
        # `__canals_output__` is already populated, no need to do anything.
        if not hasattr(instance, "__canals_output__"):
            # If that's not the case, we need to populate `__canals_output__`
            #
            # If the `run` method was decorated, it has a `_output_types_cache` field assigned
            # that stores the output specification.
            # We deepcopy the content of the cache to transfer ownership from the class method
            # to the actual instance, so that different instances of the same class won't share this data.
            instance.__canals_output__ = deepcopy(getattr(instance.run, "_output_types_cache", {}))

        # Create the sockets if set_input_types() wasn't called in the constructor.
        # If it was called and there are some parameters also in the `run()` method, these take precedence.
        if not hasattr(instance, "__canals_input__"):
            instance.__canals_input__ = {}
        run_signature = inspect.signature(getattr(cls, "run"))
        for param in list(run_signature.parameters)[1:]:  # First is 'self' and it doesn't matter.
            if run_signature.parameters[param].kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:  # ignore `**kwargs`
                instance.__canals_input__[param] = InputSocket(
                    name=param,
                    type=run_signature.parameters[param].annotation,
                    is_mandatory=run_signature.parameters[param].default == inspect.Parameter.empty,
                )
        return instance

__call__(*args, **kwargs)

This method is called when clients instantiate a Component and runs before new and init.

Source code in canals/component/component.py
def __call__(cls, *args, **kwargs):
    """
    This method is called when clients instantiate a Component and
    runs before __new__ and __init__.
    """
    # This will call __new__ then __init__, giving us back the Component instance
    instance = super().__call__(*args, **kwargs)

    # Before returning, we have the chance to modify the newly created
    # Component instance, so we take the chance and set up the I/O sockets

    # If `component.set_output_types()` was called in the component constructor,
    # `__canals_output__` is already populated, no need to do anything.
    if not hasattr(instance, "__canals_output__"):
        # If that's not the case, we need to populate `__canals_output__`
        #
        # If the `run` method was decorated, it has a `_output_types_cache` field assigned
        # that stores the output specification.
        # We deepcopy the content of the cache to transfer ownership from the class method
        # to the actual instance, so that different instances of the same class won't share this data.
        instance.__canals_output__ = deepcopy(getattr(instance.run, "_output_types_cache", {}))

    # Create the sockets if set_input_types() wasn't called in the constructor.
    # If it was called and there are some parameters also in the `run()` method, these take precedence.
    if not hasattr(instance, "__canals_input__"):
        instance.__canals_input__ = {}
    run_signature = inspect.signature(getattr(cls, "run"))
    for param in list(run_signature.parameters)[1:]:  # First is 'self' and it doesn't matter.
        if run_signature.parameters[param].kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:  # ignore `**kwargs`
            instance.__canals_input__[param] = InputSocket(
                name=param,
                type=run_signature.parameters[param].annotation,
                is_mandatory=run_signature.parameters[param].default == inspect.Parameter.empty,
            )
    return instance