Skip to content

Commit

Permalink
Merge branch 'staging'
Browse files Browse the repository at this point in the history
  • Loading branch information
rassokhin-s committed Jul 26, 2023
2 parents b4f92b9 + bd2004b commit 4caeeeb
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 19 deletions.
4 changes: 3 additions & 1 deletion core/_services/arbimon/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ function matchSegmentToRecording (sfParams, segment) {
datetime: moment.utc(segment.start).format('YYYY-MM-DD HH:mm:ss.SSS'),
duration: (segment.end - segment.start) / 1000,
samples: segment.sample_count,
file_size: segment.file_size,
file_size: 0,
bit_rate: sfParams.bit_rate,
sample_rate: sfParams.sample_rate,
sample_encoding: sfParams.audio_codec,
Expand Down Expand Up @@ -199,7 +199,9 @@ module.exports = {
createSite,
updateSite,
deleteSite,
matchSegmentToRecording,
createRecordingsFromSegments,
deleteRecordingsFromSegments,
createRecordings,
createUser
}
48 changes: 36 additions & 12 deletions core/internal/ingest/post.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,43 @@ module.exports = function (req, res) {
const fileExtensions = [...new Set(transformedArray.map(segment => segment.file_extension))]
const fileExtensionObjects = await Promise.all(fileExtensions.map(ext => fileFormatDao.findOrCreate({ value: ext }, { transaction })))

const segments = []
for (const s of transformedArray) {
const fileExtensionId = fileExtensionObjects.find(obj => obj.value === s.file_extension).id
const where = { stream_id: streamId, start: s.start }
const defaults = { ...s, stream_source_file_id: streamSourceFile.id, file_extension_id: fileExtensionId }
const { segment, created } = await streamSegmentDao.findOrCreate(where, defaults, { transaction })
if (!created) {
await streamSegmentDao.update(streamId, s.start, { availability: 1 }, { transaction })
}
segments.push({ ...s, ...segment.toJSON(), created })
const existingSegments = (await streamSegmentDao.findByStreamAndStarts(streamId, transformedArray.map(s => s.start), {
transaction,
fields: ['id', 'stream_id', 'start', 'sample_count']
})).map(s => s.toJSON())
if (existingSegments.length) {
await streamSegmentDao.updateByStreamAndStarts(streamId, existingSegments, { availability: 1 }, { transaction })
}

const createdSegments = segments.filter(s => s.created)
const dataToCreate = transformedArray
.filter((s) => { return !existingSegments.map(e => e.start).includes(s.start) })
.map((s) => {
const fileExtensionId = fileExtensionObjects.find(obj => obj.value === s.file_extension).id
return {
...s,
stream_id: streamId,
stream_source_file_id: streamSourceFile.id,
file_extension_id: fileExtensionId
}
})
let createdSegments = []
if (dataToCreate.length) {
createdSegments = (await streamSegmentDao.bulkCreate(dataToCreate, {
transaction,
returning: ['id', 'stream_id', 'start', 'sample_count']
})).map(s => {
const fileExtension = fileExtensionObjects.find(e => e.id === s.file_extension_id)
return {
...s.toJSON(),
file_extension: fileExtension.value
}
})
}
const segments = [
...existingSegments,
...createdSegments
].sort((a, b) => {
return a < b
})

if (arbimonService.isEnabled && createdSegments.length) {
await arbimonService.createRecordingsFromSegments(sfParams, createdSegments, { transaction })
Expand Down
46 changes: 45 additions & 1 deletion core/stream-segments/dao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,47 @@ function findOrCreate (where, defaults, options = {}) {
})
}

/**
* Find all segments belonging to a stream within specified start array
* @param {string} streamId Stream id
* @param {string[]} starts Stream segment attributes to use for creation
* @param {*} options
* @param {Transaction} options.transaction Perform within given transaction
*/
function findByStreamAndStarts (streamId, starts, options = {}) {
const transaction = options.transaction
const where = {
stream_id: streamId,
start: {
[Sequelize.Op.in]: starts
}
}
const attributes = options.fields && options.fields.length > 0 ? StreamSegment.attributes.full.filter(a => options.fields.includes(a)) : StreamSegment.attributes.lite
return StreamSegment.findAll({ where, attributes, transaction })
.catch((e) => {
console.error('Stream segment service -> findByStreamAndStarts -> error', e)
throw e
})
}

/**
* Update all segments belonging to a stream within specified start array
* @param {string} streamId Stream id
* @param {string[]} starts Stream segment attributes to use for creation
* @param {*} options
* @param {Transaction} options.transaction Perform within given transaction
*/
function updateByStreamAndStarts (streamId, starts, data, options = {}) {
const transaction = options.transaction
const where = {
stream_id: streamId,
start: {
[Sequelize.Op.in]: starts
}
}
return StreamSegment.update(data, { where, transaction })
}

/**
* Bulk create stream segment
* @param {*} segments Array of stream segment attributes
Expand All @@ -193,7 +234,8 @@ function findOrCreate (where, defaults, options = {}) {
*/
function bulkCreate (segments, options = {}) {
const transaction = options.transaction
return StreamSegment.bulkCreate(segments, { transaction })
const returning = options.returning
return StreamSegment.bulkCreate(segments, { transaction, returning })
.catch((e) => {
console.error('Stream segment service -> bulkCreate -> error', e)
throw new ValidationError('Cannot bulkCreate stream segment with provided data')
Expand Down Expand Up @@ -318,6 +360,8 @@ module.exports = {
bulkCreate,
create,
findOrCreate,
findByStreamAndStarts,
updateByStreamAndStarts,
update,
notify,
getStreamCoverage,
Expand Down
8 changes: 7 additions & 1 deletion core/streams/dao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ async function get (idOrWhere, options = {}) {
const include = options.fields && options.fields.length > 0 ? availableIncludes.filter(i => options.fields.includes(i.as)) : availableIncludes
const transaction = options.transaction || null

const stream = await Stream.findOne({ where, attributes, include, paranoid: false, transaction })
const stream = await Stream.findOne({
where,
attributes,
include,
paranoid: options.onlyDeleted !== true,
transaction
})

if (!stream) {
throw new EmptyResultError('Stream not found')
Expand Down
21 changes: 17 additions & 4 deletions core/streams/get.int.test.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
const routes = require('.')
const models = require('../_models')
const { expressApp, seedValues, truncateNonBase } = require('../../common/testing/sequelize')
const { expressApp, seedValues, truncateNonBase, muteConsole } = require('../../common/testing/sequelize')
const request = require('supertest')

const app = expressApp()

app.use('/', routes)

beforeAll(() => {
muteConsole('warn')
})
afterEach(async () => {
await truncateNonBase(models)
})
afterAll(async () => {
await truncateNonBase(models)
await models.sequelize.close()
})

describe('GET /streams/:id', () => {
test('not found', async () => {
console.warn = jest.fn()

const response = await request(app).get('/1234')

expect(response.statusCode).toBe(404)
expect(console.warn).toHaveBeenCalled()
})

test('deleted stream not found', async () => {
const stream = { id: 'jagu1', createdById: seedValues.primaryUserId, name: 'Jaguar Station', latitude: 10.1, longitude: 101.1, altitude: 200, deletedAt: '2021-01-01T00:00:00.000Z' }
await models.Stream.create(stream)

const response = await request(app).get(`/${stream.id}`)

expect(response.statusCode).toBe(404)
expect(response.body.message).toBe('Stream not found')
})

test('readable by creator', async () => {
Expand Down

0 comments on commit 4caeeeb

Please sign in to comment.