diff --git a/pom.xml b/pom.xml
index 9b73928..c7b986b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,22 +47,22 @@
8
8
true
-
+
true
4
1
false
package
package
- 2.10.4.3
- 1.9.4
+ 2.10.6.1
+ 2.4.1
1.11.4
- 3.8.0
+ 3.12.4
0.6.1
3.0.0
4.0.rc2
- 0.3.1
+ 1.0
3.0.0-M3
@@ -70,15 +70,15 @@
3.0.0-M3
3.3.0
3.1.2
- 3.2.4
+ 3.4.0
3.0.0
1.0.0
- 1.2.0
+ 1.5.0
3.1.2
4.0.2
3.4.3
1.4.1.Final
- 0.8.6
+ 0.8.7
4.2.2
4.2.2
2.5.1
@@ -86,7 +86,7 @@
0.1.4
1.3
0.4
- 6.1.6
+ 8.0.1
@@ -128,7 +128,7 @@
org.apache.maven.plugins
maven-shade-plugin
- 3.2.4
+ 3.6.0
maven-compiler-plugin
@@ -356,7 +356,7 @@ limitations under the License.]]>
org.apache.maven.plugins
maven-javadoc-plugin
- 3.2.0
+ 3.10.1
attach-javadocs
@@ -369,7 +369,7 @@ limitations under the License.]]>
org.apache.maven.plugins
maven-source-plugin
- 3.2.1
+ 3.3.1
attach-sources
diff --git a/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorMock.java b/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorMock.java
index d21dbb1..173ef73 100644
--- a/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorMock.java
+++ b/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorMock.java
@@ -13,22 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.snowflake.kafka.connector;
import static org.mockito.Mockito.when;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
+import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.mockito.Mockito;
+@Slf4j
public class SnowflakeSinkConnectorMock extends SnowflakeSinkConnector {
private KCLogger DYNAMIC_LOGGER;
@@ -72,22 +76,35 @@ public ConfigDef config() {
*/
@SneakyThrows
@Override
- public void start(Map parsedConfig) {
+ public void start(final Map parsedConfig) {
+ log.info("SnowflakeSinkConnector:starting...");
+
Utils.checkConnectorVersion();
- this.DYNAMIC_LOGGER.info("SnowflakeSinkConnector:start");
FieldUtils.writeDeclaredField(wrapped, "setupComplete", false, true);
FieldUtils.writeDeclaredField(wrapped, "connectorStartTime", System.currentTimeMillis(), true);
- FieldUtils.writeDeclaredField(wrapped, "setupComplete", false, true);
-
Map config = new HashMap<>(parsedConfig);
SnowflakeSinkConnectorConfig.setDefaultValues(config);
FieldUtils.writeDeclaredField(wrapped, "config", config, true);
- Utils.validateConfig(config);
+ SnowflakeSinkConnectorConfig.setDefaultValues(config);
+
// modify invalid connector name
Utils.convertAppName(config);
+
+ ConnectorConfigValidator connectorConfigValidator =
+ new DefaultConnectorConfigValidator(new DefaultStreamingConfigValidator());
+
+ connectorConfigValidator.validateConfig(config);
+
+ // enable mdc logging if needed
+ KCLogger.toggleGlobalMdcLoggingContext(
+ Boolean.parseBoolean(
+ config.getOrDefault(
+ SnowflakeSinkConnectorConfig.ENABLE_MDC_LOGGING_CONFIG,
+ SnowflakeSinkConnectorConfig.ENABLE_MDC_LOGGING_DEFAULT)));
+
// enable proxy
Utils.enableJVMProxy(config);
@@ -97,6 +114,8 @@ public void start(Map parsedConfig) {
FieldUtils.writeDeclaredField(wrapped, "telemetryClient", telemetryMock, true);
FieldUtils.writeDeclaredField(wrapped, "setupComplete", true, true);
+
+ log.info("SnowflakeSinkConnector:started");
}
@Override
diff --git a/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskMock.java b/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskMock.java
index fda40b4..8014906 100644
--- a/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskMock.java
+++ b/pulsar-snowflake-connector/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskMock.java
@@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
+import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.InternalUtilsAccessor;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
@@ -58,6 +59,7 @@ public SnowflakeSinkTaskMock() {
ingestionMock = Mockito.mock(SnowflakeIngestionServiceV1.class);
when(ingestionMock.getStageName()).thenReturn("stage");
+
when(ingestionMock.readIngestReport(any())).thenReturn(InternalUtilsAccessor.getEmptyIFSMap());
connMock = Mockito.mock(SnowflakeConnectionServiceV1.class);
@@ -94,17 +96,24 @@ public Map preCommit(
@SneakyThrows
@Override
- public void start(Map parsedConfig) {
+ public void start(final Map parsedConfig) {
+ this.DYNAMIC_LOGGER.info("starting task...");
+
long startTime = System.currentTimeMillis();
String id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");
FieldUtils.writeDeclaredField(wrapped, "taskConfigId", id, true);
this.DYNAMIC_LOGGER.info("SnowflakeSinkTask[TaskConfigID:{}]:start", id);
+ FieldUtils.writeDeclaredField(wrapped, "taskStartTime", System.currentTimeMillis(), true);
+
// generate topic to table map
Map topic2table = new HashMap<>();
FieldUtils.writeDeclaredField(wrapped, "topic2table", topic2table, true);
+ // generate metadataConfig table
+ SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);
+
// enable jvm proxy
Utils.enableJVMProxy(parsedConfig);
@@ -130,25 +139,34 @@ public void start(Map parsedConfig) {
parsedConfig.get(SnowflakeSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG));
}
+ // we would have already validated the config inside SFConnector start()
+ boolean enableCustomJMXMonitoring = SnowflakeSinkConnectorConfig.JMX_OPT_DEFAULT;
+ if (parsedConfig.containsKey(SnowflakeSinkConnectorConfig.JMX_OPT)) {
+ enableCustomJMXMonitoring =
+ Boolean.parseBoolean(parsedConfig.get(SnowflakeSinkConnectorConfig.JMX_OPT));
+ }
+
+ KafkaRecordErrorReporter kafkaRecordErrorReporter = noOpKafkaRecordErrorReporter();
+
FieldUtils.writeDeclaredField(wrapped, "conn", connMock, true);
FieldUtils.writeDeclaredField(wrapped, "ingestionMethodConfig", SNOWPIPE, true);
- SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);
-
SnowflakeSinkService sink =
- SnowflakeSinkServiceFactory.builder(connMock)
+ SnowflakeSinkServiceFactory.builder(connMock, SNOWPIPE, parsedConfig)
.setFileSize(bufferSizeBytes)
.setRecordNumber(bufferCountRecords)
.setFlushTime(bufferFlushTime)
.setTopic2TableMap(topic2table)
.setMetadataConfig(metadataConfig)
.setBehaviorOnNullValuesConfig(behavior)
+ .setCustomJMXMetrics(enableCustomJMXMonitoring)
+ .setErrorReporter(kafkaRecordErrorReporter)
+ .setSinkTaskContext(this.context)
.build();
-
FieldUtils.writeDeclaredField(wrapped, "sink", sink, true);
- this.DYNAMIC_LOGGER.info(
- "SnowflakeSinkTask[TaskConfigID:{}]:start. Time: {} seconds",
+ DYNAMIC_LOGGER.info(
+ "task started, execution time: {} milliseconds",
id,
(System.currentTimeMillis() - startTime) / 1000);
}
diff --git a/snowflake-kafka-connector-shaded/pom.xml b/snowflake-kafka-connector-shaded/pom.xml
index e8fa320..7607177 100644
--- a/snowflake-kafka-connector-shaded/pom.xml
+++ b/snowflake-kafka-connector-shaded/pom.xml
@@ -66,14 +66,18 @@
com.fasterxml
com.datastax.oss.shaded.com.fasterxml
+
+ com.hubspot
+ com.datastax.oss.shaded.com.hubspot
+
com.google
com.datastax.oss.shaded.com.google
-
-
+
+