Skip to content

Analytics on a single machine using Docker

williamito edited this page May 23, 2023 · 22 revisions

The repository includes a "Single Machine" Docker Compose configuration which brings up the FHIR Pipelines Controller plus a Spark Thrift server, letting you more easily run Spark SQL queries on the Parquet files output by the Pipelines Controller.

To learn how the Pipelines Controller works on its own, Try out the FHIR Pipelines Controller.

Requirements

Configure the FHIR Pipelines Controller

Note: All file paths are relative to the root of the FHIR Data Pipes repository.

  1. Open docker/config/application.yaml and edit the value of fhirServerUrl to match the FHIR server you are connecting to.
  2. Open docker/config/hapi-postgres-config_local.json and edit the values to match the FHIR server you are connecting to.

If you are using the local test servers, things should work with the default values. If not, use the IP address of the Docker default bridge network. To find it, run the following command and use the "Gateway" value:

docker network inspect bridge --format='{{json .IPAM.Config}}'

The Single Machine docker configuration uses two environment variables, DWH_ROOT and PIPELINE_CONFIG, whose default values are defined in the .env file. To override them, set the variable before running the docker-compose command. For example, to override the DWH_ROOT environment variable, run the following:

DWH_ROOT="$(pwd)/my-amazing-data" docker-compose -f docker/compose-controller-spark-sql.yaml up --force-recreate 

Run the Single Machine configuration

To bring up the configuration, run:

docker-compose -f docker/compose-controller-spark-sql-single.yaml up --force-recreate

If you have run this container in the past and want to include new changes pulled into the repo, add the --build flag to rebuild the binaries.

Alternatively, the configuration docker/compose-controller-spark-sql.yaml shows a simplified example on how to integrate the Parquet output of Pipelines in a Spark cluster environment.

Once started, the Pipelines Controller is available at http://localhost:8090 and the Spark Thrift server is at http://localhost:10001.

The first time you run the Pipelines Controller, you must manually start a Full Pipeline run. In a browser go to http://localhost:8090 and click the Run Full button.

After running the Full Pipeline, use the Incremental Pipeline to update the Parquet files and tables. By default it is scheduled to run every hour, or you can manually trigger it.

If the Incremental Pipeline does not work, or you see errors like:

ERROR o.openmrs.analytics.PipelineManager o.openmrs.analytics.PipelineManager$PipelineThread.run:343 - exception while running pipeline: 
pipeline-controller    | java.sql.SQLException: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.

try running sudo chmod -R 755 on the Parquet file directory, by default located at docker/dwh.

View and analyze the data using Spark Thrift server

Connect to the Spark Thrift server using a client that supports Apache Hive. For example, if using the JDBC driver, the URL should be jdbc:hive2://localhost:10001. The pipeline will automatically create Patient, Encounter, and Observation tables when run.

Best practices for querying exported Parquet files

Handle nested fields

One major challenge when querying exported data is that FHIR resources have many nested fields. One approach is to use LATERAL VIEW with EXPLODE to flatten repeated fields and then filter for specific values of interest.

Example queries

The following queries explore the sample data loaded when using a local test server. They leverage LATERAL VIEW with EXPLODE to flatten the Observation.code.coding repeated field and filter for specific observation codes.

Note that the synthetic sample data simulates HIV patients. Observations for HIV viral load use the following code, which is not the actual LOINC code:

[
   {
      "id":null,
      "coding":[
         {
            "id":null,
            "system":"http://loinc.org",
            "version":null,
            "code":"856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
            "display":"HIV viral load",
            "userSelected":null
         }
      ],
      "text":"HIV viral load"
   }
]
Find patients with an observed viral load higher than a threshold
SELECT P.id AS pid, P.name.family AS family, P.gender AS gender, O.id AS obs_id, OCC.`system`,
  OCC.code, O.status AS status, O.value.quantity.value AS value
FROM Patient AS P, Observation AS O LATERAL VIEW explode(code.coding) AS OCC 
WHERE P.id = O.subject.PatientId
  AND OCC.`system` = 'http://loinc.org'
  AND OCC.code LIKE '856A%'
  AND O.value.quantity.value > 10000
LIMIT 4;

Sample output:

pid  |family      |gender|obs_id|system          |code                                |status|value   |
-----+------------+------+------+----------------+------------------------------------+------+--------+
4765 |["Marks830"]|male  |6488  |http://loinc.org|856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|final |860500.0|
32085|["Ceja441"] |male  |33164 |http://loinc.org|856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|final |404230.0|
9357 |["Sauer652"]|male  |12338 |http://loinc.org|856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|final |749860.0|
8619 |["Rogahn59"]|male  |9638  |http://loinc.org|856AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|final |390490.0|
Count all viral-load observations
SELECT COUNT(0)
FROM (
  SELECT P.id AS pid, P.name.family AS family, P.gender AS gender, O.id AS obs_id, OCC.`system`,
    OCC.code, O.status AS status, O.value.quantity.value
  FROM Patient AS P, Observation AS O LATERAL VIEW explode(code.coding) AS OCC 
  WHERE P.id = O.subject.PatientId
    AND OCC.`system` = 'http://loinc.org'
    AND OCC.code LIKE '856A%'
);

Sample output:

count(0)|
--------+
     265|

Use Views to reduce complexity

Once you have a query that filters to the data you're interested in, create a view with a simpler schema to work with in the future. This is a good way to create building blocks to combine with other data and create more complex queries.

Observations of patients starting an Anti-retroviral plan in 2010
SELECT
  O.id AS obs_id, OCC.`system`, OCC.code, O.status AS status,
  OVCC.code AS value_code, O.subject.PatientId AS patient_id
FROM Observation AS O LATERAL VIEW explode(code.coding) AS OCC
  LATERAL VIEW explode(O.value.codeableConcept.coding) AS OVCC
WHERE OCC.code LIKE '1255%'
  AND OVCC.code LIKE "1256%"
  AND YEAR(O.effective.dateTime) = 2010
LIMIT 1;

Sample output:

obs_id|system          |code                                |status|value_code                          |patient_id|
------+----------------+------------------------------------+------+------------------------------------+----------+
86837 |http://loinc.org|1255AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|final |1256AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA|83691     |
Create a corresponding view
CREATE VIEW obs_arv_plan AS
SELECT
  O.id AS obs_id, OCC.`system`, OCC.code, O.status AS status,
  OVCC.code AS value_code, O.subject.PatientId AS patient_id
FROM Observation AS O LATERAL VIEW explode(code.coding) AS OCC
  LATERAL VIEW explode(O.value.codeableConcept.coding) AS OVCC
WHERE OCC.code LIKE '1255%'
  AND OVCC.code LIKE "1256%"
  AND YEAR(O.effective.dateTime) = 2010;
Count cases of Anti-retroviral plans started in 2010
SELECT COUNT(0) FROM obs_arv_plan ;

Sample output:

count(0)|
--------+
       4|
Compare Patient data with view based on observations
SELECT P.id AS pid, P.name.family AS family, P.gender AS gender, COUNT(0) AS num_start
FROM Patient P, obs_arv_plan
WHERE P.id = obs_arv_plan.patient_id
GROUP BY P.id, P.name.family, P.gender
ORDER BY num_start DESC
LIMIT 10;

Sample output:

pid  |family          |gender|num_start|
-----+----------------+------+---------+
83691|["Terry864"]    |male  |        1|
39417|["VonRueden376"]|male  |        1|
21485|["Terry864"]    |male  |        1|
47696|["VonRueden376"]|male  |        1|