From baefb72e8590d8379bd4581d627ea12b72952c25 Mon Sep 17 00:00:00 2001 From: ABHAY PANDEY Date: Sun, 24 May 2026 08:34:54 +0530 Subject: [PATCH] refactor: simplify EventRepository query/purge complexity Signed-off-by: ABHAY PANDEY --- .changeset/vast-signs-melt.md | 5 + src/repositories/event-repository.ts | 327 +++++++++--------- .../repositories/event-repository.spec.ts | 47 +++ 3 files changed, 223 insertions(+), 156 deletions(-) create mode 100644 .changeset/vast-signs-melt.md diff --git a/.changeset/vast-signs-melt.md b/.changeset/vast-signs-melt.md new file mode 100644 index 00000000..4595cbae --- /dev/null +++ b/.changeset/vast-signs-melt.md @@ -0,0 +1,5 @@ +--- +"nostream": patch +--- + +Refactor EventRepository query construction to reduce method complexity. diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 6c53fd0a..597e574c 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -1,31 +1,16 @@ import { - __, always, applySpec, - complement, - cond, - equals, - evolve, - filter, - forEach, - forEachObjIndexed, - groupBy, ifElse, - invoker, is, - isEmpty, isNil, map, - modulo, - nth, omit, path, paths, pipe, prop, propSatisfies, - T, - toPairs, } from 'ramda' import { @@ -43,20 +28,15 @@ import { createLogger } from '../factories/logger-factory' import { isGenericTagQuery, isGeohashPrefixCriterion, stripGeohashPrefixWildcard } from '../utils/filter' import { SubscriptionFilter } from '../@types/subscription' -const even = pipe(modulo(__, 2), equals(0)) - -const groupByLengthSpec = groupBy( - pipe( - prop('length'), - cond([ - [equals(64), always('exact')], - [even, always('even')], - [T, always('odd')], - ]), - ), -) - const logger = createLogger('event-repository') +const RETENTION_BATCH_SIZE = 1000 +const SECONDS_PER_DAY = 86400 + +type HexCriterionGroups = { + exact: string[] + even: string[] + odd: string[] +} export class EventRepository implements IEventRepository { public constructor( @@ -135,37 +115,7 @@ export class EventRepository implements IEventRepository { } private applyFilterConditions(builder: any, currentFilter: SubscriptionFilter): boolean { - forEachObjIndexed((tableFields: string[], filterName: string | number) => { - builder.andWhere((bd) => { - cond([ - [isEmpty, () => void bd.whereRaw('1 = 0')], - [ - complement(isNil), - pipe( - groupByLengthSpec, - evolve({ - exact: (pubkeys: string[]) => - tableFields.forEach((tableField) => bd.orWhereIn(tableField, pubkeys.map(toBuffer))), - even: forEach((prefix: string) => - tableFields.forEach((tableField) => - bd.orWhereRaw(`substring("${tableField}" from 1 for ?) = ?`, [prefix.length >> 1, toBuffer(prefix)]), - ), - ), - odd: forEach((prefix: string) => - tableFields.forEach((tableField) => - bd.orWhereRaw(`substring("${tableField}" from 1 for ?) BETWEEN ? AND ?`, [ - (prefix.length >> 1) + 1, - `\\x${prefix}0`, - `\\x${prefix}f`, - ]), - ), - ), - } as any), - ), - ], - ])(currentFilter[filterName] as string[]) - }) - })({ authors: ['event_pubkey'], ids: ['event_id'] }) + this.applyHexFilterConditions(builder, currentFilter) if (Array.isArray(currentFilter.kinds)) { builder.whereIn('event_kind', currentFilter.kinds) @@ -179,40 +129,7 @@ export class EventRepository implements IEventRepository { builder.where('event_created_at', '<=', currentFilter.until) } - const andWhereRaw = invoker(1, 'andWhereRaw') - const orWhereRaw = invoker(2, 'orWhereRaw') - - let isTagQuery = false - pipe( - toPairs, - filter(pipe(nth(0) as () => string, isGenericTagQuery)) as any, - forEach(([filterName, criteria]: [string, string[]]) => { - isTagQuery = true - builder.andWhere((bd) => { - ifElse( - isEmpty, - () => andWhereRaw('1 = 0', bd), - forEach( - (criterion: string) => { - if (isGeohashPrefixCriterion(filterName, criterion)) { - return void orWhereRaw( - 'event_tags.tag_name = ? AND event_tags.tag_value LIKE ?', - [filterName[1], `${stripGeohashPrefixWildcard(criterion)}%`], - bd, - ) - } - - return void orWhereRaw( - 'event_tags.tag_name = ? AND event_tags.tag_value = ?', - [filterName[1], criterion], - bd, - ) - }, - ), - )(criteria) - }) - }), - )(currentFilter as any) + const isTagQuery = this.applyGenericTagFilterConditions(builder, currentFilter) if (isTagQuery) { builder.leftJoin('event_tags', 'events.event_id', 'event_tags.event_id') @@ -221,6 +138,99 @@ export class EventRepository implements IEventRepository { return isTagQuery } + private applyHexFilterConditions(builder: any, currentFilter: SubscriptionFilter): void { + builder.andWhere((bd) => { + this.applyHexCriteria(bd, ['event_pubkey'], currentFilter.authors) + }) + + builder.andWhere((bd) => { + this.applyHexCriteria(bd, ['event_id'], currentFilter.ids) + }) + } + + private applyHexCriteria(builder: any, tableFields: string[], criteria?: string[]): void { + if (typeof criteria === 'undefined') { + return + } + + if (!criteria.length) { + builder.whereRaw('1 = 0') + return + } + + const groups = this.groupHexCriteria(criteria) + + tableFields.forEach((tableField) => { + if (groups.exact.length) { + builder.orWhereIn(tableField, groups.exact.map(toBuffer)) + } + + groups.even.forEach((prefix) => { + builder.orWhereRaw(`substring("${tableField}" from 1 for ?) = ?`, [prefix.length >> 1, toBuffer(prefix)]) + }) + + groups.odd.forEach((prefix) => { + builder.orWhereRaw(`substring("${tableField}" from 1 for ?) BETWEEN ? AND ?`, [ + (prefix.length >> 1) + 1, + `\\x${prefix}0`, + `\\x${prefix}f`, + ]) + }) + }) + } + + private groupHexCriteria(criteria: string[]): HexCriterionGroups { + return criteria.reduce( + (groups, criterion) => { + if (criterion.length === 64) { + groups.exact.push(criterion) + } else if (criterion.length % 2 === 0) { + groups.even.push(criterion) + } else { + groups.odd.push(criterion) + } + + return groups + }, + { + exact: [], + even: [], + odd: [], + }, + ) + } + + private applyGenericTagFilterConditions(builder: any, currentFilter: SubscriptionFilter): boolean { + const tagFilters = Object.entries(currentFilter).filter(([filterName]) => isGenericTagQuery(filterName)) + + tagFilters.forEach(([filterName, criteria]) => { + this.applyGenericTagCriteria(builder, filterName, criteria as string[]) + }) + + return tagFilters.length > 0 + } + + private applyGenericTagCriteria(builder: any, filterName: string, criteria: string[]): void { + builder.andWhere((bd) => { + if (!criteria.length) { + bd.andWhereRaw('1 = 0') + return + } + + criteria.forEach((criterion) => { + if (isGeohashPrefixCriterion(filterName, criterion)) { + bd.orWhereRaw('event_tags.tag_name = ? AND event_tags.tag_value LIKE ?', [ + filterName[1], + `${stripGeohashPrefixWildcard(criterion)}%`, + ]) + return + } + + bd.orWhereRaw('event_tags.tag_name = ? AND event_tags.tag_value = ?', [filterName[1], criterion]) + }) + }) + } + public async create(event: Event): Promise { return this.insert(event).then(prop('rowCount') as () => number, () => 0) } @@ -403,92 +413,97 @@ export class EventRepository implements IEventRepository { }) } - const retentionLimit = now - maxDays * 86400 - const batchSize = 1000 + const retentionLimit = now - maxDays * SECONDS_PER_DAY logger( 'deleting expired and retained events (retentionLimit: %d, now: %d, batchSize: %d)', retentionLimit, now, - batchSize, + RETENTION_BATCH_SIZE, ) - const kindWhitelist = [ - ...(Array.isArray(options?.kindWhitelist) ? options.kindWhitelist : []), - EventKinds.REQUEST_TO_VANISH, - ].reduce<(number | [number, number])[]>((result, item) => { - const key = Array.isArray(item) ? `range:${item[0]}-${item[1]}` : `kind:${item}` + const candidates = this.buildRetentionCandidateQuery(now, retentionLimit, options) - if ( - !result.some((existing) => { - const existingKey = Array.isArray(existing) ? `range:${existing[0]}-${existing[1]}` : `kind:${existing}` - return existingKey === key - }) - ) { - result.push(item) - } + const query = this.masterDbClient('events') + .whereIn('event_id', candidates) + .del(['deleted_at', 'expires_at', 'event_created_at']) - return result - }, []) + const getPromise = () => query.then((rows: any) => this.mapToPurgeCounts(rows, now, retentionLimit)) - const candidates = this.masterDbClient('events') + return { + then: ( + onfulfilled?: ((value: EventPurgeCounts) => T1 | PromiseLike) | null, + onrejected?: ((reason: any) => T2 | PromiseLike) | null, + ) => getPromise().then(onfulfilled as any, onrejected as any), + catch: (onrejected?: ((reason: any) => T | PromiseLike) | null) => getPromise().catch(onrejected as any), + finally: (onfinally?: (() => void) | null) => getPromise().finally(onfinally as any), + toString: (): string => query.toString(), + } as Promise & { toString(): string } + } + + private buildRetentionCandidateQuery( + now: number, + retentionLimit: number, + options?: EventRetentionOptions, + ): any { + return this.masterDbClient('events') .select('event_id') .where(function () { this.where('expires_at', '<', now).orWhereNotNull('deleted_at').orWhere('event_created_at', '<', retentionLimit) }) .modify((query) => { - query.whereNot((builder) => { - kindWhitelist.forEach((kindOrRange) => { - if (Array.isArray(kindOrRange)) { - builder.orWhereBetween('event_kind', kindOrRange) - } else { - builder.orWhere('event_kind', kindOrRange) - } - }) - }) + this.applyRetentionKindWhitelist(query, options?.kindWhitelist) if (Array.isArray(options?.pubkeyWhitelist) && options.pubkeyWhitelist.length > 0) { query.whereNotIn('event_pubkey', map(toBuffer)(options.pubkeyWhitelist)) } }) - .limit(batchSize) + .limit(RETENTION_BATCH_SIZE) + } - const query = this.masterDbClient('events') - .whereIn('event_id', candidates) - .del(['deleted_at', 'expires_at', 'event_created_at']) + private applyRetentionKindWhitelist(query: any, kindWhitelist?: EventRetentionOptions['kindWhitelist']): void { + const seen = new Set() + const configuredWhitelist = Array.isArray(kindWhitelist) ? kindWhitelist : [] + const dedupedWhitelist = [...configuredWhitelist, EventKinds.REQUEST_TO_VANISH].filter((item) => { + const key = Array.isArray(item) ? `range:${item[0]}-${item[1]}` : `kind:${item}` - const mapToCounts = ( - deletedRows: Pick[], - ): EventPurgeCounts => - deletedRows.reduce( - (counts, row) => { - if (row.deleted_at) { - counts.deleted += 1 - } else if (typeof row.expires_at === 'number' && row.expires_at < now) { - counts.expired += 1 - } else if (row.event_created_at < retentionLimit) { - counts.retained += 1 - } - - return counts - }, - { - deleted: 0, - expired: 0, - retained: 0, - }, - ) + if (seen.has(key)) { + return false + } - const getPromise = () => query.then((rows: any) => mapToCounts(rows)) + seen.add(key) + return true + }) + query.whereNot((builder) => { + dedupedWhitelist.forEach((kindOrRange) => { + if (Array.isArray(kindOrRange)) { + builder.orWhereBetween('event_kind', kindOrRange) + } else { + builder.orWhere('event_kind', kindOrRange) + } + }) + }) + } - return { - then: ( - onfulfilled?: ((value: EventPurgeCounts) => T1 | PromiseLike) | null, - onrejected?: ((reason: any) => T2 | PromiseLike) | null, - ) => getPromise().then(onfulfilled as any, onrejected as any), - catch: (onrejected?: ((reason: any) => T | PromiseLike) | null) => getPromise().catch(onrejected as any), - finally: (onfinally?: (() => void) | null) => getPromise().finally(onfinally as any), - toString: (): string => query.toString(), - } as Promise & { toString(): string } + private mapToPurgeCounts( + deletedRows: Pick[], + now: number, + retentionLimit: number, + ): EventPurgeCounts { + return deletedRows.reduce((counts, row) => { + if (row.deleted_at) { + counts.deleted += 1 + } else if (typeof row.expires_at === 'number' && row.expires_at < now) { + counts.expired += 1 + } else if (row.event_created_at < retentionLimit) { + counts.retained += 1 + } + + return counts + }, { + deleted: 0, + expired: 0, + retained: 0, + }) } } diff --git a/test/unit/repositories/event-repository.spec.ts b/test/unit/repositories/event-repository.spec.ts index f9a90cc8..04793d42 100644 --- a/test/unit/repositories/event-repository.spec.ts +++ b/test/unit/repositories/event-repository.spec.ts @@ -638,6 +638,53 @@ describe('EventRepository', () => { }) }) + describe('deleteExpiredAndRetained', () => { + it('returns zero counts when retention is disabled', async () => { + const result = await repository.deleteExpiredAndRetained({ maxDays: 0 }) + + expect(result).to.deep.equal({ + deleted: 0, + expired: 0, + retained: 0, + }) + }) + + it('builds a purge query with deduplicated kind and pubkey whitelists', () => { + sandbox.useFakeTimers(new Date('2023-11-14T22:13:20.000Z')) + + const query = repository + .deleteExpiredAndRetained({ + maxDays: 7, + kindWhitelist: [1, [10000, 19999], 1], + pubkeyWhitelist: ['001122'], + }) + .toString() + + expect(query).to.equal( + 'delete from "events" where "event_id" in (select "event_id" from "events" where ("expires_at" < 1700000000 or "deleted_at" is not null or "event_created_at" < 1699395200) and not ("event_kind" = 1 or "event_kind" between 10000 and 19999 or "event_kind" = 62) and "event_pubkey" not in (X\'001122\') limit 1000) returning "deleted_at", "expires_at", "event_created_at"', + ) + }) + + it('maps purged rows to deleted, expired, and retained counts', () => { + const result = (repository as any).mapToPurgeCounts( + [ + { deleted_at: new Date(), expires_at: null, event_created_at: 90 }, + { deleted_at: null, expires_at: 95, event_created_at: 90 }, + { deleted_at: null, expires_at: null, event_created_at: 40 }, + { deleted_at: null, expires_at: null, event_created_at: 80 }, + ], + 100, + 50, + ) + + expect(result).to.deep.equal({ + deleted: 1, + expired: 1, + retained: 1, + }) + }) + }) + describe('upsert', () => { it('replaces event based on event_pubkey and event_kind', () => { const event: Event = {