You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
classCustomApp(StreamsApp):
definflate(self) ->list[PipelineComponent]:
return [self, *self.derive_connectors()]
defderive_connectors(self) ->list[PipelineComponent]:
connectors= []
fortopic_namein [...]:
connectors.append(self.to_connector(topic_name))
returnconnectorsdefto_connector(self, topic_name) ->KafkaSinkConnector:
name=f"{topic_name}"returnKafkaSinkConnector(
name=name,
config=self.config,
handlers=self.handlers,
app=KafkaConnectorConfig(
**{
"name": "${pipeline.name}-"+name, # using `self.full_name` results in an error"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", # Cannot be set in `defaults.yaml`
}
),
)
When initializing the connector in the above example, self.full_name == pipeline_name-connector_name, but ${pipeline.name}-connector_name is expected, i.e. self.prefix is already substituted.
It can be circumvented by setting the name in KafkaConnectorConfig to KafkaSinkConnector.model_fields["prefix"].default + name, but it is a workaround.
Example code:
When initializing the connector in the above example,
self.full_name == pipeline_name-connector_name
, but${pipeline.name}-connector_name
is expected, i.e.self.prefix
is already substituted.It can be circumvented by setting the name in
KafkaConnectorConfig
toKafkaSinkConnector.model_fields["prefix"].default + name
, but it is a workaround.Closely related to #412
The text was updated successfully, but these errors were encountered: