Skip to content

Duckdb connection

get_db_connection(read_only=True)

Establishes a connection to the local DuckDB database.

Parameters:

Name Type Description Default
read_only bool

If True, opens the database in read-only mode to prevent accidental writes during analysis. Defaults to True.

True

Returns:

Name Type Description
DuckDBPyConnection DuckDBPyConnection

The active database connection object.

Raises:

Type Description
FileNotFoundError

If the database file (srag_analytics.db) does not exist at the configured path (usually indicating the ETL pipeline hasn't run).

Source code in api/src/db/duckdb_connection.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def get_db_connection(read_only: bool = True) -> DuckDBPyConnection:
    """
    Establishes a connection to the local DuckDB database.

    Args:
        read_only (bool): If True, opens the database in read-only mode to prevent
            accidental writes during analysis. Defaults to `True`.

    Returns:
        DuckDBPyConnection: The active database connection object.

    Raises:
        FileNotFoundError: If the database file (`srag_analytics.db`) does not exist
            at the configured path (usually indicating the ETL pipeline hasn't run).
    """
    if not settings.DB_PATH.exists():
        raise FileNotFoundError(
            f"Database not found at {settings.DB_PATH}. Run ETL pipeline first."
        )

    con = duckdb.connect(str(settings.DB_PATH), read_only=read_only)
    return con

get_schema_info()

Generates a rich, LLM-friendly textual representation of the database schema.

This function combines static metadata descriptions with dynamic data profiling to help the AI Agent understand the dataset's structure and content.

Process:

  1. Reflect Schema: Queries DuckDB to get column names and types.
  2. Match Metadata: Aligns columns with the COLUMN_METADATA dictionary.
  3. Data Profiling (Dynamic): For categorical columns (VARCHAR), it executes a GROUP BY query to fetch the top 5 most frequent values. This allows the Agent to see actual examples (e.g., seeing 'Covid-19' vs 'SARS-CoV-2').
  4. Formatting: Compiles everything into a Markdown list string.

Returns:

Name Type Description
str str

A formatted string describing columns, types, descriptions, and sample values. Returns an error message string if the schema cannot be read.

Source code in api/src/db/duckdb_connection.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def get_schema_info() -> str:
    """
    Generates a rich, LLM-friendly textual representation of the database schema.

    This function combines static metadata descriptions with dynamic data profiling
    to help the AI Agent understand the dataset's structure and content.

    **Process:**

    1.  **Reflect Schema:** Queries DuckDB to get column names and types.
    2.  **Match Metadata:** Aligns columns with the `COLUMN_METADATA` dictionary.
    3.  **Data Profiling (Dynamic):** For categorical columns (VARCHAR), it executes
        a `GROUP BY` query to fetch the top 5 most frequent values. This allows the
        Agent to see actual examples (e.g., seeing 'Covid-19' vs 'SARS-CoV-2').
    4.  **Formatting:** Compiles everything into a Markdown list string.

    Returns:
        str: A formatted string describing columns, types, descriptions, and sample values.
             Returns an error message string if the schema cannot be read.
    """
    con = get_db_connection()
    try:
        try:
            df_schema = con.execute("DESCRIBE srag_analytics").df()
        except Exception as e:
            return f"Error reading schema table: {e}"

        schema_lines = []
        schema_lines.append("Table: srag_analytics")
        schema_lines.append("=" * 30)

        for col_name, description in COLUMN_METADATA.items():
            schema_row = df_schema[df_schema["column_name"] == col_name]

            if schema_row.empty:
                logger.warning(
                    f"Column '{col_name}' defined in metadata but not found in DB table."
                )
                continue

            col_type = schema_row.iloc[0]["column_type"]

            values_str = ""
            if "VARCHAR" in col_type.upper():
                try:
                    query = f"""
                        SELECT {col_name}, COUNT(*) as freq
                        FROM srag_analytics
                        WHERE {col_name} IS NOT NULL
                        GROUP BY {col_name}
                        ORDER BY freq DESC
                        LIMIT 5
                    """
                    df_distinct = con.execute(query).df()

                    vals = [f"'{v}'" for v in df_distinct[col_name].tolist()]
                    values_str = f" | Sample Values: [{', '.join(vals)}]"
                except Exception:
                    pass

            line = f"- **{col_name}** ({col_type}) | Description: {description}{values_str}"
            schema_lines.append(line)

        return "\n".join(schema_lines)

    except Exception as e:
        logger.error(f"Failed to generate schema info: {e}")
        return "Error: Could not retrieve database schema."
    finally:
        con.close()