Skip to content

Orchestrator

SRAGAgentOrchestrator

Orchestrates the lifecycle, configuration, and security of the SRAG Reporting Agent.

This class acts as the central factory for the AI Agent. It assembles the System Prompt (with dynamic schema info), registers the Tools, and applies Input/Output Guardrails to ensure safe execution.

Key Responsibilities:

  1. Schema Injection: Loads the latest database schema metadata into the system prompt.
  2. Tool Registration: Binds stats_tool, plot_tool, and tavily_search to the LLM.
  3. Guardrails (Input):
    • length_limit: Prevents DoS attacks via huge payloads.
    • pii_detector: (Optional) Flag potential PII leaks.
    • prompt_injection: Scans for jailbreak attempts.
    • toxicity_detector: Filters offensive content.
  4. Guardrails (Output/Tool):
    • validate_tool_parameters: Enforces strict schema compliance and calls custom validators (e.g., validate_sql_safety to block DROP/DELETE queries).

Attributes:

Name Type Description
settings Settings

Application configuration.

agent GuardedAgent

The configured PydanticAI instance wrapped with security layers.

Source code in api/src/agents/orchestrator.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class SRAGAgentOrchestrator:
    """
    Orchestrates the lifecycle, configuration, and security of the SRAG Reporting Agent.

    This class acts as the central factory for the AI Agent. It assembles the
    System Prompt (with dynamic schema info), registers the Tools, and applies
    Input/Output Guardrails to ensure safe execution.

    **Key Responsibilities:**

    1.  **Schema Injection:** Loads the latest database schema metadata into the system prompt.
    2.  **Tool Registration:** Binds `stats_tool`, `plot_tool`, and `tavily_search` to the LLM.
    3.  **Guardrails (Input):**
        - `length_limit`: Prevents DoS attacks via huge payloads.
        - `pii_detector`: (Optional) Flag potential PII leaks.
        - `prompt_injection`: Scans for jailbreak attempts.
        - `toxicity_detector`: Filters offensive content.
    4.  **Guardrails (Output/Tool):**
        - `validate_tool_parameters`: Enforces strict schema compliance and calls
          custom validators (e.g., `validate_sql_safety` to block DROP/DELETE queries).

    Attributes:
        settings (Settings): Application configuration.
        agent (GuardedAgent): The configured PydanticAI instance wrapped with security layers.
    """

    def __init__(self, settings: Settings):
        self.settings = settings
        logger.info("Initializing SRAG Agent Orchestrator...")

        try:
            self.schema_info = get_schema_info()
            logger.debug("Database schema loaded successfully.")
        except Exception as e:
            logger.warning(f"Could not load schema on init (ETL might be running): {e}")
            self.schema_info = "Schema not available yet."

        self.base_agent = Agent(
            model=self.settings.OPENAI_MODEL,
            system_prompt=build_system_prompt(self.schema_info),
            deps_type=AgentDeps,
            tools=[
                create_stats_tool(),
                create_search_tool(api_key=settings.TAVILY_API_KEY.get_secret_value()),
                create_plot_tool(output_dir=settings.PLOTS_DIR),
            ],
            model_settings=ModelSettings(
                temperature=self.settings.TEMPERATURE,
                max_tokens=self.settings.MAX_OUTPUT_TOKENS,
            ),
        )

        self.agent = GuardedAgent(
            self.base_agent,
            input_guardrails=[
                length_limit(max_tokens=self.settings.MAX_INPUT_TOKENS),
                pii_detector(),
                prompt_injection(),
                toxicity_detector(),
            ],
            output_guardrails=[
                # secret_redaction(),
                validate_tool_parameters(
                    schemas={
                        "stats_tool": StatsParams,
                        "plot_tool": PlotParams,
                        "tavily_search": SearchParams,
                    },
                    validators={"stats_tool": validate_sql_safety},
                    allow_undefined_tools=False,
                ),
            ],
        )
        logger.info("SRAG Agent initialized with Guardrails.")

    async def run(self, query: str) -> AgentRunResult:
        """
        Executes the Agent pipeline for a given user query.

        Args:
            query (str): The prompt/instruction for the agent.

        Returns:
            AgentRunResult: The result object containing the final text response,
            usage statistics (tokens), and message history.

        Raises:
            Exception: Propagates any runtime errors from the LLM or tools.
        """
        logger.info(f"Agent received query: {query}")

        deps = AgentDeps(
            db_path=str(self.settings.DB_PATH),
        )

        try:
            result = await self.agent.run(query, deps=deps)

            logger.info(f"Run completed. Usage: {result.usage()}")

            return result

        except Exception as e:
            logger.error(f"Agent execution failed: {e}", exc_info=True)
            raise e

run(query) async

Executes the Agent pipeline for a given user query.

Parameters:

Name Type Description Default
query str

The prompt/instruction for the agent.

required

Returns:

Name Type Description
AgentRunResult AgentRunResult

The result object containing the final text response,

AgentRunResult

usage statistics (tokens), and message history.

Raises:

Type Description
Exception

Propagates any runtime errors from the LLM or tools.

Source code in api/src/agents/orchestrator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
async def run(self, query: str) -> AgentRunResult:
    """
    Executes the Agent pipeline for a given user query.

    Args:
        query (str): The prompt/instruction for the agent.

    Returns:
        AgentRunResult: The result object containing the final text response,
        usage statistics (tokens), and message history.

    Raises:
        Exception: Propagates any runtime errors from the LLM or tools.
    """
    logger.info(f"Agent received query: {query}")

    deps = AgentDeps(
        db_path=str(self.settings.DB_PATH),
    )

    try:
        result = await self.agent.run(query, deps=deps)

        logger.info(f"Run completed. Usage: {result.usage()}")

        return result

    except Exception as e:
        logger.error(f"Agent execution failed: {e}", exc_info=True)
        raise e

get_orchestrator(settings=Depends(get_settings)) cached

Dependency Injection Provider for the Agent Orchestrator.

Uses lru_cache to implement the Singleton pattern, ensuring the Agent (and its heavy model initialization) is created only once per application lifecycle.

Source code in api/src/agents/orchestrator.py
138
139
140
141
142
143
144
145
146
147
148
@lru_cache
def get_orchestrator(
    settings: Settings = Depends(get_settings),
) -> SRAGAgentOrchestrator:
    """
    Dependency Injection Provider for the Agent Orchestrator.

    Uses `lru_cache` to implement the Singleton pattern, ensuring the Agent
    (and its heavy model initialization) is created only once per application lifecycle.
    """
    return SRAGAgentOrchestrator(settings)