Skip to content

Engine Processor Implementations

Runtime processor classes and processor registry helpers.

Plugin processors inherit from Processor and override one or more callback methods: process_before_batch, process_after_batch, or process_after_generation.

For user-facing processor config objects, see processor configurations.

Base Contract

Processor

Bases: ConfigurableTask[TaskConfigT], ABC

Base class for dataset processors.

Processors transform data at different stages of the generation pipeline. Override the callback methods for the stages you want to handle.

Methods:

Name Description
implements

Check if subclass overrides a callback method.

process_after_batch

Called at POST_BATCH stage after each batch is generated.

process_after_generation

Called at AFTER_GENERATION stage on the final combined dataset.

process_before_batch

Called at PRE_BATCH stage before each batch is generated.

Source code in packages/data-designer-engine/src/data_designer/engine/configurable_task.py
24
25
26
27
28
def __init__(self, config: TaskConfigT, resource_provider: ResourceProvider):
    self._config = self.get_config_type().model_validate(config)
    self._resource_provider = resource_provider
    self._validate()
    self._initialize()

implements(method_name)

Check if subclass overrides a callback method.

Source code in packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py
18
19
20
def implements(self, method_name: str) -> bool:
    """Check if subclass overrides a callback method."""
    return getattr(type(self), method_name) is not getattr(Processor, method_name)

process_after_batch(data, *, current_batch_number)

Called at POST_BATCH stage after each batch is generated.

Override to process each batch of generated data.

Parameters:

Name Type Description Default
data DataT

The generated batch data.

required
current_batch_number int | None

The current batch number (0-indexed), or None in preview mode.

required

Returns:

Type Description
DataT

Transformed batch data.

Source code in packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def process_after_batch(self, data: DataT, *, current_batch_number: int | None) -> DataT:
    """Called at POST_BATCH stage after each batch is generated.

    Override to process each batch of generated data.

    Args:
        data: The generated batch data.
        current_batch_number: The current batch number (0-indexed), or None in preview mode.

    Returns:
        Transformed batch data.
    """
    return data

process_after_generation(data)

Called at AFTER_GENERATION stage on the final combined dataset.

Override to transform the complete generated dataset.

Parameters:

Name Type Description Default
data DataT

The final combined dataset.

required

Returns:

Type Description
DataT

Transformed final dataset.

Source code in packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py
49
50
51
52
53
54
55
56
57
58
59
60
def process_after_generation(self, data: DataT) -> DataT:
    """Called at AFTER_GENERATION stage on the final combined dataset.

    Override to transform the complete generated dataset.

    Args:
        data: The final combined dataset.

    Returns:
        Transformed final dataset.
    """
    return data

process_before_batch(data)

Called at PRE_BATCH stage before each batch is generated.

Override to transform batch data before generation begins.

Parameters:

Name Type Description Default
data DataT

The batch data before generation.

required

Returns:

Type Description
DataT

Transformed batch data.

Source code in packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py
22
23
24
25
26
27
28
29
30
31
32
33
def process_before_batch(self, data: DataT) -> DataT:
    """Called at PRE_BATCH stage before each batch is generated.

    Override to transform batch data before generation begins.

    Args:
        data: The batch data before generation.

    Returns:
        Transformed batch data.
    """
    return data

Built-In Implementations

DropColumnsProcessor

Bases: Processor[DropColumnsProcessorConfig]

Drops specified columns from the dataset after each batch.

Source code in packages/data-designer-engine/src/data_designer/engine/configurable_task.py
24
25
26
27
28
def __init__(self, config: TaskConfigT, resource_provider: ResourceProvider):
    self._config = self.get_config_type().model_validate(config)
    self._resource_provider = resource_provider
    self._validate()
    self._initialize()

SchemaTransformProcessor

Bases: WithJinja2UserTemplateRendering, Processor[SchemaTransformProcessorConfig]

Transforms dataset schema using Jinja2 templates after each batch.

Source code in packages/data-designer-engine/src/data_designer/engine/configurable_task.py
24
25
26
27
28
def __init__(self, config: TaskConfigT, resource_provider: ResourceProvider):
    self._config = self.get_config_type().model_validate(config)
    self._resource_provider = resource_provider
    self._validate()
    self._initialize()

Registry

ProcessorRegistry

Bases: TaskRegistry[str, Processor, ConfigBase]

create_default_processor_registry

Source code in packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py
23
24
25
26
27
28
29
30
31
def create_default_processor_registry() -> ProcessorRegistry:
    registry = ProcessorRegistry()
    registry.register(ProcessorType.SCHEMA_TRANSFORM, SchemaTransformProcessor, SchemaTransformProcessorConfig, False)
    registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False)

    for plugin in PluginRegistry().get_plugins(PluginType.PROCESSOR):
        registry.register(plugin.name, plugin.impl_cls, plugin.config_cls, False)

    return registry