Skip to content

Commit

Permalink
Add Scripted Upsert functionality
Browse files Browse the repository at this point in the history
Add Kafka Payload -> Injection as Parameters functionality

Signed-off-by: Nicholas Cole <nicholas.cole@walmart.com>
  • Loading branch information
NicholasDCole committed Mar 12, 2024
1 parent f0c057f commit b6c0645
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Script;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,11 +187,44 @@ public DocWriteRequest<?> convertRecord(SinkRecord record, String index) {
new IndexRequest(index).id(id).source(payload, XContentType.JSON).opType(opType),
record
);
case SCRIPTED_UPSERT:
try {

if (config.getIsPayloadAsParams()) {
return buildUpdateRequestWithParams(index, payload, id);
}

Script script = ScriptParser.parseScript(config.getScript());

return new UpdateRequest(index, id)
.doc(payload, XContentType.JSON)
.upsert(payload, XContentType.JSON)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(script)
.scriptedUpsert(true);

} catch (JsonProcessingException jsonProcessingException) {
throw new RuntimeException(jsonProcessingException);
}
default:
return null; // shouldn't happen
}
}

private UpdateRequest buildUpdateRequestWithParams(String index, String payload, String id)
throws JsonProcessingException {

Script script = ScriptParser.parseScriptWithParams(config.getScript(), payload);

UpdateRequest updateRequest =
new UpdateRequest(index, id)
.retryOnConflict(Math.min(config.maxInFlightRequests(), 5))
.script(script)
.scriptedUpsert(true);

return updateRequest;
}

private String getPayload(SinkRecord record) {
if (record.value() == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.elasticsearch.validator.ScriptValidator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -277,6 +279,24 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
);
private static final String WRITE_METHOD_DISPLAY = "Write Method";
private static final String WRITE_METHOD_DEFAULT = WriteMethod.INSERT.name();

public static final String UPSERT_SCRIPT_CONFIG = "upsert.script";

private static final String UPSERT_SCRIPT_DOC = "Script used for"
+ " upserting data to Elasticsearch. This script allows for"
+ " customizable behavior upon upserting a document. Please refer to"
+ " Elasticsearch scripted upsert documentation";

private static final String UPSERT_SCRIPT_DISPLAY = "Upsert Script";

public static final String PAYLOAD_AS_PARAMS_CONFIG = "payload.as.params";

private static final String PAYLOAD_AS_PARAMS_DOC = "Defines Kafka payload will be injected"
+ " into upsert.script script component as params object";

private static final String PAYLOAD_AS_PARAMS_DISPLAY = "Payload as Params";


public static final String LOG_SENSITIVE_DATA_CONFIG = "log.sensitive.data";
private static final String LOG_SENSITIVE_DATA_DISPLAY = "Log Sensitive data";
private static final String LOG_SENSITIVE_DATA_DOC = "If true, logs sensitive data "
Expand Down Expand Up @@ -408,7 +428,8 @@ public enum SecurityProtocol {

public enum WriteMethod {
INSERT,
UPSERT
UPSERT,
SCRIPTED_UPSERT
}

protected static ConfigDef baseConfigDef() {
Expand Down Expand Up @@ -622,8 +643,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_KEY_DISPLAY
).define(
IGNORE_KEY_DISPLAY)
.define(
IGNORE_SCHEMA_CONFIG,
Type.BOOLEAN,
IGNORE_SCHEMA_DEFAULT,
Expand All @@ -632,8 +653,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
IGNORE_SCHEMA_DISPLAY
).define(
IGNORE_SCHEMA_DISPLAY)
.define(
COMPACT_MAP_ENTRIES_CONFIG,
Type.BOOLEAN,
COMPACT_MAP_ENTRIES_DEFAULT,
Expand All @@ -642,8 +663,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
COMPACT_MAP_ENTRIES_DISPLAY
).define(
COMPACT_MAP_ENTRIES_DISPLAY)
.define(
IGNORE_KEY_TOPICS_CONFIG,
Type.LIST,
IGNORE_KEY_TOPICS_DEFAULT,
Expand All @@ -652,8 +673,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_KEY_TOPICS_DISPLAY
).define(
IGNORE_KEY_TOPICS_DISPLAY)
.define(
IGNORE_SCHEMA_TOPICS_CONFIG,
Type.LIST,
IGNORE_SCHEMA_TOPICS_DEFAULT,
Expand All @@ -662,8 +683,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
IGNORE_SCHEMA_TOPICS_DISPLAY
).define(
IGNORE_SCHEMA_TOPICS_DISPLAY)
.define(
DROP_INVALID_MESSAGE_CONFIG,
Type.BOOLEAN,
DROP_INVALID_MESSAGE_DEFAULT,
Expand All @@ -672,8 +693,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.LONG,
DROP_INVALID_MESSAGE_DISPLAY
).define(
DROP_INVALID_MESSAGE_DISPLAY)
.define(
BEHAVIOR_ON_NULL_VALUES_CONFIG,
Type.STRING,
BEHAVIOR_ON_NULL_VALUES_DEFAULT.name(),
Expand All @@ -684,8 +705,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_NULL_VALUES_DISPLAY,
new EnumRecommender<>(BehaviorOnNullValues.class)
).define(
new EnumRecommender<>(BehaviorOnNullValues.class))
.define(
BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
Type.STRING,
BEHAVIOR_ON_MALFORMED_DOCS_DEFAULT.name(),
Expand All @@ -696,8 +717,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
BEHAVIOR_ON_MALFORMED_DOCS_DISPLAY,
new EnumRecommender<>(BehaviorOnMalformedDoc.class)
).define(
new EnumRecommender<>(BehaviorOnMalformedDoc.class))
.define(
EXTERNAL_VERSION_HEADER_CONFIG,
Type.STRING,
EXTERNAL_VERSION_HEADER_DEFAULT,
Expand All @@ -706,8 +727,8 @@ private static void addConversionConfigs(ConfigDef configDef) {
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
EXTERNAL_VERSION_HEADER_DISPLAY
).define(
EXTERNAL_VERSION_HEADER_DISPLAY)
.define(
WRITE_METHOD_CONFIG,
Type.STRING,
WRITE_METHOD_DEFAULT,
Expand All @@ -718,8 +739,30 @@ private static void addConversionConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
WRITE_METHOD_DISPLAY,
new EnumRecommender<>(WriteMethod.class)
);
new EnumRecommender<>(WriteMethod.class))
.define(
UPSERT_SCRIPT_CONFIG,
Type.STRING,
null,
new ScriptValidator(),
Importance.LOW,
UPSERT_SCRIPT_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
UPSERT_SCRIPT_DISPLAY,
new ScriptValidator())
.define(
PAYLOAD_AS_PARAMS_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
PAYLOAD_AS_PARAMS_DOC,
DATA_CONVERSION_GROUP,
++order,
Width.SHORT,
PAYLOAD_AS_PARAMS_DISPLAY);
;
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down Expand Up @@ -1078,6 +1121,14 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

public String getScript() {
return getString(UPSERT_SCRIPT_CONFIG);
}

public Boolean getIsPayloadAsParams() {
return getBoolean(PAYLOAD_AS_PARAMS_CONFIG);
}

private static class DataStreamDatasetValidator implements Validator {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.script.Script;

import java.util.Map;

public class ScriptParser {

private static final ObjectMapper objectMapper = new ObjectMapper();

public static Script parseScript(String scriptJson) throws JsonProcessingException {

Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);

return Script.parse(map);
}

private static Map<String, Object> parseSchemaStringAsJson(String scriptJson)
throws JsonProcessingException {

ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> scriptConverted;

scriptConverted =
objectMapper.readValue(scriptJson, new TypeReference<Map<String, Object>>() {});

return scriptConverted;
}

public static Script parseScriptWithParams(String scriptJson, String jsonPayload)
throws JsonProcessingException {

Map<String, Object> map = ScriptParser.parseSchemaStringAsJson(scriptJson);

Map<String, Object> fields =
objectMapper.readValue(jsonPayload, new TypeReference<Map<String, Object>>() {});

map.put("params", fields);

return Script.parse(map);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch.validator;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod.SCRIPTED_UPSERT;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.confluent.connect.elasticsearch.util.ScriptParser;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.elasticsearch.script.Script;

public class ScriptValidator implements ConfigDef.Validator, ConfigDef.Recommender {

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object value) {

if (value == null) {
return;
}

String script = (String) value;

try {
Script parsedScript = ScriptParser.parseScript(script);

if (parsedScript.getIdOrCode() == null) {
throw new ConfigException(name, script, "The specified script is missing code");
} else if (parsedScript.getLang() == null) {
throw new ConfigException(name, script, "The specified script is missing lang");
}

} catch (JsonProcessingException jsonProcessingException) {
throw new ConfigException(
name, script, "The specified script is not a valid Elasticsearch painless script");
}
}

@Override
public String toString() {
return "A valid script that is able to be parsed";
}

@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
if (!parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT)) {
return new ArrayList<>();
}
return null;
}

@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return parsedConfig.get(WRITE_METHOD_CONFIG).equals(SCRIPTED_UPSERT.name());
}
}
Loading

0 comments on commit b6c0645

Please sign in to comment.