Home

Awesome

Open Policy Agent plugin for Kafka authorization

Maven Central codecov

Open Policy Agent (OPA) plugin for Kafka authorization.

Prerequisites

Installation

Download the latest OPA authorizer plugin jar from Releases (or Maven Central) and put the file (opa-authorizer-{$VERSION}.jar) somewhere Kafka recognizes it - this could be directly in Kafka's libs directory or in a separate plugin directory pointed out to Kafka at startup, e.g:

CLASSPATH=/usr/local/share/kafka/plugins/*

To activate the opa-kafka-plugin add the authorizer.class.name to server.properties
authorizer.class.name=org.openpolicyagent.kafka.OpaAuthorizer

<br /> The plugin supports the following properties:
Property KeyExampleDefaultDescription
opa.authorizer.urlhttp://opa:8181/v1/data/kafka/authz/allowName of the OPA policy to query. [required]
opa.authorizer.allow.on.errorfalsefalseFail-closed or fail-open if OPA call fails.
opa.authorizer.cache.initial.capacity50005000Initial decision cache size.
opa.authorizer.cache.maximum.size5000050000Max decision cache size.
opa.authorizer.cache.expire.after.seconds36003600Decision cache expiry in seconds.
opa.authorizer.metrics.enabledtruefalseWhether or not expose JMX metrics for monitoring.
super.usersUser:alice;User:bobSuper users which are always allowed.
opa.authorizer.truststore.path/path/to/mytruststore.p12Path to the PKCS12 truststore for HTTPS requests to OPA.
opa.authorizer.truststore.passwordichangeditchangeitPassword for the truststore.
opa.authorizer.truststore.typePKCS12, JKS or whatever your JVM supportsPKCS12Type of the truststore.

Usage

Example structure of input data provided from opa-kafka-plugin to Open Policy Agent.

{
    "action": {
        "logIfAllowed": true,
        "logIfDenied": true,
        "operation": "DESCRIBE",
        "resourcePattern": {
            "name": "alice-topic",
            "patternType": "LITERAL",
            "resourceType": "TOPIC",
            "unknown": false
        },
        "resourceReferenceCount": 1
    },
    "requestContext": {
        "clientAddress": "192.168.64.1",
        "clientInformation": {
            "softwareName": "unknown",
            "softwareVersion": "unknown"
        },
        "connectionId": "192.168.64.4:9092-192.168.64.1:58864-0",
        "header": {
            "data": {
                "clientId": "rdkafka",
                "correlationId": 5,
                "requestApiKey": 3,
                "requestApiVersion": 2
            },
            "headerVersion": 1
        },
        "listenerName": "SASL_PLAINTEXT",
        "principal": {
            "name": "alice-consumer",
            "principalType": "User"
        },
        "securityProtocol": "SASL_PLAINTEXT"
    }
}

The following table summarizes the supported resource types and operation names.

input.action.resourcePattern.resourceTypeinput.action.operation
CLUSTERCLUSTER_ACTION
CLUSTERCREATE
CLUSTERDESCRIBE
GROUPREAD
GROUPDESCRIPTION
TOPICCREATE
TOPICALTER
TOPICDELETE
TOPICDESCRIBE
TOPICREAD
TOPICWRITE
TRANSACTIONAL_IDDESCRIBE
TRANSACTIONAL_IDWRITE

These are handled by the method authorizeAction, and passed to OPA with an action, that identifies the accessed resource and the performed operation. patternType is always LITERAL.

Creation of a topic checks for CLUSTER + CREATE. If this is denied, it will check for TOPIC with its name + CREATE.

When doing idepotent write to a topic, and the first request for operation=IDEMPOTENT_WRITE on the resourceType=CLUSTER is denied, the method authorizeByResourceType to check, if the user has the right to write to any topic. If yes, the idempotent write is granted by Kafka's ACL-implementation. To allow for a similar check, it is mapped to OPA with patternType=PREFIXED, resourceType=TOPIC, and name="".

{
  "action": {
    "logIfAllowed": true,
    "logIfDenied": true,
    "operation": "DESCRIBE",
    "resourcePattern": {
      "name": "",
      "patternType": "PREFIXED",
      "resourceType": "TOPIC",
      "unknown": false
    },
    "resourceReferenceCount": 1
  },
  ...
}

It's likely possible to use all different resource types and operations described in the Kafka API docs: https://kafka.apache.org/24/javadoc/org/apache/kafka/common/acl/AclOperation.html https://kafka.apache.org/24/javadoc/org/apache/kafka/common/resource/ResourceType.html

Security protocols:

ProtocolDescription
PLAINTEXTUn-authenticated, non-encrypted channel
SASL_PLAINTEXTauthenticated, non-encrypted channel
SASLauthenticated, SSL channel
SSLSSL channel

More info:

https://kafka.apache.org/24/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html

Policy sample

With the sample policy rego you will out of the box get a structure where an "owner" can one user per type (consumer, producer, mgmt). The owner and user type is separated by -.


<b>Example:</b>
User alice-consumer will be...

See sample rego

Build from source

Using gradle wrapper: ./gradlew clean test shadowJar

The resulting jar (with dependencies embedded) will be named opa-authorizer-{$VERSION}-all.jar and stored in build/libs.

Logging

Set log level log4j.logger.org.openpolicyagent=INFO in config/log4j.properties Use DEBUG or TRACE for debugging.

In a busy Kafka cluster it might be good to tweak the cache since it may produce a lot of log entries in Open Policy Agent, especially if decision logs are turned on. If the policy isn't dynamically updated very often it's recommended to cache a lot to improve performance and reduce the amount of log entries.

Monitoring

The plugin exposes some metrics that can be useful in operation.

Community

For questions, discussions and announcements related to Styra products, services and open source projects, please join the Styra community on Slack!