Skip to content

Commit

Permalink
feat:KafkaSource supports KafkaVersion modification (numaproj#2191)
Browse files Browse the repository at this point in the history
Signed-off-by: majiantao <772369024@qq.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
qianbeibuzui and vigith authored Oct 28, 2024
1 parent 5b77782 commit eca3b0c
Show file tree
Hide file tree
Showing 14 changed files with 601 additions and 504 deletions.
3 changes: 3 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20899,6 +20899,9 @@
"consumerGroup": {
"type": "string"
},
"kafkaVersion": {
"type": "string"
},
"sasl": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SASL",
"description": "SASL user to configure SASL connection for kafka broker SASL.enable=true default for SASL."
Expand Down
3 changes: 3 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -20898,6 +20898,9 @@
"consumerGroup": {
"type": "string"
},
"kafkaVersion": {
"type": "string"
},
"sasl": {
"description": "SASL user to configure SASL connection for kafka broker SASL.enable=true default for SASL.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.SASL"
Expand Down
2 changes: 2 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_monovertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4651,6 +4651,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down
2 changes: 2 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9331,6 +9331,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down
2 changes: 2 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4119,6 +4119,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down
6 changes: 6 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7837,6 +7837,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down Expand Up @@ -19066,6 +19068,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down Expand Up @@ -25474,6 +25478,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down
6 changes: 6 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7837,6 +7837,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down Expand Up @@ -19066,6 +19068,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down Expand Up @@ -25474,6 +25478,8 @@ spec:
type: string
consumerGroup:
type: string
kafkaVersion:
type: string
sasl:
properties:
gssapi:
Expand Down
13 changes: 13 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5439,6 +5439,19 @@ default for SASL.

</tr>

<tr>

<td>

<code>kafkaVersion</code></br> <em> string </em>
</td>

<td>

</td>

</tr>

</tbody>

</table>
Expand Down
1,047 changes: 544 additions & 503 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ type KafkaSource struct {
// SASL user to configure SASL connection for kafka broker
// SASL.enable=true default for SASL.
// +optional
SASL *SASL `json:"sasl" protobuf:"bytes,6,opt,name=sasl"`
SASL *SASL `json:"sasl" protobuf:"bytes,6,opt,name=sasl"`
KafkaVersion string `json:"kafkaVersion,omitempty" protobuf:"bytes,7,opt,name=kafkaVersion"`
}
6 changes: 6 additions & 0 deletions pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func NewKafkaSource(ctx context.Context, vertexInstance *dfv1.VertexInstance, ha
config.Net.SASL = *sasl
}
}
if v := source.KafkaVersion; v != "" {
if version, err := sarama.ParseKafkaVersion(source.KafkaVersion); err != nil {
return nil, err
} else {
config.Version = version
}
}

sarama.Logger = zap.NewStdLog(ks.logger.Desugar())

Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-models/src/models/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct KafkaSource {
pub config: Option<String>,
#[serde(rename = "consumerGroup", skip_serializing_if = "Option::is_none")]
pub consumer_group: Option<String>,
#[serde(rename = "kafkaVersion", skip_serializing_if = "Option::is_none")]
pub kafka_version: Option<String>,
#[serde(rename = "sasl", skip_serializing_if = "Option::is_none")]
pub sasl: Option<Box<crate::models::Sasl>>,
#[serde(rename = "tls", skip_serializing_if = "Option::is_none")]
Expand All @@ -38,6 +40,7 @@ impl KafkaSource {
brokers: None,
config: None,
consumer_group: None,
kafka_version: None,
sasl: None,
tls: None,
topic,
Expand Down

0 comments on commit eca3b0c

Please sign in to comment.