diff --git a/src/lib/features/unique-connection/fake-unique-connection-store.ts b/src/lib/features/unique-connection/fake-unique-connection-store.ts index babe3311dfd8..cad87bc22710 100644 --- a/src/lib/features/unique-connection/fake-unique-connection-store.ts +++ b/src/lib/features/unique-connection/fake-unique-connection-store.ts @@ -20,4 +20,8 @@ export class FakeUniqueConnectionStore implements IUniqueConnectionStore { ): Promise<(UniqueConnections & { updatedAt: Date }) | null> { return this.uniqueConnectionsRecord[id] || null; } + + async deleteAll(): Promise { + this.uniqueConnectionsRecord = {}; + } } diff --git a/src/lib/features/unique-connection/unique-connection-service.test.ts b/src/lib/features/unique-connection/unique-connection-service.test.ts new file mode 100644 index 000000000000..603fc9b76da8 --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-service.test.ts @@ -0,0 +1,142 @@ +import { UniqueConnectionService } from './unique-connection-service'; +import { FakeUniqueConnectionStore } from './fake-unique-connection-store'; +import getLogger from '../../../test/fixtures/no-logger'; +import type { IFlagResolver } from '../../types'; +import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; +import { addHours } from 'date-fns'; +import { EventEmitter } from 'events'; + +const alwaysOnFlagResolver = { + isEnabled() { + return true; + }, +} as unknown as IFlagResolver; + +test('sync first current bucket', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + uniqueConnectionService.listen(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + + await uniqueConnectionService.sync(); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 0, current: 2 }); +}); + +test('sync first previous bucket', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + uniqueConnectionService.listen(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + + await uniqueConnectionService.sync(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3'); + + await uniqueConnectionService.sync(() => addHours(new Date(), 1)); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 3, current: 0 }); +}); + +test('sync to existing current bucket from the same service', async () => { + const eventBus = new EventEmitter(); + const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + uniqueConnectionService.listen(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2'); + + await uniqueConnectionService.sync(); + + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1'); + eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3'); + + const stats = await uniqueConnectionService.getStats(); + expect(stats).toEqual({ previous: 0, current: 3 }); +}); + +test('sync to existing current bucket from another service', async () => { + const eventBus = new EventEmitter(); + const config = { + flagResolver: alwaysOnFlagResolver, + getLogger, + eventBus: eventBus, + }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService1 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + const uniqueConnectionService2 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + + uniqueConnectionService1.count('connection1'); + uniqueConnectionService1.count('connection2'); + await uniqueConnectionService1.sync(); + + uniqueConnectionService2.count('connection1'); + uniqueConnectionService2.count('connection3'); + await uniqueConnectionService2.sync(); + + const stats1 = await uniqueConnectionService1.getStats(); + expect(stats1).toEqual({ previous: 0, current: 3 }); + const stats2 = await uniqueConnectionService2.getStats(); + expect(stats2).toEqual({ previous: 0, current: 3 }); +}); + +test('sync to existing previous bucket from another service', async () => { + const eventBus = new EventEmitter(); + const config = { + flagResolver: alwaysOnFlagResolver, + getLogger, + eventBus: eventBus, + }; + const uniqueConnectionStore = new FakeUniqueConnectionStore(); + const uniqueConnectionService1 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + const uniqueConnectionService2 = new UniqueConnectionService( + { uniqueConnectionStore }, + config, + ); + + uniqueConnectionService1.count('connection1'); + uniqueConnectionService1.count('connection2'); + await uniqueConnectionService1.sync(() => addHours(new Date(), 1)); + + uniqueConnectionService2.count('connection1'); + uniqueConnectionService2.count('connection3'); + await uniqueConnectionService2.sync(() => addHours(new Date(), 1)); + + const stats1 = await uniqueConnectionService1.getStats(); + expect(stats1).toEqual({ previous: 3, current: 0 }); + const stats2 = await uniqueConnectionService2.getStats(); + expect(stats1).toEqual({ previous: 3, current: 0 }); +}); diff --git a/src/lib/features/unique-connection/unique-connection-service.ts b/src/lib/features/unique-connection/unique-connection-service.ts index 38636ec7d0e5..cd1822e2c54f 100644 --- a/src/lib/features/unique-connection/unique-connection-service.ts +++ b/src/lib/features/unique-connection/unique-connection-service.ts @@ -6,6 +6,8 @@ import HyperLogLog from 'hyperloglog-lite'; import type EventEmitter from 'events'; import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events'; +const n = 12; + export class UniqueConnectionService { private logger: Logger; @@ -17,13 +19,13 @@ export class UniqueConnectionService { private activeHour: number; - private hll = HyperLogLog(12); + private hll = HyperLogLog(n); constructor( { uniqueConnectionStore, }: Pick, - config: IUnleashConfig, + config: Pick, ) { this.uniqueConnectionStore = uniqueConnectionStore; this.logger = config.getLogger('services/unique-connection-service.ts'); @@ -36,18 +38,34 @@ export class UniqueConnectionService { this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this)); } - async count(connectionId: string) { + count(connectionId: string) { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; this.hll.add(HyperLogLog.hash(connectionId)); } - async sync(): Promise { + async getStats() { + const [previous, current] = await Promise.all([ + this.uniqueConnectionStore.get('previous'), + this.uniqueConnectionStore.get('current'), + ]); + const previousHll = HyperLogLog(n); + if (previous) { + previousHll.merge({ n, buckets: previous.hll }); + } + const currentHll = HyperLogLog(n); + if (current) { + currentHll.merge({ n, buckets: current.hll }); + } + return { previous: previousHll.count(), current: currentHll.count() }; + } + + async sync(clock = () => new Date()): Promise { if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return; - const currentHour = new Date().getHours(); + const currentHour = clock().getHours(); const currentBucket = await this.uniqueConnectionStore.get('current'); if (this.activeHour !== currentHour && currentBucket) { if (currentBucket.updatedAt.getHours() < currentHour) { - this.hll.merge({ n: 12, buckets: currentBucket.hll }); + this.hll.merge({ n, buckets: currentBucket.hll }); await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, id: 'previous', @@ -55,7 +73,9 @@ export class UniqueConnectionService { } else { const previousBucket = await this.uniqueConnectionStore.get('previous'); - this.hll.merge({ n: 12, buckets: previousBucket }); + if (previousBucket) { + this.hll.merge({ n, buckets: previousBucket.hll }); + } await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, id: 'previous', @@ -63,10 +83,14 @@ export class UniqueConnectionService { } this.activeHour = currentHour; - this.hll = HyperLogLog(12); + this.hll = HyperLogLog(n); + await this.uniqueConnectionStore.insert({ + hll: this.hll.output().buckets, + id: 'current', + }); } else { if (currentBucket) { - this.hll.merge({ n: 12, buckets: currentBucket }); + this.hll.merge({ n, buckets: currentBucket.hll }); } await this.uniqueConnectionStore.insert({ hll: this.hll.output().buckets, diff --git a/src/lib/features/unique-connection/unique-connection-store-type.ts b/src/lib/features/unique-connection/unique-connection-store-type.ts index 0fb6f0d5c459..79ee2d55e273 100644 --- a/src/lib/features/unique-connection/unique-connection-store-type.ts +++ b/src/lib/features/unique-connection/unique-connection-store-type.ts @@ -11,4 +11,5 @@ export type TimedUniqueConnections = UniqueConnections & { export interface IUniqueConnectionStore { insert(uniqueConnections: UniqueConnections): Promise; get(id: 'current' | 'previous'): Promise; + deleteAll(): Promise; } diff --git a/src/lib/features/unique-connection/unique-connection-store.e2e.test.ts b/src/lib/features/unique-connection/unique-connection-store.e2e.test.ts new file mode 100644 index 000000000000..29fca9e6914e --- /dev/null +++ b/src/lib/features/unique-connection/unique-connection-store.e2e.test.ts @@ -0,0 +1,58 @@ +import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init'; +import getLogger from '../../../test/fixtures/no-logger'; +import type { + IUniqueConnectionStore, + IUnleashStores, +} from '../../../lib/types'; +import HyperLogLog from 'hyperloglog-lite'; + +let stores: IUnleashStores; +let db: ITestDb; +let uniqueConnectionStore: IUniqueConnectionStore; + +beforeAll(async () => { + db = await dbInit('unique_connections_store', getLogger); + stores = db.stores; + uniqueConnectionStore = stores.uniqueConnectionStore; +}); + +afterAll(async () => { + await db.destroy(); +}); + +beforeEach(async () => { + await uniqueConnectionStore.deleteAll(); +}); + +test('should store empty HyperLogLog buffer', async () => { + const hll = HyperLogLog(12); + await uniqueConnectionStore.insert({ + id: 'current', + hll: hll.output().buckets, + }); + + const fetchedHll = await uniqueConnectionStore.get('current'); + hll.merge({ n: 12, buckets: fetchedHll!.hll }); + expect(hll.count()).toBe(0); +}); + +test('should store non empty HyperLogLog buffer', async () => { + const hll = HyperLogLog(12); + hll.add(HyperLogLog.hash('connection-1')); + hll.add(HyperLogLog.hash('connection-2')); + await uniqueConnectionStore.insert({ + id: 'current', + hll: hll.output().buckets, + }); + + const fetchedHll = await uniqueConnectionStore.get('current'); + const emptyHll = HyperLogLog(12); + emptyHll.merge({ n: 12, buckets: fetchedHll!.hll }); + expect(hll.count()).toBe(2); +}); + +test('should indicate when no entry', async () => { + const fetchedHll = await uniqueConnectionStore.get('current'); + + expect(fetchedHll).toBeNull(); +}); diff --git a/src/lib/features/unique-connection/unique-connection-store.ts b/src/lib/features/unique-connection/unique-connection-store.ts index dfd502584344..aa3eab7c5c2d 100644 --- a/src/lib/features/unique-connection/unique-connection-store.ts +++ b/src/lib/features/unique-connection/unique-connection-store.ts @@ -27,4 +27,8 @@ export class UniqueConnectionStore implements IUniqueConnectionStore { ? { id: row.id, hll: row.hll, updatedAt: row.updated_at } : null; } + + async deleteAll(): Promise { + await this.db('unique_connections').delete(); + } }