Skip to content

Commit

Permalink
Missing indexes, script improvements, fixes (#1500)
Browse files Browse the repository at this point in the history
  • Loading branch information
Uros Marolt authored Sep 15, 2023
1 parent aa5cb7e commit f09901f
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 43 deletions.
2 changes: 1 addition & 1 deletion backend/src/bin/jobs/memberScoreCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { PythonWorkerMessageType } from '../../serverless/types/workerTypes'

const job: CrowdJob = {
name: 'Member Score Coordinator',
cronTime: cronGenerator.every(20).minutes(),
cronTime: cronGenerator.every(90).minutes(),
onTrigger: async () => {
await sendPythonWorkerMessage('global', {
type: PythonWorkerMessageType.MEMBERS_SCORE,
Expand Down
24 changes: 21 additions & 3 deletions backend/src/bin/nodejs-worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging'
import { Logger, getChildLogger, getServiceLogger, logExecutionTimeV2 } from '@crowd/logging'
import { DeleteMessageRequest, Message, ReceiveMessageRequest } from 'aws-sdk/clients/sqs'
import moment from 'moment'
import { timeout } from '@crowd/common'
Expand Down Expand Up @@ -119,7 +119,19 @@ async function handleMessages() {
})

try {
messageLogger.debug('Received a new queue message!')
if (
msg.type === NodeWorkerMessageType.DB_OPERATIONS &&
(msg as any).operation === 'update_members'
) {
messageLogger.warn('Skipping update_members message! TEMPORARY MEASURE!')
await removeFromQueue(message.ReceiptHandle)
return
}

messageLogger.info(
{ messageType: msg.type, messagePayload: JSON.stringify(msg) },
'Received a new queue message!',
)

let processFunction: (msg: NodeWorkerMessageBase, logger?: Logger) => Promise<void>
let keep = false
Expand Down Expand Up @@ -152,7 +164,13 @@ async function handleMessages() {
await removeFromQueue(message.ReceiptHandle)
messagesInProgress.set(message.MessageId, msg)
try {
await processFunction(msg, messageLogger)
await logExecutionTimeV2(
async () => {
await processFunction(msg, messageLogger)
},
messageLogger,
'queueMessageProcessingTime',
)
} catch (err) {
messageLogger.error(err, 'Error while processing queue message!')
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop index if exists "ix_segmentActivityChannels_segmentId_platform";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create index if not exists "ix_segmentActivityChannels_segmentId_platform" on "segmentActivityChannels" ("segmentId", platform);
3 changes: 2 additions & 1 deletion services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"script:restart-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-failed-results.ts",
"script:restart-all-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-all-failed-results.ts",
"script:restart-x-failed-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-x-failed-results.ts",
"script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts"
"script:restart-result": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/restart-result.ts",
"script:process-results": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-results.ts"
},
"dependencies": {
"@crowd/common": "file:../../libs/common",
Expand Down
52 changes: 52 additions & 0 deletions services/apps/data_sink_worker/src/bin/process-results.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG } from '@/conf'
import DataSinkRepository from '@/repo/dataSink.repo'
import DataSinkService from '@/service/dataSink.service'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { getRedisClient } from '@crowd/redis'
import { NodejsWorkerEmitter, SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: resultIds')
process.exit(1)
}

const resultIds = processArguments[0].split(',')

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const redisClient = await getRedisClient(REDIS_CONFIG())

const nodejsWorkerEmitter = new NodejsWorkerEmitter(sqsClient, log)
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(sqsClient, log)

const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const service = new DataSinkService(
store,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
redisClient,
log,
)

const repo = new DataSinkRepository(store, log)
for (const resultId of resultIds) {
const result = await repo.getResultInfo(resultId)
if (!result) {
log.error(`Result ${resultId} not found!`)
continue
} else {
await repo.resetResults([resultId])

await service.processResult(resultId)
}
}

process.exit(0)
})
65 changes: 35 additions & 30 deletions services/apps/integration_run_worker/src/bin/onboard-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const log = getServiceLogger()

const processArguments = process.argv.slice(2)

const integrationId = processArguments[0]
const parameter = processArguments[0]

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
Expand All @@ -21,35 +21,40 @@ setImmediate(async () => {

const repo = new IntegrationRunRepository(store, log)

const integration = await repo.getIntegrationData(integrationId)

if (integration) {
if (integration.state == IntegrationState.IN_PROGRESS) {
log.warn(`Integration already running!`)
process.exit(1)
}

if (integration.state == IntegrationState.INACTIVE) {
log.warn(`Integration is not active!`)
process.exit(1)
const integrationIds = parameter.split(',')

for (const integrationId of integrationIds) {
const integration = await repo.getIntegrationData(integrationId)

if (integration) {
if (integration.state == IntegrationState.IN_PROGRESS) {
log.warn(`Integration already running!`)
continue
}

if (integration.state == IntegrationState.INACTIVE) {
log.warn(`Integration is not active!`)
continue
}

if (integration.state == IntegrationState.WAITING_APPROVAL) {
log.warn(`Integration is waiting for approval!`)
continue
}

log.info(`Triggering integration run for ${integrationId}!`)

await emitter.triggerIntegrationRun(
integration.tenantId,
integration.type,
integration.id,
true,
)
} else {
log.error({ integrationId }, 'Integration not found!')
continue
}

if (integration.state == IntegrationState.WAITING_APPROVAL) {
log.warn(`Integration is waiting for approval!`)
process.exit(1)
}

log.info(`Triggering integration run for ${integrationId}!`)

await emitter.triggerIntegrationRun(
integration.tenantId,
integration.type,
integration.id,
true,
)
process.exit(0)
} else {
log.error({ integrationId }, 'Integration not found!')
process.exit(1)
}

process.exit(0)
})
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ export class OrganizationSyncService extends LoggerBase {
},
}

const sort = [{ date_joinedAt: 'asc' }]
const include = ['date_joinedAt', 'uuid_organizationId']
const sort = [{ date_createdAt: 'asc' }]
const include = ['date_createdAt', 'uuid_organizationId']
const pageSize = 500
let lastJoinedAt: string
let lastCreatedAt: string

let results = (await this.openSearchService.search(
OpenSearchIndex.ORGANIZATIONS,
Expand All @@ -101,7 +101,7 @@ export class OrganizationSyncService extends LoggerBase {
sort,
undefined,
include,
)) as ISearchHit<{ date_joinedAt: string; uuid_organizationId: string }>[]
)) as ISearchHit<{ date_createdAt: string; uuid_organizationId: string }>[]

let processed = 0

Expand All @@ -126,17 +126,17 @@ export class OrganizationSyncService extends LoggerBase {
processed += results.length
this.log.warn({ tenantId }, `Processed ${processed} organizations while cleaning up tenant!`)

// use last joinedAt to get the next page
lastJoinedAt = results[results.length - 1]._source.date_joinedAt
// use last createdAt to get the next page
lastCreatedAt = results[results.length - 1]._source.date_createdAt
results = (await this.openSearchService.search(
OpenSearchIndex.ORGANIZATIONS,
query,
undefined,
pageSize,
sort,
lastJoinedAt,
lastCreatedAt,
include,
)) as ISearchHit<{ date_joinedAt: string; uuid_organizationId: string }>[]
)) as ISearchHit<{ date_createdAt: string; uuid_organizationId: string }>[]
}

this.log.warn(
Expand Down

0 comments on commit f09901f

Please sign in to comment.