diff --git a/core/_services/arbimon/index.js b/core/_services/arbimon/index.js index 757fe7f8f..88560daba 100644 --- a/core/_services/arbimon/index.js +++ b/core/_services/arbimon/index.js @@ -101,7 +101,7 @@ function matchSegmentToRecording (sfParams, segment) { datetime: moment.utc(segment.start).format('YYYY-MM-DD HH:mm:ss.SSS'), duration: (segment.end - segment.start) / 1000, samples: segment.sample_count, - file_size: segment.file_size, + file_size: 0, bit_rate: sfParams.bit_rate, sample_rate: sfParams.sample_rate, sample_encoding: sfParams.audio_codec, @@ -199,7 +199,9 @@ module.exports = { createSite, updateSite, deleteSite, + matchSegmentToRecording, createRecordingsFromSegments, deleteRecordingsFromSegments, + createRecordings, createUser } diff --git a/core/internal/ingest/post.js b/core/internal/ingest/post.js index 9219b92f7..014da1678 100644 --- a/core/internal/ingest/post.js +++ b/core/internal/ingest/post.js @@ -85,19 +85,43 @@ module.exports = function (req, res) { const fileExtensions = [...new Set(transformedArray.map(segment => segment.file_extension))] const fileExtensionObjects = await Promise.all(fileExtensions.map(ext => fileFormatDao.findOrCreate({ value: ext }, { transaction }))) - const segments = [] - for (const s of transformedArray) { - const fileExtensionId = fileExtensionObjects.find(obj => obj.value === s.file_extension).id - const where = { stream_id: streamId, start: s.start } - const defaults = { ...s, stream_source_file_id: streamSourceFile.id, file_extension_id: fileExtensionId } - const { segment, created } = await streamSegmentDao.findOrCreate(where, defaults, { transaction }) - if (!created) { - await streamSegmentDao.update(streamId, s.start, { availability: 1 }, { transaction }) - } - segments.push({ ...s, ...segment.toJSON(), created }) + const existingSegments = (await streamSegmentDao.findByStreamAndStarts(streamId, transformedArray.map(s => s.start), { + transaction, + fields: ['id', 'stream_id', 'start', 'sample_count'] + })).map(s => s.toJSON()) + if (existingSegments.length) { + await streamSegmentDao.updateByStreamAndStarts(streamId, existingSegments, { availability: 1 }, { transaction }) } - - const createdSegments = segments.filter(s => s.created) + const dataToCreate = transformedArray + .filter((s) => { return !existingSegments.map(e => e.start).includes(s.start) }) + .map((s) => { + const fileExtensionId = fileExtensionObjects.find(obj => obj.value === s.file_extension).id + return { + ...s, + stream_id: streamId, + stream_source_file_id: streamSourceFile.id, + file_extension_id: fileExtensionId + } + }) + let createdSegments = [] + if (dataToCreate.length) { + createdSegments = (await streamSegmentDao.bulkCreate(dataToCreate, { + transaction, + returning: ['id', 'stream_id', 'start', 'sample_count'] + })).map(s => { + const fileExtension = fileExtensionObjects.find(e => e.id === s.file_extension_id) + return { + ...s.toJSON(), + file_extension: fileExtension.value + } + }) + } + const segments = [ + ...existingSegments, + ...createdSegments + ].sort((a, b) => { + return a < b + }) if (arbimonService.isEnabled && createdSegments.length) { await arbimonService.createRecordingsFromSegments(sfParams, createdSegments, { transaction }) diff --git a/core/stream-segments/dao/index.js b/core/stream-segments/dao/index.js index d827df42d..3091cb23f 100644 --- a/core/stream-segments/dao/index.js +++ b/core/stream-segments/dao/index.js @@ -185,6 +185,47 @@ function findOrCreate (where, defaults, options = {}) { }) } +/** + * Find all segments belonging to a stream within specified start array + * @param {string} streamId Stream id + * @param {string[]} starts Stream segment attributes to use for creation + * @param {*} options + * @param {Transaction} options.transaction Perform within given transaction + */ +function findByStreamAndStarts (streamId, starts, options = {}) { + const transaction = options.transaction + const where = { + stream_id: streamId, + start: { + [Sequelize.Op.in]: starts + } + } + const attributes = options.fields && options.fields.length > 0 ? StreamSegment.attributes.full.filter(a => options.fields.includes(a)) : StreamSegment.attributes.lite + return StreamSegment.findAll({ where, attributes, transaction }) + .catch((e) => { + console.error('Stream segment service -> findByStreamAndStarts -> error', e) + throw e + }) +} + +/** + * Update all segments belonging to a stream within specified start array + * @param {string} streamId Stream id + * @param {string[]} starts Stream segment attributes to use for creation + * @param {*} options + * @param {Transaction} options.transaction Perform within given transaction + */ +function updateByStreamAndStarts (streamId, starts, data, options = {}) { + const transaction = options.transaction + const where = { + stream_id: streamId, + start: { + [Sequelize.Op.in]: starts + } + } + return StreamSegment.update(data, { where, transaction }) +} + /** * Bulk create stream segment * @param {*} segments Array of stream segment attributes @@ -193,7 +234,8 @@ function findOrCreate (where, defaults, options = {}) { */ function bulkCreate (segments, options = {}) { const transaction = options.transaction - return StreamSegment.bulkCreate(segments, { transaction }) + const returning = options.returning + return StreamSegment.bulkCreate(segments, { transaction, returning }) .catch((e) => { console.error('Stream segment service -> bulkCreate -> error', e) throw new ValidationError('Cannot bulkCreate stream segment with provided data') @@ -318,6 +360,8 @@ module.exports = { bulkCreate, create, findOrCreate, + findByStreamAndStarts, + updateByStreamAndStarts, update, notify, getStreamCoverage, diff --git a/core/streams/dao/index.js b/core/streams/dao/index.js index de3bedac3..7bae15c26 100644 --- a/core/streams/dao/index.js +++ b/core/streams/dao/index.js @@ -47,7 +47,13 @@ async function get (idOrWhere, options = {}) { const include = options.fields && options.fields.length > 0 ? availableIncludes.filter(i => options.fields.includes(i.as)) : availableIncludes const transaction = options.transaction || null - const stream = await Stream.findOne({ where, attributes, include, paranoid: false, transaction }) + const stream = await Stream.findOne({ + where, + attributes, + include, + paranoid: options.onlyDeleted !== true, + transaction + }) if (!stream) { throw new EmptyResultError('Stream not found') diff --git a/core/streams/get.int.test.js b/core/streams/get.int.test.js index 4cc0befb8..a046c9080 100644 --- a/core/streams/get.int.test.js +++ b/core/streams/get.int.test.js @@ -1,12 +1,18 @@ const routes = require('.') const models = require('../_models') -const { expressApp, seedValues, truncateNonBase } = require('../../common/testing/sequelize') +const { expressApp, seedValues, truncateNonBase, muteConsole } = require('../../common/testing/sequelize') const request = require('supertest') const app = expressApp() app.use('/', routes) +beforeAll(() => { + muteConsole('warn') +}) +afterEach(async () => { + await truncateNonBase(models) +}) afterAll(async () => { await truncateNonBase(models) await models.sequelize.close() @@ -14,12 +20,19 @@ afterAll(async () => { describe('GET /streams/:id', () => { test('not found', async () => { - console.warn = jest.fn() - const response = await request(app).get('/1234') expect(response.statusCode).toBe(404) - expect(console.warn).toHaveBeenCalled() + }) + + test('deleted stream not found', async () => { + const stream = { id: 'jagu1', createdById: seedValues.primaryUserId, name: 'Jaguar Station', latitude: 10.1, longitude: 101.1, altitude: 200, deletedAt: '2021-01-01T00:00:00.000Z' } + await models.Stream.create(stream) + + const response = await request(app).get(`/${stream.id}`) + + expect(response.statusCode).toBe(404) + expect(response.body.message).toBe('Stream not found') }) test('readable by creator', async () => {