Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifications in a distinct topic when starting/ending each file #197

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ ELFTesting.properties
.checkstyle
.factorypath
.idea/
.DS_Store
2 changes: 1 addition & 1 deletion config/CSVExample.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.file.pattern": "^.*\\.csv$",
"halt.on.error": "false",
"topic": "testing"
"topic": "testing",
"csv.first.row.as.header": "true",
"csv.null.field.indicator": "EMPTY_SEPARATORS",
"input.path": "/tmp/spooldir/input",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
"`Name` is name of the file. `Length` is the length of the file preferring larger files first. `LastModified` is " +
"the LastModified attribute of the file preferring older files first.";

static final String FILES_NOTIFICATIONS_CONF = "files.notifications.enabled";
static final String FILES_NOTIFICATIONS_DOC = "Should the task produce a message to notify when a file is started or finished.";
static final boolean FILES_NOTIFICATIONS_DEFAULT = false;
static final String FILES_NOTIFICATIONS_TOPIC_CONF = "files.notifications.topic";
static final String FILES_NOTIFICATIONS_TOPIC_DOC = "Topic name for files notifications";
static final String FILES_NOTIFICATIONS_TOPIC_DEFAULT = "files_notifications";
static final String FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_CONF = "files.notifications.record.creator.class";
static final String FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_DOC = "The implementation to create notification messages";
static final String FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_DEFAULT =
FileNotifierRecordCreator.DefaultFileNotifierRecordCreator.class.getTypeName();

public static final String TASK_INDEX_CONF = "task.index";
static final String TASK_INDEX_DOC = "Internal setting to the connector used to instruct a " +
"task on which files to select. The connector will override this setting.";
Expand Down Expand Up @@ -130,6 +141,9 @@ public abstract class AbstractSourceConnectorConfig extends AbstractConfig {
public final TaskPartitioner taskPartitioner;
public final boolean bufferedInputStream;
public final int fileBufferSizeBytes;
public final boolean isFilesNotificationsEnabled;
public final String filesNotificationsTopic;
public final String fileNotifierRecordCreatorClass;
public final boolean inputPathWalkRecursively;
public final boolean inputPathWalkRecursivelyRetainSubDirs;

Expand Down Expand Up @@ -176,6 +190,9 @@ public AbstractSourceConnectorConfig(ConfigDef definition, Map<?, ?> originals,
this.taskIndex = getInt(TASK_INDEX_CONF);
this.taskCount = getInt(TASK_COUNT_CONF);
this.taskPartitioner = ConfigUtils.getEnum(TaskPartitioner.class, this, TASK_PARTITIONER_CONF);
this.isFilesNotificationsEnabled = this.getBoolean(FILES_NOTIFICATIONS_CONF);
this.filesNotificationsTopic = this.getString(FILES_NOTIFICATIONS_TOPIC_CONF);
this.fileNotifierRecordCreatorClass = this.getString(FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_CONF);
this.inputPathWalkRecursively = this.getBoolean(INPUT_PATH_WALK_RECURSIVELY);
this.inputPathWalkRecursivelyRetainSubDirs = this.getBoolean(CLEANUP_POLICY_MAINTAIN_RELATIVE_PATH);

Expand Down Expand Up @@ -314,6 +331,27 @@ protected static ConfigDef config(boolean bufferedInputStream) {
.defaultValue(TaskPartitioner.ByName.toString())
.group(GROUP_FILESYSTEM)
.build()
).define(
ConfigKeyBuilder.of(FILES_NOTIFICATIONS_CONF, ConfigDef.Type.BOOLEAN)
.documentation(FILES_NOTIFICATIONS_DOC)
.importance(ConfigDef.Importance.LOW)
.defaultValue(FILES_NOTIFICATIONS_DEFAULT)
.group(GROUP_GENERAL)
.build()
).define(
ConfigKeyBuilder.of(FILES_NOTIFICATIONS_TOPIC_CONF, ConfigDef.Type.STRING)
.documentation(FILES_NOTIFICATIONS_TOPIC_DOC)
.importance(ConfigDef.Importance.LOW)
.defaultValue(FILES_NOTIFICATIONS_TOPIC_DEFAULT)
.group(GROUP_GENERAL)
.build()
).define(
ConfigKeyBuilder.of(FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_CONF, ConfigDef.Type.STRING)
.documentation(FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_DOC)
.importance(ConfigDef.Importance.LOW)
.defaultValue(FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_DEFAULT)
.group(GROUP_GENERAL)
.build()
).define(
ConfigKeyBuilder.of(INPUT_PATH_WALK_RECURSIVELY, ConfigDef.Type.BOOLEAN)
.documentation(INPUT_PATH_WALK_RECURSIVELY_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,68 @@ private void recordProcessingTime() {

AbstractCleanUpPolicy cleanUpPolicy;

static class ReadResult {
private final List<SourceRecord> records = new ArrayList<>();
private final FileNotifierRecordCreator payloadCreator;

ReadResult(FileNotifierRecordCreator payloadCreator) {
this.payloadCreator = payloadCreator;
}


static ReadResult fromConfig(AbstractSourceConnectorConfig config) {
FileNotifierRecordCreator payloadCreator =
FileNotifierRecordCreator.newInstanceForClassName(config.fileNotifierRecordCreatorClass);
payloadCreator.setTopic(config.filesNotificationsTopic);
return config.isFilesNotificationsEnabled ? new ReadResult(payloadCreator) : new NoNotifications();
}

private static class NoNotifications extends ReadResult {
private NoNotifications() {
super(null);
}

@Override
void startFile(InputFile file) {
return;
}

@Override
public void endFile(InputFile file) {
return;
}
}

void startFile(InputFile file) {
records.add(
payloadCreator.startFilePayload(file)
);
}


public void addRecords(List<SourceRecord> newRecords) {
records.addAll(newRecords);
}

public void endFile(InputFile file) {
records.add(
payloadCreator.endFilePayload(file)
);

}
}


public List<SourceRecord> read() {
ReadResult result = ReadResult.fromConfig(this.config);
try {
if (!hasRecords) {

if (null != this.inputFile) {
recordProcessingTime();
this.inputFile.close();
this.cleanUpPolicy.success();
result.endFile(this.inputFile);
this.inputFile = null;
}

Expand All @@ -225,8 +278,9 @@ public List<SourceRecord> read() {
log.trace("read() - nextFile = '{}'", nextFile);
if (null == nextFile) {
log.trace("read() - No next file found.");
return new ArrayList<>();
return result.records;
}
result.startFile(nextFile);
this.inputFile = nextFile;
try {
this.sourcePartition = ImmutableMap.of(
Expand All @@ -253,7 +307,8 @@ public List<SourceRecord> read() {
}
List<SourceRecord> records = process();
this.hasRecords = !records.isEmpty();
return records;
result.addRecords(records);
return result.records;
} catch (Exception ex) {
long recordOffset;
try {
Expand All @@ -271,7 +326,7 @@ public List<SourceRecord> read() {
if (this.config.haltOnError) {
throw new ConnectException(ex);
} else {
return new ArrayList<>();
return result.records;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;

import static java.lang.System.currentTimeMillis;

public abstract class FileNotifierRecordCreator {
public static final Logger log = LoggerFactory.getLogger(AbstractSourceTask.class);

public abstract SourceRecord startFilePayload(InputFile file);

public abstract SourceRecord endFilePayload(InputFile file);

protected String topic;

public void setTopic(String topic) {
this.topic = topic;
}

/**
* Simple implementation to notify basic details in JSON.
*/
public static class DefaultFileNotifierRecordCreator extends FileNotifierRecordCreator {
@Override
public SourceRecord startFilePayload(InputFile file) {
return new SourceRecord(
new HashMap<>(),
new HashMap<>(),
this.topic,
Schema.BYTES_SCHEMA,
String.format(
"{\"time\": %d, \"event\": \"file_start\", \"filepath\": \"%s\"}",
currentTimeMillis(),
file.getPath()
).getBytes()
);

}

@Override
public SourceRecord endFilePayload(InputFile file) {
return new SourceRecord(
new HashMap<>(),
new HashMap<>(),
this.topic,
Schema.BYTES_SCHEMA,
String.format(
"{\"time\":%d, \"event\": \"file_end\", \"filepath\": \"%s\"}",
currentTimeMillis(),
file.getPath()
).getBytes()
);
}
}

public static FileNotifierRecordCreator newInstanceForClassName(String className) {
try {
Class<?> clazz = Class.forName(className);
Object o = Utils.newInstance(clazz);
if (!(o instanceof FileNotifierRecordCreator)) {
String error = className + " is not an implementation of FileNotifierRecordCreator";
log.error(error);
throw new KafkaException(error);
}
return (FileNotifierRecordCreator) o;

} catch (ClassNotFoundException e) {
log.error("Unable to find class {} as a FileNotifierRecordCreator", className);
throw new KafkaException(e);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.github.jcustenborder.kafka.connect.spooldir;

import com.github.jcustenborder.kafka.connect.spooldir.FileNotifierRecordCreator.DefaultFileNotifierRecordCreator;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

class AbstractSourceConnectorConfigTest {

private final Map<String, String> baseConf =
Map.of(
AbstractSourceConnectorConfig.TOPIC_CONF,
"/tmp",
AbstractSourceConnectorConfig.ERROR_PATH_CONFIG,
"/tmp",
AbstractSourceConnectorConfig.INPUT_FILE_PATTERN_CONF,
"/tmp",
AbstractSourceConnectorConfig.FINISHED_PATH_CONFIG,
"/tmp",
AbstractSourceConnectorConfig.INPUT_PATH_CONFIG,
"/tmp"
);

@Test
void should_parse_config() {
String clazz = "java.lang.Integer";
String topic = "testme";
Map<String, String> conf = new HashMap<>(
Map.of(
AbstractSourceConnectorConfig.FILES_NOTIFICATIONS_CONF,
"true",
AbstractSourceConnectorConfig.FILES_NOTIFICATIONS_RECORD_CREATOR_CLASS_CONF,
clazz,
AbstractSourceConnectorConfig.FILES_NOTIFICATIONS_TOPIC_CONF,
topic
)
);

conf.putAll(
baseConf
);
AbstractSourceConnectorConfig config = new AbstractSourceConnectorConfig(
AbstractSourceConnectorConfig.config(false),
conf,
false
) {};

assertTrue(
config.isFilesNotificationsEnabled
);

assertEquals(
config.fileNotifierRecordCreatorClass,
clazz
);

assertEquals(
config.filesNotificationsTopic,
topic
);

}

@Test
void should_populate_default() {
AbstractSourceConnectorConfig config = new AbstractSourceConnectorConfig(
AbstractSourceConnectorConfig.config(false),
baseConf,
false
) {
};

assertFalse(
config.isFilesNotificationsEnabled
);
assertEquals(
config.fileNotifierRecordCreatorClass,
DefaultFileNotifierRecordCreator.class.getTypeName()
);
assertEquals(
config.filesNotificationsTopic,
AbstractSourceConnectorConfig.FILES_NOTIFICATIONS_TOPIC_DEFAULT
);
}
}
Loading