diff --git a/backend/src/database/migrations/V1782128525__segment-merge-suggestion-counts.sql b/backend/src/database/migrations/V1782128525__segment-merge-suggestion-counts.sql new file mode 100644 index 0000000000..98ff0f2b80 --- /dev/null +++ b/backend/src/database/migrations/V1782128525__segment-merge-suggestion-counts.sql @@ -0,0 +1,6 @@ +create table "segmentMergeSuggestionCounts" ( + "segmentId" uuid primary key references "segments" ("id") on delete cascade, + "memberMergeSuggestionsCount" integer not null default 0, + "organizationMergeSuggestionsCount" integer not null default 0, + "updatedAt" timestamp with time zone default now() not null +); \ No newline at end of file diff --git a/backend/src/database/repositories/memberRepository.ts b/backend/src/database/repositories/memberRepository.ts index 5a6f668d81..38ae2e67dd 100644 --- a/backend/src/database/repositories/memberRepository.ts +++ b/backend/src/database/repositories/memberRepository.ts @@ -51,7 +51,11 @@ import { } from '@crowd/data-access-layer/src/members/segments' import { IDbMemberData } from '@crowd/data-access-layer/src/members/types' import { optionsQx } from '@crowd/data-access-layer/src/queryExecutor' -import { fetchManySegments, getSegmentSubprojectIds } from '@crowd/data-access-layer/src/segments' +import { + fetchManySegments, + getSegmentMergeSuggestionCounts, + getSegmentSubprojectIds, +} from '@crowd/data-access-layer/src/segments' import { ActivityDisplayService } from '@crowd/integrations' import { ALL_PLATFORM_TYPES, @@ -63,6 +67,8 @@ import { MemberIdentityType, MemberSegmentAffiliation, MemberSegmentAffiliationJoined, + MergeActionState, + MergeActionType, PlatformType, SegmentType, TemporalWorkflowId, @@ -277,13 +283,27 @@ class MemberRepository { AND EXISTS ( SELECT 1 FROM "memberSegmentsAgg" ms2 WHERE ms2."memberId" = mtm."toMergeId" AND ms2."segmentId" IN (:segmentIds) + ) + AND NOT EXISTS ( + SELECT 1 + FROM "mergeActions" ma + WHERE ma.type = :mergeActionType + AND ma.state <> :mergeActionState + AND ( + (ma."primaryId" = mtm."memberId" AND ma."secondaryId" = mtm."toMergeId") + OR (ma."primaryId" = mtm."toMergeId" AND ma."secondaryId" = mtm."memberId") + ) ) ${memberFilter} ${similarityFilter} ${displayNameFilter} `, { - replacements, + replacements: { + ...replacements, + mergeActionType: MergeActionType.MEMBER, + mergeActionState: MergeActionState.ERROR, + }, type: QueryTypes.SELECT, }, ) @@ -299,18 +319,16 @@ class MemberRepository { const MEDIUM_CONFIDENCE_LOWER_BOUND = 0.7 // Member segments are aggregated at each hierarchy level (group -> project -> subproject). - // Match the selected segment ID directly; do not expand to leaf subprojects. - const segmentIds = SequelizeRepository.getSegmentIds(options) - - if (segmentIds.length === 0) { - return args.countOnly - ? { count: '0' } - : { - rows: [{ members: [], similarity: 0 }], - count: 0, - limit: args.limit, - offset: args.offset, - } + const projectGroupSegment = SequelizeRepository.getStrictlySingleProjectGroupSegment(options) + + let segmentIds: string[] + + if (args.filter?.projectIds?.length) { + segmentIds = args.filter.projectIds + } else if (args.filter?.subprojectIds?.length) { + segmentIds = args.filter.subprojectIds + } else { + segmentIds = [projectGroupSegment.id] } let similarityFilter = '' @@ -357,8 +375,25 @@ class MemberRepository { order += 'mtm."memberId", mtm."toMergeId"' } - if (args.countOnly) { - const totalCount = await this.countMemberMergeSuggestions( + const hasProjectFilter = Boolean( + args.filter?.projectIds?.length || args.filter?.subprojectIds?.length, + ) + + const hasCountFilters = Boolean( + args.filter?.memberId || args.filter?.displayName || args.filter?.similarity?.length, + ) + + const getTotalCount = async (): Promise => { + if (!hasCountFilters && !hasProjectFilter) { + const counts = await getSegmentMergeSuggestionCounts( + SequelizeRepository.getQueryExecutor(options), + projectGroupSegment.id, + ) + + return counts?.memberMergeSuggestionsCount ?? 0 + } + + return this.countMemberMergeSuggestions( memberFilter, similarityFilter, displayNameFilter, @@ -369,8 +404,10 @@ class MemberRepository { }, options, ) + } - return { count: totalCount } + if (args.countOnly) { + return { count: await getTotalCount() } } const mems = await options.database.sequelize.query( @@ -395,7 +432,16 @@ class MemberRepository { SELECT 1 FROM "memberSegmentsAgg" ms2 WHERE ms2."memberId" = mtm."toMergeId" AND ms2."segmentId" IN (:segmentIds) ) - AND mtm.similarity IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM "mergeActions" ma + WHERE ma.type = :mergeActionType + AND ma.state <> :mergeActionState + AND ( + (ma."primaryId" = mtm."memberId" AND ma."secondaryId" = mtm."toMergeId") + OR (ma."primaryId" = mtm."toMergeId" AND ma."secondaryId" = mtm."memberId") + ) + ) ${memberFilter} ${similarityFilter} ${displayNameFilter} @@ -410,6 +456,8 @@ class MemberRepository { offset: args.offset, displayName: args?.filter?.displayName ? `${args.filter.displayName}%` : undefined, memberId: args?.filter?.memberId, + mergeActionType: MergeActionType.MEMBER, + mergeActionState: MergeActionState.ERROR, }, type: QueryTypes.SELECT, }, @@ -506,24 +554,12 @@ class MemberRepository { })) } - const totalCount = await this.countMemberMergeSuggestions( - memberFilter, - similarityFilter, - displayNameFilter, - { - segmentIds, - memberId: args?.filter?.memberId, - displayName: args?.filter?.displayName ? `${args.filter.displayName}%` : undefined, - }, - options, - ) - - return { rows: result, count: totalCount, limit: args.limit, offset: args.offset } + return { rows: result, count: await getTotalCount(), limit: args.limit, offset: args.offset } } return { rows: [{ members: [], similarity: 0 }], - count: 0, + count: await getTotalCount(), limit: args.limit, offset: args.offset, } diff --git a/backend/src/database/repositories/organizationRepository.ts b/backend/src/database/repositories/organizationRepository.ts index e64d83eb8d..916701ee22 100644 --- a/backend/src/database/repositories/organizationRepository.ts +++ b/backend/src/database/repositories/organizationRepository.ts @@ -34,7 +34,11 @@ import { } from '@crowd/data-access-layer/src/organizations' import { findAttribute } from '@crowd/data-access-layer/src/organizations/attributesConfig' import { optionsQx } from '@crowd/data-access-layer/src/queryExecutor' -import { findSegmentById, getSegmentSubprojectIds } from '@crowd/data-access-layer/src/segments' +import { + findSegmentById, + getSegmentMergeSuggestionCounts, + getSegmentSubprojectIds, +} from '@crowd/data-access-layer/src/segments' import { IMemberRenderFriendlyRole, IMemberRoleWithOrganization, @@ -796,8 +800,6 @@ class OrganizationRepository { segmentIds: string[] organizationId?: string displayName?: string - mergeActionType: MergeActionType - mergeActionStatus: MergeActionState }, options: IRepositoryOptions, ): Promise { @@ -808,18 +810,9 @@ class OrganizationRepository { const result = await options.database.sequelize.query( ` - SELECT COUNT(DISTINCT Greatest( - Hashtext(Concat(otm."organizationId", otm."toMergeId")), - Hashtext(Concat(otm."toMergeId", otm."organizationId")) - )) AS total_count + SELECT COUNT(*) AS total_count FROM "organizationToMerge" otm ${organizationsJoin} - LEFT JOIN "mergeActions" ma - ON ma.type = :mergeActionType - AND ( - (ma."primaryId" = otm."organizationId" AND ma."secondaryId" = otm."toMergeId") - OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = otm."organizationId") - ) WHERE EXISTS ( SELECT 1 FROM "organizationSegmentsAgg" os1 WHERE os1."organizationId" = otm."organizationId" AND os1."segmentId" IN (:segmentIds) @@ -828,13 +821,26 @@ class OrganizationRepository { SELECT 1 FROM "organizationSegmentsAgg" os2 WHERE os2."organizationId" = otm."toMergeId" AND os2."segmentId" IN (:segmentIds) ) - AND (ma.id IS NULL OR ma.state = :mergeActionStatus) + AND NOT EXISTS ( + SELECT 1 + FROM "mergeActions" ma + WHERE ma.type = :mergeActionType + AND ma.state <> :mergeActionState + AND ( + (ma."primaryId" = otm."organizationId" AND ma."secondaryId" = otm."toMergeId") + OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = otm."organizationId") + ) + ) ${organizationFilter} ${similarityFilter} ${displayNameFilter} `, { - replacements, + replacements: { + ...replacements, + mergeActionType: MergeActionType.ORG, + mergeActionState: MergeActionState.ERROR, + }, type: QueryTypes.SELECT, }, ) @@ -850,8 +856,17 @@ class OrganizationRepository { const MEDIUM_CONFIDENCE_LOWER_BOUND = 0.7 // Organization segments are aggregated at each hierarchy level (group -> project -> subproject). - // Match the selected segment ID(s) directly; do not expand to leaf subprojects. - const segmentIds = SequelizeRepository.getSegmentIds(options) + const projectGroupSegment = SequelizeRepository.getStrictlySingleProjectGroupSegment(options) + + let segmentIds: string[] + + if (args.filter?.projectIds?.length) { + segmentIds = args.filter.projectIds + } else if (args.filter?.subprojectIds?.length) { + segmentIds = args.filter.subprojectIds + } else { + segmentIds = [projectGroupSegment.id] + } let similarityFilter = '' const similarityConditions = [] @@ -880,23 +895,39 @@ class OrganizationRepository { ? ` and (o1."displayName" ilike :displayName OR o2."displayName" ilike :displayName)` : '' - let order = - '"organizationsToMerge".similarity desc, "organizationsToMerge"."id", "organizationsToMerge"."toMergeId"' + let order = 'otm.similarity desc, otm."organizationId", otm."toMergeId"' if (args.orderBy?.length > 0) { order = '' for (const orderBy of args.orderBy) { const [field, direction] = orderBy.split('_') if (['similarity'].includes(field) && ['asc', 'desc'].includes(direction.toLowerCase())) { - order += `"organizationsToMerge".${field} ${direction}, ` + order += `otm.${field} ${direction}, ` } } - order += '"organizationsToMerge"."id", "organizationsToMerge"."toMergeId"' + order += 'otm."organizationId", otm."toMergeId"' } - if (args.countOnly) { - const totalCount = await this.countOrganizationMergeSuggestions( + const hasProjectFilter = Boolean( + args.filter?.projectIds?.length || args.filter?.subprojectIds?.length, + ) + + const hasCountFilters = Boolean( + args.filter?.organizationId || args.filter?.displayName || args.filter?.similarity?.length, + ) + + const getTotalCount = async (): Promise => { + if (!hasCountFilters && !hasProjectFilter) { + const counts = await getSegmentMergeSuggestionCounts( + SequelizeRepository.getQueryExecutor(options), + projectGroupSegment.id, + ) + + return counts?.organizationMergeSuggestionsCount ?? 0 + } + + return this.countOrganizationMergeSuggestions( organizationFilter, similarityFilter, displayNameFilter, @@ -904,24 +935,21 @@ class OrganizationRepository { segmentIds, displayName: args?.filter?.displayName ? `${args.filter.displayName}%` : undefined, organizationId: args?.filter?.organizationId, - mergeActionType: MergeActionType.ORG, - mergeActionStatus: MergeActionState.ERROR, }, options, ) + } - return { count: totalCount } + if (args.countOnly) { + return { count: await getTotalCount() } } const orgs = await options.database.sequelize.query( - `WITH - cte AS ( + ` SELECT - Greatest(Hashtext(Concat(otm."organizationId", otm."toMergeId")), Hashtext(Concat(otm."toMergeId", otm."organizationId"))) as hash, - otm."organizationId" as id, + otm."organizationId" AS id, otm."toMergeId", - o1."createdAt", - otm."similarity", + otm.similarity, o1."displayName" as "primaryDisplayName", o1.logo as "primaryLogo", o2."displayName" as "secondaryDisplayName", @@ -935,12 +963,6 @@ class OrganizationRepository { FROM "organizationToMerge" otm JOIN organizations o1 ON o1.id = otm."organizationId" JOIN organizations o2 ON o2.id = otm."toMergeId" - LEFT JOIN "mergeActions" ma - ON ma.type = :mergeActionType - AND ( - (ma."primaryId" = otm."organizationId" AND ma."secondaryId" = otm."toMergeId") - OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = otm."organizationId") - ) WHERE EXISTS ( SELECT 1 FROM "organizationSegmentsAgg" os1 WHERE os1."organizationId" = otm."organizationId" AND os1."segmentId" IN (:segmentIds) @@ -949,51 +971,22 @@ class OrganizationRepository { SELECT 1 FROM "organizationSegmentsAgg" os2 WHERE os2."organizationId" = otm."toMergeId" AND os2."segmentId" IN (:segmentIds) ) - AND (ma.id IS NULL OR ma.state = :mergeActionStatus) + AND NOT EXISTS ( + SELECT 1 + FROM "mergeActions" ma + WHERE ma.type = :mergeActionType + AND ma.state <> :mergeActionState + AND ( + (ma."primaryId" = otm."organizationId" AND ma."secondaryId" = otm."toMergeId") + OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = otm."organizationId") + ) + ) ${organizationFilter} ${similarityFilter} ${displayNameFilter} - ), - - count_cte AS ( - SELECT COUNT(DISTINCT hash) AS total_count - FROM cte - ), - - final_select AS ( - SELECT DISTINCT ON (hash) - id, - "toMergeId", - "primaryDisplayName", - "primaryLogo", - "secondaryDisplayName", - "secondaryLogo", - "createdAt", - "similarity", - "primarySegmentId", - "secondarySegmentId" - FROM cte - ORDER BY hash, id - ) - - SELECT - "organizationsToMerge".id, - "organizationsToMerge"."toMergeId", - "organizationsToMerge"."primaryDisplayName", - "organizationsToMerge"."primaryLogo", - "organizationsToMerge"."secondaryDisplayName", - "organizationsToMerge"."secondaryLogo", - "organizationsToMerge"."primarySegmentId", - "organizationsToMerge"."secondarySegmentId", - count_cte."total_count", - "organizationsToMerge"."similarity" - FROM - final_select AS "organizationsToMerge", - count_cte - ORDER BY - ${order} - LIMIT :limit OFFSET :offset - `, + ORDER BY ${order} + LIMIT :limit OFFSET :offset + `, { replacements: { segmentIds, @@ -1001,7 +994,7 @@ class OrganizationRepository { offset: args.offset, displayName: args?.filter?.displayName ? `${args.filter.displayName}%` : undefined, mergeActionType: MergeActionType.ORG, - mergeActionStatus: MergeActionState.ERROR, + mergeActionState: MergeActionState.ERROR, organizationId: args?.filter?.organizationId, }, type: QueryTypes.SELECT, @@ -1060,12 +1053,17 @@ class OrganizationRepository { }) }) - return { rows: result, count: orgs[0].total_count, limit: args.limit, offset: args.offset } + return { + rows: result, + count: await getTotalCount(), + limit: args.limit, + offset: args.offset, + } } return { rows: [{ organizations: [], similarity: 0 }], - count: 0, + count: await getTotalCount(), limit: args.limit, offset: args.offset, } diff --git a/backend/src/database/repositories/sequelizeRepository.ts b/backend/src/database/repositories/sequelizeRepository.ts index f8cbe2d122..7c8b09dcf0 100644 --- a/backend/src/database/repositories/sequelizeRepository.ts +++ b/backend/src/database/repositories/sequelizeRepository.ts @@ -103,6 +103,20 @@ export default class SequelizeRepository { return options.currentSegments[0] } + static getStrictlySingleProjectGroupSegment( + options: IRepositoryOptions | IServiceOptions, + ): SegmentData { + const segment = this.getStrictlySingleActiveSegment(options) + + if (segment.parentId != null || segment.grandparentId != null) { + throw new Error400( + `This operation requires a project group segment. Segment ${segment.id} is not a project group.`, + ) + } + + return segment + } + /** * Returns the transaction if it exists on the options. */ diff --git a/backend/src/services/memberService.ts b/backend/src/services/memberService.ts index 1189c5dce4..f069ed6b63 100644 --- a/backend/src/services/memberService.ts +++ b/backend/src/services/memberService.ts @@ -22,7 +22,11 @@ import { queryMembersAdvanced, } from '@crowd/data-access-layer/src/members' import { QueryExecutor, optionsQx } from '@crowd/data-access-layer/src/queryExecutor' -import { fetchManySegments } from '@crowd/data-access-layer/src/segments' +import { + decrementMemberMergeSuggestionCounts, + fetchManySegments, + getMembersCommonProjectGroupSegmentIds, +} from '@crowd/data-access-layer/src/segments' import { LoggerBase } from '@crowd/logging' import { IMemberIdentity, @@ -757,33 +761,32 @@ export default class MemberService extends LoggerBase { */ async addToNoMerge(memberOneId, memberTwoId) { const transaction = await SequelizeRepository.createTransaction(this.options) + const txOptions = { ...this.options, transaction } try { - await MemberRepository.addNoMerge(memberOneId, memberTwoId, { - ...this.options, - transaction, - }) - await MemberRepository.addNoMerge(memberTwoId, memberOneId, { - ...this.options, - transaction, - }) - await MemberRepository.removeToMerge(memberOneId, memberTwoId, { - ...this.options, - transaction, - }) - await MemberRepository.removeToMerge(memberTwoId, memberOneId, { - ...this.options, - transaction, - }) + await MemberRepository.addNoMerge(memberOneId, memberTwoId, txOptions) + await MemberRepository.addNoMerge(memberTwoId, memberOneId, txOptions) + await MemberRepository.removeToMerge(memberOneId, memberTwoId, txOptions) + await MemberRepository.removeToMerge(memberTwoId, memberOneId, txOptions) await SequelizeRepository.commitTransaction(transaction) - - return { status: 200 } } catch (error) { await SequelizeRepository.rollbackTransaction(transaction) throw error } + + const qx = SequelizeRepository.getQueryExecutor(this.options) + const projectGroupSegmentIds = await getMembersCommonProjectGroupSegmentIds(qx, [ + memberOneId, + memberTwoId, + ]) + + // Precomputed per-project-group counts are only refreshed by cron every few hours. + // Decrement here so no-merge from the UI is reflected immediately. + await decrementMemberMergeSuggestionCounts(qx, projectGroupSegmentIds) + + return { status: 200 } } async update( diff --git a/backend/src/services/organizationService.ts b/backend/src/services/organizationService.ts index 1cc0fa5adc..3fe7e38cf7 100644 --- a/backend/src/services/organizationService.ts +++ b/backend/src/services/organizationService.ts @@ -29,7 +29,11 @@ import { findOrgById, upsertOrgIdentities, } from '@crowd/data-access-layer/src/organizations' -import { findLfSegmentByName } from '@crowd/data-access-layer/src/segments' +import { + decrementOrganizationMergeSuggestionCounts, + findLfSegmentByName, + getOrganizationsCommonProjectGroupSegmentIds, +} from '@crowd/data-access-layer/src/segments' import { LoggerBase } from '@crowd/logging' import { WorkflowIdReusePolicy } from '@crowd/temporal' import { @@ -738,6 +742,15 @@ export default class OrganizationService extends LoggerBase { }), ) + const projectGroupSegmentIds = await getOrganizationsCommonProjectGroupSegmentIds(qx, [ + originalId, + toMergeId, + ]) + + // Precomputed per-project-group counts are only refreshed by cron every few hours. + // Decrement here so merges from the UI are reflected immediately. + await decrementOrganizationMergeSuggestionCounts(qx, projectGroupSegmentIds) + await this.options.temporal.workflow.start('finishOrganizationMerging', { taskQueue: 'entity-merging', workflowId: `finishOrganizationMerging/${originalId}/${toMergeId}`, @@ -812,24 +825,13 @@ export default class OrganizationService extends LoggerBase { async addToNoMerge(organizationId: string, noMergeId: string): Promise { const transaction = await SequelizeRepository.createTransaction(this.options) + const txOptions = { ...this.options, transaction } try { - await OrganizationRepository.addNoMerge(organizationId, noMergeId, { - ...this.options, - transaction, - }) - await OrganizationRepository.addNoMerge(noMergeId, organizationId, { - ...this.options, - transaction, - }) - await OrganizationRepository.removeToMerge(organizationId, noMergeId, { - ...this.options, - transaction, - }) - await OrganizationRepository.removeToMerge(noMergeId, organizationId, { - ...this.options, - transaction, - }) + await OrganizationRepository.addNoMerge(organizationId, noMergeId, txOptions) + await OrganizationRepository.addNoMerge(noMergeId, organizationId, txOptions) + await OrganizationRepository.removeToMerge(organizationId, noMergeId, txOptions) + await OrganizationRepository.removeToMerge(noMergeId, organizationId, txOptions) await SequelizeRepository.commitTransaction(transaction) } catch (error) { @@ -837,6 +839,16 @@ export default class OrganizationService extends LoggerBase { throw error } + + const qx = SequelizeRepository.getQueryExecutor(this.options) + const projectGroupSegmentIds = await getOrganizationsCommonProjectGroupSegmentIds(qx, [ + organizationId, + noMergeId, + ]) + + // Precomputed per-project-group counts are only refreshed by cron every few hours. + // Decrement here so no-merge from the UI is reflected immediately. + await decrementOrganizationMergeSuggestionCounts(qx, projectGroupSegmentIds) } async createOrUpdate( diff --git a/frontend/src/modules/contributor/services/contributor.api.service.ts b/frontend/src/modules/contributor/services/contributor.api.service.ts index 983cec9f34..431794ca5b 100644 --- a/frontend/src/modules/contributor/services/contributor.api.service.ts +++ b/frontend/src/modules/contributor/services/contributor.api.service.ts @@ -1,6 +1,6 @@ +import { storeToRefs } from 'pinia'; import authAxios from '@/shared/axios/auth-axios'; import { Contributor } from '@/modules/contributor/types/Contributor'; -import { storeToRefs } from 'pinia'; import { useLfSegmentsStore } from '@/modules/lf/segments/store'; const getSegments = () => { @@ -25,10 +25,14 @@ export class ContributorApiService { } static async mergeSuggestions(limit: number, offset: number, query: any, segments: string[]) { + const resolvedSegments = segments.length + ? segments + : (getSegments() ?? []); + const data = { limit, offset, - segments, + segments: resolvedSegments, detail: true, ...query, }; diff --git a/services/apps/cron_service/src/jobs/refreshSegmentMergeSuggestionCounts.job.ts b/services/apps/cron_service/src/jobs/refreshSegmentMergeSuggestionCounts.job.ts new file mode 100644 index 0000000000..328157227e --- /dev/null +++ b/services/apps/cron_service/src/jobs/refreshSegmentMergeSuggestionCounts.job.ts @@ -0,0 +1,81 @@ +import CronTime from 'cron-time-generator' + +import { + READ_DB_CONFIG, + WRITE_DB_CONFIG, + getDbConnection, +} from '@crowd/data-access-layer/src/database' +import { chunkArray } from '@crowd/data-access-layer/src/old/apps/merge_suggestions_worker/utils' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { + calculateSegmentMemberMergeSuggestionsCount, + calculateSegmentOrganizationMergeSuggestionsCount, + fetchProjectGroupSegmentIds, + getSegmentMergeSuggestionCounts, + upsertSegmentMergeSuggestionCounts, +} from '@crowd/data-access-layer/src/segments' + +import { IJobDefinition } from '../types' + +const job: IJobDefinition = { + name: 'refresh-segment-merge-suggestion-counts', + cronTime: CronTime.every(4).hours(), + timeout: 2 * 60 * 60, + process: async (ctx) => { + const readDb = await getDbConnection(READ_DB_CONFIG(), 3, 0) + const writeDb = await getDbConnection(WRITE_DB_CONFIG(), 1, 0) + + const readQx = pgpQx(readDb) + const writeQx = pgpQx(writeDb) + + const segmentIds = await fetchProjectGroupSegmentIds(readQx) + const BATCH_SIZE = 5 + + const failedSegmentIds: string[] = [] + + ctx.log.info({ segmentCount: segmentIds.length }, 'Refreshing segment merge suggestion counts') + + for (const batch of chunkArray(segmentIds, BATCH_SIZE)) { + await Promise.all( + batch.map(async (segmentId) => { + try { + const [memberMergeSuggestionsCount, organizationMergeSuggestionsCount] = + await Promise.all([ + calculateSegmentMemberMergeSuggestionsCount(readQx, segmentId), + calculateSegmentOrganizationMergeSuggestionsCount(readQx, segmentId), + ]) + + const smsc = await getSegmentMergeSuggestionCounts(readQx, segmentId) + + if ( + smsc?.memberMergeSuggestionsCount !== memberMergeSuggestionsCount || + smsc?.organizationMergeSuggestionsCount !== organizationMergeSuggestionsCount + ) { + await upsertSegmentMergeSuggestionCounts(writeQx, segmentId, { + memberMergeSuggestionsCount, + organizationMergeSuggestionsCount, + }) + } + + ctx.log.debug({ segmentId }, 'Refreshed segment merge suggestion counts') + } catch (err) { + failedSegmentIds.push(segmentId) + ctx.log.error(err, { segmentId }, 'Segment merge suggestion count refresh failed') + } + }), + ) + } + + const failedCount = failedSegmentIds.length + + if (failedCount > 0) { + throw new Error( + `Segment merge suggestion counts refresh failed for ${failedCount}/${segmentIds.length} segments (${failedSegmentIds.join(', ')})`, + ) + } + + ctx.log.info('Segment merge suggestion counts refresh finished') + }, +} + +export default job diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index ee4a6da890..087f079ad6 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -56,6 +56,10 @@ import { } from '@crowd/data-access-layer/src/mergeActions/repo' import { IWorkExperienceData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data' import { addOrgsToSegments } from '@crowd/data-access-layer/src/organizations' +import { + decrementMemberMergeSuggestionCounts, + getMembersCommonProjectGroupSegmentIds, +} from '@crowd/data-access-layer/src/segments' import { Logger, LoggerBase } from '@crowd/logging' import { Client as TemporalClient } from '@crowd/temporal' import { MergeActionState, MergeActionStep, MergeActionType } from '@crowd/types' @@ -450,6 +454,15 @@ export class CommonMemberService extends LoggerBase { }), ) + const projectGroupSegmentIds = await getMembersCommonProjectGroupSegmentIds(this.qx, [ + originalId, + toMergeId, + ]) + + // Precomputed per-project-group counts are only refreshed by cron every few hours. + // Decrement here so merges from the UI are reflected immediately. + await decrementMemberMergeSuggestionCounts(this.qx, projectGroupSegmentIds) + await this.temporal.workflow.start('finishMemberMerging', { taskQueue: 'entity-merging', workflowId: `finishMemberMerging/${originalId}/${toMergeId}`, diff --git a/services/libs/data-access-layer/src/segments/index.ts b/services/libs/data-access-layer/src/segments/index.ts index a3cef8a2e5..19498a39a5 100644 --- a/services/libs/data-access-layer/src/segments/index.ts +++ b/services/libs/data-access-layer/src/segments/index.ts @@ -3,7 +3,14 @@ import cloneDeep from 'lodash.clonedeep' import { DEFAULT_TENANT_ID } from '@crowd/common' import { DEFAULT_ACTIVITY_TYPE_SETTINGS } from '@crowd/integrations' -import { ActivityTypeSettings, PlatformType, SegmentData, SegmentRawData } from '@crowd/types' +import { + ActivityTypeSettings, + MergeActionState, + MergeActionType, + PlatformType, + SegmentData, + SegmentRawData, +} from '@crowd/types' import { QueryExecutor } from '../queryExecutor' @@ -558,3 +565,285 @@ export async function getSubProjectsCount( projectsLast30Days: parseInt(result.projectsLast30Days) || 0, } } + +export async function fetchProjectGroupSegmentIds(qx: QueryExecutor): Promise { + const rows: { id: string }[] = await qx.select( + ` + SELECT id + FROM segments + WHERE "parentId" IS NULL + AND "grandparentId" IS NULL + AND "tenantId" = $(tenantId) + AND status = 'active' + ORDER BY id + `, + { + tenantId: DEFAULT_TENANT_ID, + }, + ) + + return rows.map((row) => row.id) +} + +export async function calculateSegmentMemberMergeSuggestionsCount( + qx: QueryExecutor, + segmentId: string, +): Promise { + const result = await qx.selectOne( + ` + SELECT COUNT(*) AS count + FROM "memberToMerge" mtm + WHERE EXISTS ( + SELECT 1 + FROM "memberSegmentsAgg" ms + WHERE ms."memberId" = mtm."memberId" + AND ms."segmentId" = $(segmentId) + ) + AND EXISTS ( + SELECT 1 + FROM "memberSegmentsAgg" ms2 + WHERE ms2."memberId" = mtm."toMergeId" + AND ms2."segmentId" = $(segmentId) + ) + AND NOT EXISTS ( + SELECT 1 + FROM "mergeActions" ma + WHERE ma.type = $(mergeActionType) + AND ma.state <> $(mergeActionState) + AND ( + (ma."primaryId" = mtm."memberId" AND ma."secondaryId" = mtm."toMergeId") + OR (ma."primaryId" = mtm."toMergeId" AND ma."secondaryId" = mtm."memberId") + ) + ) + `, + { + segmentId, + mergeActionType: MergeActionType.MEMBER, + mergeActionState: MergeActionState.ERROR, + }, + ) + + return Number(result.count) +} + +export async function calculateSegmentOrganizationMergeSuggestionsCount( + qx: QueryExecutor, + segmentId: string, +): Promise { + const result = await qx.selectOne( + ` + SELECT COUNT(*) AS count + FROM "organizationToMerge" otm + WHERE EXISTS ( + SELECT 1 + FROM "organizationSegmentsAgg" os1 + WHERE os1."organizationId" = otm."organizationId" + AND os1."segmentId" = $(segmentId) + ) + AND EXISTS ( + SELECT 1 + FROM "organizationSegmentsAgg" os2 + WHERE os2."organizationId" = otm."toMergeId" + AND os2."segmentId" = $(segmentId) + ) + AND NOT EXISTS ( + SELECT 1 + FROM "mergeActions" ma + WHERE ma.type = $(mergeActionType) + AND ma.state <> $(mergeActionState) + AND ( + (ma."primaryId" = otm."organizationId" AND ma."secondaryId" = otm."toMergeId") + OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = otm."organizationId") + ) + ) + `, + { + segmentId, + mergeActionType: MergeActionType.ORG, + mergeActionState: MergeActionState.ERROR, + }, + ) + + return Number(result.count) +} + +interface SegmentMergeSuggestionCounts { + memberMergeSuggestionsCount: number + organizationMergeSuggestionsCount: number +} + +export async function upsertSegmentMergeSuggestionCounts( + qx: QueryExecutor, + segmentId: string, + data: SegmentMergeSuggestionCounts, +): Promise { + const { memberMergeSuggestionsCount, organizationMergeSuggestionsCount } = data + + await qx.result( + ` + INSERT INTO "segmentMergeSuggestionCounts" ( + "segmentId", + "memberMergeSuggestionsCount", + "organizationMergeSuggestionsCount", + "updatedAt" + ) + VALUES ( + $(segmentId), + $(memberMergeSuggestionsCount), + $(organizationMergeSuggestionsCount), + now() + ) + ON CONFLICT ("segmentId") + DO UPDATE SET + "memberMergeSuggestionsCount" = EXCLUDED."memberMergeSuggestionsCount", + "organizationMergeSuggestionsCount" = EXCLUDED."organizationMergeSuggestionsCount", + "updatedAt" = EXCLUDED."updatedAt" + `, + { + segmentId, + memberMergeSuggestionsCount, + organizationMergeSuggestionsCount, + }, + ) +} + +export async function getSegmentMergeSuggestionCounts( + qx: QueryExecutor, + segmentId: string, +): Promise { + const result = await qx.selectOneOrNone( + ` + SELECT + "memberMergeSuggestionsCount", + "organizationMergeSuggestionsCount" + FROM "segmentMergeSuggestionCounts" + WHERE "segmentId" = $(segmentId) + `, + { + segmentId, + }, + ) + + if (!result) { + return null + } + + return { + memberMergeSuggestionsCount: Number(result.memberMergeSuggestionsCount), + organizationMergeSuggestionsCount: Number(result.organizationMergeSuggestionsCount), + } +} + +export async function getMembersCommonProjectGroupSegmentIds( + qx: QueryExecutor, + memberIds: string[], +): Promise { + if (!memberIds || memberIds.length !== 2) { + throw new Error('Exactly two memberIds are required') + } + + const [memberId, otherMemberId] = memberIds + + const rows: { segmentId: string }[] = await qx.select( + ` + SELECT DISTINCT ms."segmentId" + FROM "memberSegmentsAgg" ms + INNER JOIN "memberSegmentsAgg" ms2 + ON ms."segmentId" = ms2."segmentId" + INNER JOIN segments s + ON s.id = ms."segmentId" + WHERE ms."memberId" = $(memberId) + AND ms2."memberId" = $(otherMemberId) + AND s."parentId" IS NULL + AND s."grandparentId" IS NULL + AND s."tenantId" = $(tenantId) + AND s.status = 'active' + `, + { + memberId, + otherMemberId, + tenantId: DEFAULT_TENANT_ID, + }, + ) + + return rows.map((row) => row.segmentId) +} + +export async function getOrganizationsCommonProjectGroupSegmentIds( + qx: QueryExecutor, + organizationIds: string[], +): Promise { + if (!organizationIds || organizationIds.length !== 2) { + throw new Error('Exactly two organizationIds are required') + } + + const [organizationId, otherOrganizationId] = organizationIds + + const rows: { segmentId: string }[] = await qx.select( + ` + SELECT DISTINCT os1."segmentId" + FROM "organizationSegmentsAgg" os1 + INNER JOIN "organizationSegmentsAgg" os2 + ON os1."segmentId" = os2."segmentId" + INNER JOIN segments s + ON s.id = os1."segmentId" + WHERE os1."organizationId" = $(organizationId) + AND os2."organizationId" = $(otherOrganizationId) + AND s."parentId" IS NULL + AND s."grandparentId" IS NULL + AND s."tenantId" = $(tenantId) + AND s.status = 'active' + `, + { + organizationId, + otherOrganizationId, + tenantId: DEFAULT_TENANT_ID, + }, + ) + + return rows.map((row) => row.segmentId) +} + +export async function decrementMemberMergeSuggestionCounts( + qx: QueryExecutor, + segmentIds: string[], +): Promise { + if (segmentIds.length === 0) { + return + } + + await qx.result( + ` + UPDATE "segmentMergeSuggestionCounts" + SET + "memberMergeSuggestionsCount" = GREATEST(0, "memberMergeSuggestionsCount" - 1), + "updatedAt" = now() + WHERE "segmentId" IN ($(segmentIds:csv)) + `, + { + segmentIds, + }, + ) +} + +export async function decrementOrganizationMergeSuggestionCounts( + qx: QueryExecutor, + segmentIds: string[], +): Promise { + if (segmentIds.length === 0) { + return + } + + await qx.result( + ` + UPDATE "segmentMergeSuggestionCounts" + SET + "organizationMergeSuggestionsCount" = GREATEST(0, "organizationMergeSuggestionsCount" - 1), + "updatedAt" = now() + WHERE "segmentId" IN ($(segmentIds:csv)) + `, + { + segmentIds, + }, + ) +}