Home

Awesome

Kafka security playbook

This repository contains a set of docker images to demonstrate the security configuration of Kafka and the Confluent Platform. The purpose of this repository is NOT to provide production's ready images. It has been designed to be used as an example and to assist peoples configuring the security module of Apache Kafka.

All images has been created from scratch without reusing previously created images, this, to emphasize code and configuration readability over best-practices. For official images, I would recommend you to rely on the Docker Images for the Confluent Platform

Plain authentication (challenge response)

Plain authentication is a simple mechanism based on username/password. It should be used with TLS for encryption to implement secure authentication. This playbook contains a simple configuration where SASL-Plain authentication is used for Kafka.

Usage

cd plain
./up
kafka-console-producer --broker-list kafka:9093 --producer.config /etc/kafka/consumer.properties --topic test
kafka-console-consumer --bootstrap-server kafka:9093 --consumer.config /etc/kafka/consumer.properties --topic test --from-beginning

Important configuration files

<details> <summary><a href="plain/kafka/server.properties">kafka server.properties</a></summary>, <pre> sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN allow.everyone.if.no.acl.found=false super.users=User:kafka authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer </pre> </details> <details> <summary><a href="plain/kafka/consumer.properties">kafka consumer and producer configuration</a></summary> <pre> sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka" \ password="kafka"; </pre> </details> <details> <summary><a href="plain/kafka/kafka.jaas.config">kafka server jaas configuration</a></summary> <pre> KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka" user_kafka="kafka" user_producer="producer-secret" user_consumer="consumer-secret"; }; </pre> </details>

For further information

Scram authentication (challenge response)

Scram is an authentication mechanism that perform username/password authentication in a secure way. This playbook contains a simple configuration where SASL-Scram authentication is used for Zookeeper and Kafka. In it:

Usage

cd scram
# Scripts starting the docker services and generating the kafka user
./up
docker-compose exec kafka kafka-console-producer --broker-list kafka:9093 --producer.config /etc/kafka/consumer.properties --topic test
docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:9093 --consumer.config /etc/kafka/consumer.properties --topic test --from-beginning

Important configuration files

<details> <summary><a href="scram/kafka/server.properties">kafka server.properties</a></summary> <pre> sasl.enabled.mechanisms=SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_PLAINTEXT authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer </pre> </details> <details> <summary><a href="scram/kafka/consumer.properties">kafka consumer and producer configuration</a></summary> <pre> sasl.mechanism=SCRAM-SHA-256 security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="kafka" \ password="kafka"; </pre> </details> <details> <summary><a href="scram/kafka/kafka.sasl.jaas.config">kafka server jaas configuration</a></summary> <pre> KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka"; }; </pre> </details>

For further information

TLS with x509 authentication

TLS, previously known as SSL, is a cryptography protocol providing network encryption via asymetric certificates and keys. This playbook contains a basic configuration to enforce TLS between the broker and a client. Be aware that right now zookeeper didn't release TLS as an official feature, thus only the broker is configured for TLS. In this playbook, TLS is used for both encryption, authentication and authorization. the up script generates the following file before starting docker-compose services:

  1. certs/ca.key, certs/ca.crt - public and private key of the certificate authority
  2. certs/server.keystore.jks - keystore containing the signed certificate of the kafka broker
  3. certs/client.keystore.jks - keystore containing the signed certificate of a kafka client. It has been granted super user permision

Usage

cd tls
# Scripts generating the required certificate and starting docker-compose services
./up
docker-compose exec kafka kafka-console-producer --broker-list kafka.confluent.local:9093 --topic test --producer.config /etc/kafka/consumer.properties
docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka.confluent.local:9093 --topic test --consumer.config /etc/kafka/consumer.properties --from-beginning

#Avro consumer/producer using schema registry
docker-compose exec kafka kafka-avro-console-producer --broker-list kafka.confluent.local:9093 --topic avro_test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=https://schema-registry.confluent.local:8443 --producer.config /etc/kafka/consumer.properties
#example message: {"f1": "value1"}
kafka-avro-console-consumer --topic avro_test --from-beginning --property schema.registry.url=https://schema-registry.confluent.local:8443 --consumer.config /etc/kafka/consumer.properties --bootstrap-server kafka.confluent.local:9093

To connect from a producer/consumer running on your local machine:

docker-compose exec kafka kafka-acls --authorizer-properties zookeeper.connect=zookeeper.confluent.local:2181 --add --allow-principal User:CN=<YOUR LOCAL HOSTNAME>,L=London,O=Confluent,C=UK --operation All --topic '*' --cluster;

Set the following JVM parameters:

-Djavax.net.ssl.keyStore=<kafka-security-playbook DIR>/tls/certs/local-client.keystore.jks
-Djavax.net.ssl.trustStore=<kafka-security-playbook DIR>/tls/certs/truststore.jks
-Djavax.net.ssl.keyStorePassword=test1234
-Djavax.net.ssl.trustStorePassword=test1234

Important configuration files

<details> <summary><a href="tls/kafka/server.properties"> kafka server.properties</a></summary> <pre> listeners=SSL://kafka.confluent.local:9093 advertised.listeners=SSL://kafka.confluent.local:9093 security.inter.broker.protocol=SSL ssl.truststore.location=/var/lib/secret/truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/lib/secret/server.keystore.jks ssl.keystore.password=test1234 ssl.client.auth=required # To use TLS based authorization authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:CN=kafka.confluent.local,L=London,O=Confluent,C=UK </pre> </details> <details> <summary><a href="tls/kafka/consumer.properties">kafka consumer and producer configuration</a></summary> <pre> bootstrap.servers=kafka.conflent.local:9093 security.protocol=SSL ssl.truststore.location=/var/lib/secret/truststore.jks ssl.truststore.password=test1234 ssl.keystore.location=/var/lib/secret/client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 </pre> </details>

For further information

Kerberos (GSSAPI) authentication without TLS

This example contains a basic KDC server and configure both zookeeper and kafka with Kerberos authentication and authorization. Credentials are created without password, a keytab containing credentials is available in a Docker volume named "secret". The following credential are automatically created in the KDC database:

  1. kafka/admin - to access zookeeper
  2. kafka_producer/producer - to access kafka as a producer
  3. kafka_consumer/consumer - to access kafka as a consumer

Usage

cd kerberos
# Scripts orchestrating the docker-compose services
./up
# Using kinit with a keytab for authentication then invoking kafka interfaces
docker-compose exec kafka bash -c 'kinit -k -t /var/lib/secret/kafka.key kafka_producer/producer && kafka-console-producer --broker-list kafka:9093 --topic test --producer.config /etc/kafka/consumer.properties'
docker-compose exec kafka bash -c 'kinit -k -t /var/lib/secret/kafka.key kafka_consumer/consumer && kafka-console-consumer --bootstrap-server kafka:9093 --topic test --consumer.config /etc/kafka/consumer.properties --from-beginning'

Important configuration files

<details> <summary><a href="kerberos/zookeeper/zookeeper.properties">zookeeper properties</a></summary> <pre> authProvider.1 = org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl </pre> </details> <details> <summary><a href="kerberos/zookeeper/zookeeper.sasl.jaas.config">zookeeper server and client jaas configuration</a></summary> <pre> Server { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false keyTab="/var/lib/secret/kafka.key" principal="zookeeper/zookeeper.kerberos_default@TEST.CONFLUENT.IO"; }; </pre> </details> <details> <summary><a href="kerberos/kafka/server.properties">kafka server.properties</a></summary> <pre> listeners=SASL_PLAINTEXT://kafka:9093 advertised.listeners=SASL_PLAINTEXT://kafka:9093 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=GSSAPI sasl.mechanism.inter.broker.protocol=GSSAPI security.inter.broker.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka allow.everyone.if.no.acl.found=false super.users=User:admin;User:kafka authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer </pre> </details> <details> <summary><a href="kerberos/kafka/kafka.sasl.jaas.config">kafka server and client jaas configuration</a></summary> <pre> /* * Cluster kerberos services */ KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/var/lib/secret/kafka.key" principal="kafka/kafka.kerberos_default@TEST.CONFLUENT.IO"; };

/*

/*

</details> <details> <summary><a href="kerberos/kafka/consumer.properties">kafka consumer and producer configuration</a></summary> <pre> bootstrap.servers=kafka:9093 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useTicketCache=true </pre> </details>

For further information

Oauth authentication via TLS encryption

Kafka supports SASL authentication via Oauth bearer tokens. A sample playbook for secured oauth token authentication is contained in the oauth subfolder of this repository.

Usage

Prerequisites: jdk8, maven, docker-compose, openssl.

cd oauth
./up

In this sample playbook both the identity of brokers (sasl.mechanism.inter.broker.protocol=OAUTHBEARER within server.properties) and the identity of clients (sasl.mechanism=OAUTHBEARER within consumer.properties) are verified by the brokers using oauth bearer tokens.

Within this sample playbook oauth bearer tokens are generated and validated using the jjwt library without communication to an authorization server. In real life, this would be different.

The class OauthBearerLoginCallbackHandler is used by the clients and by brokers to generate a JWT token using a shared secret. This class is configured within the client.properties file:

Note that the client does not need to have a keystore configured, since client authentication is achieved using bearer tokens. Still it needs a truststore to store the brokers certificate authorities.

<details> <summary><a href="oauth/kafka/client.properties">kafka consumer and producer configuration</a></summary> <pre> security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.examples.authentication.oauth.OauthBearerLoginCallbackHandler ssl.truststore.location=/etc/kafka/kafka.client.truststore.jks ssl.truststore.password=secret </pre> </details>

The OauthBearerLoginCallbackHandler class is also configured for broker clients within the server.properties file (see below). The server.properties file must also include a reference to the token validator class (OauthBearerValidatorCallbackHandler):

<details> <summary><a href="oauth/kafka/server.properties">kafka broker configuration</a></summary> <pre> listeners=SASL_SSL://kafka.confluent.local:9093 advertised.listeners=SASL_SSL://kafka.confluent.local:9093 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=OAUTHBEARER sasl.enabled.mechanisms=OAUTHBEARER listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=io.confluent.examples.authentication.oauth.OauthBearerValidatorCallbackHandler listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class=io.confluent.examples.authentication.oauth.OauthBearerLoginCallbackHandler ssl.truststore.location=/etc/kafka/kafka.server.truststore.jks ssl.truststore.password=secret ssl.keystore.location=/etc/kafka/kafka.server.keystore.jks ssl.keystore.password=secret ssl.key.password=secret </pre> </details>

Kafka brokers need a keystore to store its private certificate as well as a truststore to verify the identity of other brokers.

Further information

Schema registry basic security

According to documentation the schema registry plugin only supports SSL principals, but there is an undocumented separate authentication possibility via Jetty Authentication.

cd schema-registry-basic-auth
./up

Now you can access the schema registry REST interface on http://localhost:8089

Note that in order to test the schema registry properly, you need to either curl into it, or use the kafka-avro-consule-producer and consumer. The latter require special considerations.

First, access via curl:

curl -X GET http://localhost:8089 -u admin:admin

If you want to try out the console producer, you need to exec into the schema-registry docker image and then run the producer:

docker-compose exec schema-registry bash
kafka-avro-console-producer --broker-list kafka:9092 --topic avro-test --property \
   value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' \
   --property basic.auth.credentials.source=USER_INFO \
   --property schema.registry.basic.auth.user.info=write:write

> {"f1": "value1"}
> {"f1": "value2"}
> ^D

Note that the official documentation is wrong on two accounts. First, to define the source, you need to use basic.auth.credentials.source without the schema.registry in front of it.

Second, user authentication via a property file gets ignored, you need to pass the credentials via --property.

Schema registry semi-open security

This playbook is an example of configuration where Schema Registry is configured for accepting request on http and https. Requests on the http endpoint are actually identified as the ANONYMOUS user. This is possible thanks to the confluent.schema.registry.anonymous.principal=true option.

The following ACLs are configured:

With this configuration, curl -X GET http://localhost:8089/subjects/ is successful, but the ANONYMOUS user does not have the privileges to write new schemas. Only the client with the TLS client certificate C=UK,O=Confluent,L=London,CN=schema-registry can write new schemas, this could be for example your CI tool or an admin user.