Skip to content

Commit

Permalink
Functional Kysely adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Papooch committed Jan 29, 2024
1 parent ddf3a9b commit 94da0fd
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,21 @@
},
"peerDependencies": {
"@nestjs-cls/transactional": "workspace:^2.0.0",
"nestjs-cls": "workspace:^4.0.1",
"kysely": "^0.27"
"kysely": "^0.27",
"nestjs-cls": "workspace:^4.0.1"
},
"devDependencies": {
"@nestjs/cli": "^10.0.2",
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@types/better-sqlite3": "^7.6.9",
"@types/jest": "^28.1.2",
"@types/node": "^18.0.0",
"better-sqlite3": "^9.3.0",
"@types/pg": "^8",
"jest": "^28.1.1",
"kysely": "^0.27.2",
"pg": "^8.11.3",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
"rxjs": "^7.5.5",
Expand All @@ -69,4 +71,4 @@
"tsconfig-paths": "^4.0.0",
"typescript": "~4.8.0"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,43 @@
import { TransactionalAdapter } from '@nestjs-cls/transactional';
import { Knex } from 'knex';
import { Kysely, TransactionBuilder } from 'kysely';

export interface KnexTransactionalAdapterOptions {
export interface KyselyTransactionalAdapterOptions {
/**
* The injection token for the Knex instance.
* The injection token for the Kysely instance.
*/
knexInstanceToken: any;
kyselyInstanceToken: any;
}

export class TransactionalAdapterKnex
implements TransactionalAdapter<Knex, Knex, Knex.TransactionConfig>
export interface KyselyTransactionOptions {
isolationLevel?: Parameters<
TransactionBuilder<any>['setIsolationLevel']
>[0];
}

export class TransactionalAdapterKysely<DB = any>
implements TransactionalAdapter<Kysely<DB>, Kysely<DB>, any>
{
connectionToken: any;

constructor(options: KnexTransactionalAdapterOptions) {
this.connectionToken = options.knexInstanceToken;
constructor(options: KyselyTransactionalAdapterOptions) {
this.connectionToken = options.kyselyInstanceToken;
}

optionsFactory = (knexInstance: Knex) => ({
optionsFactory = (kyselyDb: Kysely<DB>) => ({
wrapWithTransaction: async (
options: Knex.TransactionConfig,
options: KyselyTransactionOptions,
fn: (...args: any[]) => Promise<any>,
setClient: (client?: Knex) => void,
setClient: (client?: Kysely<DB>) => void,
) => {
return knexInstance.transaction((trx) => {
const transaction = kyselyDb.transaction();
if (options?.isolationLevel) {
transaction.setIsolationLevel(options.isolationLevel);
}
return transaction.execute(async (trx) => {
setClient(trx);
return fn();
}, options);
});
},
getFallbackInstance: () => knexInstance,
getFallbackInstance: () => kyselyDb,
});
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
services:
db:
kysely-test-db:
image: postgres:15
ports:
- 5432:5432
- 5445:5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres

healthcheck:
test: ['CMD-SHELL', 'pg_isready -U postgres']
interval: 1s
timeout: 1s
retries: 5
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,62 @@ import {
} from '@nestjs-cls/transactional';
import { Inject, Injectable, Module } from '@nestjs/common';
import { Test, TestingModule } from '@nestjs/testing';
import { execSync } from 'child_process';
import { Generated, Kysely, PostgresDialect } from 'kysely';
import { ClsModule } from 'nestjs-cls';
import Knex from 'knex';
import { TransactionalAdapterKnex } from '../src';
import { Pool } from 'pg';
import { TransactionalAdapterKysely } from '../src';

const KNEX = 'KNEX';
const KYSELY = 'KYSELY';

interface Database {
user: User;
}

interface User {
id: Generated<number>;
name: string;
email: string;
}

@Injectable()
class UserRepository {
constructor(
private readonly txHost: TransactionHost<TransactionalAdapterKnex>,
private readonly txHost: TransactionHost<
TransactionalAdapterKysely<Database>
>,
) {}

async getUserById(id: number) {
return this.txHost.tx('user').where({ id }).first();
return this.txHost.tx
.selectFrom('user')
.where('id', '=', id)
.selectAll()
.executeTakeFirst();
}

async createUser(name: string) {
const created = await this.txHost
.tx('user')
.insert({ name: name, email: `${name}@email.com` })
.returning('*');
return created[0] ?? null;
const created = await this.txHost.tx
.insertInto('user')
.values({
name: name,
email: `${name}@email.com`,
})
.returningAll()
.executeTakeFirstOrThrow();
return created;
}
}

@Injectable()
class UserService {
constructor(
private readonly userRepository: UserRepository,
private readonly txHost: TransactionHost<TransactionalAdapterKnex>,
@Inject(KNEX)
private readonly knex: Knex.Knex,
private readonly txHost: TransactionHost<
TransactionalAdapterKysely<Database>
>,
@Inject(KYSELY)
private readonly kysely: Kysely<Database>,
) {}

@Transactional()
Expand All @@ -46,13 +70,16 @@ class UserService {
return { r1, r2 };
}

@Transactional<TransactionalAdapterKnex>({
@Transactional<TransactionalAdapterKysely>({
isolationLevel: 'serializable',
})
async transactionWithDecoratorWithOptions() {
const r1 = await this.userRepository.createUser('James');
const r2 =
(await this.knex('user').where({ id: r1.id }).first()) ?? null;
const r2 = await this.kysely
.selectFrom('user')
.where('id', '=', r1.id)
.selectAll()
.executeTakeFirst();
const r3 = await this.userRepository.getUserById(r1.id);
return { r1, r2, r3 };
}
Expand All @@ -64,9 +91,11 @@ class UserService {
},
async () => {
const r1 = await this.userRepository.createUser('Joe');
const r2 =
(await this.knex('user').where({ id: r1.id }).first()) ??
null;
const r2 = await this.kysely
.selectFrom('user')
.where('id', '=', r1.id)
.selectAll()
.executeTakeFirst();
const r3 = await this.userRepository.getUserById(r1.id);
return { r1, r2, r3 };
},
Expand All @@ -80,23 +109,23 @@ class UserService {
}
}

const knex = Knex({
client: 'sqlite',
connection: {
filename: 'test.db',
},
useNullAsDefault: true,
pool: { min: 1, max: 2 },
const kyselyDb = new Kysely<Database>({
dialect: new PostgresDialect({
pool: new Pool({
connectionString: 'postgres://postgres:postgres@localhost:5445',
max: 2,
}),
}),
});

@Module({
providers: [
{
provide: KNEX,
useValue: knex,
provide: KYSELY,
useValue: kyselyDb,
},
],
exports: [KNEX],
exports: [KYSELY],
})
class KnexModule {}

Expand All @@ -107,8 +136,8 @@ class KnexModule {}
plugins: [
new ClsPluginTransactional({
imports: [KnexModule],
adapter: new TransactionalAdapterKnex({
knexInstanceToken: KNEX,
adapter: new TransactionalAdapterKysely({
kyselyInstanceToken: KYSELY,
}),
}),
],
Expand All @@ -123,12 +152,17 @@ describe('Transactional', () => {
let callingService: UserService;

beforeAll(async () => {
await knex.schema.dropTableIfExists('user');
await knex.schema.createTable('user', (table) => {
table.increments('id');
table.string('name');
table.string('email');
execSync('docker compose -f test/docker-compose.yml up -d --wait', {
stdio: 'inherit',
cwd: process.cwd(),
});
await kyselyDb.schema.dropTable('user').ifExists().execute();
await kyselyDb.schema
.createTable('user')
.addColumn('id', 'serial', (column) => column.primaryKey())
.addColumn('name', 'varchar', (column) => column.notNull())
.addColumn('email', 'varchar', (column) => column.notNull())
.execute();
});

beforeEach(async () => {
Expand All @@ -140,39 +174,55 @@ describe('Transactional', () => {
});

afterAll(async () => {
await knex.destroy();
await kyselyDb.destroy();
execSync('docker compose -f test/docker-compose.yml down', {
stdio: 'inherit',
cwd: process.cwd(),
});
});

describe('TransactionalAdapterKnex', () => {
describe('TransactionalAdapterKysely', () => {
it('should run a transaction with the default options with a decorator', async () => {
const { r1, r2 } = await callingService.transactionWithDecorator();
expect(r1).toEqual(r2);
const users = await knex('user');
const users = await kyselyDb
.selectFrom('user')
.selectAll()
.execute();
expect(users).toEqual(expect.arrayContaining([r1]));
});

it('should run a transaction with the specified options with a decorator', async () => {
const { r1, r2, r3 } =
await callingService.transactionWithDecoratorWithOptions();
expect(r1).toEqual(r3);
expect(r2).toBeNull();
const users = await knex('user');
expect(r2).toBeUndefined();
const users = await kyselyDb
.selectFrom('user')
.selectAll()
.execute();
expect(users).toEqual(expect.arrayContaining([r1]));
});
it('should run a transaction with the specified options with a function wrapper', async () => {
const { r1, r2, r3 } =
await callingService.transactionWithFunctionWrapper();
expect(r1).toEqual(r3);
expect(r2).toBeNull();
const users = await knex('user');
expect(r2).toBeUndefined();
const users = await kyselyDb
.selectFrom('user')
.selectAll()
.execute();
expect(users).toEqual(expect.arrayContaining([r1]));
});

it('should rollback a transaction on error', async () => {
await expect(
callingService.transactionWithDecoratorError(),
).rejects.toThrow(new Error('Rollback'));
const users = await knex('user');
const users = await kyselyDb
.selectFrom('user')
.selectAll()
.execute();
expect(users).toEqual(
expect.not.arrayContaining([{ name: 'Nobody' }]),
);
Expand Down
Loading

0 comments on commit 94da0fd

Please sign in to comment.