Skip to content

Commit

Permalink
Merge pull request #49 from dfds/feature/2471-make-version-part-of-co…
Browse files Browse the repository at this point in the history
…nfluent-cloud-schema-registry-registering-request

added schema version part of request to confluent cloud
  • Loading branch information
kralle333 authored Jan 11, 2024
2 parents 1764a4d + c0b6517 commit 1d7fc60
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 6 deletions.
4 changes: 4 additions & 0 deletions db/migrations/20240110130102_add_schema_version_column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- 2024-01-10 13:01:02 : add schema version

ALTER TABLE schema_process
ADD COLUMN schema_version int NOT NULL DEFAULT 1;
3 changes: 3 additions & 0 deletions src/functional_tests/create_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ func setupCreateSchemaHttpMock(processInput schema.ProcessInput, topicName strin
type schemaPayload struct {
SchemaType string `json:"schemaType"`
Schema string `json:"schema"`
Version int32 `json:"version"`
}
payload, err := json.Marshal(schemaPayload{
SchemaType: "JSON",
Schema: processInput.Schema,
Version: processInput.SchemaVersion,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -100,6 +102,7 @@ func TestCreateSchemaProcess(t *testing.T) {
MessageType: "message-type-for-schema-registry",
Description: "schema-description",
Schema: "test-schema",
SchemaVersion: 1,
}

setupCreateSchemaHttpMock(input, createSchemaVariables.TopicName, testerApp.dbSeedVariables)
Expand Down
4 changes: 3 additions & 1 deletion src/internal/confluent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,12 @@ func (c *Client) delete(ctx context.Context, url string, apiKey models.ApiKey) (
}

type schemaPayload struct {
Version int32 `json:"version"`
SchemaType string `json:"schemaType"`
Schema string `json:"schema"`
}

func (c *Client) RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string) error {
func (c *Client) RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string, version int32) error {
cluster, err := c.clusters.Get(clusterId)

if err != nil {
Expand All @@ -479,6 +480,7 @@ func (c *Client) RegisterSchema(ctx context.Context, clusterId models.ClusterId,
url := fmt.Sprintf("%s/subjects/%s/versions", cluster.SchemaRegistryApiEndpoint, subject)

payload, err := json.Marshal(schemaPayload{
Version: version,
SchemaType: "JSON",
Schema: schema,
})
Expand Down
4 changes: 3 additions & 1 deletion src/internal/models/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ type SchemaProcess struct {
Schema string
CreatedAt time.Time
CompletedAt *time.Time
SchemaVersion int32
}

func NewSchemaProcess(clusterId ClusterId, messageContractId string, topicId string, messageType string, description string, subject string, schema string) *SchemaProcess {
func NewSchemaProcess(clusterId ClusterId, messageContractId string, topicId string, messageType string, description string, subject string, schema string, schemaVersion int32) *SchemaProcess {
return &SchemaProcess{
Id: uuid.NewV4(),
ClusterId: clusterId,
Expand All @@ -30,6 +31,7 @@ func NewSchemaProcess(clusterId ClusterId, messageContractId string, topicId str
Schema: schema,
CreatedAt: time.Now(),
CompletedAt: nil,
SchemaVersion: schemaVersion,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/internal/schema/confluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ type Confluent interface {
CreateServiceAccountRoleBinding(ctx context.Context, serviceAccount models.ServiceAccountId, clusterId models.ClusterId) error
CountSchemaRegistryApiKeys(ctx context.Context, clusterAccess models.ServiceAccountId, clusterId models.ClusterId) (int, error)
DeleteSchemaRegistryApiKey(ctx context.Context, clusterId models.ClusterId, serviceAccountId models.ServiceAccountId) error
RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string) error
RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string, version int32) error
}
2 changes: 1 addition & 1 deletion src/internal/schema/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *StepContext) IsCompleted() bool {
}

func (c *StepContext) RegisterSchema() error {
return c.registry.RegisterSchema(c.ctx, c.state.ClusterId, c.state.Subject, c.state.Schema)
return c.registry.RegisterSchema(c.ctx, c.state.ClusterId, c.state.Subject, c.state.Schema, c.state.SchemaVersion)
}

func (c *StepContext) MarkAsCompleted() {
Expand Down
1 change: 1 addition & 0 deletions src/internal/schema/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (h *handler) Handle(ctx context.Context, msgContext messaging.MessageContex
MessageType: message.MessageType,
Description: message.Description,
Schema: message.Schema,
SchemaVersion: message.SchemaVersion,
}
return h.process.Process(ctx, input)

Expand Down
1 change: 1 addition & 0 deletions src/internal/schema/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type MessageContractRequested struct {
MessageType string `json:"messageType"`
Description string `json:"description"`
Schema string `json:"schema"`
SchemaVersion int32 `json:"schemaVersion"`
}

type SchemaRegistered struct {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/schema/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ProcessInput struct {
MessageType string
Description string
Schema string
SchemaVersion int32
}

func (p *process) Process(ctx context.Context, input ProcessInput) error {
Expand Down Expand Up @@ -123,7 +124,7 @@ func getOrCreateProcessState(repo schemaRepository, input ProcessInput, topic *m
}

subject := fmt.Sprintf("%s-%s", topic.Name, input.MessageType)
schema = models.NewSchemaProcess(topic.ClusterId, input.MessageContractId, input.TopicId, input.MessageType, input.Description, subject, input.Schema)
schema = models.NewSchemaProcess(topic.ClusterId, input.MessageContractId, input.TopicId, input.MessageType, input.Description, subject, input.Schema, input.SchemaVersion)

if err := repo.SaveSchemaProcessState(schema); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion src/internal/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import (
)

type SchemaRegistry interface {
RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string) error
RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string, version int32) error
}

0 comments on commit 1d7fc60

Please sign in to comment.