Skip to content

Commit

Permalink
Couchbase query normalization (#6116)
Browse files Browse the repository at this point in the history
* added normalization

* simplified normalization check and added tests

* added a DDCache for previously normalized queries

* cleaned test

* addressed changes from PR

* refactor conditional logic

* fix deleted file
  • Loading branch information
nayeem-kamal authored Nov 10, 2023
1 parent 8f45302 commit a36099f
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_TYPE;

import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.api.normalize.SQLNormalizer;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
import java.util.function.Function;
import java.util.function.ToIntFunction;

class CouchbaseClientDecorator extends DBTypeProcessingDatabaseClientDecorator {
private static final String DB_TYPE = "couchbase";
Expand All @@ -16,6 +21,13 @@ class CouchbaseClientDecorator extends DBTypeProcessingDatabaseClientDecorator {
public static final CharSequence COUCHBASE_CLIENT = UTF8BytesString.create("couchbase-client");
public static final CouchbaseClientDecorator DECORATE = new CouchbaseClientDecorator();

private static final Function<String, UTF8BytesString> NORMALIZE = SQLNormalizer::normalize;
private static final int COMBINED_STATEMENT_LIMIT = 2 * 1024 * 1024; // characters

private static final ToIntFunction<UTF8BytesString> STATEMENT_WEIGHER = UTF8BytesString::length;
private static final DDCache<String, UTF8BytesString> CACHED_STATEMENTS =
DDCaches.newFixedSizeWeightedCache(512, STATEMENT_WEIGHER, COMBINED_STATEMENT_LIMIT);

@Override
protected String[] instrumentationNames() {
return new String[] {"couchbase"};
Expand Down Expand Up @@ -55,4 +67,8 @@ protected String dbInstance(final Object o) {
protected String dbHostname(Object o) {
return null;
}

protected static UTF8BytesString normalizedQuery(String sql) {
return CACHED_STATEMENTS.computeIfAbsent(sql, NORMALIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public void setAttribute(String key, String value) {
// TODO when `db.statement` is set here it will be intercepted by the TagInterceptor, so any
// sort of obfuscation should go in there, preferably as a lazy sort of Utf8String that does
// the actual work at the end
span.setTag(key, value);
if ("db.statement".equals(key)) {
span.setTag(key, CouchbaseClientDecorator.normalizedQuery(value));
} else {
span.setTag(key, value);
}
}

// This method shows up in later versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], 'select * from `test-bucket` limit 1')
], 'select * from `test-bucket` limit ?')
assertCouchbaseDispatchCall(it, span(0))
}
}
Expand All @@ -148,6 +148,7 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
def "check query spans with parent"() {
setup:
def query = 'select * from `test-bucket` limit 1'
def normalizedQuery = 'select * from `test-bucket` limit ?'

when:
runUnderTrace('query.parent') {
Expand All @@ -163,14 +164,15 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], query, span(0), false)
], normalizedQuery, span(0), false)
assertCouchbaseDispatchCall(it, span(1))
}
}
}

def "check query spans with parent and adhoc #adhoc"() {
def query = 'select count(1) from `test-bucket` where (`something` = "else") limit 1'
def normalizedQuery = 'select count(?) from `test-bucket` where (`something` = "else") limit ?'
int count = 0

when:
Expand All @@ -192,12 +194,12 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], query, span(0), false)
], normalizedQuery, span(0), false)
if (!adhoc) {
assertCouchbaseCall(it, "prepare", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], "PREPARE $query", span(1), true)
], "PREPARE $normalizedQuery", span(1), true)
}
assertCouchbaseDispatchCall(it, span(adhoc ? 1 : 2))
}
Expand All @@ -209,6 +211,7 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {

def "check multiple query spans with parent and adhoc false"() {
def query = 'select count(1) from `test-bucket` where (`something` = "wonderful") limit 1'
def normalizedQuery = 'select count(?) from `test-bucket` where (`something` = "wonderful") limit ?'
int count1 = 0
int count2 = 0

Expand Down Expand Up @@ -237,20 +240,20 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], query, span(0), false)
], normalizedQuery, span(0), false)
assertCouchbaseCall(it, "prepare", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], "PREPARE $query", span(1), true)
], "PREPARE $normalizedQuery", span(1), true)
assertCouchbaseDispatchCall(it, span(2))
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], query, span(0), false)
], normalizedQuery, span(0), false)
assertCouchbaseCall(it, "execute", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], query, span(4), true)
], normalizedQuery, span(4), true)
assertCouchbaseDispatchCall(it, span(5))
}
}
Expand All @@ -259,6 +262,7 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
def "check error query spans with parent"() {
setup:
def query = 'select * from `test-bucket` limeit 1'
def normalizedQuery = 'select * from `test-bucket` limeit ?'
Throwable ex = null

when:
Expand All @@ -280,14 +284,15 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], query, span(0), false, ex)
], normalizedQuery, span(0), false, ex)
assertCouchbaseDispatchCall(it, span(1))
}
}
}

def "check multiple error query spans with parent and adhoc false"() {
def query = 'select count(1) from `test-bucket` where (`something` = "wonderful") limeit 1'
def normalizedQuery = 'select count(?) from `test-bucket` where (`something` = "wonderful") limeit ?'
int count1 = 0
int count2 = 0
Throwable ex1 = null
Expand Down Expand Up @@ -332,7 +337,7 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "prepare", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], "PREPARE $query", span(1), true, ex1)
], "PREPARE $normalizedQuery", span(1), true, ex1)
assertCouchbaseDispatchCall(it, span(2))
assertCouchbaseCall(it, "cb.query", [
'db.couchbase.retries' : { Long },
Expand All @@ -341,7 +346,7 @@ abstract class CouchbaseClient31Test extends VersionedNamingTestBase {
assertCouchbaseCall(it, "prepare", [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
], "PREPARE $query", span(4), true, ex2)
], "PREPARE $normalizedQuery", span(4), true, ex2)
assertCouchbaseDispatchCall(it, span(5))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_TYPE;

import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.api.normalize.SQLNormalizer;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
import java.util.function.Function;
import java.util.function.ToIntFunction;

class CouchbaseClientDecorator extends DBTypeProcessingDatabaseClientDecorator {
private static final String DB_TYPE = "couchbase";
Expand All @@ -16,6 +21,13 @@ class CouchbaseClientDecorator extends DBTypeProcessingDatabaseClientDecorator {
public static final CharSequence COUCHBASE_CLIENT = UTF8BytesString.create("couchbase-client");
public static final CouchbaseClientDecorator DECORATE = new CouchbaseClientDecorator();

private static final Function<String, UTF8BytesString> NORMALIZE = SQLNormalizer::normalize;
private static final int COMBINED_STATEMENT_LIMIT = 2 * 1024 * 1024; // characters

private static final ToIntFunction<UTF8BytesString> STATEMENT_WEIGHER = UTF8BytesString::length;
private static final DDCache<String, UTF8BytesString> CACHED_STATEMENTS =
DDCaches.newFixedSizeWeightedCache(512, STATEMENT_WEIGHER, COMBINED_STATEMENT_LIMIT);

@Override
protected String[] instrumentationNames() {
return new String[] {"couchbase"};
Expand Down Expand Up @@ -55,4 +67,8 @@ protected String dbInstance(final Object o) {
protected String dbHostname(Object o) {
return null;
}

protected static UTF8BytesString normalizedQuery(String sql) {
return CACHED_STATEMENTS.computeIfAbsent(sql, NORMALIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public void attribute(String key, String value) {
// TODO when `db.statement` is set here it will be intercepted by the TagInterceptor, so any
// sort of obfuscation should go in there, preferably as a lazy sort of Utf8String that does
// the actual work at the end
span.setTag(key, value);
if ("db.statement".equals(key)) {
span.setTag(key, CouchbaseClientDecorator.normalizedQuery(value));
} else {
span.setTag(key, value);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
assertTraces(1) {
sortSpansByStart()
trace(2) {
assertCouchbaseCall(it, 'select * from `test-bucket` limit 1', [
assertCouchbaseCall(it, 'select * from `test-bucket` limit ?', [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
])
Expand All @@ -161,7 +161,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
def "check query spans with parent"() {
setup:
def query = 'select * from `test-bucket` limit 1'

def normalizedQuery = 'select * from `test-bucket` limit ?'
when:
runUnderTrace('query.parent') {
cluster.query(query)
Expand All @@ -172,7 +172,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
sortSpansByStart()
trace(3) {
basicSpan(it, 'query.parent')
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
], span(0))
Expand All @@ -184,6 +184,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
def "check async query spans with parent and adhoc #adhoc"() {
setup:
def query = 'select count(1) from `test-bucket` where (`something` = "else") limit 1'
def normalizedQuery = 'select count(?) from `test-bucket` where (`something` = "else") limit ?'
int count = 0

when:
Expand All @@ -202,12 +203,12 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
sortSpansByStart()
trace(adhoc ? 3 : 4) {
basicSpan(it, 'async.parent')
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
], span(0))
if (!adhoc) {
assertCouchbaseCall(it, "PREPARE $query", [
assertCouchbaseCall(it, "PREPARE $normalizedQuery", [
'db.couchbase.retries': { Long },
'db.couchbase.service': 'query'
], span(1), true)
Expand All @@ -223,6 +224,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
def "check multiple async query spans with parent and adhoc false"() {
setup:
def query = 'select count(1) from `test-bucket` where (`something` = "wonderful") limit 1'
def normalizedQuery = 'select count(?) from `test-bucket` where (`something` = "wonderful") limit ?'
int count1 = 0
int count2 = 0
def extraPrepare = isLatestDepTest
Expand All @@ -249,26 +251,26 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
sortSpansByStart()
trace(extraPrepare ? 8 : 7) {
basicSpan(it, 'async.multiple')
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
], span(0))
assertCouchbaseCall(it, "PREPARE $query", [
assertCouchbaseCall(it, "PREPARE $normalizedQuery", [
'db.couchbase.retries': { Long },
'db.couchbase.service': 'query'
], span(1), true)
assertCouchbaseDispatchCall(it, span(2))
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
], span(0))
if (extraPrepare) {
assertCouchbaseCall(it, "PREPARE $query", [
assertCouchbaseCall(it, "PREPARE $normalizedQuery", [
'db.couchbase.retries': { Long },
'db.couchbase.service': 'query'
], span(4), true)
}
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries': { Long },
'db.couchbase.service': 'query'
], span(4), true)
Expand All @@ -280,12 +282,13 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
def "check error query spans with parent"() {
setup:
def query = 'select * from `test-bucket` limeit 1'
def normalizedQuery = "select * from `test-bucket` limeit ?"
Throwable ex = null

when:
runUnderTrace('query.failure') {
try {
cluster.query('select * from `test-bucket` limeit 1')
cluster.query(query)
} catch (ParsingFailureException expected) {
ex = expected
}
Expand All @@ -297,7 +300,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
sortSpansByStart()
trace(3) {
basicSpan(it, 'query.failure')
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query',
'db.system' : 'couchbase',
Expand All @@ -310,6 +313,7 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
def "check multiple async error query spans with parent and adhoc false"() {
setup:
def query = 'select count(1) from `test-bucket` where (`something` = "wonderful") limeit 1'
def normalizedQuery = 'select count(?) from `test-bucket` where (`something` = "wonderful") limeit ?'
int count1 = 0
int count2 = 0
Throwable ex1 = null
Expand Down Expand Up @@ -347,20 +351,20 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
sortSpansByStart()
trace(7) {
basicSpan(it, 'async.failure')
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
], span(0), false, ex1)
assertCouchbaseCall(it, "PREPARE $query", [
assertCouchbaseCall(it, "PREPARE $normalizedQuery", [
'db.couchbase.retries': { Long },
'db.couchbase.service': 'query'
], span(1), true, ex1)
assertCouchbaseDispatchCall(it, span(2))
assertCouchbaseCall(it, query, [
assertCouchbaseCall(it, normalizedQuery, [
'db.couchbase.retries' : { Long },
'db.couchbase.service' : 'query'
], span(0), false, ex2)
assertCouchbaseCall(it, "PREPARE $query", [
assertCouchbaseCall(it, "PREPARE $normalizedQuery", [
'db.couchbase.retries': { Long },
'db.couchbase.service': 'query'
], span(4), true, ex2)
Expand Down

0 comments on commit a36099f

Please sign in to comment.