Skip to content

Commit

Permalink
fix(ISchemaRegistryHttpClient): Make client return object that better…
Browse files Browse the repository at this point in the history
… reflects api return type
  • Loading branch information
axlj45 committed Dec 14, 2018
1 parent 0022123 commit 8a632b5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 17 deletions.
18 changes: 8 additions & 10 deletions src/AvroSchemaRegistryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,22 @@ export class AvroSchemaRegistryClient implements ISchemaRegistryClient {
) { }

public async encodeBySubject<Tin>(obj: Tin, subject: string): Promise<Buffer> {
const registrySchema = await this.client.getLatestSchemaBySubject(subject);
const schema = JSON.stringify(registrySchema);
const buffer = this.serializer.serialize(obj, schema);
return buffer;
const schemaInfo = await this.client.getLatestSchemaInfoBySubject(subject);
const buffer = this.serializer.serialize(obj, schemaInfo.schema);
const avroResult = this.encoder.encodeAvroBuffer({ buffer, schemaRegistryId: schemaInfo.id, versionByte: 0 });
return avroResult.buffer || Buffer.alloc(0);
}

public async encodeById<Tin>(obj: Tin, id: number): Promise<Buffer> {
const registrySchema = await this.client.getSchemaById(id);
const schema = JSON.stringify(registrySchema);
const buffer = this.serializer.serialize(obj, schema);
const schemaRequest = await this.client.getSchemaById(id);
const buffer = this.serializer.serialize(obj, JSON.parse(schemaRequest.schema));
return buffer;
}

public async decode<Tout>(buffer: Buffer): Promise<Tout> {
const encoding = this.encoder.decodeAvroBuffer(buffer);
const registrySchema = await this.client.getSchemaById(encoding.schemaRegistryId);
const schema = JSON.stringify(registrySchema);
const obj = this.serializer.deserialize<Tout>(buffer, schema);
const schemaRequest = await this.client.getSchemaById(encoding.schemaRegistryId);
const obj = this.serializer.deserialize<Tout>(encoding.buffer, JSON.parse(schemaRequest.schema));
return obj;
}
}
3 changes: 2 additions & 1 deletion src/schema-registry-http-client/ISchemaRegistryHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import { CompatibilityType, IConfigurationResult, ISchemaResult } from './models
import { ISchemaRequest } from './models/ISchemaRequest';

export interface ISchemaRegistryHttpClient {
getSchemaById(id: number): Promise<string>
getSchemaById(id: number): Promise<ISchemaRequest>
getSubjects(): Promise<string[]>;
getSubjectVersions(subjectName: string): Promise<number[]>; // List of subject versions
deleteSubject(subjectName: string): Promise<number[]>; // List of deleted subject versions
deleteSchemaBySubjectVersion(subjectName: string, versionIdentifier: number): Promise<number> // Version ID of deleted schema
getSchemaInfoBySubjectVersion(subjectName: string, versionIdentifier: number): Promise<ISchemaResult>;
getLatestSchemaInfoBySubject(subjectName: string): Promise<ISchemaResult>;
getSchemaBySubjectVersion(subjectName: string, versionIdentifier: number | string): Promise<object>;
getLatestSchemaBySubject(subjectName: string): Promise<object>;
createSchema(subjectName: string, schema: ISchemaRequest): Promise<ISchemaResult>;
Expand Down
12 changes: 8 additions & 4 deletions src/schema-registry-http-client/SchemaRegistryHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ISchemaRegistryHttpClient } from './ISchemaRegistryHttpClient';
import { CompatibilityType, IConfigurationResult, ISchemaRegistryError, ISchemaResult } from './models';
import { ISchemaRequest } from './models/ISchemaRequest';

export class SchemaRegistryClient implements ISchemaRegistryHttpClient {
export class SchemaRegistryHttpClient implements ISchemaRegistryHttpClient {
constructor(private httpClient: IHttpClient) {
httpClient.addHeader("Accept", "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json");
}
Expand All @@ -25,11 +25,11 @@ export class SchemaRegistryClient implements ISchemaRegistryHttpClient {
return this.isCompatible(targetSubjectName, 'latest', sourceSchema);
}

public async getSchemaById(id: number): Promise<string> {
public async getSchemaById(id: number): Promise<ISchemaRequest> {
const resource = `/schemas/ids/${id}`

try {
const result = await this.httpClient.get<string>(resource);
const result = await this.httpClient.get<ISchemaRequest>(resource);
return result.data;
}
catch (ex) {
Expand Down Expand Up @@ -84,7 +84,7 @@ export class SchemaRegistryClient implements ISchemaRegistryHttpClient {
}
}

public async getSchemaInfoBySubjectVersion(subjectName: string, versionIdentifier: number): Promise<ISchemaResult> {
public async getSchemaInfoBySubjectVersion(subjectName: string, versionIdentifier: number | string): Promise<ISchemaResult> {
const resource = `/subjects/${subjectName}/versions/${versionIdentifier}`;

try {
Expand All @@ -96,6 +96,10 @@ export class SchemaRegistryClient implements ISchemaRegistryHttpClient {
}
}

public async getLatestSchemaInfoBySubject(subjectName: string): Promise<ISchemaResult> {
return this.getSchemaInfoBySubjectVersion(subjectName, 'latest');
}

public async getSchemaBySubjectVersion(subjectName: string, versionIdentifier: number | string): Promise<object> {
const resource = `/subjects/${subjectName}/versions/${versionIdentifier}/schema`;

Expand Down
2 changes: 1 addition & 1 deletion src/serialization/AvroSerializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ISerializer } from './ISerializer';

export class AvroSerializer implements ISerializer {
public serialize<Tin>(obj: Tin, schema: string): Buffer {
const avro = Type.forSchema(schema);
const avro = Type.forSchema(JSON.parse(schema));
return avro.toBuffer(obj);
}

Expand Down
2 changes: 1 addition & 1 deletion src/serialization/ISchemaRegistryEncoding.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export default interface ISchemaRegistryEncoding {
versionByte: number;
schemaRegistryId: number;
buffer?: Buffer;
buffer: Buffer;
}

0 comments on commit 8a632b5

Please sign in to comment.