Skip to content

Commit

Permalink
feat: improve logic for syncing counts
Browse files Browse the repository at this point in the history
  • Loading branch information
kwasniew committed Jan 10, 2025
1 parent 7f897b1 commit 87c1e09
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}

async deleteAll(): Promise<void> {
this.uniqueConnectionsRecord = {};
}
}
142 changes: 142 additions & 0 deletions src/lib/features/unique-connection/unique-connection-service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { UniqueConnectionService } from './unique-connection-service';
import { FakeUniqueConnectionStore } from './fake-unique-connection-store';
import getLogger from '../../../test/fixtures/no-logger';
import type { IFlagResolver } from '../../types';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';
import { addHours } from 'date-fns';
import { EventEmitter } from 'events';

const alwaysOnFlagResolver = {
isEnabled() {
return true;
},
} as unknown as IFlagResolver;

test('sync first current bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 2 });
});

test('sync first previous bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');

await uniqueConnectionService.sync(() => addHours(new Date(), 1));

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
});

test('sync to existing current bucket from the same service', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
});

test('sync to existing current bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync();

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync();

const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 0, current: 3 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 0, current: 3 });
});

test('sync to existing previous bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync(() => addHours(new Date(), 1));

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync(() => addHours(new Date(), 1));

const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 3, current: 0 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats1).toEqual({ previous: 3, current: 0 });
});
42 changes: 33 additions & 9 deletions src/lib/features/unique-connection/unique-connection-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import HyperLogLog from 'hyperloglog-lite';
import type EventEmitter from 'events';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';

const n = 12;

export class UniqueConnectionService {
private logger: Logger;

Expand All @@ -17,13 +19,13 @@ export class UniqueConnectionService {

private activeHour: number;

private hll = HyperLogLog(12);
private hll = HyperLogLog(n);

constructor(
{
uniqueConnectionStore,
}: Pick<IUnleashStores, 'uniqueConnectionStore'>,
config: IUnleashConfig,
config: Pick<IUnleashConfig, 'getLogger' | 'flagResolver' | 'eventBus'>,
) {
this.uniqueConnectionStore = uniqueConnectionStore;
this.logger = config.getLogger('services/unique-connection-service.ts');
Expand All @@ -36,37 +38,59 @@ export class UniqueConnectionService {
this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this));
}

async count(connectionId: string) {
count(connectionId: string) {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
this.hll.add(HyperLogLog.hash(connectionId));
}

async sync(): Promise<void> {
async getStats() {
const [previous, current] = await Promise.all([
this.uniqueConnectionStore.get('previous'),
this.uniqueConnectionStore.get('current'),
]);
const previousHll = HyperLogLog(n);
if (previous) {
previousHll.merge({ n, buckets: previous.hll });
}
const currentHll = HyperLogLog(n);
if (current) {
currentHll.merge({ n, buckets: current.hll });
}
return { previous: previousHll.count(), current: currentHll.count() };
}

async sync(clock = () => new Date()): Promise<void> {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
const currentHour = new Date().getHours();
const currentHour = clock().getHours();
const currentBucket = await this.uniqueConnectionStore.get('current');
if (this.activeHour !== currentHour && currentBucket) {
if (currentBucket.updatedAt.getHours() < currentHour) {
this.hll.merge({ n: 12, buckets: currentBucket.hll });
this.hll.merge({ n, buckets: currentBucket.hll });
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
} else {
const previousBucket =
await this.uniqueConnectionStore.get('previous');
this.hll.merge({ n: 12, buckets: previousBucket });
if (previousBucket) {
this.hll.merge({ n, buckets: previousBucket.hll });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
}
this.activeHour = currentHour;

this.hll = HyperLogLog(12);
this.hll = HyperLogLog(n);
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'current',
});
} else {
if (currentBucket) {
this.hll.merge({ n: 12, buckets: currentBucket });
this.hll.merge({ n, buckets: currentBucket.hll });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ export type TimedUniqueConnections = UniqueConnections & {
export interface IUniqueConnectionStore {
insert(uniqueConnections: UniqueConnections): Promise<void>;
get(id: 'current' | 'previous'): Promise<TimedUniqueConnections | null>;
deleteAll(): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import dbInit, { type ITestDb } from '../../../test/e2e/helpers/database-init';
import getLogger from '../../../test/fixtures/no-logger';
import type {
IUniqueConnectionStore,
IUnleashStores,
} from '../../../lib/types';
import HyperLogLog from 'hyperloglog-lite';

let stores: IUnleashStores;
let db: ITestDb;
let uniqueConnectionStore: IUniqueConnectionStore;

beforeAll(async () => {
db = await dbInit('unique_connections_store', getLogger);
stores = db.stores;
uniqueConnectionStore = stores.uniqueConnectionStore;
});

afterAll(async () => {
await db.destroy();
});

beforeEach(async () => {
await uniqueConnectionStore.deleteAll();
});

test('should store empty HyperLogLog buffer', async () => {
const hll = HyperLogLog(12);
await uniqueConnectionStore.insert({
id: 'current',
hll: hll.output().buckets,
});

const fetchedHll = await uniqueConnectionStore.get('current');
hll.merge({ n: 12, buckets: fetchedHll!.hll });
expect(hll.count()).toBe(0);
});

test('should store non empty HyperLogLog buffer', async () => {
const hll = HyperLogLog(12);
hll.add(HyperLogLog.hash('connection-1'));
hll.add(HyperLogLog.hash('connection-2'));
await uniqueConnectionStore.insert({
id: 'current',
hll: hll.output().buckets,
});

const fetchedHll = await uniqueConnectionStore.get('current');
const emptyHll = HyperLogLog(12);
emptyHll.merge({ n: 12, buckets: fetchedHll!.hll });
expect(hll.count()).toBe(2);
});

test('should indicate when no entry', async () => {
const fetchedHll = await uniqueConnectionStore.get('current');

expect(fetchedHll).toBeNull();
});
4 changes: 4 additions & 0 deletions src/lib/features/unique-connection/unique-connection-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ export class UniqueConnectionStore implements IUniqueConnectionStore {
? { id: row.id, hll: row.hll, updatedAt: row.updated_at }
: null;
}

async deleteAll(): Promise<void> {
await this.db('unique_connections').delete();
}
}

0 comments on commit 87c1e09

Please sign in to comment.