Skip to content

Commit

Permalink
Refactored because of deprecation of TopicPartitionCounter. Moved fun…
Browse files Browse the repository at this point in the history
…ctionality to flush. Fixes #43. (#44)
  • Loading branch information
jcustenborder authored Apr 14, 2022
1 parent 139ea9f commit a30c1ac
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 23 deletions.
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
<artifactId>lettuce-core</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>connect-utils-jackson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.github.jcustenborder.kafka.connect.utils.data.SinkOffsetState;
import com.github.jcustenborder.kafka.connect.utils.data.TopicPartitionCounter;
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
import com.google.common.base.Charsets;
import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisFuture;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
Expand Down Expand Up @@ -146,8 +145,6 @@ public void put(Collection<SinkRecord> records) {

SinkOperation operation = SinkOperation.NONE;

TopicPartitionCounter counter = new TopicPartitionCounter();

for (SinkRecord record : records) {
log.trace("put() - Processing record " + formatLocation(record));
if (null == record.key()) {
Expand Down Expand Up @@ -182,7 +179,6 @@ public void put(Collection<SinkRecord> records) {
operations.add(operation);
}
operation.add(key, value);
counter.increment(record.topic(), record.kafkaPartition(), record.kafkaOffset());
}

log.debug(
Expand All @@ -191,33 +187,50 @@ public void put(Collection<SinkRecord> records) {
records.size()
);

final List<SinkOffsetState> offsetData = counter.offsetStates();
if (!offsetData.isEmpty()) {
operation = SinkOperation.create(SinkOperation.Type.SET, this.config, offsetData.size());
operations.add(operation);
for (SinkOffsetState e : offsetData) {
final byte[] key = String.format("__kafka.offset.%s.%s", e.topic(), e.partition()).getBytes(Charsets.UTF_8);
final byte[] value;
try {
value = ObjectMapperFactory.INSTANCE.writeValueAsBytes(e);
} catch (JsonProcessingException e1) {
throw new DataException(e1);
}
operation.add(key, value);
log.trace("put() - Setting offset: {}", e);
}
}

for (SinkOperation op : operations) {
log.debug("put() - Executing {} operation with {} values", op.type, op.size());
try {
op.execute(this.session.asyncCommands());
} catch (InterruptedException e) {
log.warn("Exception thrown while executing operation", e);
throw new RetriableException(e);
}
}
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
SinkOperation operation = SinkOperation.create(SinkOperation.Type.SET, this.config, currentOffsets.size());

List<SinkOffsetState> states = currentOffsets
.entrySet().stream()
.map(e -> ImmutableSinkOffsetState.builder()
.topic(e.getKey().topic())
.partition(e.getKey().partition())
.offset(e.getValue().offset())
.build()
).collect(Collectors.toList());

for (SinkOffsetState e : states) {
final byte[] key = String.format("__kafka.offset.%s.%s", e.topic(), e.partition()).getBytes(Charsets.UTF_8);
final byte[] value;
try {
value = ObjectMapperFactory.INSTANCE.writeValueAsBytes(e);
} catch (JsonProcessingException e1) {
throw new DataException(e1);
}
operation.add(key, value);
log.trace("put() - Setting offset: {}", e);
}

try {
operation.execute(this.session.asyncCommands());
} catch (InterruptedException e) {
log.warn("Exception thrown while executing operation", e);
throw new RetriableException(e);
}
}

private static String redisOffsetKey(TopicPartition topicPartition) {
return String.format("__kafka.offset.%s.%s", topicPartition.topic(), topicPartition.partition());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.redis;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.kafka.common.TopicPartition;
import org.immutables.value.Value;

@Value.Immutable
@JsonDeserialize(as = ImmutableSinkOffsetState.class)
@JsonAutoDetect(
fieldVisibility = Visibility.NONE,
getterVisibility = Visibility.NONE,
setterVisibility = Visibility.NONE,
isGetterVisibility = Visibility.NONE,
creatorVisibility = Visibility.NONE)
public interface SinkOffsetState {
@JsonProperty("topic")
String topic();

@JsonProperty("partition")
Integer partition();

@JsonProperty("offset")
Long offset();

@JsonIgnore
@Value.Derived
default TopicPartition topicPartition() {
return new TopicPartition(topic(), partition());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -48,6 +50,8 @@
public class RedisSinkTaskTest {
long offset = 1;

SinkRecord lastRecord;

SinkRecord record(String k, String v) {
final byte[] key = k.getBytes(Charsets.UTF_8);
final Schema keySchema = Schema.BYTES_SCHEMA;
Expand All @@ -62,7 +66,7 @@ SinkRecord record(String k, String v) {
valueSchema = Schema.BYTES_SCHEMA;
}

return new SinkRecord(
return lastRecord = new SinkRecord(
"topic",
1,
keySchema,
Expand Down Expand Up @@ -147,6 +151,8 @@ public void put() throws InterruptedException {
InOrder inOrder = Mockito.inOrder(asyncCommands);
inOrder.verify(asyncCommands).mset(anyMap());
inOrder.verify(asyncCommands).del(any(byte[].class));

task.flush(ImmutableMap.of(new TopicPartition(lastRecord.topic(), lastRecord.kafkaPartition()), new OffsetAndMetadata(lastRecord.kafkaOffset())));
inOrder.verify(asyncCommands, times(2)).mset(anyMap());
}

Expand Down

0 comments on commit a30c1ac

Please sign in to comment.