Skip to content

Engine MCP

Execution-time MCP registries, facades, session handling, schema discovery, and tool calls.

For user-facing provider and tool config objects, see MCP configuration.

Parallel Structure

Model layer MCP layer Purpose
ModelProviderRegistry MCPProviderRegistry Holds provider configurations.
ModelRegistry MCPRegistry Manages configs by alias and lazily creates facades.
ModelFacade MCPFacade Provides a lightweight runtime facade scoped to one config.
ModelConfig.alias ToolConfig.tool_alias Alias referenced by column configs.

Registry

MCPToolDefinition

Definition of an MCP tool with its schema.

Methods:

Name Description
to_openai_tool_schema

Convert this tool definition to OpenAI function calling format.

to_openai_tool_schema()

Convert this tool definition to OpenAI function calling format.

Returns:

Type Description
dict[str, Any]

A dictionary in OpenAI's tool schema format with 'type' set to

dict[str, Any]

'function' and nested 'function' containing name, description,

dict[str, Any]

and parameters.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/registry.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def to_openai_tool_schema(self) -> dict[str, Any]:
    """Convert this tool definition to OpenAI function calling format.

    Returns:
        A dictionary in OpenAI's tool schema format with 'type' set to
        'function' and nested 'function' containing name, description,
        and parameters.
    """
    schema = self.input_schema or {"type": "object", "properties": {}}
    return {
        "type": "function",
        "function": {
            "name": self.name,
            "description": self.description or "",
            "parameters": schema,
        },
    }

MCPToolResult

Result from executing an MCP tool call.

MCPRegistry

Registry for MCP tool configurations and facades.

MCPRegistry manages ToolConfig instances by tool_alias and lazily creates MCPFacade instances when requested. This is a config-only registry - all actual MCP operations are delegated to the MCPFacade and io module.

This mirrors the ModelRegistry pattern for consistency across the codebase.

Initialize the MCPRegistry.

Parameters:

Name Type Description Default
secret_resolver SecretResolver

Resolver for secrets referenced in provider configs.

required
mcp_provider_registry MCPProviderRegistry

Registry of MCP provider configurations.

required
mcp_facade_factory Callable[[ToolConfig, SecretResolver, MCPProviderRegistry], MCPFacade]

Factory for creating MCPFacade instances.

required
tool_configs list[ToolConfig] | None

Optional list of tool configurations to register.

None

Methods:

Name Description
get_mcp

Get or lazily create an MCPFacade for the given tool alias.

get_tool_config

Get a tool configuration by alias.

register_tool_configs

Register tool configurations at runtime.

validate_no_duplicate_tool_names

Validate that no ToolConfig has duplicate tool names across its providers.

Attributes:

Name Type Description
facades dict[str, MCPFacade]

Get all instantiated facades.

mcp_provider_registry MCPProviderRegistry

Get the MCP provider registry.

tool_configs dict[str, ToolConfig]

Get all registered tool configurations.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/registry.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self,
    *,
    secret_resolver: SecretResolver,
    mcp_provider_registry: MCPProviderRegistry,
    mcp_facade_factory: Callable[[ToolConfig, SecretResolver, MCPProviderRegistry], MCPFacade],
    tool_configs: list[ToolConfig] | None = None,
) -> None:
    """Initialize the MCPRegistry.

    Args:
        secret_resolver: Resolver for secrets referenced in provider configs.
        mcp_provider_registry: Registry of MCP provider configurations.
        mcp_facade_factory: Factory for creating MCPFacade instances.
        tool_configs: Optional list of tool configurations to register.
    """
    self._secret_resolver = secret_resolver
    self._mcp_provider_registry = mcp_provider_registry
    self._mcp_facade_factory = mcp_facade_factory
    self._tool_configs: dict[str, ToolConfig] = {}
    self._facades: dict[str, MCPFacade] = {}
    self._validated_tool_aliases: set[str] = set()

    self._set_tool_configs(tool_configs)

facades property

Get all instantiated facades.

mcp_provider_registry property

Get the MCP provider registry.

tool_configs property

Get all registered tool configurations.

get_mcp(*, tool_alias)

Get or lazily create an MCPFacade for the given tool alias.

Parameters:

Name Type Description Default
tool_alias str

The alias of the tool configuration.

required

Returns:

Type Description
MCPFacade

An MCPFacade configured for the specified tool alias.

Raises:

Type Description
ValueError

If no tool config with the given alias is found.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/registry.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_mcp(self, *, tool_alias: str) -> MCPFacade:
    """Get or lazily create an MCPFacade for the given tool alias.

    Args:
        tool_alias: The alias of the tool configuration.

    Returns:
        An MCPFacade configured for the specified tool alias.

    Raises:
        ValueError: If no tool config with the given alias is found.
    """
    if tool_alias not in self._tool_configs:
        raise ValueError(f"No tool config with alias {tool_alias!r} found!")

    if tool_alias not in self._facades:
        self._facades[tool_alias] = self._create_facade(self._tool_configs[tool_alias])

    return self._facades[tool_alias]

get_tool_config(*, tool_alias)

Get a tool configuration by alias.

Parameters:

Name Type Description Default
tool_alias str

The alias of the tool configuration.

required

Returns:

Type Description
ToolConfig

The tool configuration.

Raises:

Type Description
ValueError

If no tool config with the given alias is found.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/registry.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_tool_config(self, *, tool_alias: str) -> ToolConfig:
    """Get a tool configuration by alias.

    Args:
        tool_alias: The alias of the tool configuration.

    Returns:
        The tool configuration.

    Raises:
        ValueError: If no tool config with the given alias is found.
    """
    if tool_alias not in self._tool_configs:
        raise ValueError(f"No tool config with alias {tool_alias!r} found!")
    return self._tool_configs[tool_alias]

register_tool_configs(tool_configs)

Register tool configurations at runtime.

Parameters:

Name Type Description Default
tool_configs list[ToolConfig]

List of tool configurations to register. If a configuration with the same alias already exists, it will be overwritten.

required
Source code in packages/data-designer-engine/src/data_designer/engine/mcp/registry.py
107
108
109
110
111
112
113
114
def register_tool_configs(self, tool_configs: list[ToolConfig]) -> None:
    """Register tool configurations at runtime.

    Args:
        tool_configs: List of tool configurations to register. If a configuration
            with the same alias already exists, it will be overwritten.
    """
    self._set_tool_configs(list(self._tool_configs.values()) + tool_configs)

validate_no_duplicate_tool_names()

Validate that no ToolConfig has duplicate tool names across its providers.

This method eagerly fetches tool schemas for all registered ToolConfigs, which triggers duplicate tool name detection. This catches cases where multiple providers in the same ToolConfig expose a tool with the same name.

Raises:

Type Description
DuplicateToolNameError

If any ToolConfig has duplicate tool names across providers.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/registry.py
193
194
195
196
197
198
199
200
201
202
203
204
def validate_no_duplicate_tool_names(self) -> None:
    """Validate that no ToolConfig has duplicate tool names across its providers.

    This method eagerly fetches tool schemas for all registered ToolConfigs,
    which triggers duplicate tool name detection. This catches cases where
    multiple providers in the same ToolConfig expose a tool with the same name.

    Raises:
        DuplicateToolNameError: If any ToolConfig has duplicate tool names across providers.
    """
    for tool_alias in self._tool_configs:
        self._validate_tool_alias(tool_alias)

create_mcp_registry

Factory function for creating an MCPRegistry instance.

This factory function creates an MCPRegistry with a facade factory that creates MCPFacade instances on demand. It follows the same pattern as create_model_registry for consistency.

Parameters:

Name Type Description Default
tool_configs list[ToolConfig] | None

Optional list of tool configurations to register.

None
secret_resolver SecretResolver

Resolver for secrets referenced in provider configs.

required
mcp_provider_registry MCPProviderRegistry

Registry of MCP provider configurations.

required

Returns:

Type Description
MCPRegistry

A configured MCPRegistry instance.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/factory.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def create_mcp_registry(
    *,
    tool_configs: list[ToolConfig] | None = None,
    secret_resolver: SecretResolver,
    mcp_provider_registry: MCPProviderRegistry,
) -> MCPRegistry:
    """Factory function for creating an MCPRegistry instance.

    This factory function creates an MCPRegistry with a facade factory that
    creates MCPFacade instances on demand. It follows the same pattern as
    create_model_registry for consistency.

    Args:
        tool_configs: Optional list of tool configurations to register.
        secret_resolver: Resolver for secrets referenced in provider configs.
        mcp_provider_registry: Registry of MCP provider configurations.

    Returns:
        A configured MCPRegistry instance.
    """

    def mcp_facade_factory(
        tool_config: ToolConfig, secret_resolver: SecretResolver, provider_registry: MCPProviderRegistry
    ) -> MCPFacade:
        return MCPFacade(
            tool_config=tool_config, secret_resolver=secret_resolver, mcp_provider_registry=provider_registry
        )

    return MCPRegistry(
        secret_resolver=secret_resolver,
        mcp_provider_registry=mcp_provider_registry,
        mcp_facade_factory=mcp_facade_factory,
        tool_configs=tool_configs,
    )

Facade

ModelFacade.generate() accepts a tool_alias parameter. When it is provided, ModelFacade looks up the matching MCPFacade from MCPRegistry, fetches tool schemas, passes them to the model, processes tool calls after each completion, tracks tool-call turns, and returns messages that include tool results for trace capture.

MCPFacade

Lightweight facade scoped to a specific ToolConfig.

MCPFacade provides a clean interface for MCP tool operations within the context of a specific tool configuration. It handles tool call extraction, validation, and execution using the mcp.io module for communication.

This mirrors the ModelFacade pattern where each facade is scoped to a specific configuration while sharing underlying resources through caching in the io module.

Methods:

Name Description
get_tool_call_count

Count the number of tool calls in a completion response.

get_tool_schemas

Get OpenAI-compatible tool schemas for this configuration.

has_tool_calls

Returns True if tool calls are present in the completion response.

process_completion_response

Process an LLM completion response and execute any tool calls.

refuse_completion_response

Refuse tool calls without executing them.

Attributes:

Name Type Description
allow_tools list[str] | None

Optional allowlist of permitted tool names.

max_tool_call_turns int

Maximum number of tool-calling turns permitted in a single generation.

providers list[str]

List of MCP provider names for this configuration.

timeout_sec float | None

Timeout in seconds for MCP tool calls.

tool_alias str

The alias for this tool configuration.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/facade.py
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    tool_config: ToolConfig,
    secret_resolver: SecretResolver,
    mcp_provider_registry: MCPProviderRegistry,
) -> None:
    self._tool_config = tool_config
    self._secret_resolver = secret_resolver
    self._mcp_provider_registry = mcp_provider_registry

allow_tools property

Optional allowlist of permitted tool names.

max_tool_call_turns property

Maximum number of tool-calling turns permitted in a single generation.

A turn is one iteration where the LLM requests tool calls. With parallel tool calling, a single turn may execute multiple tools simultaneously.

providers property

List of MCP provider names for this configuration.

timeout_sec property

Timeout in seconds for MCP tool calls.

tool_alias property

The alias for this tool configuration.

get_tool_call_count(completion_response) staticmethod

Count the number of tool calls in a completion response.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/facade.py
74
75
76
77
@staticmethod
def get_tool_call_count(completion_response: ChatCompletionResponse) -> int:
    """Count the number of tool calls in a completion response."""
    return len(completion_response.message.tool_calls)

get_tool_schemas()

Get OpenAI-compatible tool schemas for this configuration.

Fetches tools from all providers in the configuration and applies allow_tools filtering if specified. Uses cached results from mcp_io.

Returns:

Type Description
list[dict[str, Any]]

List of tool schemas in OpenAI function calling format.

Raises:

Type Description
MCPConfigurationError

If allowed tools are not found on any provider.

DuplicateToolNameError

If the same tool name appears in multiple providers.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/facade.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_tool_schemas(self) -> list[dict[str, Any]]:
    """Get OpenAI-compatible tool schemas for this configuration.

    Fetches tools from all providers in the configuration and applies
    allow_tools filtering if specified. Uses cached results from mcp_io.

    Returns:
        List of tool schemas in OpenAI function calling format.

    Raises:
        MCPConfigurationError: If allowed tools are not found on any provider.
        DuplicateToolNameError: If the same tool name appears in multiple providers.
    """
    all_tools: list[MCPToolDefinition] = []
    tool_to_providers: dict[str, list[str]] = {}

    for provider_name in self._tool_config.providers:
        provider = self._mcp_provider_registry.get_provider(provider_name)
        resolved_provider = self._resolve_provider(provider)
        tools = mcp_io.list_tools(
            resolved_provider, timeout_sec=self._tool_config.timeout_sec
        )  # Cached in io module
        for tool in tools:
            tool_to_providers.setdefault(tool.name, []).append(provider_name)
        all_tools.extend(tools)

    # Check for duplicate tool names across providers
    duplicates = {name: providers for name, providers in tool_to_providers.items() if len(providers) > 1}
    if duplicates:
        dup_details = [f"'{name}' (in: {', '.join(providers)})" for name, providers in sorted(duplicates.items())]
        raise DuplicateToolNameError(
            f"Duplicate tool names found across MCP providers: {'; '.join(dup_details)}. "
            "Each tool name must be unique across all providers in a ToolConfig."
        )

    all_available_names = set(tool_to_providers.keys())
    allowed_names = set(self._tool_config.allow_tools) if self._tool_config.allow_tools else None
    if allowed_names is not None:
        missing = allowed_names.difference(all_available_names)
        if missing:
            provider_list = ", ".join(repr(p) for p in self._tool_config.providers)
            raise MCPConfigurationError(
                f"Tool(s) {sorted(missing)!r} not found on any of the MCP providers: {provider_list}."
            )
        all_tools = [tool for tool in all_tools if tool.name in allowed_names]

    return [tool.to_openai_tool_schema() for tool in all_tools]

has_tool_calls(completion_response) staticmethod

Returns True if tool calls are present in the completion response.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/facade.py
79
80
81
82
@staticmethod
def has_tool_calls(completion_response: ChatCompletionResponse) -> bool:
    """Returns True if tool calls are present in the completion response."""
    return len(completion_response.message.tool_calls) > 0

process_completion_response(completion_response)

Process an LLM completion response and execute any tool calls.

This is the primary method for handling tool calls from an LLM response. It extracts the response content, reasoning content, and all tool calls from the completion response, executes each tool call (including parallel tool calls), and returns the messages for continuing the conversation.

Parameters:

Name Type Description Default
completion_response ChatCompletionResponse

The canonical ChatCompletionResponse from the model client.

required

Returns:

Type Description
list[ChatMessage]

A list of ChatMessages to append to the conversation history:

list[ChatMessage]
  • If tool calls were present: [assistant_message_with_tool_calls, *tool_response_messages]
list[ChatMessage]
  • If no tool calls: [assistant_message]

Raises:

Type Description
MCPToolError

If a requested tool is not in the allowed tools list.

MCPToolError

If tool execution fails or times out.

MCPConfigurationError

If a requested tool is not found on any configured provider.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/facade.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def process_completion_response(
    self,
    completion_response: ChatCompletionResponse,
) -> list[ChatMessage]:
    """Process an LLM completion response and execute any tool calls.

    This is the primary method for handling tool calls from an LLM response.
    It extracts the response content, reasoning content, and all tool calls
    from the completion response, executes each tool call (including parallel
    tool calls), and returns the messages for continuing the conversation.

    Args:
        completion_response: The canonical ChatCompletionResponse from the model client.

    Returns:
        A list of ChatMessages to append to the conversation history:
        - If tool calls were present: [assistant_message_with_tool_calls, *tool_response_messages]
        - If no tool calls: [assistant_message]

    Raises:
        MCPToolError: If a requested tool is not in the allowed tools list.
        MCPToolError: If tool execution fails or times out.
        MCPConfigurationError: If a requested tool is not found on any configured provider.
    """
    message = completion_response.message

    response_content = message.content or ""
    reasoning_content = message.reasoning_content

    # Strip whitespace if reasoning is present (models often add extra newlines)
    if reasoning_content:
        response_content = response_content.strip()
        reasoning_content = reasoning_content.strip()

    tool_calls = message.tool_calls

    if not tool_calls:
        return [
            ChatMessage.as_assistant(
                content=response_content,
                reasoning_content=reasoning_content or None,
            )
        ]

    # Has tool calls - execute and return all messages
    tool_call_dicts = _convert_canonical_tool_calls_to_dicts(tool_calls)
    assistant_message = self._build_assistant_tool_message(response_content, tool_call_dicts, reasoning_content)
    tool_messages = self._execute_tool_calls_from_canonical(tool_calls)

    return [assistant_message, *tool_messages]

refuse_completion_response(completion_response, refusal_message=None)

Refuse tool calls without executing them.

Used when the tool call turn budget is exhausted. Returns messages that include the assistant's tool call request but with refusal responses instead of actual tool results.

Parameters:

Name Type Description Default
completion_response ChatCompletionResponse

The canonical ChatCompletionResponse containing tool calls.

required
refusal_message str | None

Optional custom refusal message.

None

Returns:

Type Description
list[ChatMessage]

A list of ChatMessages to append to the conversation history.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/facade.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def refuse_completion_response(
    self,
    completion_response: ChatCompletionResponse,
    refusal_message: str | None = None,
) -> list[ChatMessage]:
    """Refuse tool calls without executing them.

    Used when the tool call turn budget is exhausted. Returns messages
    that include the assistant's tool call request but with refusal
    responses instead of actual tool results.

    Args:
        completion_response: The canonical ChatCompletionResponse containing tool calls.
        refusal_message: Optional custom refusal message.

    Returns:
        A list of ChatMessages to append to the conversation history.
    """
    message = completion_response.message

    response_content = message.content or ""
    reasoning_content = message.reasoning_content

    # Strip whitespace if reasoning is present (models often add extra newlines)
    if reasoning_content:
        response_content = response_content.strip()
        reasoning_content = reasoning_content.strip()

    tool_calls = message.tool_calls

    if not tool_calls:
        return [
            ChatMessage.as_assistant(
                content=response_content,
                reasoning_content=reasoning_content or None,
            )
        ]

    # Build assistant message with tool calls (same as normal)
    tool_call_dicts = _convert_canonical_tool_calls_to_dicts(tool_calls)
    assistant_message = self._build_assistant_tool_message(response_content, tool_call_dicts, reasoning_content)

    # Build refusal messages instead of executing tools
    refusal = refusal_message or DEFAULT_TOOL_REFUSAL_MESSAGE
    tool_messages = [ChatMessage.as_tool(content=refusal, tool_call_id=tc.id) for tc in tool_calls]

    return [assistant_message, *tool_messages]

I/O Service

The I/O service owns a background event loop, pools MCP sessions by provider config, coalesces concurrent tool schema lookups, and executes parallel tool calls.

MCPIOService

Actor-style MCP I/O service owning all async state.

Methods:

Name Description
call_tools

Call multiple tools in parallel.

clear_provider_caches

Clear caches and session pool entries for specific providers.

clear_session_pool

Clear all pooled MCP sessions (best effort).

clear_tools_cache

Clear the list_tools cache (best effort).

get_cache_info

Get cache statistics for list_tools.

get_session_pool_info

Get information about the session pool.

list_tools

List tools from an MCP provider (cached with request coalescing).

shutdown

Shutdown the MCP event loop and close all sessions.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self) -> None:
    self._loop: asyncio.AbstractEventLoop | None = None
    self._thread: threading.Thread | None = None
    self._loop_lock = threading.Lock()

    self._sessions: dict[str, ClientSession] = {}
    self._session_contexts: dict[str, Any] = {}
    self._session_inflight: dict[str, asyncio.Task[ClientSession]] = {}

    self._tools_cache: dict[str, tuple[MCPToolDefinition, ...]] = {}
    self._tools_cache_epoch: dict[str, int] = {}
    self._inflight_tools: dict[str, asyncio.Task[tuple[MCPToolDefinition, ...]]] = {}

call_tools(calls, *, timeout_sec=None)

Call multiple tools in parallel.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def call_tools(
    self,
    calls: list[tuple[MCPProviderT, str, dict[str, Any]]],
    *,
    timeout_sec: float | None = None,
) -> list[MCPToolResult]:
    """Call multiple tools in parallel."""
    if not calls:
        return []
    try:
        return self._run_on_loop(self._call_tools_async(calls), timeout_sec)
    except TimeoutError as exc:
        timeout_label = f"{timeout_sec:.1f}" if timeout_sec is not None else "unknown"
        raise MCPToolError(f"Timed out after {timeout_label}s while calling tools in parallel.") from exc

clear_provider_caches(providers)

Clear caches and session pool entries for specific providers.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
 96
 97
 98
 99
100
101
102
103
104
105
106
def clear_provider_caches(self, providers: list[MCPProviderT]) -> int:
    """Clear caches and session pool entries for specific providers."""
    if not providers:
        return 0
    if self._loop is not None and self._loop.is_running():
        try:
            return self._run_on_loop(self._clear_provider_caches_async(providers), timeout_sec=5)
        except Exception:
            logger.debug("Failed to clear provider caches on MCP IO service.", exc_info=True)
            return 0
    return self._clear_provider_caches_sync(providers)

clear_session_pool()

Clear all pooled MCP sessions (best effort).

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
128
129
130
131
132
133
134
135
136
137
def clear_session_pool(self) -> None:
    """Clear all pooled MCP sessions (best effort)."""
    if self._loop is not None and self._loop.is_running():
        try:
            self._run_on_loop(self._close_all_sessions_async(), timeout_sec=5)
            return
        except Exception:
            logger.debug("Failed to clear session pool on MCP IO service.", exc_info=True)
            # Fall through to sync cleanup
    self._clear_session_pool_sync()

clear_tools_cache()

Clear the list_tools cache (best effort).

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
108
109
110
111
112
113
114
115
116
117
def clear_tools_cache(self) -> None:
    """Clear the list_tools cache (best effort)."""
    if self._loop is not None and self._loop.is_running():
        try:
            self._run_on_loop(self._clear_tools_cache_async(), timeout_sec=5)
            return
        except Exception:
            logger.debug("Failed to clear tools cache on MCP IO service.", exc_info=True)
            return
    self._clear_tools_cache_sync()

get_cache_info()

Get cache statistics for list_tools.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
119
120
121
122
123
124
125
126
def get_cache_info(self) -> dict[str, Any]:
    """Get cache statistics for list_tools."""
    if self._loop is not None and self._loop.is_running():
        try:
            return self._run_on_loop(self._get_cache_info_async(), timeout_sec=5)
        except Exception:
            logger.debug("Failed to read tools cache info on MCP IO service.", exc_info=True)
    return {"currsize": len(self._tools_cache), "providers": list(self._tools_cache.keys())}

get_session_pool_info()

Get information about the session pool.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
139
140
141
142
143
144
145
146
def get_session_pool_info(self) -> dict[str, Any]:
    """Get information about the session pool."""
    if self._loop is not None and self._loop.is_running():
        try:
            return self._run_on_loop(self._get_session_pool_info_async(), timeout_sec=5)
        except Exception:
            logger.debug("Failed to read session pool info on MCP IO service.", exc_info=True)
    return {"active_sessions": len(self._sessions), "provider_keys": list(self._sessions.keys())}

list_tools(provider, timeout_sec=None)

List tools from an MCP provider (cached with request coalescing).

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
73
74
75
76
77
78
79
def list_tools(self, provider: MCPProviderT, timeout_sec: float | None = None) -> tuple[MCPToolDefinition, ...]:
    """List tools from an MCP provider (cached with request coalescing)."""
    try:
        return self._run_on_loop(self._list_tools_async(provider), timeout_sec)
    except TimeoutError as exc:
        timeout_label = f"{timeout_sec:.1f}" if timeout_sec is not None else "unknown"
        raise MCPToolError(f"Timed out after {timeout_label}s while listing tools on {provider.name!r}.") from exc

shutdown()

Shutdown the MCP event loop and close all sessions.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def shutdown(self) -> None:
    """Shutdown the MCP event loop and close all sessions."""
    if self._loop is None:
        self._reset_state()
        return
    try:
        future = asyncio.run_coroutine_threadsafe(self._close_all_sessions_async(), self._loop)
        try:
            future.result(timeout=5)
        except Exception:
            pass
        self._loop.call_soon_threadsafe(self._loop.stop)
        if self._thread is not None:
            self._thread.join(timeout=5)
    finally:
        self._loop = None
        self._thread = None
        self._reset_state()

Runtime Helpers

List tools from an MCP provider (cached with request coalescing).

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
402
403
404
def list_tools(provider: MCPProviderT, timeout_sec: float | None = None) -> tuple[MCPToolDefinition, ...]:
    """List tools from an MCP provider (cached with request coalescing)."""
    return _MCP_IO_SERVICE.list_tools(provider, timeout_sec=timeout_sec)

Return the names of all tools available on an MCP provider.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
407
408
409
def list_tool_names(provider: MCPProviderT, timeout_sec: float) -> list[str]:
    """Return the names of all tools available on an MCP provider."""
    return [t.name for t in _MCP_IO_SERVICE.list_tools(provider, timeout_sec=timeout_sec)]

Call multiple tools in parallel.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
412
413
414
415
416
417
418
def call_tools(
    calls: list[tuple[MCPProviderT, str, dict[str, Any]]],
    *,
    timeout_sec: float | None = None,
) -> list[MCPToolResult]:
    """Call multiple tools in parallel."""
    return _MCP_IO_SERVICE.call_tools(calls, timeout_sec=timeout_sec)

Clear all caches for specific MCP providers.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
421
422
423
def clear_provider_caches(providers: list[MCPProviderT]) -> int:
    """Clear all caches for specific MCP providers."""
    return _MCP_IO_SERVICE.clear_provider_caches(providers)

Clear the list_tools cache.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
426
427
428
def clear_tools_cache() -> None:
    """Clear the list_tools cache."""
    _MCP_IO_SERVICE.clear_tools_cache()

Get cache statistics for list_tools.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
431
432
433
def get_cache_info() -> dict[str, Any]:
    """Get cache statistics for list_tools."""
    return _MCP_IO_SERVICE.get_cache_info()

Clear all pooled MCP sessions.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
436
437
438
def clear_session_pool() -> None:
    """Clear all pooled MCP sessions."""
    _MCP_IO_SERVICE.clear_session_pool()

Get information about the session pool.

Source code in packages/data-designer-engine/src/data_designer/engine/mcp/io.py
441
442
443
def get_session_pool_info() -> dict[str, Any]:
    """Get information about the session pool."""
    return _MCP_IO_SERVICE.get_session_pool_info()