KoP supports authentication and SSL connection.
By default, KoP is installed with no encryption, authentication, and authorization. You can enable the authentication feature for KoP to improve security. Currently, this feature is only available in Java.
KoP authentication mechanism uses Kafka SASL mechanisms and achieves authentication with Pulsar token-based authentication mechanism. Consequently, if you want to enable the authentication feature for KoP, you need to enable authentication for the following components:
-
Pulsar brokers
-
KoP (some configurations of KoP rely on the configurations of Pulsar brokers)
-
Kafka clients
Currently, KoP supports the following SASL mechanisms:
If you want to enable the authentication feature for KoP using the PLAIN
mechanism, follow the steps below.
-
Enable authentication on Pulsar broker.
For the
PLAIN
mechanism, the Kafka authentication is forwarded to the JWT authentication of Pulsar, so you need to configure the JWT authentication and set the following properties in theconf/broker.conf
orconf/standalone.conf
file.(1) Enable authentication for the Pulsar broker.
authenticationEnabled=true authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
(2) Enable authentication between Pulsar broker and KoP.
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken brokerClientAuthenticationParameters=token:<token-of-super-user-role> superUserRoles=<super-user-roles>
(3) Specify the key.
For more information, see Enable Token Authentication on Brokers.
-
If you use a secret key, set the property as below.
tokenSecretKey=file:///path/to/secret.key
The key can also be passed inline.
tokenSecretKey=data:;base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=
-
If you use a public/private key, set the property as below.
tokenPublicKey=file:///path/to/public.key
-
-
Enable authentication on KoP.
Set the following property in the
conf/broker.conf
orconf/standalone.conf
file.saslAllowedMechanisms=PLAIN
-
Enable authentication on Kafka client.
To forward your credentials,
SASL-PLAIN
is used on the Kafka client side. To enableSASL-PLAIN
, you need to set the following properties through Kafka JAAS.Property Description Example value username
username
of Kafka JAAS is thetenant/namespace
, where Kafka’s topics are stored in Pulsar.
Note In KoP 2.9.0 or above, the username is only used to be compatible with version history, has‘t actual function.public/default
password
password
must be your token authentication parameters from Pulsar.
The token can be created by Pulsar token tools. The role is thesubject
for the token. It is embedded in the created token and the broker can getrole
by parsing this token.token:xxx
security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \ required username="public/default" password="token:xxx";
If you want to enable the authentication feature for KoP using the OAUTHBEARER
mechanism, follow the steps below.
-
Enable authentication on Pulsar broker.
Set the following properties in the
conf/broker.conf
orconf/standalone.conf
file.The properties here are same to that of the
PLAIN
mechanism exceptbrokerClientAuthenticationPlugin
andbrokerClientAuthenticationParameters
.Property Description Required or optional Example value type
Oauth 2.0 authentication type
The default value isclient_credentials
Optional client_credentials
privateKey
URL to a JSON credential file
The following pattern formats are supported:
-file:///path/to/file
-file:/path/to/file
-data:application/json;base64,<base64-encoded value>
Required file:///path/to/credentials_file.json issuerUrl
URL of the authentication provider which allows the Pulsar client to obtain an access token Required https://accounts.google.com
audience
An OAuth 2.0 "resource server" identifier for the Pulsar cluster Required https://broker.example.com
authenticationEnabled=true authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken superUserRoles=<super-user-roles> brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 brokerClientAuthenticationParameters={"type":"client_credentials","privateKey":"file:///path/to/credentials_file.json","issuerUrl":"<issuer-url>","audience":"<audience>"} tokenPublicKey=<token-public-key>
-
Enable authentication on KoP.
(1) Set the following property in the
conf/broker.conf
orconf/standalone.conf
file.saslAllowedMechanisms=OAUTHBEARER
(2) Specify the Kafka server callback handler.
For the
OAUTHBEARER
mechanism, you can useAuthenticationProviderToken
or custom your authentication provider to process the access tokens from OAuth 2.0 server.KoP provides a built-in
AuthenticateCallbackHandler
that uses the authentication provider of Pulsar for authentication. You need to configure the following properties in theconf/kop-handler.properties
file.# Use the KoP's built-in handler kopOauth2AuthenticateCallbackHandler=io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler # Java property configuration file of OauthValidatorCallbackHandler kopOauth2ConfigFile=conf/kop-handler.properties
(3) Specify the authentication method name of the provider (that is, oauth.validate.method
) in the conf/kop-handler.properties
file.
-
If you use
AuthenticationProviderToken
, sinceAuthenticationProviderToken#getAuthMethodName()
returnstoken
, set theoauth.validate.method
as the token. -
If you use other providers, set the
oauth.validate.method
as the result ofgetAuthMethodName()
.oauth.validate.method=token
-
Enable authentication on Kafka client.
(1) Install the KoP built-in callback handler to your local Maven repository.
To get an access token from an OAuth 2.0 server, you need to use the KoP built-in callback handler instead of the Kafka login callback handler.
mvn clean install -pl oauth-client -DskipTests
(2) Add the following dependencies to the
pom.xml
file.For stable releases, the
pulsar.version
is the same to thekop.version
.<dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>oauth-client</artifactId> <version>${kop.version}</version> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> </exclusions> </dependency> <!-- KoP's login callback handler has a pulsar-client dependency --> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency>
(3) Configure the producer or consumer with the following required properties.
Property Description Example value Note sasl.login.callback.handler.class
Class of SASL login callback handler io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler Set the value of this property as the same to the value in the example below. security.protocol
Security protocol SASL_PLAINTEXT sasl.mechanism
SASL mechanism OAUTHBEARER sasl.jaas.config
JAAS configuration org.apache.kafka.coPropertymmon.security.oauthbearer.OAuthBearerLoginModule oauth.issuer.url
URL of the authentication provider which allows the Pulsar client to obtain an access token. https://accounts.google.com This property is the same to the issuerUrl property in Pulsar client credentials oauth.credentials.url
URL to a JSON credentials file.
The following pattern formats are supported:
- file:///path/to/file
- file:/path/to/file
- data:application/json;base64,file:///path/to/credentials_file.json This property is the same to the privateKey property in Pulsar client credentials oauth.audience
OAuth 2.0 "resource server" identifier for the Pulsar cluster. https://broker.example.com This property is the same to the audience property in Pulsar client credentials
sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required oauth.issuer.url="https://accounts.google.com"\
oauth.credentials.url="file:///path/to/credentials_file.json"\
oauth.audience="https://broker.example.com";
To enable authorization on KoP, please make sure the authentication is enabled.
Note: For more information, see Authorization.
-
Enable authorization and assign superusers for the Pulsar broker.
authorizationEnabled=true
-
Generate JWT tokens.
A token is the credential associated with a user. The association is done through the "
principal
" or "role
". In the case of JWT tokens, this field is typically referred assubject
, though they are exactly the same concept. Then, you need to use this command to require the generated token to have asubject
field set.$ bin/pulsar tokens create --secret-key file:///path/to/secret.key \ --subject <user-role>
This command prints the token string on stdout.
-
Grant permission to specific role.
The token itself does not have any permission associated. The authorization engine determines whether the token should have permissions or not. Once you have created the token, you can grant permission for this token to do certain actions.
The following is an example.$ bin/pulsar-admin --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" --auth-params "token:<token-of-super-user-role>" \ namespaces grant-permission <tenant>/<namespace> \ --role <user-role> \ --actions produce,consume
KoP supports the following configuration types for Kafka listeners:
- PLAINTEXT
- SSL
Example
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
Tip For how to configure SSL keys, see Kafka SSL.
The following example shows how to connect KoP through SSL.
-
Create SSL related keys.
This example creates the related CA and JKS files.
# You need to input a password, for example "server-keystore". keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey # You need to input a password, for example "server". openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 # You need to input a password, for example "server-truststore" keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert # You need to input a password, for example "client-truststore" keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert # You must input the password of server.keystore.jks: "server-keystore" keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file # NOTE: the password followed by `-passin pass:` is the password of ca-cert: "server" openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:server # You must input the password of server.keystore.jks: "server-keystore" keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert # You must input the password of server.keystore.jks: "server-keystore" keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
In above example, we have input four passwords:
server-keystore
forserver.keystore.jks
fileserver
forca-cert
andca-key
filesserver-truststore
forserver.truststore.jks
fileclient-truststore
forclient.truststore.jks
file
-
Configure the KoP broker.
In the StreamNative Platform configuration file (
${PLATFORM_HOME}/etc/pulsar/broker.conf
or${PLATFORM_HOME}/etc/pulsar/standalone.conf
), add the related configurations that using the jks configurations created in Step 1:listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 # You need to use the full path of server.keystore.jks kopSslKeystoreLocation=server.keystore.jks kopSslKeystorePassword=server-keystore kopSslKeyPassword=server-keystore # You need to use the full path of server.truststore.jks kopSslTruststoreLocation=server.truststore.jks kopSslTruststorePassword=server-truststore
-
Configure the Kafka client.
(1) Prepare a file named
client-ssl.properties
. The file contains the following information.security.protocol=SSL # You need to use the full path of client.truststore.jks ssl.truststore.location=client.truststore.jks ssl.truststore.password=client-truststore # The identification algorithm must be empty ssl.endpoint.identification.algorithm=
(2) Verify the console-producer and the console-consumer.
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
Tip For more information, see Configure Kafka client.