Skip to content

Commit

Permalink
Serve organization and contact profiles from OpenSearch (#2004)
Browse files Browse the repository at this point in the history
Co-authored-by: Joana Maia <joana@crowd.dev>
  • Loading branch information
skwowet and joanagmaia authored Jan 3, 2024
1 parent 25775d1 commit c51d11b
Show file tree
Hide file tree
Showing 23 changed files with 569 additions and 56 deletions.
9 changes: 7 additions & 2 deletions backend/src/api/member/memberFind.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { isFeatureEnabled } from '@crowd/feature-flags'
import { FeatureFlag } from '@crowd/types'
import Permissions from '../../security/permissions'
import MemberService from '../../services/memberService'
import PermissionChecker from '../../services/user/permissionChecker'
import isFeatureEnabled from '../../feature-flags/isFeatureEnabled'

/**
* GET /tenant/{tenantId}/member/{id}
Expand Down Expand Up @@ -34,7 +34,12 @@ export default async (req, res) => {
}
}

const payload = await new MemberService(req).findById(req.params.id, true, true, segmentId)
let payload
if (await isFeatureEnabled(FeatureFlag.SERVE_PROFILES_OPENSEARCH, req)) {
payload = await new MemberService(req).findByIdOpensearch(req.params.id, segmentId)
} else {
payload = await new MemberService(req).findById(req.params.id, true, true, segmentId)
}

await req.responseHandler.success(req, res, payload)
}
9 changes: 7 additions & 2 deletions backend/src/api/organization/organizationFind.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { FeatureFlag } from '@crowd/types'
import isFeatureEnabled from '@/feature-flags/isFeatureEnabled'
import Permissions from '../../security/permissions'
import OrganizationService from '../../services/organizationService'
import PermissionChecker from '../../services/user/permissionChecker'
import isFeatureEnabled from '../../feature-flags/isFeatureEnabled'

/**
* GET /tenant/{tenantId}/organization/{id}
Expand Down Expand Up @@ -34,7 +34,12 @@ export default async (req, res) => {
}
}

const payload = await new OrganizationService(req).findById(req.params.id, segmentId)
let payload
if (await isFeatureEnabled(FeatureFlag.SERVE_PROFILES_OPENSEARCH, req)) {
payload = await new OrganizationService(req).findByIdOpensearch(req.params.id, segmentId)
} else {
payload = await new OrganizationService(req).findById(req.params.id, segmentId)
}

await req.responseHandler.success(req, res, payload)
}
7 changes: 5 additions & 2 deletions backend/src/api/premium/enrichment/memberEnrich.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RedisCache } from '@crowd/redis'
import { getServiceLogger } from '@crowd/logging'
import { FeatureFlagRedisKey } from '@crowd/types'
import { FeatureFlagRedisKey, SyncMode } from '@crowd/types'
import { getSecondsTillEndOfMonth } from '../../../utils/timing'
import Permissions from '../../../security/permissions'
import identifyTenant from '../../../segment/identifyTenant'
Expand Down Expand Up @@ -29,7 +29,10 @@ const log = getServiceLogger()
export default async (req, res) => {
new PermissionChecker(req).validateHas(Permissions.values.memberEdit)

const payload = await new MemberEnrichmentService(req).enrichOne(req.params.id)
const payload = await new MemberEnrichmentService(req).enrichOne(
req.params.id,
SyncMode.SYNCHRONOUS,
)

track('Single member enrichment', { memberId: req.params.id }, { ...req })

Expand Down
18 changes: 18 additions & 0 deletions backend/src/bin/scripts/unleash-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,24 @@ const constaintConfiguration = {
},
],
],

[FeatureFlag.SERVE_PROFILES_OPENSEARCH]: [
[
{
values: [
Plans.values.scale,
Plans.values.eagleEye,
Plans.values.enterprise,
Plans.values.essential,
Plans.values.growth,
],
inverted: false,
operator: 'IN',
contextName: 'plan',
caseInsensitive: false,
},
],
],
}

let seq: any
Expand Down
72 changes: 72 additions & 0 deletions backend/src/database/repositories/memberRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
import OrganizationRepository from './organizationRepository'
import MemberSyncRemoteRepository from './memberSyncRemoteRepository'
import MemberAffiliationRepository from './memberAffiliationRepository'
import MemberAttributeSettingsRepository from './memberAttributeSettingsRepository'

const { Op } = Sequelize

Expand Down Expand Up @@ -1160,6 +1161,77 @@ class MemberRepository {
})
}

static async findByIdOpensearch(id, options: IRepositoryOptions, segmentId?: string) {
const segments = segmentId ? [segmentId] : SequelizeRepository.getSegmentIds(options)

const memberAttributeSettings = (
await MemberAttributeSettingsRepository.findAndCountAll({}, options)
).rows

const response = await this.findAndCountAllOpensearch(
{
filter: {
and: [
{
id: {
eq: id,
},
},
],
},
limit: 1,
offset: 0,
attributesSettings: memberAttributeSettings,
segments,
},
options,
)

if (response.count === 0) {
throw new Error404()
}

const result = response.rows[0]

// Get special attributes from memberAttributeSettings
const specialAttributes = memberAttributeSettings
.filter((setting) => setting.type === 'special')
.map((setting) => setting.name)

// Parse special attributes that are indexed as strings
if (result.attributes) {
specialAttributes.forEach((attr) => {
if (result.attributes[attr]) {
result.attributes[attr] = JSON.parse(result.attributes[attr])
}
})
}

// Sort the organizations based on dateStart
if (result.organizations) {
result.organizations.sort((a, b) => {
const dateStartA = a.memberOrganizations.dateStart
const dateStartB = b.memberOrganizations.dateStart

if (!dateStartA && !dateStartB) {
return 0
}

if (!dateStartA) {
return 1
}

if (!dateStartB) {
return -1
}

return new Date(dateStartB).getTime() - new Date(dateStartA).getTime()
})
}

return result
}

static async findAndCountActiveOpensearch(
filter: IActiveMemberFilter,
limit: number,
Expand Down
43 changes: 42 additions & 1 deletion backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,46 @@ class OrganizationRepository {
return results
}

static async findByIdOpensearch(
id: string,
options: IRepositoryOptions,
segmentId?: string,
): Promise<PageData<any>> {
const segments = segmentId ? [segmentId] : SequelizeRepository.getSegmentIds(options)

const response = await this.findAndCountAllOpensearch(
{
filter: {
and: [
{
id: {
eq: id,
},
},
],
},
isProfileQuery: true,
limit: 1,
offset: 0,
segments,
},
options,
)

if (response.count === 0) {
throw new Error404()
}

const result = response.rows[0]

// Parse attributes that are indexed as strings
if (result.attributes) {
result.attributes = JSON.parse(result.attributes)
}

return result
}

static async findAndCountAllOpensearch(
{
filter = {} as any,
Expand All @@ -2141,6 +2181,7 @@ class OrganizationRepository {
countOnly = false,
segments = [] as string[],
customSortFunction = undefined,
isProfileQuery = false,
},
options: IRepositoryOptions,
): Promise<PageData<any>> {
Expand All @@ -2156,7 +2197,7 @@ class OrganizationRepository {

const translator = FieldTranslatorFactory.getTranslator(OpenSearchIndex.ORGANIZATIONS)

if (filter.and) {
if (!isProfileQuery && filter.and) {
filter.and.push({
or: [
{
Expand Down
4 changes: 4 additions & 0 deletions backend/src/services/memberService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,10 @@ export default class MemberService extends LoggerBase {
)
}

async findByIdOpensearch(id: string, segmentId?: string) {
return MemberRepository.findByIdOpensearch(id, this.options, segmentId)
}

async queryV2(data) {
if (await isFeatureEnabled(FeatureFlag.SEGMENTS, this.options)) {
if (data.segments.length !== 1) {
Expand Down
4 changes: 4 additions & 0 deletions backend/src/services/organizationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,10 @@ export default class OrganizationService extends LoggerBase {
return OrganizationRepository.findOrCreateByDomain(domain, this.options)
}

async findByIdOpensearch(id: string, segmentId?: string) {
return OrganizationRepository.findByIdOpensearch(id, this.options, segmentId)
}

async query(data) {
const advancedFilter = data.filter
const orderBy = data.orderBy
Expand Down
32 changes: 12 additions & 20 deletions backend/src/services/premium/enrichment/memberEnrichmentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ export default class MemberEnrichmentService extends LoggerBase {

async bulkEnrich(memberIds: string[], notifyFrontend: boolean = true) {
const redis = await getRedisClient(REDIS_CONFIG, true)
const searchSyncService = new SearchSyncService(this.options, SyncMode.ASYNCHRONOUS)

const apiPubSubEmitter = new RedisPubSubEmitter(
'api-pubsub',
Expand All @@ -160,7 +159,6 @@ export default class MemberEnrichmentService extends LoggerBase {
try {
await this.enrichOne(memberId)
enrichedMembers++
await searchSyncService.triggerMemberSync(this.options.currentTenant.id, memberId)
this.log.info(`Enriched member ${memberId}`)
} catch (err) {
if (
Expand Down Expand Up @@ -224,23 +222,19 @@ export default class MemberEnrichmentService extends LoggerBase {
* @param memberId - the ID of the member to enrich
* @returns a promise that resolves to the enrichment data for the member
*/
async enrichOne(memberId) {
async enrichOne(memberId, syncMode = SyncMode.ASYNCHRONOUS) {
const transaction = await SequelizeRepository.createTransaction(this.options)
const options = {
...this.options,
transaction,
}

try {
// If the attributes have not been fetched yet, fetch them
if (!this.attributes) {
await this.getAttributes()
}

const searchSyncService = new SearchSyncService(this.options, SyncMode.ASYNCHRONOUS)
const searchSyncService = new SearchSyncService(this.options, syncMode)

// Create an instance of the MemberService and use it to look up the member
const memberService = new MemberService(options)
const memberService = new MemberService(this.options)
const member = await memberService.findById(memberId, false, false)

// If the member's GitHub handle or email address is not available, throw an error
Expand Down Expand Up @@ -290,7 +284,7 @@ export default class MemberEnrichmentService extends LoggerBase {
// add the member to merge suggestions
await MemberRepository.addToMerge(
[{ similarity: 0.9, members: [memberId, existingMember.id] }],
options,
this.options,
)

if (Array.isArray(normalized.username[platform])) {
Expand All @@ -311,8 +305,7 @@ export default class MemberEnrichmentService extends LoggerBase {
}

// save raw data to cache
await MemberEnrichmentCacheRepository.upsert(memberId, enrichmentData, options)

await MemberEnrichmentCacheRepository.upsert(memberId, enrichmentData, this.options)
// We are updating the displayName only if the existing one has one word only
// And we are using an update here instead of the upsert because
// upsert always takes the existing displayName
Expand All @@ -334,7 +327,7 @@ export default class MemberEnrichmentService extends LoggerBase {
memberId: member.id,
enrichedFrom,
},
options,
this.options,
)

let result = await memberService.upsert(
Expand All @@ -350,7 +343,7 @@ export default class MemberEnrichmentService extends LoggerBase {
// for every work experience in `enrichmentData`
// - upsert organization
// - upsert `memberOrganization` relation
const organizationService = new OrganizationService(options)
const organizationService = new OrganizationService(this.options)
if (enrichmentData.work_experiences) {
for (const workExperience of enrichmentData.work_experiences) {
const organizationIdentities: IOrganizationIdentity[] = [
Expand All @@ -374,7 +367,7 @@ export default class MemberEnrichmentService extends LoggerBase {
},
{
doSync: true,
mode: SyncMode.ASYNCHRONOUS,
mode: syncMode,
},
)

Expand All @@ -390,15 +383,15 @@ export default class MemberEnrichmentService extends LoggerBase {
dateEnd,
source: OrganizationSource.ENRICHMENT,
}
await MemberRepository.createOrUpdateWorkExperience(data, options)
await OrganizationRepository.includeOrganizationToSegments(org.id, options)
await MemberRepository.createOrUpdateWorkExperience(data, this.options)
await OrganizationRepository.includeOrganizationToSegments(org.id, this.options)
}
}

await SequelizeRepository.commitTransaction(transaction)
await searchSyncService.triggerMemberSync(this.options.currentTenant.id, result.id)

result = await memberService.findById(result.id, true, false)
await SequelizeRepository.commitTransaction(transaction)
result = await MemberRepository.findByIdOpensearch(result.id, this.options)
return result
} catch (error) {
this.log.error(error, 'Error while enriching a member!')
Expand Down Expand Up @@ -523,7 +516,6 @@ export default class MemberEnrichmentService extends LoggerBase {

// Assign 'value' to 'member.attributes[attributeName].enrichment'
member.attributes[attributeName].enrichment = value

await this.createAttributeAndUpdateOptions(attributeName, attribute, value)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ export const customAttributesService = () => {
hideIncludeSwitch: true,
options: [
{
options: attribute.options.map((option) => ({
options: attribute.options?.map((option) => ({
value: option,
label: option,
})),
})) || [],
},
],
},
Expand Down
Loading

0 comments on commit c51d11b

Please sign in to comment.