Skip to content

Commit

Permalink
Merge branch 'main' into bugfix/editTags
Browse files Browse the repository at this point in the history
  • Loading branch information
skwowet authored Jul 19, 2023
2 parents d14db1f + 337f230 commit 2ba65b1
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 3 deletions.
3 changes: 3 additions & 0 deletions backend/src/services/memberService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ export default class MemberService extends LoggerBase {

// Collect unique domains
for (const email of data.emails) {
if (!email) {
continue
}
const domain = email.split('@')[1]
emailDomains.add(domain)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@

<script setup>
import {
computed, defineProps, defineEmits, reactive, ref, watch,
computed, defineProps, defineEmits, reactive, ref, watch, onMounted,
} from 'vue';
import AppDrawer from '@/shared/drawer/drawer.vue';
import { required } from '@vuelidate/validators';
Expand All @@ -154,6 +154,7 @@ import { useAutomationStore } from '@/modules/automation/store';
import Message from '@/shared/message/message';
import { i18n } from '@/i18n';
import formChangeDetector from '@/shared/form/form-change';
import { useStore } from 'vuex';
const props = defineProps({
modelValue: {
Expand All @@ -177,6 +178,9 @@ const emit = defineEmits(['update:modelValue', 'update:automation']);
const { createAutomation, updateAutomation, getAutomations } = useAutomationStore();
const store = useStore();
const fetchIntegrations = () => store.dispatch('integration/doFetch');
const isDrawerOpen = computed({
get() {
return props.modelValue;
Expand Down Expand Up @@ -287,6 +291,9 @@ const doSubmit = () => {
}
};
onMounted(() => {
fetchIntegrations();
});
</script>
<script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@

<!-- Add/Edit Webhook form drawer -->
<app-automation-form
v-if="openAutomationForm"
v-model="openAutomationForm"
v-model:automation="editAutomation"
:type="automationFormType"
Expand Down
4 changes: 3 additions & 1 deletion services/apps/integration_data_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"lint": "./node_modules/.bin/eslint --ext .ts src --max-warnings=0",
"format": "./node_modules/.bin/prettier --write \"src/**/*.ts\"",
"format-check": "./node_modules/.bin/prettier --check .",
"tsc-check": "./node_modules/.bin/tsc --noEmit"
"tsc-check": "./node_modules/.bin/tsc --noEmit",
"script:process-data": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-data.ts",
"script:process-data-for-tenant": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/process-data-for-tenant.ts"
},
"dependencies": {
"@crowd/common": "file:../../libs/common",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
import IntegrationDataRepository from '@/repo/integrationData.repo'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { IntegrationDataWorkerEmitter, getSqsClient } from '@crowd/sqs'
import { IntegrationStreamDataState } from '@crowd/types'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: tenantId')
process.exit(1)
}

const tenantId = processArguments[0]

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new IntegrationDataWorkerEmitter(sqsClient, log)
await emitter.init()

const dbConnection = getDbConnection(DB_CONFIG(), 1)
const store = new DbStore(log, dbConnection)
const repo = new IntegrationDataRepository(store, log)

const dataIds = await repo.getDataForTenant(tenantId)

for (const dataId of dataIds) {
const info = await repo.getDataInfo(dataId)

if (info) {
if (info.state !== IntegrationStreamDataState.PENDING) {
await repo.resetStream(dataId)
}

await emitter.triggerDataProcessing(info.tenantId, info.integrationType, dataId)
} else {
log.error({ dataId }, 'Data stream not found!')
process.exit(1)
}
}
})
42 changes: 42 additions & 0 deletions services/apps/integration_data_worker/src/bin/process-data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
import IntegrationDataRepository from '@/repo/integrationData.repo'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { IntegrationDataWorkerEmitter, getSqsClient } from '@crowd/sqs'
import { IntegrationStreamDataState } from '@crowd/types'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: dataId')
process.exit(1)
}

const dataIds = processArguments[0].split(',')

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new IntegrationDataWorkerEmitter(sqsClient, log)
await emitter.init()

const dbConnection = getDbConnection(DB_CONFIG(), 1)
const store = new DbStore(log, dbConnection)
const repo = new IntegrationDataRepository(store, log)

for (const dataId of dataIds) {
const info = await repo.getDataInfo(dataId)

if (info) {
if (info.state !== IntegrationStreamDataState.PENDING) {
await repo.resetStream(dataId)
}

await emitter.triggerDataProcessing(info.tenantId, info.integrationType, dataId)
} else {
log.error({ dataId }, 'Data stream not found!')
process.exit(1)
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio
where d.id = $(dataId);
`

private readonly getDataForTenantQuery = `
select
d.id
from
integration."apiData" d
where
d."tenantId" = $(tenantId)
`

public async getDataInfo(dataId: string): Promise<IApiDataInfo | null> {
const results = await this.db().oneOrNone(this.getDataInfoQuery, {
dataId,
Expand Down Expand Up @@ -236,4 +245,30 @@ export default class IntegrationDataRepository extends RepositoryBase<Integratio

this.checkUpdateRowCount(result.rowCount, 1)
}

public async resetStream(dataId: string): Promise<void> {
const result = await this.db().result(
`update integration."apiData"
set state = $(state),
error = null,
"delayedUntil" = null,
"processedAt" = null,
"updatedAt" = now()
where id = $(dataId)`,
{
dataId,
state: IntegrationStreamDataState.PENDING,
},
)

this.checkUpdateRowCount(result.rowCount, 1)
}

public async getDataForTenant(tenantId: string): Promise<string[]> {
const results = await this.db().manyOrNone(this.getDataForTenantQuery, {
tenantId,
})

return results.map((r) => r.id)
}
}
2 changes: 1 addition & 1 deletion services/libs/conversations/src/repo/conversation.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class ConversationRepository extends RepositoryBase<ConversationRepositor
const now = new Date()
const result = await this.db().result(
`insert into "conversationSettings"(id, "tenantId", "createdAt", "updatedAt", "createdById")
values ($(id), $(tenantId), $(now), $(now), (select "createdById" from tenants where id = $(tenantId) limit 1));`,
values ($(id), $(tenantId), $(now), $(now), (select coalesce("createdById", "updatedById") as "createdById" from tenants where id = $(tenantId) limit 1));`,
{
id,
tenantId,
Expand Down

0 comments on commit 2ba65b1

Please sign in to comment.