Skip to content

Commit

Permalink
Fix compatibility with Kafka >= 3.8
Browse files Browse the repository at this point in the history
Stop using io.aiven.kafka.auth.audit.Session from Kafka that is no longer present in Kafka >= 3.8.

Add io.aiven.kafka.auth.audit.Session to replace that. In the new class, add properly named getter methods and make the fields private.
  • Loading branch information
juha-aiven committed Aug 5, 2024
1 parent 4cc1a75 commit 10707fd
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.aiven.kafka.auth.audit.Session;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
Expand All @@ -59,7 +60,6 @@
import io.aiven.kafka.auth.nameformatters.LegacyResourceTypeNameFormatter;
import io.aiven.kafka.auth.nativeacls.AclAivenToNativeConverter;

import kafka.network.RequestChannel.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/aiven/kafka/auth/audit/Auditor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/aiven/kafka/auth/audit/AuditorAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;

public interface AuditorAPI extends Configurable {
void addActivity(final RequestChannel.Session session,
void addActivity(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess);
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/aiven/kafka/auth/audit/NoAuditor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;

/**
* A no-op {@link AuditorAPI}.
*/
Expand All @@ -33,7 +31,7 @@ public NoAuditor() {
}

@Override
public void addActivity(final RequestChannel.Session session,
public void addActivity(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess) {
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/io/aiven/kafka/auth/audit/Session.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Aiven Oy https://aiven.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.auth.audit;

import java.net.InetAddress;

import org.apache.kafka.common.security.auth.KafkaPrincipal;


public class Session {
private final KafkaPrincipal principal;
private final InetAddress clientAddress;

public Session(final KafkaPrincipal principal, final InetAddress clientAddress) {
this.principal = principal;
this.clientAddress = clientAddress;
}

public KafkaPrincipal getPrincipal() {
return principal;
}

public InetAddress getClientAddress() {
return clientAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;
import org.slf4j.Logger;

public class UserActivityAuditor extends Auditor {
Expand All @@ -46,11 +45,11 @@ protected UserActivityAuditor(final Logger logger) {
}

@Override
protected void addActivity0(final RequestChannel.Session session,
protected void addActivity0(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess) {
final AuditKey auditKey = new AuditKey(session.principal(), session.clientAddress());
final AuditKey auditKey = new AuditKey(session.getPrincipal(), session.getClientAddress());

auditStorage.compute(auditKey, (key, userActivity) -> Objects.isNull(userActivity)
? new UserActivity.UserActivityOperations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.ResourcePattern;

import kafka.network.RequestChannel;
import org.slf4j.Logger;

public class UserOperationsActivityAuditor extends Auditor {
Expand All @@ -35,7 +34,7 @@ protected UserOperationsActivityAuditor(final Logger logger) {
}

@Override
protected void addActivity0(final RequestChannel.Session session,
protected void addActivity0(final Session session,
final AclOperation operation,
final ResourcePattern resource,
final boolean hasAccess) {
Expand All @@ -46,18 +45,18 @@ protected void addActivity0(final RequestChannel.Session session,
} else {
ua = userActivity;
}
ua.addOperation(new UserOperation(session.clientAddress(), operation, resource, hasAccess));
ua.addOperation(new UserOperation(session.getClientAddress(), operation, resource, hasAccess));
return ua;
});
}

private AuditKey createAuditKey(final RequestChannel.Session session) {
final var grouping = auditorConfig.getAggregationGrouping();
private AuditKey createAuditKey(final Session session) {
final var grouping = auditorConfig.getAggregationGrouping();
switch (grouping) {
case USER:
return new AuditKey(session.principal(), null);
return new AuditKey(session.getPrincipal(), null);
case USER_AND_IP:
return new AuditKey(session.principal(), session.clientAddress());
return new AuditKey(session.getPrincipal(), session.getClientAddress());
default:
throw new IllegalArgumentException("Unknown aggregation grouping type: " + grouping);
}
Expand Down
20 changes: 9 additions & 11 deletions src/test/java/io/aiven/kafka/auth/audit/FormatterTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class FormatterTestBase {

protected RequestChannel.Session session;
protected Session session;

protected AclOperation operation;

Expand All @@ -48,7 +46,7 @@ public class FormatterTestBase {

protected ResourcePattern anotherResource;

protected RequestChannel.Session anotherSession;
protected Session anotherSession;

protected InetAddress anotherInetAddress;

Expand All @@ -60,9 +58,9 @@ protected FormatterTestBase(final AuditorConfig.AggregationGrouping aggregationG

void setUp() throws Exception {
final KafkaPrincipal principal = new KafkaPrincipal("PRINCIPAL_TYPE", "PRINCIPAL_NAME");
session = new RequestChannel.Session(principal, InetAddress.getLocalHost());
session = new Session(principal, InetAddress.getLocalHost());
anotherInetAddress = InetAddress.getByName("192.168.0.1");
anotherSession = new RequestChannel.Session(principal, anotherInetAddress);
anotherSession = new Session(principal, anotherInetAddress);
resource =
new ResourcePattern(
ResourceType.CLUSTER,
Expand All @@ -88,20 +86,20 @@ protected void zeroOperations(final ZonedDateTime now, final String expected) {
protected void twoOperations(final ZonedDateTime now, final String expected) {
final Map<Auditor.AuditKey, UserActivity> dump = new HashMap<>();
final UserActivity userActivity = createUserActivity(now);
userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false));
userActivity.addOperation(new UserOperation(session.getClientAddress(), operation, resource, false));
userActivity.addOperation(
new UserOperation(session.clientAddress(), anotherOperation, anotherResource, true));
new UserOperation(session.getClientAddress(), anotherOperation, anotherResource, true));
dump.put(createAuditKey(session), userActivity);

formatAndAssert(dump, expected);
}

protected Auditor.AuditKey createAuditKey(final RequestChannel.Session session) {
protected Auditor.AuditKey createAuditKey(final Session session) {
switch (aggregationGrouping) {
case USER:
return new Auditor.AuditKey(session.principal(), null);
return new Auditor.AuditKey(session.getPrincipal(), null);
case USER_AND_IP:
return new Auditor.AuditKey(session.principal(), session.clientAddress());
return new Auditor.AuditKey(session.getPrincipal(), session.getClientAddress());
default:
throw new IllegalArgumentException("Unknown aggregation grouping: " + aggregationGrouping);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String
final Map<Auditor.AuditKey, UserActivity> dump = new HashMap<>();

final UserActivity userActivity = createUserActivity(now);
userActivity.addOperation(new UserOperation(session.clientAddress(), operation, resource, false));
userActivity.addOperation(new UserOperation(session.getClientAddress(), operation, resource, false));
dump.put(createAuditKey(session), userActivity);

final UserActivity anotherUserActivity = createUserActivity(now);
anotherUserActivity.addOperation(
new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true));
new UserOperation(anotherSession.getClientAddress(), anotherOperation, anotherResource, true));
dump.put(createAuditKey(anotherSession), anotherUserActivity);

formatAndAssert(dump, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ protected void twoOperationsTwoIpAddresses(final ZonedDateTime now, final String

final UserActivity userActivity = createUserActivity(now);
userActivity.addOperation(
new UserOperation(session.clientAddress(), operation, resource, false));
new UserOperation(session.getClientAddress(), operation, resource, false));
userActivity.addOperation(
new UserOperation(anotherSession.clientAddress(), anotherOperation, anotherResource, true));
new UserOperation(anotherSession.getClientAddress(), anotherOperation, anotherResource, true));
dump.put(createAuditKey(session), userActivity);

formatAndAssert(dump, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel.Session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;

import kafka.network.RequestChannel.Session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -115,16 +114,16 @@ void shouldAggregateOperationsForSameUser() throws Exception {
2,
cast(auditor.auditStorage.get(
new Auditor.AuditKey(
session.principal(),
session.clientAddress())
session.getPrincipal(),
session.getClientAddress())
), UserActivity.UserActivityOperations.class).operations.size()
);
assertEquals(
1,
cast(auditor.auditStorage.get(
new Auditor.AuditKey(
anotherSession.principal(),
anotherSession.clientAddress())
anotherSession.getPrincipal(),
anotherSession.getClientAddress())
), UserActivity.UserActivityOperations.class).operations.size()
);
auditor.dump();
Expand Down Expand Up @@ -152,7 +151,7 @@ void shouldAggregateOperationsForSameUserAndPrincipalGrouping() throws Exception
2,
cast(auditor.auditStorage.get(
new Auditor.AuditKey(
session.principal(),
session.getPrincipal(),
null)
), UserActivity.UserActivityOperationsGropedByIP.class).operations.size()
);
Expand Down

0 comments on commit 10707fd

Please sign in to comment.