Ingest
download_data()
Downloads the SRAG data CSV from the official source if not present locally.
This function implements a streaming download to handle large files efficiently without consuming excessive memory.
Checks:
- If
RAW_DATA_PATHexists andFORCE_UPDATEis false, the download is skipped. - Creates parent directories if they don't exist.
Raises:
| Type | Description |
|---|---|
HTTPError
|
If the remote server returns an error code (4xx/5xx). |
IOError
|
If writing to the local disk fails. |
Source code in api/src/services/ingest.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | |
process_and_load()
Processes the raw CSV data and loads it into a DuckDB database.
This function performs the 'Transform' and 'Load' steps of the ETL pipeline:
- Read: Loads specific columns from the raw CSV using Pandas.
- Clean:
- Converts dates and handles parsing errors.
- Maps categorical codes (e.g., 1, 2) to human-readable labels (e.g., 'Cure', 'Death').
- Normalizes binary fields (vaccination, comorbidities).
- Load: Persists the processed DataFrame into a DuckDB table (
srag_analytics).
Column Mappings Applied:
outcome_lbl: 1 -> Cure, 2 -> Death_SRAG, 3 -> Death_Other.diagnosis_lbl: 1 -> Influenza, 5 -> Covid-19.icu_lbl,vaccine_*: 1 -> Yes, 2 -> No.
Raises:
| Type | Description |
|---|---|
Exception
|
If data processing fails (e.g., memory issues, schema mismatch). |
Source code in api/src/services/ingest.py
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | |
run_pipeline()
Orchestrates the full ETL (Extract, Transform, Load) pipeline.
This is the main entry point for data ingestion. It ensures that the application has a valid local database to work with. It respects caching mechanisms to avoid redundant work on restart.
Logic:
- Checks if the database (
DB_PATH) already exists. - If it exists and
FORCE_UPDATEis False, it skips execution (Idempotency). - Otherwise, triggers
download_data()followed byprocess_and_load().
Source code in api/src/services/ingest.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | |