Skip to content

Commit

Permalink
Get importer to work on large amounts of data without killing the dat…
Browse files Browse the repository at this point in the history
…abase
  • Loading branch information
tibetsprague committed Aug 21, 2020
1 parent 6ed2eb2 commit bc315f3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 47 deletions.
7 changes: 6 additions & 1 deletion api/models/post/createPost.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ export default function createPost (userId, params) {
.tap(post => afterCreatingPost(post, merge(
pick(params, 'community_ids', 'imageUrl', 'videoUrl', 'docs', 'topicNames', 'memberIds', 'eventInviteeIds', 'imageUrls', 'fileUrls', 'announcement', 'location', 'location_id'),
{children: params.requests, transacting}
)))))
)))).then(function(inserts) {
return inserts
}).catch(function(error) {
throw error
})
)
}

export function afterCreatingPost (post, opts) {
Expand Down
30 changes: 19 additions & 11 deletions lib/uploader/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,28 @@ export function upload (args) {
if (!filename) filename = url

function setupStreams (data, resolve, reject) {
const fileType = guessFileType(data, filename)
const mimetype = fileType && fileType.mime

didSetup = true

// this is used so we can get the file type from the first chunk of
// data and still use `.pipe` -- you can't pipe a stream after getting
// data from it
passthrough = new PassThrough()

if (type === 'importPosts') {
const csvImporter = createPostImporter(userId, id)
csvImporter.on('finish', () => resolve({ numPostsCreated: csvImporter.numPostsCreated, errors: csvImporter.errors }))
passthrough.pipe(csvImporter)
passthrough = createPostImporter(userId, id)
passthrough.on('end', (e) => {
// This returns to the front-end after the CSV has been read but before posts have been created
const uploaderResult = {
type,
id,
mimetype: "text/csv"
}
return resolve(uploaderResult)
})
} else {
// this is used so we can get the file type from the first chunk of
// data and still use `.pipe` -- you can't pipe a stream after getting
// data from it
passthrough = new PassThrough()

const fileType = guessFileType(data, filename)
const mimetype = fileType && fileType.mime

converter = createConverterStream(type, id, {fileType})
converter.on('error', err => reject(err))

Expand All @@ -57,6 +64,7 @@ export function upload (args) {
return new Promise((resolve, reject) => {
source.on('data', data => {
if (sourceHasError) return

if (!didSetup) {
try {
setupStreams(data, resolve, reject)
Expand Down
95 changes: 60 additions & 35 deletions lib/uploader/postImporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,75 @@ import { PassThrough } from 'stream'
import createPost from '../../api/models/post/createPost'
import { findOrCreateLocation } from '../../api/graphql/mutations/location'

function createObjectFrom(record, userId, communityId) {
return new Promise(async (resolve, reject) => {
let location
try {
const locationData = await geocode(record.location)
location = await findOrCreateLocation(locationData)
} catch (e) {
sails.log.error("Error finding post location: " + e)
reject(e)
return
}

const postParams = {
community_ids: [communityId],
description: record.description || '',
endTime: record.end_date ? new Date(record.end_date) : null,
location: record.location,
imageUrls: record.image_urls ? record.image_urls.split(/,?\s+/) : [],
isPublic: record.is_public ? ['true', 'yes'].includes(record.is_public.toLowerCase()) : false,
location_id: location ? location.id : null,
name: record.title || '',
startTime: record.start_date ? new Date(record.start_date) : null,
topicNames: record.topics ? record.topics.split(/,?\s+/) : [],
type: record.type ? record.type.toLowerCase() : 'discussion'
}

try {
const post = await createPost(userId, postParams)
sails.log.info("Finished creating post", postParams)
resolve(post)
} catch(e) {
sails.log.error("Error importing post: " + e.message)
reject(e)
}
})
}

export function createPostImporter (userId, communityId) {
const parser = parse({ columns: header => header.map(column => column.toLowerCase()) })


parser.errors = []
parser.numPostsCreated = 0
const promiseFactories = []

parser.on('readable', async function() {
let record, location
while (record = parser.read()) {
try {
const locationData = await geocode(record.location)
location = await findOrCreateLocation(locationData)
} catch (e) {
sails.log.error("Error finding post location: " + e)
parser.errors.push(e)
}
const postParams = {
community_ids: [communityId],
description: record.description || '',
endTime: record.end_date ? new Date(record.end_date) : null,
location: record.location,
imageUrls: record.image_urls ? record.image_urls.split(/,?\s+/) : [],
isPublic: record.is_public ? ['true', 'yes'].includes(record.is_public.toLowerCase()) : false,
location_id: location ? location.id : null,
name: record.title || '',
startTime: record.start_date ? new Date(record.start_date) : null,
topicNames: record.topics ? record.topics.split(/,?\s+/) : [],
type: record.type ? record.type.toLowerCase() : 'discussion'
}
try {
await createPost(userId, postParams)
parser.numPostsCreated = parser.numPostsCreated + 1
} catch(e) {
sails.log.error("Error importing post: " + e.message)
parser.errors.push(e.message)
}
parser.on('readable', () => {
let record = parser.read();

if (record === null) {
return
}
const promiseFactory = () => createObjectFrom(record, userId, communityId);
promiseFactories.push( promiseFactory );
})

// Catch any error
parser.on('error', function(err){
sails.log.error("Error importing CSV: " + err.message)
parser.errors.push(err.message)
parser.on('error', (err) => { sails.log.error("Weird parser error, check out " + err)})

parser.on('end', () => {
var sequence = Promise.resolve();

// Loop over each promise factory and add on a promise to the end of the 'sequence' promise.
promiseFactories.forEach(promiseFactory => {
sequence = sequence
.then(promiseFactory)
.then(result => { parser.numPostsCreated = parser.numPostsCreated + 1 })
.catch(error => { parser.errors.push(error.message ? error.message : error) })
})

// This will resolve after the entire chain is resolved
sequence.then(() => { sails.log.info("Succesfully imported " + parser.numPostsCreated + " posts.\n Errors: " + parser.errors.join("\n"))})
})

return parser
Expand Down

0 comments on commit bc315f3

Please sign in to comment.