Skip to content

Commit

Permalink
Support Kafka-clients 3.8+ (#7626)
Browse files Browse the repository at this point in the history
  • Loading branch information
nayeem-kamal authored Oct 16, 2024
1 parent a80d13e commit b53b6dc
Show file tree
Hide file tree
Showing 43 changed files with 3,683 additions and 1 deletion.
81 changes: 81 additions & 0 deletions dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
ext {
minJavaVersionForTests = JavaVersion.VERSION_17
}
muzzle {
pass {
group = "org.apache.kafka"
module = "kafka-clients"
versions = "[3.8.0,)"
assertInverse = false
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuite('latestDepTest')

//java {
// toolchain {
// languageVersion.set(JavaLanguageVersion.of(17))
// }
//}


//project.afterEvaluate {
// tasks.withType(Test).configureEach {
// if (javaLauncher.get().metadata.languageVersion.asInt() >= 16) {
// jvmArgs += ['--add-opens', 'java.base/java.util=ALL-UNNAMED']
// }
// }
//}
[compileMain_java17Java, compileTestJava, compileLatestDepTestJava].each {
it.configure {
setJavaVersion(it, 17)
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
}
tasks.withType(JavaCompile).each {
it.configure {
setJavaVersion(it, 17)
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
}
tasks.withType(GroovyCompile) {
javaLauncher = getJavaLauncherFor(17)
}

dependencies {
// compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
main_java17CompileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
implementation project(':dd-java-agent:instrumentation:kafka-common')
main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common')

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.17.0'
testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0'
testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1')


// Include latest version of kafka itself along with latest version of client libs.
// This seems to help with jar compatibility hell.
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+'
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.+'
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+'
latestDepTestImplementation libs.guava

}

configurations.testRuntimeClasspath {
// spock-core depends on assertj version that is not compatible with kafka-clients
resolutionStrategy.force 'org.assertj:assertj-core:2.9.1'
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.*;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.HashMap;
import java.util.Map;

@AutoService(InstrumenterModule.class)
public final class ConsumerCoordinatorInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public ConsumerCoordinatorInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(2);
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
contextStores.put(
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
KafkaConsumerInfo.class.getName());
return contextStores;
}

@Override
public String instrumentedType() {
return "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator";
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".KafkaConsumerInfo"};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("sendOffsetCommitRequest")).and(takesArguments(1)),
packageName + ".ConsumerCoordinatorAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

/**
* This instrumentation saves additional information from the KafkaConsumer, such as consumer group
* and cluster ID, in the context store for later use.
*/
@AutoService(InstrumenterModule.class)
public final class KafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

public KafkaConsumerInfoInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(4);
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
contextStores.put(
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
// new- here we are storing the callbackinvoker and consumerdelegate in the context store
// as opposed to the old consumercoordinator and kafkaconsumer
contextStores.put(
"org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker",
KafkaConsumerInfo.class.getName());
contextStores.put(
"org.apache.kafka.clients.consumer.internals.ConsumerDelegate",
KafkaConsumerInfo.class.getName());
return contextStores;
}

@Override
public String hierarchyMarkerType() {
return "org.apache.kafka.clients.consumer.internals.ConsumerDelegate";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named(hierarchyMarkerType()))
.and(declaresField(named("offsetCommitCallbackInvoker")));
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isConstructor()
.and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig")))
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer")))
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
packageName + ".ConstructorAdvice");
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("poll"))
.and(takesArguments(1))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
packageName + ".RecordsAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

@AutoService(InstrumenterModule.class)
public final class KafkaConsumerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public KafkaConsumerInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>(2);
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
contextStores.put(
"org.apache.kafka.clients.consumer.ConsumerRecords",
"datadog.trace.instrumentation.kafka_clients38.KafkaConsumerInfo");
return Collections.unmodifiableMap(contextStores);
}

@Override
public String instrumentedType() {
return "org.apache.kafka.clients.consumer.ConsumerRecords";
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".TextMapInjectAdapterInterface",
packageName + ".KafkaConsumerInfo",
packageName + ".KafkaConsumerInstrumentationHelper",
packageName + ".KafkaDecorator",
packageName + ".TextMapExtractAdapter",
packageName + ".TracingIterableDelegator",
packageName + ".TracingIterable",
packageName + ".TracingIterator",
packageName + ".TracingList",
packageName + ".TracingListIterator",
packageName + ".TextMapInjectAdapter",
"datadog.trace.instrumentation.kafka_common.Utils",
"datadog.trace.instrumentation.kafka_common.StreamingContext",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
packageName + ".IterableAdvice");
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
packageName + ".ListAdvice");
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("iterator"))
.and(takesArguments(0))
.and(returns(Iterator.class)),
packageName + ".IteratorAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import java.util.Map;

@AutoService(InstrumenterModule.class)
public final class KafkaProducerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public KafkaProducerInstrumentation() {
super("kafka");
}

@Override
public boolean isEnabled() {
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
}

@Override
public String instrumentedType() {
return "org.apache.kafka.clients.producer.KafkaProducer";
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".KafkaDecorator",
packageName + ".TextMapInjectAdapterInterface",
packageName + ".TextMapInjectAdapter",
packageName + ".NoopTextMapInjectAdapter",
packageName + ".KafkaProducerCallback",
"datadog.trace.instrumentation.kafka_common.StreamingContext",
packageName + ".AvroSchemaExtractor",
};
}

@Override
public Map<String, String> contextStore() {
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("send"))
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
packageName + ".ProducerAdvice");

transformer.applyAdvice(
isMethod()
.and(isPrivate())
.and(takesArgument(0, int.class))
.and(named("ensureValidRecordSize")), // intercepting this call allows us to see the
// estimated message size
packageName + ".PayloadSizeAdvice");
}
}
Loading

0 comments on commit b53b6dc

Please sign in to comment.