Skip to content

Commit

Permalink
fix transaction usage
Browse files Browse the repository at this point in the history
  • Loading branch information
rassokhin-s committed Aug 31, 2023
1 parent 58ac00f commit 28df24b
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 34 deletions.
6 changes: 4 additions & 2 deletions common/users/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ function findOrCreateUser (defaults) {
})
}

function getUserByParams (params, ignoreMissing) {
function getUserByParams (params, ignoreMissing, options = {}) {
const transaction = options.transaction
return User
.findOne({
where: params,
include: [{ all: true }]
include: [{ all: true }],
transaction
})
.then((user) => {
if (!user && !ignoreMissing) { throw new EmptyResultError('User not found') }
Expand Down
22 changes: 11 additions & 11 deletions core/classifier-jobs/bl/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ const ALLOWED_SOURCE_STATUSES = [CANCELLED, WAITING, ERROR]
* @throws ValidationError when the project does not exist
*/
async function create (data, options = {}) {
if (options.creatableBy && !(await hasPermission(CREATE, options.creatableBy, data.projectId, PROJECT))) {
throw new ForbiddenError()
}
const namesOrIds = data.queryStreams ? data.queryStreams.split(',') : undefined
const streamIds = (await streamsDao.query({ projects: [data.projectId], namesOrIds }, { fields: ['id'] })).results.map(r => r.id)
if (!streamIds.length) {
throw new EmptyResultError('No streams found for the query')
}
if (namesOrIds && (streamIds.length < namesOrIds.length)) {
throw new EmptyResultError('Some streams not found for the query')
}
return await sequelize.transaction(async (transaction) => {
if (options.creatableBy && !(await hasPermission(CREATE, options.creatableBy, data.projectId, PROJECT, { transaction }))) {
throw new ForbiddenError()
}
const namesOrIds = data.queryStreams ? data.queryStreams.split(',') : undefined
const streamIds = (await streamsDao.query({ projects: [data.projectId], namesOrIds }, { fields: ['id'], transaction })).results.map(r => r.id)
if (!streamIds.length) {
throw new EmptyResultError('No streams found for the query')
}
if (namesOrIds && (streamIds.length < namesOrIds.length)) {
throw new EmptyResultError('Some streams not found for the query')
}
options.transaction = transaction
const job = await dao.create(data, options)
await dao.createJobStreams(job.id, streamIds, { transaction })
Expand Down
6 changes: 2 additions & 4 deletions core/classifier-jobs/dao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ async function query (filters, options = {}) {
* @throws ValidationError when the project does not exist
*/
async function create (job, options = {}) {
if (options.creatableBy && !(await hasPermission(CREATE, options.creatableBy, job.projectId, PROJECT).catch(() => { throw new ValidationError('project does not exist') }))) {
throw new ForbiddenError()
}
return await ClassifierJob.create(job)
const transaction = options.transaction
return await ClassifierJob.create(job, { transaction })
.catch((e) => {
console.error('error', e)
throw new ValidationError('Cannot create classifier job with provided data')
Expand Down
10 changes: 5 additions & 5 deletions core/classifiers/dao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,24 @@ function create (attrs) {
createdById: attrs.createdById,
parameters: attrs.parameters
}
return models.sequelize.transaction(async (t) => {
return models.sequelize.transaction(async (transaction) => {
// Create the classifier
const classifier = await models.Classifier.create(classifierData, { transaction: t })
const classifier = await models.Classifier.create(classifierData, { transaction })

// Create the outputs
const outputsData = attrs.outputs.map(output => ({
classifierId: classifier.id,
classificationId: output.id,
outputClassName: output.className
}))
await Promise.all(outputsData.map(output => models.ClassifierOutput.create(output, { transaction: t })))
await Promise.all(outputsData.map(output => models.ClassifierOutput.create(output, { transaction })))

// Create the active projects and streams
if (attrs.activeProjects) {
await updateActiveProjects({ id: classifier.id, activeProjects: attrs.activeProjects }, t)
await updateActiveProjects({ id: classifier.id, activeProjects: attrs.activeProjects }, { transaction })
}
if (attrs.activeStreams) {
await updateActiveStreams({ id: classifier.id, activeStreams: attrs.activeStreams }, t)
await updateActiveStreams({ id: classifier.id, activeStreams: attrs.activeStreams }, { transaction })
}

return classifier
Expand Down
27 changes: 16 additions & 11 deletions core/roles/dao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ function getByName (name, opts = {}) {
* @param {string} itemOrId item id or item
* @param {string} itemName `STREAM` or `PROJECT` or `ORGANIZATION`
*/
async function hasPermission (type, userId, itemOrId, itemName) {
const permissions = await getPermissions(userId, itemOrId, itemName)
async function hasPermission (type, userId, itemOrId, itemName, options = {}) {
const permissions = await getPermissions(userId, itemOrId, itemName, options)
return permissions.includes(type)
}

Expand All @@ -93,7 +93,8 @@ async function hasPermission (type, userId, itemOrId, itemName) {
* @param {string | object} itemOrId item id or item
* @param {string} itemName `STREAM` or `PROJECT` or `ORGANIZATION`
*/
async function getPermissions (userOrId, itemOrId, itemName) {
async function getPermissions (userOrId, itemOrId, itemName, options = {}) {
const transaction = options.transaction
const isId = typeof itemOrId === 'string'
const userIsPrimitive = ['string', 'number'].includes(typeof userOrId)
const userId = userIsPrimitive ? userOrId : userOrId.id
Expand All @@ -106,13 +107,13 @@ async function getPermissions (userOrId, itemOrId, itemName) {
let item = itemOrId
if (isId) {
try {
item = (await hierarchy[itemName].model.findOne({ where: { id: itemOrId }, paranoid: false })).toJSON()
item = (await hierarchy[itemName].model.findOne({ where: { id: itemOrId }, paranoid: false, transaction })).toJSON()
} catch (e) {
throw new EmptyResultError(`${itemName} with given id doesn't exist.`)
}
}
const originalItem = { ...item }
const user = await (userIsPrimitive ? usersService.getUserByParams({ id: userId }) : Promise.resolve(userOrId))
const user = await (userIsPrimitive ? usersService.getUserByParams({ id: userId }, false, { transaction }) : Promise.resolve(userOrId))
if (user.is_super || item.created_by_id === userId) {
return [CREATE, READ, UPDATE, DELETE]
}
Expand All @@ -126,7 +127,8 @@ async function getPermissions (userOrId, itemOrId, itemName) {
user_id: userId,
[currentLevel.columnId]: item.id
},
include: userRoleBaseInclude
include: userRoleBaseInclude,
transaction
})
if (itemRole && itemRole.role) {
// if role is found, check permissions of this role
Expand All @@ -141,7 +143,8 @@ async function getPermissions (userOrId, itemOrId, itemName) {
where: {
id: item[parentColumnId] || item[parentModelName].id
},
paranoid: false
paranoid: false,
transaction
})
if (item) {
if (item.created_by_id === userId) {
Expand Down Expand Up @@ -202,7 +205,8 @@ async function getPermissionsForProjects (projectIds, userId) {
* @param {string[]} inIds Subset of object ids to select from
* @param {string} userId The user for which the projects are accessible
*/
async function getPermissionsForObjects (itemName, inIds, userId) {
async function getPermissionsForObjects (itemName, inIds, userId, options = {}) {
const transaction = options.transaction
if (!inIds.length) {
return {}
}
Expand All @@ -211,7 +215,7 @@ async function getPermissionsForObjects (itemName, inIds, userId) {
const where = `WHERE ${itemName}r.${itemName}_id IN (:inIds) AND ${itemName}r.user_id = ${userId}`
const sql = `${select} ${join} ${where}`

return models.sequelize.query(sql, { replacements: { inIds }, type: models.sequelize.QueryTypes.SELECT })
return models.sequelize.query(sql, { replacements: { inIds }, type: models.sequelize.QueryTypes.SELECT, transaction })
.then(data => {
return data.reduce((result, row) => {
result[row[`${itemName}_id`]] = (result[row[`${itemName}_id`]] || []).concat(row.permission)
Expand All @@ -229,7 +233,8 @@ async function getPermissionsForObjects (itemName, inIds, userId) {
* @param {string[]} inIds Subset of object ids to select from
* @param {string} permission Required permission "R" by default
*/
async function getAccessibleObjectsIDs (userId, itemName, inIds, permission = READ, includePublic = false) {
async function getAccessibleObjectsIDs (userId, itemName, inIds, permission = READ, includePublic = false, options = {}) {
const transaction = options.transaction
const select = `SELECT DISTINCT ${itemName}.id FROM ${itemName}s ${itemName}`
const joins = [
`LEFT JOIN user_${itemName}_roles ${itemName}r ON ${itemName}.id = ${itemName}r.${itemName}_id AND ${itemName}r.user_id = ${userId}`,
Expand Down Expand Up @@ -264,7 +269,7 @@ async function getAccessibleObjectsIDs (userId, itemName, inIds, permission = RE
if (inIds && inIds.length) {
sql += ` AND ${itemName}.id IN (:inIds)`
}
return models.sequelize.query(sql, { replacements: { userId, inIds }, type: models.sequelize.QueryTypes.SELECT })
return models.sequelize.query(sql, { replacements: { userId, inIds }, type: models.sequelize.QueryTypes.SELECT, transaction })
.then(data => data.map(x => x.id))
}

Expand Down
3 changes: 2 additions & 1 deletion core/streams/dao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ async function query (filters, options = {}) {
order,
limit: options.limit,
offset: options.offset,
paranoid: options.onlyDeleted !== true
paranoid: options.onlyDeleted !== true,
transaction: options.transaction
})

// TODO move country into the table and perform lookup once on create/update
Expand Down

0 comments on commit 28df24b

Please sign in to comment.