Skip to content

Commit

Permalink
📡 Add basic innoPatients importer, bump
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzak committed Dec 13, 2020
1 parent 21a48f9 commit 9ee1b39
Show file tree
Hide file tree
Showing 14 changed files with 321 additions and 8,318 deletions.
6 changes: 3 additions & 3 deletions app/electron/main/dbfToJSON.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const dbfToJSON = async ({ path, encoding }) => {
const dbf = new Parser(path, { encoding: encoding || 'latin1' })
let records = []

dbf.on('header', (header) => {
logger.info(`[dbfToJSON] Opened ${path} with header ${JSON.stringify(header)}`)
dbf.on('header', ({ fields, ...header}) => {
logger.info(`[dbfToJSON] Opened ${path} with header ${JSON.stringify(header)} and ${fields.length} fields`)
})

dbf.on('record', (record) => {
Expand All @@ -18,7 +18,7 @@ const dbfToJSON = async ({ path, encoding }) => {

dbf.on('end', () => {
const recordsJSON = JSON.stringify(records)
logger.info(`[dbfToJSON] Finished parsing ${records.length} records in ${(new Date() - startAt) / 1000} seconds`)
logger.info(`[dbfToJSON] Finished parsing ${records.length} records into ${recordsJSON.length} JSON bytes in ${(new Date() - startAt) / 1000} seconds`)
resolve(recordsJSON)
})

Expand Down
174 changes: 108 additions & 66 deletions app/electron/main/watch.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
const fs = require('fs')
const path = require('path')
const { ipcMain } = require('electron')
const iconv = require('iconv-lite')
const chokidar = require('chokidar')
const temp = require('temp')
const { dbfToJSON } = require('./dbfToJSON')
const { getSettings, onNewSettings } = require('./settings')
const logger = require('./logger')


// This queue keeps thunks of actions that start/stop all watchers when settings change
let actionQueue = []

const performNextAction = async () => {
if (actionQueue[0]) {
await actionQueue[0]()
actionQueue = actionQueue.slice(1)
}

setTimeout(performNextAction, 2000)
}

performNextAction()

let watchers = []

// Only honor delete requests for files that were actually transmitted, for security
Expand All @@ -27,8 +44,9 @@ const onAdd = async ({ ipcReceiver, watch, path, importer, remove, focus }) => {

switch (importer) {
case 'innoPatients':
const patientsJSON = dbfToJSON({ path })
ipcReceiver.send('dataTransfer', { path, watch, content: patientsJSON, importer, remove, focus })
const patientsJSON = await dbfToJSON({ path })
logger.info(`Attmepting to send ${(patientsJSON.length / 1024 / 1024).toFixed(2)} MiB over IPC`)
return ipcReceiver.send('dataTransfer', { path, watch, content: patientsJSON, importer, remove, focus })
default:
fs.readFile(path, (err, buffer) => {
if (err) { return logger.error('[Watch] Error reading file to buffer', err) }
Expand All @@ -48,98 +66,111 @@ const onAdd = async ({ ipcReceiver, watch, path, importer, remove, focus }) => {
}

const start = ({ ipcReceiver, handleFocus }) => {
onNewSettings(() => {
stop()
onNewSettings(actionQueue.push(async () => {
await stop()

setTimeout(() => {
startWatchers({ ipcReceiver, handleFocus })
actionQueue.push(() =>
startWatchers({ ipcReceiver, handleFocus })
)
}, 5000)
})
}))

startWatchers({ ipcReceiver, handleFocus })
actionQueue.push(async () => {
await startWatchers({ ipcReceiver, handleFocus })
// Only bind remove thing once
bindRemoveAfterIngest()
})
}

const startWatchers = async ({ ipcReceiver, handleFocus }) => {
const settings = getSettings()
if (settings.watch) {
logger.info('[Watch] Watching paths', settings.watch)

watchers = await Promise.all(settings.watch.map((watch) => {
if (!watch.enabled) { return }
await Promise.all(settings.watch.map(async (watch) => {
if (!watch.enabled) {
logger.info(`[watch] Skip setting up watcher because it is not enabled: ${JSON.stringify(watch)}`)
return
}

let { importer, remove } = watch

if (isCritical(importer)) {
remove = false // just to make sure, the temp file is deleted anyways
}

return new Promise((resolve, reject) => {
fs.mkdir(watch.path, { recursive: true }, (err) => {
if (err) {
reject(err)
return logger.error(`[Watch] Failed to create direcotry to watch ${watch.path} for importer ${importer}`)
}

let watcher = chokidar.watch(watch.path, {
persistent: true,
ignored: /[/\\]\./,
depth: 0,
usePolling: true,
disableGlobbing: true,
interval: 50,
binaryInterval: 50,
awaitWriteFinish: {
stabilityThreshold: 101,
pollInterval: 30
}
})

watcher.on('add', (path) => onAdd({ ipcReceiver, watch, path, importer, remove }))
watcher.on('change', (path) => onAdd({ ipcReceiver, watch, path, importer, remove }))

resolve(watcher)
})
})
}))

// Optionally remove files after successful ingestion
const { ipcMain } = require('electron')
ipcMain.on('dataTransferSuccess', (e, { remove, path, focus }) => {
if (pendingTransferPaths.indexOf(path) === -1) {
logger.error('[Watch] Received success event for file that was not transferred in the current session, discarding event')
return
if (!watch.singleFile) {
await ensureDirectoryExists(watch.path)
}

// Always remove temp files, especially temp copies of critical sources
if (path.indexOf('rosalind') !== -1 && path.indexOf('.tmp') !== -1) {
logger.info('[Watch] Data transfer success, forcing removal of temp file', { path, remove })
remove = true
} else {
logger.info('[Watch] Data transfer success', { path, remove })
}
let watcher = chokidar.watch(watch.path, {
persistent: true,
ignored: /(^|[\/\\])\../, // ignore dotfiles
depth: 1,
usePolling: true,
disableGlobbing: true,
interval: 100,
binaryInterval: 300,
awaitWriteFinish: {
stabilityThreshold: 300,
pollInterval: 50
}
})

if (remove) {
logger.info('[Watch] Removing file', path)
fs.unlink(path, (err) => {
if (err) {
logger.error(`[Watch] Failed to remove file ${err.message} ${err.stack}`)
}
})
}
watcher.on('add', (path) => onAdd({ ipcReceiver, watch, path, importer, remove }))
watcher.on('change', (path) => onAdd({ ipcReceiver, watch, path, importer, remove }))

if (focus) {
handleFocus()
}
})
watchers.push(watcher)
}))
}
}

const stop = () => {
const stop = async () => {
logger.info('[Watch] Stop')

watchers.forEach((watcher) => {
if (watcher) {
watcher.close()
await Promise.all(watchers.map(async w => {
if (w) {
await Promise.all(Object.keys(w.getWatched()).map(async path => {
await w.unwatch(path)
}))
await w.close()
watchers = watchers.filter(wx => wx != w)
}
}))
}

// Optionally remove files after successful ingestion
const bindRemoveAfterIngest = () => {
ipcMain.on('dataTransferSuccess', (e, { remove, path, focus }) => {
if (pendingTransferPaths.indexOf(path) === -1) {
logger.error('[Watch] Received success event for file that was not transferred in the current session, discarding event')
return
}

// Always remove temp files, especially temp copies of critical sources
if (path.indexOf('rosalind') !== -1 && path.indexOf('.tmp') !== -1) {
logger.info('[Watch] Data transfer success, forcing removal of temp file', { path, remove })
remove = true
} else {
logger.info('[Watch] Data transfer success', { path, remove })
}

if (remove) {
logger.info('[Watch] Removing file', path)
fs.unlink(path, (err) => {
if (err) {
logger.error(`[Watch] Failed to remove file ${err.message} ${err.stack}`)
}

// Remove deleted path from queue
pendingTransferPaths = pendingTransferPaths.filter(p => p !== path)
})
}

if (focus) {
handleFocus()
}
})
}
Expand All @@ -156,4 +187,15 @@ const copyToTemp = (originalPath) => new Promise((resolve, reject) => {
})
})

const ensureDirectoryExists = watchPath => new Promise((resolve, reject) => {
fs.mkdir(watchPath, { recursive: true }, (err) => {
if (err) {
reject(err)
return logger.error(`[Watch] Failed to create direcotry to watch ${watchPath}`)
}

resolve(watchPath)
})
})

module.exports = { start, stop }
2 changes: 1 addition & 1 deletion app/electron/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "rosalind",
"productName": "Rosalind",
"version": "0.0.252",
"version": "0.0.253",
"private": true,
"description": "Rosalind",
"author": {
Expand Down
12 changes: 7 additions & 5 deletions app/electron/renderer/preload.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ const logger = {
})
},
info: message => {
console.log(message)
ipcRenderer.send('log', {
level: 'info',
message: message
})
if (DEBUG) {
console.log(message)
ipcRenderer.send('log', {
level: 'info',
message: message
})
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion app/meteor/imports/api/importers/allowedImporters.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ export const allowedImporters = [
'eoswinJournalReports',
'xdt',
'genericJson',
'mediaDocument'
'mediaDocument',
'innoPatients'
]

export const isAllowedImporter = (slug) => {
Expand Down
Loading

0 comments on commit 9ee1b39

Please sign in to comment.