Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk fhir import support #37

Merged
merged 8 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ config :dart_sass,
cd: Path.expand("../assets", __DIR__)
]

config :kindling, root_resources: ["Patient", "Observation", "Organization"]

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
229 changes: 229 additions & 0 deletions lib/epiviewpoint/bulk_fhir_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
defmodule EpiViewpoint.BulkFhirParser do
def parse_bulk_fhir(file_list) do
with {:ok, contents} <- combine_contents(file_list),
{:ok, resources} <- load_resources(file_list),
{:ok, extracted} <- extract_resources(resources),
{:ok, joined} <- join_resources(extracted) do
{:ok, to_map(joined, contents)}
end
end

defp combine_contents(file_list) do
contents = file_list |> Enum.map(& &1.contents) |> List.to_string()
{:ok, contents}
end

defp load_resources(file_list) do
result =
Enum.reduce_while(file_list, {:ok, %{}}, fn file, {:ok, acc} ->
case load_resource_file(file.contents) do
{:ok, resources} -> {:cont, {:ok, group_resources(resources, acc)}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)

case result do
{:ok, resources} -> {:ok, resources}
{:error, reason} -> {:error, reason}
end
end

defp group_resources(resources, acc) do
Enum.reduce(resources, acc, fn resource, acc ->
Map.update(acc, resource.resource_type, [resource], &[resource | &1])
end)
end

defp load_resource_file(file_content) do
result =
file_content
|> String.split("\n")
|> Stream.map(&String.trim/1)
|> Stream.filter(&(&1 != ""))
|> Stream.map(&json_to_kindle_schema/1)
|> Enum.to_list()

{:ok, result}
rescue
e -> {:error, "Failed to load resource file: #{inspect(e)}"}
end

defp json_to_kindle_schema(json) do
case Jason.decode(json) do
{:ok, map} -> Kindling.Converter.convert("EpiViewpoint.R4", map)
{:error, reason} -> {:error, "Failed to decode JSON: #{inspect(reason)}"}
end
end

defp extract_resources(resources) do
result =
Enum.reduce_while(resources, {:ok, %{}}, fn {resource_type, resource_list}, {:ok, acc} ->
case extract_resource(resource_type, resource_list) do
{:ok, extracted} -> {:cont, {:ok, Map.put(acc, resource_type, extracted)}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)

case result do
{:ok, extracted} -> {:ok, extracted}
{:error, reason} -> {:error, reason}
end
end

defp extract_resource("Patient", resource_list) do
{:ok, Enum.map(resource_list, &extract_patient/1)}
end

defp extract_resource("Observation", resource_list) do
{:ok, Enum.map(resource_list, &extract_observation/1)}
end

defp extract_resource("Organization", resource_list) do
{:ok, Enum.map(resource_list, &extract_organization/1)}
end

defp extract_resource(unknown_type, _) do
{:error, "Unknown resource type: #{unknown_type}"}
end

defp extract_patient(%EpiViewpoint.R4.Patient{} = patient) do
%EpiViewpoint.R4.Patient{
id: caseid,
identifier: [%EpiViewpoint.R4.Identifier{value: person_tid}],
name: [%EpiViewpoint.R4.HumanName{given: [search_firstname], family: search_lastname}],
birth_date: dateofbirth,
gender: sex,
address: [
%EpiViewpoint.R4.Address{
line: [diagaddress_street1],
city: diagaddress_city,
state: diagaddress_state,
postal_code: diagaddress_zip
}
],
telecom: [%EpiViewpoint.R4.ContactPoint{value: phonenumber}],
extension: extensions
} = patient

%{
caseid: caseid,
person_tid: person_tid,
search_firstname: search_firstname,
search_lastname: search_lastname,
dateofbirth: format_date(dateofbirth),
sex: String.capitalize(to_string(sex)),
diagaddress_street1: diagaddress_street1,
diagaddress_city: diagaddress_city,
diagaddress_state: diagaddress_state,
diagaddress_zip: diagaddress_zip,
phonenumber: phonenumber,
ethnicity: find_extension(extensions, :ethnicity),
occupation: find_extension(extensions, :occupation),
race: find_extension(extensions, :race)
}
end

defp extract_observation(%EpiViewpoint.R4.Observation{} = observation) do
%EpiViewpoint.R4.Observation{
id: lab_result_tid,
subject: %EpiViewpoint.R4.Reference{reference: "Patient/" <> pat_id},
effective_date_time: datecollected,
issued: resultdate,
code: %EpiViewpoint.R4.CodeableConcept{text: testname},
interpretation: [%EpiViewpoint.R4.CodeableConcept{coding: [%EpiViewpoint.R4.Coding{display: result}]}],
performer: [%EpiViewpoint.R4.Reference{reference: "Organization/" <> org_id}],
extension: extensions
} = observation

%{
lab_result_tid: lab_result_tid,
pat_id: pat_id,
datecollected: datecollected,
resultdate: format_date(resultdate),
testname: testname,
result: result,
org_id: org_id,
datereportedtolhd: find_extension(extensions, :datereportedtolhd)
}
end

defp extract_organization(%EpiViewpoint.R4.Organization{} = organization) do
%{
organization_id: organization.id,
ordering_facility_name: organization.name
}
end

defp find_extension(extensions, :race) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{
url: "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race",
extension: [%EpiViewpoint.R4.Extension{url: "ombCategory", value_coding: %EpiViewpoint.R4.Coding{display: value}}, _]
} ->
value

_ ->
nil
end)
end

defp find_extension(extensions, :ethnicity) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{
url: "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity",
extension: [%EpiViewpoint.R4.Extension{url: "ombCategory", value_coding: %EpiViewpoint.R4.Coding{display: value}}, _]
} ->
value

_ ->
nil
end)
end

defp find_extension(extensions, :occupation) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{url: "http://hl7.org/fhir/StructureDefinition/patient-occupation", value_string: value} ->
value

_ ->
nil
end)
end

defp find_extension(extensions, :datereportedtolhd) do
Enum.find_value(extensions, nil, fn
%EpiViewpoint.R4.Extension{url: "http://hl7.org/fhir/StructureDefinition/datereportedtolhd", value_date: value} ->
value

_ ->
nil
end)
end

defp join_resources(resources) do
patients = Map.get(resources, "Patient", [])
observations = Map.get(resources, "Observation", [])
organizations = Map.get(resources, "Organization", [])

joined = Enum.map(observations, &merge_resources(&1, patients, organizations))

{:ok, joined}
end

defp merge_resources(observation, patients, organizations) do
patient = Enum.find(patients, %{}, &(&1.caseid == observation.pat_id))
organization = Enum.find(organizations, %{}, &(&1.organization_id == observation.org_id))

observation
|> Map.merge(patient)
|> Map.merge(organization)
|> Map.drop([:pat_id, :org_id, :organization_id])
end

defp to_map(resources, contents) do
%{file_name: "load.bulk_fhir", contents: contents, list: resources}
jc00ke marked this conversation as resolved.
Show resolved Hide resolved
end

defp format_date(date) when is_struct(date, Date), do: Calendar.strftime(date, "%m/%d/%Y")
defp format_date(datetime) when is_struct(datetime, DateTime), do: DateTime.to_date(datetime) |> format_date()
end
4 changes: 4 additions & 0 deletions lib/epiviewpoint/cases.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ defmodule EpiViewpoint.Cases do
def count_lab_results(), do: LabResult |> Repo.aggregate(:count)
def create_lab_result!({attrs, audit_meta}), do: %LabResult{} |> change_lab_result(attrs) |> AuditingRepo.insert!(audit_meta)
def import_lab_results(lab_result_data_file_string, originator), do: Import.import_data_file(lab_result_data_file_string, originator)

def import_bulk_fhir_lab_results(lab_result_data_file_list, originator),
do: Import.import_bulk_fhir_data_file(lab_result_data_file_list, originator)

def list_lab_results(), do: LabResult.Query.all() |> Repo.all()
def preload_initiating_lab_result(case_investigations_or_nil), do: case_investigations_or_nil |> Repo.preload(:initiating_lab_result)
def preload_lab_results(person_or_people_or_nil), do: person_or_people_or_nil |> Repo.preload(lab_results: LabResult.Query.display_order())
Expand Down
7 changes: 7 additions & 0 deletions lib/epiviewpoint/cases/import.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule EpiViewpoint.Cases.Import do
alias EpiViewpoint.Accounts
alias EpiViewpoint.AuditLog
alias EpiViewpoint.BulkFhirParser
alias EpiViewpoint.Cases
alias EpiViewpoint.Cases.Import
alias EpiViewpoint.Cases.LabResult
Expand Down Expand Up @@ -59,6 +60,11 @@ defmodule EpiViewpoint.Cases.Import do
]
end

def import_bulk_fhir_data_file(lab_result_data_file_list, originator) do
{:ok, bulk_fhir_data} = BulkFhirParser.parse_bulk_fhir(lab_result_data_file_list)
import_data_file(bulk_fhir_data, originator)
end

def import_data_file(_file, %{admin: false}), do: {:error, "Originator must be an admin"}

def import_data_file(file, %Accounts.User{} = originator) do
Expand Down Expand Up @@ -103,6 +109,7 @@ defmodule EpiViewpoint.Cases.Import do
case Path.extname(file.file_name) do
".csv" -> DataFile.read(file.contents, :csv, &rename_headers/1, @fields)
".ndjson" -> DataFile.read(file.contents, :ndjson, &rename_headers/1, @fields)
".bulk_fhir" -> DataFile.read(file.list, :bulk_fhir, &rename_headers/1, @fields)
_ -> {:error, "Unsupported file type: #{file.extension}"}
end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/epiviewpoint/data_file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ defmodule EpiViewpoint.DataFile do
read(string, header_transformer, headers, &parse_ndjson/1)
end

def read(string, :bulk_fhir, header_transformer, headers) when is_list(string) do
read(string, header_transformer, headers, &parse_bulk_fhir/1)
end

def read(input, header_transformer, [required: required_headers, optional: optional_headers], parser) do
with {:ok, df} <- parser.(input),
{:ok, provided_headers} <- extract_provided_headers(df, header_transformer),
Expand Down Expand Up @@ -84,6 +88,10 @@ defmodule EpiViewpoint.DataFile do
DataFrame.load_ndjson(input)
end

defp parse_bulk_fhir(input) do
{:ok, DataFrame.new(input)}
end

defp validate_csv(input) do
EpiViewpoint.DataFile.Parser.parse_string(input, headers: true)
input
Expand Down
68 changes: 68 additions & 0 deletions lib/epiviewpoint/r4/address.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule EpiViewpoint.R4.Address do
use Ecto.Schema
import Ecto.Changeset

@fields [
:city,
:country,
:district,
:id,
:line,
:postal_code,
:state,
:text,
:type,
:use
]
@required_fields []

embedded_schema do
# Fields
field(:city, :string)
field(:country, :string)
field(:district, :string)
field(:postal_code, :string)
field(:state, :string)
field(:text, :string)

field(:line, {:array, :string})

# Enum
field(:type, Ecto.Enum,
values: [
:postal,
:physical,
:both
]
)

field(:use, Ecto.Enum,
values: [
:home,
:work,
:temp,
:old,
:billing
]
)

# Embed One
embeds_one(:period, EpiViewpoint.R4.Period)

# Embed Many
embeds_many(:extension, EpiViewpoint.R4.Extension)
end

def choices(_), do: nil

def version_namespace, do: EpiViewpoint.R4
def version, do: "R4"

def changeset(data \\ %__MODULE__{}, attrs) do
data
|> cast(attrs, @fields)
|> cast_embed(:period)
|> cast_embed(:extension)
|> validate_required(@required_fields)
end
end
Loading
Loading