From 78e6ea1b9ad648243754b83c03a781a01d1ccf57 Mon Sep 17 00:00:00 2001 From: anilb Date: Wed, 13 Dec 2023 16:35:40 +0100 Subject: [PATCH] Better organization existence checks and organizations without activities now synced to opensearch (#1939) Co-authored-by: Gasper Grom --- backend/package.json | 4 +- .../generate-merge-suggestions-synchronous.ts | 102 ++++++++++ .../scripts/merge-similar-organizations.ts | 177 ++++++++++++++++++ .../repositories/organizationRepository.ts | 175 +++++++++++------ backend/src/services/organizationService.ts | 2 +- .../enrichment/memberEnrichmentService.ts | 23 ++- .../organization/config/saved-views/main.ts | 2 + .../settings/hasActivities/config.ts | 16 ++ .../saved-views/views/all-organizations.ts | 1 + .../opensearch/src/repo/organization.repo.ts | 13 +- 10 files changed, 438 insertions(+), 77 deletions(-) create mode 100644 backend/src/bin/scripts/generate-merge-suggestions-synchronous.ts create mode 100644 backend/src/bin/scripts/merge-similar-organizations.ts create mode 100644 frontend/src/modules/organization/config/saved-views/settings/hasActivities/config.ts diff --git a/backend/package.json b/backend/package.json index 49bb3280f5..451ef73296 100644 --- a/backend/package.json +++ b/backend/package.json @@ -39,11 +39,13 @@ "script:enrich-members-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/enrich-members-and-organizations.ts", "script:enrich-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/enrich-organizations-synchronous.ts", "script:generate-merge-suggestions": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/generate-merge-suggestions.ts", + "script:generate-merge-suggestions-synchronous": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/generate-merge-suggestions-synchronous.ts", "script:merge-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/merge-organizations.ts", "script:get-member-enrichment-data": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/get-member-enrichment-data.ts", "script:get-organization-enrichment-data": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/get-organization-enrichment-data.ts", "script:refresh-materialized-views": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/refresh-materialized-views.ts", - "script:unmerge-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/unmerge-members.ts" + "script:unmerge-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/unmerge-members.ts", + "script:merge-similar-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/scripts/merge-similar-organizations.ts" }, "dependencies": { "@aws-sdk/client-comprehend": "^3.159.0", diff --git a/backend/src/bin/scripts/generate-merge-suggestions-synchronous.ts b/backend/src/bin/scripts/generate-merge-suggestions-synchronous.ts new file mode 100644 index 0000000000..3b5d4e614f --- /dev/null +++ b/backend/src/bin/scripts/generate-merge-suggestions-synchronous.ts @@ -0,0 +1,102 @@ +import commandLineArgs from 'command-line-args' +import commandLineUsage from 'command-line-usage' +import { getOpensearchClient } from '@crowd/opensearch' +import { OrganizationMergeSuggestionType } from '@crowd/types' +import * as fs from 'fs' +import path from 'path' +import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' +import getUserContext from '@/database/utils/getUserContext' +import SegmentService from '@/services/segmentService' +import { OPENSEARCH_CONFIG } from '@/conf' +import OrganizationService from '@/services/organizationService' +import TenantService from '@/services/tenantService' + +/* eslint-disable no-console */ + +const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8') + +const options = [ + { + name: 'tenant', + alias: 't', + type: String, + description: + 'The unique ID of that tenant that you would like to generate merge suggestions for.', + }, + { + name: 'plan', + alias: 'p', + type: String, + description: + 'Comma separated plans - works with allTenants flag. Only generate suggestions for tenants with specific plans. Available plans: Growth, Scale, Enterprise', + }, + { + name: 'allTenants', + alias: 'a', + type: Boolean, + defaultValue: false, + description: 'Set this flag to merge similar organizations for all tenants.', + }, + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Print this usage guide.', + }, +] +const sections = [ + { + content: banner, + raw: true, + }, + { + header: 'Generate merge suggestions for a tenant', + content: 'Generate merge suggestions for a tenant', + }, + { + header: 'Options', + optionList: options, + }, +] + +const usage = commandLineUsage(sections) +const parameters = commandLineArgs(options) + +if (parameters.help || (!parameters.tenant && !parameters.allTenants)) { + console.log(usage) +} else { + setImmediate(async () => { + let tenantIds + + if (parameters.allTenants) { + tenantIds = (await TenantService._findAndCountAllForEveryUser({})).rows + if (parameters.plan) { + tenantIds = tenantIds.filter((tenant) => parameters.plan.split(',').includes(tenant.plan)) + } + tenantIds = tenantIds.map((t) => t.id) + } else if (parameters.tenant) { + tenantIds = parameters.tenant.split(',') + } else { + tenantIds = [] + } + + for (const tenantId of tenantIds) { + const userContext: IRepositoryOptions = await getUserContext(tenantId) + const segmentService = new SegmentService(userContext) + const { rows: segments } = await segmentService.querySubprojects({}) + userContext.currentSegments = segments + userContext.opensearch = getOpensearchClient(OPENSEARCH_CONFIG) + + console.log(`Generating organization merge suggestions for tenant ${tenantId}!`) + + const organizationService = new OrganizationService(userContext) + await organizationService.generateMergeSuggestions( + OrganizationMergeSuggestionType.BY_IDENTITY, + ) + + console.log(`Done generating organization merge suggestions for tenant ${tenantId}!`) + } + + process.exit(0) + }) +} diff --git a/backend/src/bin/scripts/merge-similar-organizations.ts b/backend/src/bin/scripts/merge-similar-organizations.ts new file mode 100644 index 0000000000..5199f4c4ab --- /dev/null +++ b/backend/src/bin/scripts/merge-similar-organizations.ts @@ -0,0 +1,177 @@ +import commandLineArgs from 'command-line-args' +import commandLineUsage from 'command-line-usage' +import { QueryTypes } from 'sequelize' +import * as fs from 'fs' +import path from 'path' +import SequelizeRepository from '../../database/repositories/sequelizeRepository' +import TenantService from '@/services/tenantService' +import OrganizationService from '@/services/organizationService' +import getUserContext from '@/database/utils/getUserContext' +import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' +import { + MergeActionState, + MergeActionType, + MergeActionsRepository, +} from '@/database/repositories/mergeActionsRepository' + +/* eslint-disable no-console */ + +const banner = fs.readFileSync(path.join(__dirname, 'banner.txt'), 'utf8') + +const options = [ + { + name: 'tenant', + alias: 't', + type: String, + description: 'The unique ID of tenant', + }, + { + name: 'allTenants', + alias: 'a', + type: Boolean, + defaultValue: false, + description: 'Set this flag to merge similar organizations for all tenants.', + }, + { + name: 'similarityThreshold', + alias: 's', + type: String, + defaultValue: false, + description: + 'Similarity threshold of organization merge suggestions. Suggestions lower than this value will not be merged. Defaults to 0.95', + }, + { + name: 'hardLimit', + alias: 'l', + type: String, + defaultValue: false, + description: `Hard limit for # of organizations that'll be merged. Mostly a flag for testing purposes.`, + }, + { + name: 'help', + alias: 'h', + type: Boolean, + description: 'Print this usage guide.', + }, +] +const sections = [ + { + content: banner, + raw: true, + }, + { + header: 'Merge organizations with similarity higher than given threshold.', + content: 'Merge organizations with similarity higher than given threshold.', + }, + { + header: 'Options', + optionList: options, + }, +] + +const usage = commandLineUsage(sections) +const parameters = commandLineArgs(options) + +if (parameters.help || (!parameters.tenant && !parameters.allTenants)) { + console.log(usage) +} else { + setImmediate(async () => { + const options = await SequelizeRepository.getDefaultIRepositoryOptions() + + let tenantIds + + if (parameters.allTenants) { + tenantIds = (await TenantService._findAndCountAllForEveryUser({})).rows.map((t) => t.id) + } else if (parameters.tenant) { + tenantIds = parameters.tenant.split(',') + } else { + tenantIds = [] + } + + for (const tenantId of tenantIds) { + const userContext: IRepositoryOptions = await getUserContext(tenantId) + const orgService = new OrganizationService(userContext) + + let hasMoreData = true + let counter = 0 + + while (hasMoreData) { + // find organization merge suggestions of tenant + const result = await options.database.sequelize.query( + ` + SELECT + "ot"."organizationId", + "ot"."toMergeId", + "ot".similarity, + "ot".status, + "org1"."displayName" AS "orgDisplayName", + "org2"."displayName" AS "mergeDisplayName" + FROM + "organizationToMerge" "ot" + LEFT JOIN + "organizations" "org1" + ON + "ot"."organizationId" = "org1"."id" + LEFT JOIN + "organizations" "org2" + ON + "ot"."toMergeId" = "org2"."id" + WHERE + ("ot".similarity > :similarityThreshold) AND + ("org1"."displayName" ilike "org2"."displayName") AND + ("org1"."tenantId" = :tenantId) AND + ("org2"."tenantId" = :tenantId) + ORDER BY + "ot".similarity DESC + LIMIT 100 + OFFSET :offset;`, + { + replacements: { + similarityThreshold: parameters.similarityThreshold || 0.95, + offset: 0, + tenantId, + }, + type: QueryTypes.SELECT, + }, + ) + + if (result.length === 0) { + hasMoreData = false + } else { + for (const row of result) { + try { + console.log( + `Merging [${row.organizationId}] "${row.orgDisplayName}" into ${row.toMergeId} "${row.mergeDisplayName}"...`, + ) + await MergeActionsRepository.add( + MergeActionType.ORG, + row.organizationId, + row.toMergeId, + userContext, + ) + await orgService.mergeSync(row.organizationId, row.toMergeId) + } catch (err) { + console.log('Error merging organizations - continuing with the rest', err) + await MergeActionsRepository.setState( + MergeActionType.ORG, + row.organizationId, + row.toMergeId, + MergeActionState.ERROR, + userContext, + ) + } + + if (parameters.hardLimit && counter >= parameters.hardLimit) { + console.log(`Hard limit of ${parameters.hardLimit} reached. Exiting...`) + process.exit(0) + } + + counter += 1 + } + } + } + } + + process.exit(0) + }) +} diff --git a/backend/src/database/repositories/organizationRepository.ts b/backend/src/database/repositories/organizationRepository.ts index 089d983b40..7c5a000a7d 100644 --- a/backend/src/database/repositories/organizationRepository.ts +++ b/backend/src/database/repositories/organizationRepository.ts @@ -40,6 +40,7 @@ interface IOrganizationPartialAggregatesOpensearch { string_name: string }[] uuid_arr_noMergeIds: string[] + keyword_displayName: string } } @@ -49,6 +50,7 @@ interface ISimilarOrganization { uuid_organizationId: string nested_identities: IOrganizationIdentityOpensearch[] nested_weakIdentities: IOrganizationIdentityOpensearch[] + keyword_displayName: string } } @@ -1189,6 +1191,7 @@ class OrganizationRepository { for (const primaryIdentity of primaryOrganization._source.nested_identities) { // similar organization has a weakIdentity as one of primary organization's strong identity, return score 95 if ( + similarOrganization._source.nested_weakIdentities && similarOrganization._source.nested_weakIdentities.length > 0 && similarOrganization._source.nested_weakIdentities.some( (weakIdentity) => @@ -1198,6 +1201,15 @@ class OrganizationRepository { ) { return 0.95 } + + // check displayName match + if ( + similarOrganization._source.keyword_displayName === + primaryOrganization._source.keyword_displayName + ) { + return 0.98 + } + for (const secondaryIdentity of similarOrganization._source.nested_identities) { const currentLevenstheinDistance = getLevenshteinDistance( primaryIdentity.string_name, @@ -1233,7 +1245,12 @@ class OrganizationRepository { collapse: { field: 'uuid_organizationId', }, - _source: ['uuid_organizationId', 'nested_identities', 'uuid_arr_noMergeIds'], + _source: [ + 'uuid_organizationId', + 'nested_identities', + 'uuid_arr_noMergeIds', + 'keyword_displayName', + ], } let organizations: IOrganizationPartialAggregatesOpensearch[] = [] @@ -1244,25 +1261,6 @@ class OrganizationRepository { queryBody.query = { bool: { filter: [ - { - bool: { - should: [ - { - range: { - int_activityCount: { - gt: 0, - }, - }, - }, - { - term: { - bool_manuallyCreated: true, - }, - }, - ], - minimum_should_match: 1, - }, - }, { term: { uuid_tenantId: tenant.id, @@ -1282,25 +1280,6 @@ class OrganizationRepository { queryBody.query = { bool: { filter: [ - { - bool: { - should: [ - { - range: { - int_activityCount: { - gt: 0, - }, - }, - }, - { - term: { - bool_manuallyCreated: true, - }, - }, - ], - minimum_should_match: 1, - }, - }, { term: { uuid_tenantId: tenant.id, @@ -1330,6 +1309,11 @@ class OrganizationRepository { ) { const identitiesPartialQuery = { should: [ + { + term: { + [`keyword_displayName`]: organization._source.keyword_displayName, + }, + }, { nested: { path: 'nested_weakIdentities', @@ -1369,25 +1353,6 @@ class OrganizationRepository { uuid_tenantId: tenant.id, }, }, - { - bool: { - should: [ - { - range: { - int_activityCount: { - gt: 0, - }, - }, - }, - { - term: { - bool_manuallyCreated: true, - }, - }, - ], - minimum_should_match: 1, - }, - }, ], } @@ -1396,7 +1361,7 @@ class OrganizationRepository { for (const identity of organization._source.nested_identities) { if (identity.string_name.length > 0) { // weak identity search - identitiesPartialQuery.should[0].nested.query.bool.should.push({ + identitiesPartialQuery.should[1].nested.query.bool.should.push({ bool: { must: [ { match: { [`nested_weakIdentities.keyword_name`]: identity.string_name } }, @@ -1417,7 +1382,7 @@ class OrganizationRepository { if (Number.isNaN(Number(identity.string_name))) { hasFuzzySearch = true // fuzzy search for identities - identitiesPartialQuery.should[1].nested.query.bool.should.push({ + identitiesPartialQuery.should[2].nested.query.bool.should.push({ match: { [`nested_identities.keyword_name`]: { query: cleanedIdentityName, @@ -1429,7 +1394,7 @@ class OrganizationRepository { // also check for prefix for identities that has more than 5 characters and no whitespace if (identity.string_name.length > 5 && identity.string_name.indexOf(' ') === -1) { - identitiesPartialQuery.should[1].nested.query.bool.should.push({ + identitiesPartialQuery.should[2].nested.query.bool.should.push({ prefix: { [`nested_identities.keyword_name`]: { value: cleanedIdentityName.slice(0, prefixLength(cleanedIdentityName)), @@ -1468,7 +1433,12 @@ class OrganizationRepository { collapse: { field: 'uuid_organizationId', }, - _source: ['uuid_organizationId', 'nested_identities', 'nested_weakIdentities'], + _source: [ + 'uuid_organizationId', + 'nested_identities', + 'nested_weakIdentities', + 'keyword_displayName', + ], } const organizationsToMerge: ISimilarOrganization[] = @@ -1624,6 +1594,87 @@ class OrganizationRepository { return segments } + static async findByIdentities( + identities: IOrganizationIdentity[], + options: IRepositoryOptions, + ): Promise { + const transaction = SequelizeRepository.getTransaction(options) + const sequelize = SequelizeRepository.getSequelize(options) + const currentTenant = SequelizeRepository.getCurrentTenant(options) + + const identityConditions = identities + .map( + (identity, index) => ` + (oi.platform = :platform${index} and oi.name = :name${index}) + `, + ) + .join(' or ') + + const results = await sequelize.query( + ` + with + "organizationsWithIdentity" as ( + select oi."organizationId" + from "organizationIdentities" oi + where ${identityConditions} + ), + "organizationsWithCounts" as ( + select o.id, count(oi."organizationId") as total_counts + from organizations o + join "organizationIdentities" oi on o.id = oi."organizationId" + where o.id in (select "organizationId" from "organizationsWithIdentity") + group by o.id + ) + select o.id, + o.description, + o.emails, + o.logo, + o.tags, + o.github, + o.twitter, + o.linkedin, + o.crunchbase, + o.employees, + o.location, + o.website, + o.type, + o.size, + o.headline, + o.industry, + o.founded, + o.attributes + from organizations o + inner join "organizationsWithCounts" oc on o.id = oc.id + where o."tenantId" = :tenantId + order by oc.total_counts desc + limit 1; + `, + { + replacements: { + tenantId: currentTenant.id, + ...identities.reduce( + (acc, identity, index) => ({ + ...acc, + [`platform${index}`]: identity.platform, + [`name${index}`]: identity.name, + }), + {}, + ), + }, + type: QueryTypes.SELECT, + transaction, + }, + ) + + if (results.length === 0) { + return null + } + + const result = results[0] as IOrganization + + return result + } + static async findByIdentity( identity: IOrganizationIdentity, options: IRepositoryOptions, diff --git a/backend/src/services/organizationService.ts b/backend/src/services/organizationService.ts index 01871d70d1..a1f750a54a 100644 --- a/backend/src/services/organizationService.ts +++ b/backend/src/services/organizationService.ts @@ -498,7 +498,7 @@ export default class OrganizationService extends LoggerBase { } if (!existing) { - existing = await OrganizationRepository.findByIdentity(primaryIdentity, this.options) + existing = await OrganizationRepository.findByIdentities(data.identities, this.options) } if (existing) { diff --git a/backend/src/services/premium/enrichment/memberEnrichmentService.ts b/backend/src/services/premium/enrichment/memberEnrichmentService.ts index ae47c87587..ba42cd974f 100644 --- a/backend/src/services/premium/enrichment/memberEnrichmentService.ts +++ b/backend/src/services/premium/enrichment/memberEnrichmentService.ts @@ -13,6 +13,7 @@ import { PlatformType, OrganizationSource, SyncMode, + IOrganizationIdentity, } from '@crowd/types' import { ENRICHMENT_CONFIG, REDIS_CONFIG } from '../../../conf' import { AttributeData } from '../../../database/attributes/attribute' @@ -352,14 +353,24 @@ export default class MemberEnrichmentService extends LoggerBase { const organizationService = new OrganizationService(options) if (enrichmentData.work_experiences) { for (const workExperience of enrichmentData.work_experiences) { + const organizationIdentities: IOrganizationIdentity[] = [ + { + name: workExperience.company, + platform: PlatformType.ENRICHMENT, + }, + ] + + if (workExperience.companyLinkedInUrl) { + organizationIdentities.push({ + name: workExperience.companyLinkedInUrl.split('/').pop(), + platform: PlatformType.LINKEDIN, + url: workExperience.companyLinkedInUrl, + }) + } + const org = await organizationService.createOrUpdate( { - identities: [ - { - name: workExperience.company, - platform: PlatformType.ENRICHMENT, - }, - ], + identities: organizationIdentities, }, { doSync: true, diff --git a/frontend/src/modules/organization/config/saved-views/main.ts b/frontend/src/modules/organization/config/saved-views/main.ts index b3b23e6657..78d0c8b528 100644 --- a/frontend/src/modules/organization/config/saved-views/main.ts +++ b/frontend/src/modules/organization/config/saved-views/main.ts @@ -2,11 +2,13 @@ import { SavedViewsConfig } from '@/shared/modules/saved-views/types/SavedViewsC import allOrganizations from './views/all-organizations'; import teamOrganization from './settings/teamOrganization/config'; +import hasActivities from './settings/hasActivities/config'; export const organizationSavedViews: SavedViewsConfig = { defaultView: allOrganizations, settings: { teamOrganization, + hasActivities, }, sorting: { displayName: 'Organization', diff --git a/frontend/src/modules/organization/config/saved-views/settings/hasActivities/config.ts b/frontend/src/modules/organization/config/saved-views/settings/hasActivities/config.ts new file mode 100644 index 0000000000..847f5f6a3c --- /dev/null +++ b/frontend/src/modules/organization/config/saved-views/settings/hasActivities/config.ts @@ -0,0 +1,16 @@ +import { SavedViewsSetting } from '@/shared/modules/saved-views/types/SavedViewsConfig'; + +const hasActivities: SavedViewsSetting = { + inSettings: false, + defaultValue: true, + queryUrlParser(value: string): boolean { + return value === 'true'; + }, + apiFilterRenderer(): any[] { + return [ + { activityCount: { gt: 0 } }, + ]; + }, +}; + +export default hasActivities; diff --git a/frontend/src/modules/organization/config/saved-views/views/all-organizations.ts b/frontend/src/modules/organization/config/saved-views/views/all-organizations.ts index b9b87a403e..2ebc9ff5c3 100644 --- a/frontend/src/modules/organization/config/saved-views/views/all-organizations.ts +++ b/frontend/src/modules/organization/config/saved-views/views/all-organizations.ts @@ -14,6 +14,7 @@ const allOrganizations: SavedView = { }, settings: { teamOrganization: 'exclude', + hasActivities: 'true', }, }, }; diff --git a/services/libs/opensearch/src/repo/organization.repo.ts b/services/libs/opensearch/src/repo/organization.repo.ts index 0130eeccf8..29095f2197 100644 --- a/services/libs/opensearch/src/repo/organization.repo.ts +++ b/services/libs/opensearch/src/repo/organization.repo.ts @@ -250,7 +250,7 @@ export class OrganizationRepository extends RepositoryBase { const results = await this.db().any( ` - select distinct a."segmentId", a."organizationId" - from activities a - where a."organizationId" in ($(ids:csv)); + select distinct mo."organizationId", a."segmentId" + from "memberOrganizations" mo + inner join activities a on mo."memberId" = a."memberId" + where mo."organizationId" in ($(ids:csv)); `, { ids,