Note: This pipeline is in maintenance mode and not actively developed. There are known issues with duplicated resources in Parquet files.
The streaming directory contains code to continuously listen for changes to an underlying OpenMRS database using Debezium.
The Debezium-based streaming mode provides real-time downstream consumption of incremental updates, even for operations that were performed outside the OpenMRS API, e.g., data cleaning, module operations, and data syncing/migration. It captures incremental updates from the MySQL database binlog then streams both FHIR and non-FHIR data for downstream consumption. It is not based on Hibernate Interceptor or Event Module; therefore, all events are captured from day 0 and can be used independently without the need for a batch pipeline. It also tolerates failures like application restarts or crashes, as the pipeline will resume from the last processed offset.
Streaming mode works only with OpenMRS; support for other data sources is not planned.
- An OpenMRS instance with the latest version of the
FHIR2 Module
installed.
- There is an OpenMRS Reference Application image with these prerequisites and demo data you can use to try things out.
- A target output for the data. Supported options are Apache Parquet files or a
FHIR server such as HAPI FHIR or
Google Cloud Platform FHIR stores.
- You can use our HAPI FHIR server image for testing FHIR API targets.
- Learn how to create a compatible GCP FHIR store, if you want to use this option.
-
Clone the FHIR Data Pipes project to your machine.
-
Set the
utils
directory to world-readable:chmod -R 755 ./utils
. -
Edit
../utils/dbz_event_to_fhir_config.json
. Find thedebeziumConfigurations
section at the top of the file and edit the values to match your environment. See the documentation on Debezium MySQL Connector properties for more information. -
Build binaries from the repo root with
mvn clean package
. -
Run the pipeline to a FHIR server and Parquet files:
$ java -jar ./pipelines/streaming/target/streaming-bundled.jar \ com.google.fhir.analytics.Runner \ --fhirServerUrl=http://localhost:8099/openmrs/ws/fhir2/R4 \ --fhirServerUserName=admin --fhirServerPassword=Admin123 \ --fhirSinkPath=http://localhost:8098/fhir \ --sinkUserName=hapi --sinkPassword=hapi \ --outputParquetPath=/tmp/TEST/ \ --fhirDatabaseConfigPath=./utils/dbz_event_to_fhir_config.json
Or to a GCP FHIR store:
$ java -jar ./pipelines/streaming/target/streaming-bundled.jar \ com.google.fhir.analytics.Runner \ --fhirServerUrl=http://localhost:8099/openmrs/ws/fhir2/R4 \ --fhirServerUserName=admin --fhirServerPassword=Admin123 \ --fhirSinkPath=projects/PROJECT/locations/LOCATION/datasets/DATASET/fhirStores/FHIRSTORENAME \ --fhirDatabaseConfigPath=../utils/dbz_event_to_fhir_config.json
fhirServerUrl
- The URL of the source FHIR server instance.fhirServerUserName
- The HTTP Basic Auth username to access the fhir server APIs.fhirServerPassword
- The HTTP Basic Auth password to access the fhir server APIs.
fhirDatabaseConfigPath
- The path to the configuration file containing MySQL parameters and FHIR mappings. This generally should not be changed. Instead, edit../utils/dbz_event_to_fhir_config.json
to match your configuration. Default:../utils/dbz_event_to_fhir_config.json
Parquet files are output when outputParquetPath
is set.
outputParquetPath
- The file path to write Parquet files to, e.g.,./tmp/parquet/
. Default: empty string, which does not output Parquet files.secondsToFlushParquetFiles
- The number of seconds to wait before flushing all Parquet writers with non-empty content to files. Use0
to disable. Default:3600
.rowGroupSizeForParquetFiles
- The approximate size in bytes of the row-groups in Parquet files. When this size is reached, the content is flushed to disk. This is not used if there are less than 100 records. Use0
to use the default Parquet row-group size. Default:0
.
Resources will be copied to the FHIR server specified in fhirSinkPath
if that
field is set.
fhirSinkPath
- A base URL to a target FHIR server, or the relative path of a GCP FHIR store, e.g.http://localhost:8091/fhir
for a FHIR server orprojects/PROJECT/locations/LOCATION/datasets/DATASET/fhirStores/FHIR-STORE-NAME
for a GCP FHIR store. If using a GCP FHIR store, see here for setup information. default: none, resources are not copiedsinkUserName
- The HTTP Basic Auth username to access the FHIR sink. Not used for GCP FHIR stores.sinkPassword
- The HTTP Basic Auth password to access the FHIR sink. Not used for GCP FHIR stores.
-
Will this pipeline include historical data recorded before the
mysql binlog
was enabled? Yes. By default, the pipeline takes a snapshot of the entire database and will include all historical data. -
How do I stop Debezium from taking a snapshot of the entire database? Set
snapshotMode
in the config file toschema_only
i.e.,"snapshotMode" : "initial"
. Other options include:when_needed
,schema_only
,initial
(default),never
, e.t.c. See thedebezium documentation
for more details.