Skip to content

Commit

Permalink
Merge pull request #5 from grongierisc/master
Browse files Browse the repository at this point in the history
Add python experience on datapipe flows
  • Loading branch information
isc-afuentes authored Jul 20, 2023
2 parents 5a5ba8b + 8eaf6ab commit 8ae9d4b
Show file tree
Hide file tree
Showing 28 changed files with 827 additions and 49 deletions.
15 changes: 14 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
*.log
data/input/*.hl7
data/input/*.hl7
data/output/
# Python stuff:
*.pyc
*.pyo
# virtualenv
.venv
# dist
dist
# build
build
# eggs
*.egg
*.egg-info
12 changes: 11 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG IMAGE=intersystemsdc/irishealth-community:2022.2.0.368.0-zpm
ARG IMAGE=intersystemsdc/irishealth-community:latest
FROM $IMAGE

USER root
Expand All @@ -24,7 +24,17 @@ COPY --chown=irisowner:irisowner src src
COPY --chown=irisowner:irisowner module.xml module.xml
COPY --chown=irisowner:irisowner Installer.cls Installer.cls

ENV PATH "/home/irisowner/.local/bin:/usr/irissys/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/irisowner/bin"

RUN pip install iris-pex-embedded-python

## Python stuff
ENV IRISUSERNAME "SuperUser"
ENV IRISPASSWORD "SYS"
ENV IRISNAMESPACE "DPIPE"

# run iris.script
RUN iris start IRIS \
&& iris session IRIS < /opt/irisapp/iris.script \
&& iop -m /opt/irisapp/src/python/Demo/settings.py \
&& iris stop IRIS quietly
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ docker compose up -d

* Log-in to the system using `superuser` / `SYS`.

* Start an inteoperability production in *Interoperability > List > Productions*:
* `DataPipe.Test.Production` - regular objectscript DataPipe demo production
* `DataPipe.Python.Production` - Python implemented DataPiepe demo production

* Generate sample data using [WebTerminal](http://localhost:52773/terminal/)
```objectscript
do ##class(DataPipe.Test.HL7.Helper).GenerateFilesHL7ADT(100)
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ services:
container_name: datapipe
init: true
ports:
- "1972:1972"
- "52773:52773"
- 1972:1972
- 52773:52773
volumes:
- .:/app
networks:
Expand Down
7 changes: 6 additions & 1 deletion iris.script
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
set $namespace = "%SYS"
do ##class(Security.Users).UnExpireUserPasswords("*")

//Service for embedded python
do ##class(Security.Services).Get("%Service_CallIn",.prop)
set prop("Enabled")=1
set prop("AutheEnabled")=48
do ##class(Security.Services).Modify("%Service_CallIn",.prop)

// create ns for dev environment
do $SYSTEM.OBJ.Load("/opt/irisapp/Installer.cls", "ck")

Expand All @@ -24,6 +30,5 @@
// auto start interop production
set production = "DataPipe.Test.Production"
set ^Ens.Configuration("csp","LastProduction") = production
do ##class(Ens.Director).SetAutoStart(production)

halt
9 changes: 8 additions & 1 deletion module.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Document name="iris-datapipe.ZPM">
<Module>
<Name>iris-datapipe</Name>
<Version>0.0.1</Version>
<Version>0.0.2</Version>
<Description>DataPipe an interoperability framework to ingest data in InterSystems IRIS in a flexible way.</Description>
<Keywords>datapipe ingestion staging validation</Keywords>
<Author>
Expand All @@ -19,6 +19,13 @@
<Version>>=1.0.8</Version>
</ModuleReference>
</Dependencies>
<Dependencies>
<ModuleReference>
<Name>pex-embbeded-python</Name>
<Version>>=2.0.0</Version>
</ModuleReference>
</Dependencies>
<FileCopy Name="src/python/DataPipe" Target="${libdir}python/DataPipe/"/>
<SourcesRoot>src</SourcesRoot>
<Resource Name="DataPipe.PKG"/>
<CSPApplication
Expand Down
Empty file added pyproject.toml
Empty file.
59 changes: 59 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Licensed under the MIT License
# https://github.com/grongierisc/iris_datapipe/blob/main/LICENSE

import os

from setuptools import setup

def package_files(directory):
paths = []
for (path, directories, filenames) in os.walk(directory):
for filename in filenames:
paths.append(os.path.join('.', path, filename))
return paths

extra_files = package_files('src/python/')

def main():
# Read the readme for use as the long description
with open(os.path.join(os.path.abspath(os.path.dirname(__file__)),
'README.md'), encoding='utf-8') as readme_file:
long_description = readme_file.read()

# Do the setup
setup(
name='iris_datapipe',
description='iris_datapipe',
long_description=long_description,
long_description_content_type='text/markdown',
version='0.0.2',
author='grongier',
author_email='guillaume.rongier@intersystems.com',
keywords='iris_datapipe',
url='https://github.com/grongierisc/iris-datapipe',
license='MIT',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Utilities'
],
package_dir={'': 'src/python'},
packages=['DataPipe'],

python_requires='>=3.6',
install_requires=[
"iris-pex-embedded-python>=2.0.0"
]
)


if __name__ == '__main__':
main()
9 changes: 7 additions & 2 deletions src/DataPipe/Data/Inbox.cls
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ ClassMethod queryFIND() As %String

/// Get an Inbox record, querying by key attributes
/// This method is used to determine if an Inbox record can be re-used
ClassMethod GetByKeyAttributes(att As InboxAttributes, Output obj As Inbox) As %Status
ClassMethod GetByKeyAttributes(
att As InboxAttributes,
Output obj As Inbox) As %Status
{
set ret = $$$OK
try {
Expand All @@ -115,7 +117,10 @@ ClassMethod GetByKeyAttributes(att As InboxAttributes, Output obj As Inbox) As %

/// Calculate a SQL field given an Inbox ID
/// This method is used by calculated SQL fields
ClassMethod CalcSqlFieldById(field As %String, default As %String, id As %String) As %String
ClassMethod CalcSqlFieldById(
field As %String,
default As %String,
id As %String) As %String
{
set ret = default
try {
Expand Down
20 changes: 19 additions & 1 deletion src/DataPipe/Data/Ingestion.cls
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,17 @@ Parameter FORMORDERBY As %String = "Id";
/// *Calculated* Id. %ID value projected to JSON
Property Id As %Integer(%JSONINCLUDE = "OUTPUTONLY") [ Calculated, SqlComputeCode = { set {*}={%%ID}}, SqlComputed ];

/// Model Is Python
Property ModelIsPython As %Boolean(%JSONINCLUDE = "NONE") [ InitialExpression = 0 ];

/// Model class name
Property ModelModule As %String(%JSONINCLUDE = "NONE", MAXLEN = "");

/// Model Module path
Property ModelClassPath As %String(%JSONINCLUDE = "NONE", MAXLEN = "");

/// Model class name
Property ModelName As %String(MAXLEN="");
Property ModelName As %String(MAXLEN = "");

/// Model serialized data
Property ModelData As %Stream.GlobalCharacter;
Expand Down Expand Up @@ -81,6 +90,15 @@ Storage Default
<Value name="12">
<Value>HeaderId</Value>
</Value>
<Value name="13">
<Value>ModelIsPython</Value>
</Value>
<Value name="14">
<Value>ModelModule</Value>
</Value>
<Value name="15">
<Value>ModelClassPath</Value>
</Value>
</Data>
<DataLocation>^DataPipe.Data.IngestionD</DataLocation>
<DefaultData>IngestionDefaultData</DefaultData>
Expand Down
5 changes: 3 additions & 2 deletions src/DataPipe/Data/Staging.cls
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ Method SerializeValidationErrors() As %Status
try {
set count = ..ValidationErrors.Count()
for i=1:1:count {
set code = ..ValidationErrors.GetAt(i).Code
set desc = ..ValidationErrors.GetAt(i).Desc
set err = ..ValidationErrors.GetAt(i)
set code = err.Code
set desc = err.Desc
set jsonError = code_": "_desc
do jsonArr.%Push(jsonError)
}
Expand Down
21 changes: 21 additions & 0 deletions src/DataPipe/Helper.cls
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Class DataPipe.Helper Extends %RegisteredObject
{

ClassMethod SetPythonPath(pClasspaths)
{
set sys = ##class(%SYS.Python).Import("sys")
do sys.path.append(pClasspaths)
}

ClassMethod GetPythonInstance(
pModule,
pRemoteClassname) As %SYS.Python
{
set importlib = ##class(%SYS.Python).Import("importlib")
set builtins = ##class(%SYS.Python).Import("builtins")
set module = importlib."import_module"(pModule)
set class = builtins.getattr(module, pRemoteClassname)
return class."__new__"(class)
}

}
9 changes: 7 additions & 2 deletions src/DataPipe/Model.cls
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ Method Serialize(Output stream) As %Status
Quit $$$ERROR($$$NotImplemented)
}

ClassMethod Deserialize(stream As %Stream.Object, Output obj) As %Status
ClassMethod Deserialize(
stream As %Stream.Object,
Output obj) As %Status
{
Quit $$$ERROR($$$NotImplemented)
}
Expand All @@ -26,7 +28,10 @@ Method GetOperation() As %Status
Quit $$$ERROR($$$NotImplemented)
}

Method RunOperation(Output errorList As %List, Output log As %Stream.Object, bOperation As Ens.BusinessOperation = "") As %Status
Method RunOperation(
Output errorList As %List,
Output log As %Stream.Object,
bOperation As Ens.BusinessOperation = "") As %Status
{
Quit $$$ERROR($$$NotImplemented)
}
Expand Down
48 changes: 41 additions & 7 deletions src/DataPipe/Oper/BO/OperationHandler.cls
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Parameter ADAPTER;

Parameter INVOCATION = "Queue";

Method OperationHandler(pRequest As DataPipe.Msg.OperReq, Output pResponse As Ens.Response) As %Status
Method OperationHandler(
pRequest As DataPipe.Msg.OperReq,
Output pResponse As Ens.Response) As %Status
{
set ret = $$$OK
try {
Expand All @@ -19,10 +21,20 @@ Method OperationHandler(pRequest As DataPipe.Msg.OperReq, Output pResponse As En
$$$ThrowOnError(operObj.%Save())

// deserialize model
set modelData = pRequest.data.ModelNormData
set modelName = pRequest.data.Ingestion.ModelName
set sc = $classmethod(modelName, "Deserialize", modelData, .modelObj)
$$$ThrowOnError(sc)
if pRequest.data.Ingestion.ModelIsPython {
// Set python path
do ##class(DataPipe.Helper).SetPythonPath(pRequest.data.Ingestion.ModelClassPath)
set modelObj = ##class(DataPipe.Helper).GetPythonInstance(pRequest.data.Ingestion.ModelModule, pRequest.data.Ingestion.ModelName)
do modelObj.Deserialize(pRequest.data.ModelNormData)
set errorList = ""
set operLog = ""

} else {
set modelData = pRequest.data.ModelNormData
set modelName = pRequest.data.Ingestion.ModelName
set sc = $classmethod(modelName, "Deserialize", modelData, .modelObj)
$$$ThrowOnError(sc)
}

set operLog = ""
set operationSC = $$$OK
Expand All @@ -34,15 +46,37 @@ Method OperationHandler(pRequest As DataPipe.Msg.OperReq, Output pResponse As En

// not ignored - perfom operation
} else {
set operationSC = modelObj.RunOperation(.errorList, .operLog, ##this)
if pRequest.data.Ingestion.ModelIsPython {
// Python doesn't support byref parameters, so we return error list and oper log as strings
try {
set operationSC = $$$OK
set pythonList = modelObj.RunOperation(##this)
set errorList = ##class(%ListOfDataTypes).%New()
set operLog = ##class(%Stream.GlobalCharacter).%New()
for i=1:1:pythonList.Count() {
set err = pythonList.GetAt(i)
if err.Code = "OPERATION"
{
set operationSC = $$$ERROR($$$GeneralError, err.Desc)
} if err.Code = "OPERLOG" {
do operLog.WriteLine(err.Desc)
} else {
do errorList.Insert(err)
}
}
} catch ex {
set operationSC = ex.AsStatus()
}
} else {
set operationSC = modelObj.RunOperation(.errorList, .operLog, ##this)
}
set operObj.Retries = operObj.Retries + 1
if $$$ISOK(operationSC) {
set operObj.Status = "PROCESSED"
set inboxObj.Status = "DONE"
} else {
set operObj.Status = "ERROR"
set inboxObj.Status = "ERROR-OPERATING"

// serialize errors
set operObj.OperErrors = errorList
$$$ThrowOnError(operObj.SerializeOperErrors())
Expand Down
18 changes: 13 additions & 5 deletions src/DataPipe/Oper/BP/OperManagerContext.cls
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ Method DeserializeModel() As %Status
{
set ret = $$$OK
try {
set modelData = ..Oper.Staging.ModelNormData
set modelName = ..Oper.Staging.Ingestion.ModelName
set sc = $classmethod(modelName, "Deserialize", modelData, .obj)
$$$ThrowOnError(sc)
set ..Model = obj
if ..Oper.Staging.Ingestion.ModelIsPython {
// Set python path
do ##class(DataPipe.Helper).SetPythonPath(..Oper.Staging.Ingestion.ModelClassPath)
set ..Model = ##class(DataPipe.Helper).GetPythonInstance(..Oper.Staging.Ingestion.ModelModule, ..Oper.Staging.Ingestion.ModelName)
do ..Model.Deserialize(..Oper.Staging.ModelNormData)

} else {
set modelData = ..Oper.Staging.ModelNormData
set modelName = ..Oper.Staging.Ingestion.ModelName
set sc = $classmethod(modelName, "Deserialize", modelData, .obj)
$$$ThrowOnError(sc)
set ..Model = obj
}

} catch ex {
set ret = ex.AsStatus()
Expand Down
4 changes: 3 additions & 1 deletion src/DataPipe/Staging/BP/StagingManager.cls
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ Property TargetConfigName As %String;

Parameter SETTINGS = "TargetConfigName:Basic:selector?context={Ens.ContextSearch/ProductionItems?targets=1&productionName=@productionId}";

ClassMethod OnGetConnections(ByRef pArray As %String, pItem As Ens.Config.Item)
ClassMethod OnGetConnections(
ByRef pArray As %String,
pItem As Ens.Config.Item)
{
Do pItem.PopulateModifiedSettings()
Set (tValue,tIndex)="" For {
Expand Down
Loading

0 comments on commit 8ae9d4b

Please sign in to comment.