Skip to content

Commit

Permalink
Upgrading Snowflake connector (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 authored Oct 10, 2024
1 parent 2832fdb commit 3fe32f7
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 26 deletions.
24 changes: 12 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,46 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<test.additional.args />
<test.additional.args/>
<testReuseFork>true</testReuseFork>
<testForkCount>4</testForkCount>
<testRetryCount>1</testRetryCount>
<skipSourceReleaseAssembly>false</skipSourceReleaseAssembly>
<shadePluginPhase>package</shadePluginPhase>
<narPluginPhase>package</narPluginPhase>
<pulsar.version>2.10.4.3</pulsar.version>
<snowflakeconnector.version>1.9.4</snowflakeconnector.version>
<pulsar.version>2.10.6.1</pulsar.version>
<snowflakeconnector.version>2.4.1</snowflakeconnector.version>
<avro.version>1.11.4</avro.version>
<mockito.version>3.8.0</mockito.version>
<mockito.version>3.12.4</mockito.version>
<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
<exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>
<license-maven-plugin.version>4.0.rc2</license-maven-plugin.version>
<directory-maven-plugin.version>0.3.1</directory-maven-plugin.version>
<directory-maven-plugin.version>1.0</directory-maven-plugin.version>
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
<!-- surefire.version is defined in apache parent pom -->
<!-- it is used for surefire, failsafe and surefire-report plugins -->
<!-- do not upgrade surefire.version to 3.0.0-M5 since it runs slowly and breaks tests. -->
<surefire.version>3.0.0-M3</surefire.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version>
<maven-shade-plugin>3.2.4</maven-shade-plugin>
<maven-shade-plugin>3.4.0</maven-shade-plugin>
<maven-antrun-plugin.version>3.0.0</maven-antrun-plugin.version>
<properties-maven-plugin.version>1.0.0</properties-maven-plugin.version>
<nifi-nar-maven-plugin.version>1.2.0</nifi-nar-maven-plugin.version>
<nifi-nar-maven-plugin.version>1.5.0</nifi-nar-maven-plugin.version>
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
<git-commit-id-plugin.version>4.0.2</git-commit-id-plugin.version>
<wagon-ssh-external.version>3.4.3</wagon-ssh-external.version>
<os-maven-plugin.version>1.4.1.Final</os-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.6</jacoco-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.7</jacoco-maven-plugin.version>
<spotbugs-maven-plugin.version>4.2.2</spotbugs-maven-plugin.version>
<spotbugs.version>4.2.2</spotbugs.version>
<errorprone.version>2.5.1</errorprone.version>
<errorprone.javac.version>9+181-r4173-1</errorprone.javac.version>
<errorprone-slf4j.version>0.1.4</errorprone-slf4j.version>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>6.1.6</dependency-check-maven.version>
<dependency-check-maven.version>8.0.1</dependency-check-maven.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -128,7 +128,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<version>3.6.0</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down Expand Up @@ -356,7 +356,7 @@ limitations under the License.]]></inlineHeader>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<version>3.10.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand All @@ -369,7 +369,7 @@ limitations under the License.]]></inlineHeader>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<version>3.3.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.snowflake.kafka.connector;

import static org.mockito.Mockito.when;

import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.mockito.Mockito;

@Slf4j
public class SnowflakeSinkConnectorMock extends SnowflakeSinkConnector {

private KCLogger DYNAMIC_LOGGER;
Expand Down Expand Up @@ -72,22 +76,35 @@ public ConfigDef config() {
*/
@SneakyThrows
@Override
public void start(Map<String, String> parsedConfig) {
public void start(final Map<String, String> parsedConfig) {
log.info("SnowflakeSinkConnector:starting...");

Utils.checkConnectorVersion();
this.DYNAMIC_LOGGER.info("SnowflakeSinkConnector:start");

FieldUtils.writeDeclaredField(wrapped, "setupComplete", false, true);
FieldUtils.writeDeclaredField(wrapped, "connectorStartTime", System.currentTimeMillis(), true);

FieldUtils.writeDeclaredField(wrapped, "setupComplete", false, true);

Map<String, String> config = new HashMap<>(parsedConfig);
SnowflakeSinkConnectorConfig.setDefaultValues(config);
FieldUtils.writeDeclaredField(wrapped, "config", config, true);

Utils.validateConfig(config);
SnowflakeSinkConnectorConfig.setDefaultValues(config);

// modify invalid connector name
Utils.convertAppName(config);

ConnectorConfigValidator connectorConfigValidator =
new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator());

connectorConfigValidator.validateConfig(config);

// enable mdc logging if needed
KCLogger.toggleGlobalMdcLoggingContext(
Boolean.parseBoolean(
config.getOrDefault(
SnowflakeSinkConnectorConfig.ENABLE_MDC_LOGGING_CONFIG,
SnowflakeSinkConnectorConfig.ENABLE_MDC_LOGGING_DEFAULT)));

// enable proxy
Utils.enableJVMProxy(config);

Expand All @@ -97,6 +114,8 @@ public void start(Map<String, String> parsedConfig) {
FieldUtils.writeDeclaredField(wrapped, "telemetryClient", telemetryMock, true);

FieldUtils.writeDeclaredField(wrapped, "setupComplete", true, true);

log.info("SnowflakeSinkConnector:started");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.InternalUtilsAccessor;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
Expand Down Expand Up @@ -58,6 +59,7 @@ public SnowflakeSinkTaskMock() {

ingestionMock = Mockito.mock(SnowflakeIngestionServiceV1.class);
when(ingestionMock.getStageName()).thenReturn("stage");

when(ingestionMock.readIngestReport(any())).thenReturn(InternalUtilsAccessor.getEmptyIFSMap());

connMock = Mockito.mock(SnowflakeConnectionServiceV1.class);
Expand Down Expand Up @@ -94,17 +96,24 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(

@SneakyThrows
@Override
public void start(Map<String, String> parsedConfig) {
public void start(final Map<String, String> parsedConfig) {
this.DYNAMIC_LOGGER.info("starting task...");

long startTime = System.currentTimeMillis();

String id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");
FieldUtils.writeDeclaredField(wrapped, "taskConfigId", id, true);
this.DYNAMIC_LOGGER.info("SnowflakeSinkTask[TaskConfigID:{}]:start", id);

FieldUtils.writeDeclaredField(wrapped, "taskStartTime", System.currentTimeMillis(), true);

// generate topic to table map
Map<String, String> topic2table = new HashMap<>();
FieldUtils.writeDeclaredField(wrapped, "topic2table", topic2table, true);

// generate metadataConfig table
SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);

// enable jvm proxy
Utils.enableJVMProxy(parsedConfig);

Expand All @@ -130,25 +139,34 @@ public void start(Map<String, String> parsedConfig) {
parsedConfig.get(SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG));
}

// we would have already validated the config inside SFConnector start()
boolean enableCustomJMXMonitoring = SnowflakeSinkConnectorConfig.JMX_OPT_DEFAULT;
if (parsedConfig.containsKey(SnowflakeSinkConnectorConfig.JMX_OPT)) {
enableCustomJMXMonitoring =
Boolean.parseBoolean(parsedConfig.get(SnowflakeSinkConnectorConfig.JMX_OPT));
}

KafkaRecordErrorReporter kafkaRecordErrorReporter = noOpKafkaRecordErrorReporter();

FieldUtils.writeDeclaredField(wrapped, "conn", connMock, true);
FieldUtils.writeDeclaredField(wrapped, "ingestionMethodConfig", SNOWPIPE, true);

SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);

SnowflakeSinkService sink =
SnowflakeSinkServiceFactory.builder(connMock)
SnowflakeSinkServiceFactory.builder(connMock, SNOWPIPE, parsedConfig)
.setFileSize(bufferSizeBytes)
.setRecordNumber(bufferCountRecords)
.setFlushTime(bufferFlushTime)
.setTopic2TableMap(topic2table)
.setMetadataConfig(metadataConfig)
.setBehaviorOnNullValuesConfig(behavior)
.setCustomJMXMetrics(enableCustomJMXMonitoring)
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(this.context)
.build();

FieldUtils.writeDeclaredField(wrapped, "sink", sink, true);

this.DYNAMIC_LOGGER.info(
"SnowflakeSinkTask[TaskConfigID:{}]:start. Time: {} seconds",
DYNAMIC_LOGGER.info(
"task started, execution time: {} milliseconds",
id,
(System.currentTimeMillis() - startTime) / 1000);
}
Expand Down
8 changes: 6 additions & 2 deletions snowflake-kafka-connector-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@
<pattern>com.fasterxml</pattern>
<shadedPattern>com.datastax.oss.shaded.com.fasterxml</shadedPattern>
</relocation>
<relocation>
<pattern>com.hubspot</pattern>
<shadedPattern>com.datastax.oss.shaded.com.hubspot</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>com.datastax.oss.shaded.com.google</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"/>
</transformers>
</configuration>
</plugin>
Expand Down

0 comments on commit 3fe32f7

Please sign in to comment.