Skip to content

Commit

Permalink
MEDS sample data code tutorial (#134)
Browse files Browse the repository at this point in the history
* MEDS sample data from ESGPT sample

* Duplicate tutorial and repoint to mes

* Add sex as static variable into sample data

* Default to MEDS predicates for shipping with repo

* Cast to microsecond precision + remove extra sample shard

* Fix file structure for meds sample data

* Working MEDS tutorial
  • Loading branch information
justin13601 authored Sep 26, 2024
1 parent 4472d9f commit e826a7b
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ windows:
...
```

Sample task configuration files for 6 common tasks are provided in `sample_configs/`. All task configurations can be directly extracted using `'direct'` mode on `sample_data/sample_data.csv` as this predicates dataframe was designed specifically to capture concepts needed for all tasks. However, only `inhospital_mortality.yaml` and `imminent-mortality.yaml` would be able to be extracted on `sample_data/esgpt_sample` and `sample_data/meds_sample` due to a lack of required concepts in the datasets.
Sample task configuration files for 6 common tasks are provided in `sample_configs/`. All task configurations can be directly extracted using `'direct'` mode on `sample_data/sample_data.csv` as this predicates dataframe was designed specifically to capture concepts needed for all tasks. However, only `inhospital_mortality.yaml` and `imminent-mortality.yaml` would be able to be extracted on `sample_data/esgpt_sample` and `sample_data/meds_sample` due to a lack of required concepts in the datasets (predicates are defined as per the MEDS sample data by default; modifications will be needed for ESGPT).

### Predicates

Expand Down
2 changes: 1 addition & 1 deletion docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ README <readme>
Usage Guide <usage>
Task Examples <notebooks/examples>
Predicates DataFrame <notebooks/predicates>
Sample Data Tutorial <notebooks/tutorial>
MEDS Data Tutorial <notebooks/tutorial_meds>
Technical Details <technical>
Computational Profile <profiling>
Module API Reference <api/modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Code Example with Synthetic Data"
"# Code Example with Synthetic ESGPT Data"
]
},
{
Expand Down
252 changes: 252 additions & 0 deletions docs/source/notebooks/tutorial_meds.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Code Tutorial with Synthetic MEDS Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set-up"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Imports\n",
"\n",
"First, let's import ACES! Three modules - `config`, `predicates`, and `query` - are required to execute an end-to-end cohort extraction. `omegaconf` is also required to express our data config parameters in order to load our `MEDS` dataset. Other imports are only needed for visualization!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"from pathlib import Path\n",
"\n",
"import pandas as pd\n",
"import yaml\n",
"from bigtree import print_tree\n",
"from IPython.display import display\n",
"from omegaconf import DictConfig\n",
"\n",
"from aces import config, predicates, query"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Directories\n",
"\n",
"Next, let's specify our paths and directories. In this tutorial, we will extract a cohort for a typical in-hospital mortality prediction task from the MEDS synthetic sample dataset. The task configuration file and sample data are both shipped with the repository in [sample_configs/](https://github.com/justin13601/ACES/tree/main/sample_configs) and [sample_data/](https://github.com/justin13601/ACES/tree/main/sample_data) folders in the project root, respectively."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"config_path = \"../../../sample_configs/inhospital_mortality.yaml\"\n",
"data_path = \"../../../sample_data/meds_sample/\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Configuration File"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The task configuration file is the core configuration language that ACES uses to extract cohorts. Details about this configuration language is available in [Configuration Language](https://eventstreamaces.readthedocs.io/en/latest/configuration.html). In brief, the configuration file contains `predicates`, `patient_demographics`, `trigger`, and `windows` sections. \n",
"\n",
"The `predicates` section is used to define dataset-specific concepts that are needed for the task. In our case of binary mortality prediction, we are interested in extracting a cohort of patients that have been admitted into the hospital and who were subsequently discharged or died. As such `admission`, `discharge`, `death`, and `discharge_or_death` would be handy predicates.\n",
"\n",
"The `patient_demographics` section is used to define static concepts that remain constant for subjects over time. For instance, sex is a common static variable. Should we want to filter out cohort to patients with a specific sex, we can do so here in the same way as defining predicates. For more information on predicates, please refer to this [guide](https://eventstreamaces.readthedocs.io/en/latest/technical.html#predicates-plainpredicateconfig-and-derivedpredicateconfig). In this example, let's say we are only interested in male patients.\n",
"\n",
"We'd also like to make a prediction of mortality for each admission. Hence, a reasonable `trigger` event would be an `admission` predicate.\n",
"\n",
"Suppose in our task, we'd like to set a constraint that the admission must have been more than 48 hours long. Additionally, for our prediction inputs, we'd like to use all information in the patient record up until 24 hours after admission, which must contain at least 5 event records (as we'd want to ensure there is sufficient input data). These clauses are captured in the `windows` section where each window is defined relative to another."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(config_path, \"r\") as stream:\n",
" data_loaded = yaml.safe_load(stream)\n",
" print(json.dumps(data_loaded, indent=4))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can see that the `input` window begins at `null` (start of the patient record) and ends 24 hours after `trigger` (`admission`). A `gap` window is defined for 24 hours after the end of the `input` window, constraining the admission to be longer than 48 hours at minimum. Finally, a `target` window is specified from the end of the `gap` window to either the next `discharge` or `death` event (ie., `discharge_or_death`). This would allow us to extract a binary label for each patient in our cohort to be used in the prediction task (ie., field `label` in the `target` window, which will extract `0`: discharged, `1`: died). Additionally, an `index_timestamp` field is set as the `end` of the `input` window to denote when a prediction is made (ie., at the end of the `input` window when all input data is fed into the model), and can be used to index extraction results."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We now load our configuration file by passing its path (`str`) into `config.TaskExtractorConfig.load()`. This parses the configuration file for each of the three key sections indicated above and prepares ACES for extraction based on our defined constraints (inclusion/exclusion criteria for each window)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cfg = config.TaskExtractorConfig.load(config_path=config_path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Task Tree\n",
"\n",
"With the configuration file loaded and parsed, we can access a visualization of a tree structure that is representative of our task of interest. As seen, the tree nodes are `start` and `end` time points of the windows that were defined in the configuration file, and the tree edges express the relationships between these windows. ACES will traverse this tree and recursively compute aggregated predicate counts for each subtree. This would allow us to filter our dataset to valid realizations of this task tree, which would make up our task cohort."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tree = cfg.window_tree\n",
"print_tree(tree)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This tutorial uses synthetic data of 100 patients stored in the MEDS standard. For more information about this data, please refer to the generation of this synthetic data in the [ESGPT Documentation](https://eventstreamml.readthedocs.io/en/latest/_collections/local_tutorial_notebook.html) (separately converted to MEDS). Here is what the data looks like:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.read_parquet(f\"{data_path}/train/0.parquet\").head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Predicate Columns\n",
"\n",
"The next step in our cohort extraction is the generation of predicate columns. Our defined dataset-agnostic windows (ie., complex task logic) are linked to dataset-specific predicates (ie., dataset observations and concepts), which facilitates the sharing of tasks across datasets. As such, the predicates dataframe is the foundational unit on which ACES acts upon.\n",
"\n",
"A predicate column is simply a column containing numerical counts (often just `0`'s and `1`'s), representing the number of times a given predicate (concept) occurs at a given timestamp for a given patient.\n",
"\n",
"In the case of MEDS (and ESGPT), ACES support the automatic generation of these predicate columns from the configuration file. However, some fields need to be provided via a `DictConfig` object. These include the path to the directory of the MEDS dataset (`str`) and the data standard (which is `meds` in this case).\n",
"\n",
"Given this data configuration, we then call `predicates.get_predicates_df()` to generate the relevant predicate columns for our task. Due to the nature of the specified predicates, the resulting dataframe simply contains the unique (`subject_id`, `timestamp`) pairs and binary columns for each predicate. An additional predicate `_ANY_EVENT` is also generated - this will be used to enforce our constraint of the number of events in the `input` window. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_config = DictConfig({\"path\": data_path, \"standard\": \"meds\"})\n",
"\n",
"predicates_df = predicates.get_predicates_df(cfg=cfg, data_config=data_config)\n",
"display(predicates_df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## End-to-End Query"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, with our task configuration object and the computed predicates dataframe, we can call `query.query()` to execute the extraction of our cohort.\n",
"\n",
"Each row of the resulting dataframe is a valid realization of our task tree. Hence, each instance can be included in our cohort used for the prediction of in-hospital mortality as defined in our task configuration file. The output contains:\n",
"\n",
"- `subject_id`: subject IDs of our cohort (since we'd like to treat individual admissions as separate samples, there will be duplicate subject IDs)\n",
"- `index_timestamp`: timestamp of when a prediction is made, which coincides with the `end` timestamp of the `input` window (as specified in our task configuration)\n",
"- `label`: binary label of mortality, which is derived from the `death` predicate of the `target` window (as specified in our task configuration)\n",
"- `trigger`: timestamp of the `trigger` event, which is the `admission` predicate (as specified in our task configuration)\n",
"\n",
"Additionally, it also includes a column for each node of our task tree in a pre-order traversal order. Each column contains a `pl.Struct` object containing the name of the node, the start and end times of the window it represents, and the counts of all defined predicates in that window."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_result = query.query(cfg=cfg, predicates_df=predicates_df)\n",
"display(df_result)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"... and that's a wrap! We have used ACES to perform an end-to-end extraction on a MEDS dataset for a cohort that can be used to predict in-hospital mortality. Similar pipelines can be made for other tasks, as well as using the ESGPT data standard. You may also pre-compute predicate columns and use the `direct` flag when loading in `.csv` or `.parquet` data files. More information about this is available in [Predicates DataFrame](https://eventstreamaces.readthedocs.io/en/latest/notebooks/predicates.html).\n",
"\n",
"As always, please don't hesitate to reach out should you have any questions about ACES!\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "esgpt",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
9 changes: 6 additions & 3 deletions docs/source/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,13 @@ ACES/
│ │ ├── events_df.parquet
│ │ └── dynamic_measurements_df.parquet
│ ├── meds_sample/
│ │ ├── shards/
│ │ ├── held_out/
│ │ │ └── 0.parquet
│ │ ├── train/
│ │ │ ├── 0.parquet
│ │ │ └── 1.parquet
│ │ └── sample_shard.parquet
│ │ └── tuning/
│ │ └── 0.parquet
│ └── sample_data.csv
├── sample_configs/
│ └── inhospital_mortality.yaml
Expand All @@ -88,7 +91,7 @@ ACES/
**To query from a single MEDS shard**:

```bash
aces-cli cohort_name="inhospital_mortality" cohort_dir="sample_configs/" data.standard=meds data.path="sample_data/meds_sample/sample_shard.parquet"
aces-cli cohort_name="inhospital_mortality" cohort_dir="sample_configs/" data.standard=meds data.path="sample_data/meds_sample/train/0.parquet"
```

**To query from multiple MEDS shards**:
Expand Down
2 changes: 1 addition & 1 deletion sample_configs/imminent_mortality.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Task: 24-hour Imminent Mortality Prediction
predicates:
death:
code: event_type//DEATH
code: DEATH

trigger: _ANY_EVENT

Expand Down
6 changes: 3 additions & 3 deletions sample_configs/inhospital_mortality.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Task: 24-hour In-hospital Mortality Prediction
predicates:
admission:
code: event_type//ADMISSION
code: { regex: "ADMISSION//.*" }
discharge:
code: event_type//DISCHARGE
code: { regex: "DISCHARGE//.*" }
death:
code: event_type//DEATH
code: DEATH
discharge_or_death:
expr: or(discharge, death)

Expand Down
4 changes: 2 additions & 2 deletions sample_configs/intervention_weaning.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Task: Ventilation Weaning Prediction
predicates:
procedure_start:
code: event_type//PROCEDURE_START
code: PROCEDURE_START
procedure_end:
code: event_type//PROCEDURE_END
code: PROCEDURE_END
ventilation:
code: procedure//Invasive Ventilation
ventilation_start:
Expand Down
4 changes: 2 additions & 2 deletions sample_configs/long_term_recurrence.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Task: Long-term (3 Years) MI Recurrence Prediction
predicates:
admission:
code: event_type//ADMISSION
code: { regex: "ADMISSION//.*" }
discharge:
code: event_type//DISCHARGE
code: { regex: "DISCHARGE//.*" }
diagnosis_ICD9CM_41071:
code: diagnosis//ICD9CM_41071
diagnosis_ICD10CM_I214:
Expand Down
4 changes: 2 additions & 2 deletions sample_configs/readmission_risk.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Task: 30-day Readmission Risk Prediction
predicates:
admission:
code: event_type//ADMISSION
code: { regex: "ADMISSION//.*" }
discharge:
code: event_type//DISCHARGE
code: { regex: "DISCHARGE//.*" }

trigger: admission

Expand Down
Binary file modified sample_data/meds_sample/held_out/0.parquet
Binary file not shown.
Binary file removed sample_data/meds_sample/sample_shard.parquet
Binary file not shown.
Binary file modified sample_data/meds_sample/train/0.parquet
Binary file not shown.
Binary file modified sample_data/meds_sample/train/1.parquet
Binary file not shown.
Binary file added sample_data/meds_sample/tuning/0.parquet
Binary file not shown.

0 comments on commit e826a7b

Please sign in to comment.