diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/authentication/AuthenticationProviderBasic.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/authentication/AuthenticationProviderBasic.java index a11d7725..aeea0f14 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/authentication/AuthenticationProviderBasic.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/authentication/AuthenticationProviderBasic.java @@ -40,10 +40,28 @@ public class AuthenticationProviderBasic implements AuthenticationProvider { private static final String HTTP_HEADER_NAME = "Authorization"; private Map users; + AuthenticationMetrics metrics; + + private enum ErrorCode { + UNKNOWN, + EMPTY_AUTH_DATA, + INVALID_HEADER, + INVALID_AUTH_DATA, + INVALID_TOKEN, + } + @SneakyThrows @Override public void initialize(ServiceConfiguration config) { - String basicAuthConf = (String) config.getProperty("basicAuthConf"); + initialize(Context.builder().config(config).build()); + } + + @SneakyThrows + @Override + public void initialize(Context context) throws IOException { + metrics = new AuthenticationMetrics(context.getOpenTelemetry(), + getClass().getSimpleName(), getAuthMethodName()); + String basicAuthConf = (String) context.getConfig().getProperty("basicAuthConf"); byte[] data; boolean isFile = basicAuthConf.startsWith("file:"); @@ -79,8 +97,10 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat String password = authParams.getPassword(); String msg = "Unknown user or invalid password"; + ErrorCode errorCode = ErrorCode.UNKNOWN; try { if (users.get(userId) == null) { + errorCode = ErrorCode.EMPTY_AUTH_DATA; throw new AuthenticationException(msg); } @@ -92,19 +112,20 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat if (splitEncryptedPassword.size() != 4 || !encryptedPassword .equals(Md5Crypt.apr1Crypt(password.getBytes(StandardCharsets.UTF_8), splitEncryptedPassword.get(2)))) { + errorCode = ErrorCode.INVALID_TOKEN; throw new AuthenticationException(msg); } // For crypt algorithm } else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(StandardCharsets.UTF_8), encryptedPassword.substring(0, 2)))) { + errorCode = ErrorCode.INVALID_TOKEN; throw new AuthenticationException(msg); } } catch (AuthenticationException exception) { - AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), - exception.getMessage()); + incrementFailureMetric(errorCode); throw exception; } - AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); + metrics.recordSuccess(); return userId; } @@ -113,7 +134,7 @@ public void close() throws IOException { // noop } - private static class AuthParams { + private class AuthParams { private final String userId; private final String password; @@ -125,10 +146,12 @@ public AuthParams(AuthenticationDataSource authData) throws AuthenticationExcept String rawAuthToken = authData.getHttpHeader(HTTP_HEADER_NAME); // parsing and validation if (StringUtils.isBlank(rawAuthToken) || !rawAuthToken.toUpperCase().startsWith("BASIC ")) { + incrementFailureMetric(ErrorCode.INVALID_HEADER); throw new AuthenticationException("Authentication token has to be started with \"Basic \""); } String[] splitRawAuthToken = rawAuthToken.split(" "); if (splitRawAuthToken.length != 2) { + incrementFailureMetric(ErrorCode.INVALID_HEADER); throw new AuthenticationException("Base64 encoded token is not found"); } @@ -136,14 +159,17 @@ public AuthParams(AuthenticationDataSource authData) throws AuthenticationExcept authParams = new String(java.util.Base64.getDecoder().decode(splitRawAuthToken[1]), StandardCharsets.UTF_8); } catch (Exception e) { + incrementFailureMetric(ErrorCode.INVALID_HEADER); throw new AuthenticationException("Base64 decoding is failure: " + e.getMessage()); } } else { + incrementFailureMetric(ErrorCode.EMPTY_AUTH_DATA); throw new AuthenticationException("Authentication data source does not have data"); } String[] parsedAuthParams = authParams.split(":"); if (parsedAuthParams.length != 2) { + incrementFailureMetric(ErrorCode.INVALID_AUTH_DATA); throw new AuthenticationException("Base64 decoded params are invalid"); } @@ -159,4 +185,10 @@ public String getPassword() { return password; } } + + @Override + public void incrementFailureMetric(Enum errorCode) { + metrics.recordFailure(errorCode); + } + } diff --git a/pom.xml b/pom.xml index a60853bd..0d9ce9b1 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ ${maven.compiler.target} - 3.4.0-SNAPSHOT + 4.0.0-ursa-4-SNAPSHOT 8.0.0 5.8.0 @@ -384,6 +384,10 @@ nexus-snapshot-repo https://s01.oss.sonatype.org/content/repositories/snapshots + + ossrh + https://s01.oss.sonatype.org/service/local/repositories/0/content +