diff --git a/api/models/post/createPost.js b/api/models/post/createPost.js index 9bfc35713..75de7985e 100644 --- a/api/models/post/createPost.js +++ b/api/models/post/createPost.js @@ -11,7 +11,12 @@ export default function createPost (userId, params) { .tap(post => afterCreatingPost(post, merge( pick(params, 'community_ids', 'imageUrl', 'videoUrl', 'docs', 'topicNames', 'memberIds', 'eventInviteeIds', 'imageUrls', 'fileUrls', 'announcement', 'location', 'location_id'), {children: params.requests, transacting} - ))))) + )))).then(function(inserts) { + return inserts + }).catch(function(error) { + throw error + }) + ) } export function afterCreatingPost (post, opts) { diff --git a/lib/uploader/index.js b/lib/uploader/index.js index e97ccc4f1..366825683 100644 --- a/lib/uploader/index.js +++ b/lib/uploader/index.js @@ -18,21 +18,28 @@ export function upload (args) { if (!filename) filename = url function setupStreams (data, resolve, reject) { - const fileType = guessFileType(data, filename) - const mimetype = fileType && fileType.mime - didSetup = true - // this is used so we can get the file type from the first chunk of - // data and still use `.pipe` -- you can't pipe a stream after getting - // data from it - passthrough = new PassThrough() - if (type === 'importPosts') { - const csvImporter = createPostImporter(userId, id) - csvImporter.on('finish', () => resolve({ numPostsCreated: csvImporter.numPostsCreated, errors: csvImporter.errors })) - passthrough.pipe(csvImporter) + passthrough = createPostImporter(userId, id) + passthrough.on('end', (e) => { + // This returns to the front-end after the CSV has been read but before posts have been created + const uploaderResult = { + type, + id, + mimetype: "text/csv" + } + return resolve(uploaderResult) + }) } else { + // this is used so we can get the file type from the first chunk of + // data and still use `.pipe` -- you can't pipe a stream after getting + // data from it + passthrough = new PassThrough() + + const fileType = guessFileType(data, filename) + const mimetype = fileType && fileType.mime + converter = createConverterStream(type, id, {fileType}) converter.on('error', err => reject(err)) @@ -57,6 +64,7 @@ export function upload (args) { return new Promise((resolve, reject) => { source.on('data', data => { if (sourceHasError) return + if (!didSetup) { try { setupStreams(data, resolve, reject) diff --git a/lib/uploader/postImporter.js b/lib/uploader/postImporter.js index 5787c4119..238b6f21e 100644 --- a/lib/uploader/postImporter.js +++ b/lib/uploader/postImporter.js @@ -4,50 +4,75 @@ import { PassThrough } from 'stream' import createPost from '../../api/models/post/createPost' import { findOrCreateLocation } from '../../api/graphql/mutations/location' +function createObjectFrom(record, userId, communityId) { + return new Promise(async (resolve, reject) => { + let location + try { + const locationData = await geocode(record.location) + location = await findOrCreateLocation(locationData) + } catch (e) { + sails.log.error("Error finding post location: " + e) + reject(e) + return + } + + const postParams = { + community_ids: [communityId], + description: record.description || '', + endTime: record.end_date ? new Date(record.end_date) : null, + location: record.location, + imageUrls: record.image_urls ? record.image_urls.split(/,?\s+/) : [], + isPublic: record.is_public ? ['true', 'yes'].includes(record.is_public.toLowerCase()) : false, + location_id: location ? location.id : null, + name: record.title || '', + startTime: record.start_date ? new Date(record.start_date) : null, + topicNames: record.topics ? record.topics.split(/,?\s+/) : [], + type: record.type ? record.type.toLowerCase() : 'discussion' + } + + try { + const post = await createPost(userId, postParams) + sails.log.info("Finished creating post", postParams) + resolve(post) + } catch(e) { + sails.log.error("Error importing post: " + e.message) + reject(e) + } + }) +} + export function createPostImporter (userId, communityId) { const parser = parse({ columns: header => header.map(column => column.toLowerCase()) }) - parser.errors = [] parser.numPostsCreated = 0 + const promiseFactories = [] - parser.on('readable', async function() { - let record, location - while (record = parser.read()) { - try { - const locationData = await geocode(record.location) - location = await findOrCreateLocation(locationData) - } catch (e) { - sails.log.error("Error finding post location: " + e) - parser.errors.push(e) - } - const postParams = { - community_ids: [communityId], - description: record.description || '', - endTime: record.end_date ? new Date(record.end_date) : null, - location: record.location, - imageUrls: record.image_urls ? record.image_urls.split(/,?\s+/) : [], - isPublic: record.is_public ? ['true', 'yes'].includes(record.is_public.toLowerCase()) : false, - location_id: location ? location.id : null, - name: record.title || '', - startTime: record.start_date ? new Date(record.start_date) : null, - topicNames: record.topics ? record.topics.split(/,?\s+/) : [], - type: record.type ? record.type.toLowerCase() : 'discussion' - } - try { - await createPost(userId, postParams) - parser.numPostsCreated = parser.numPostsCreated + 1 - } catch(e) { - sails.log.error("Error importing post: " + e.message) - parser.errors.push(e.message) - } + parser.on('readable', () => { + let record = parser.read(); + + if (record === null) { + return } + const promiseFactory = () => createObjectFrom(record, userId, communityId); + promiseFactories.push( promiseFactory ); }) - // Catch any error - parser.on('error', function(err){ - sails.log.error("Error importing CSV: " + err.message) - parser.errors.push(err.message) + parser.on('error', (err) => { sails.log.error("Weird parser error, check out " + err)}) + + parser.on('end', () => { + var sequence = Promise.resolve(); + + // Loop over each promise factory and add on a promise to the end of the 'sequence' promise. + promiseFactories.forEach(promiseFactory => { + sequence = sequence + .then(promiseFactory) + .then(result => { parser.numPostsCreated = parser.numPostsCreated + 1 }) + .catch(error => { parser.errors.push(error.message ? error.message : error) }) + }) + + // This will resolve after the entire chain is resolved + sequence.then(() => { sails.log.info("Succesfully imported " + parser.numPostsCreated + " posts.\n Errors: " + parser.errors.join("\n"))}) }) return parser