1# Kafka Authorization 2 3[Apache Kafka](https://kafka.apache.org/) is a high-performance distributed 4streaming platform deployed by thousands of companies. In many deployments, 5administrators require fine-grained access control over Kafka topics to 6enforce important requirements around confidentiality and integrity. 7 8## Goals 9 10This tutorial shows how to enforce fine-grained access control over Kafka 11topics. In this tutorial you will use OPA to define and enforce an 12authorization policy stating: 13 14* Consumers of topics containing Personally Identifiable Information (PII) must be whitelisted. 15* Producers to topics with _high fanout_ must be whitelisted. 16 17In addition, this tutorial shows how to break up a policy with small helper 18rules to reuse logic and improve overall readability. 19 20## Prerequisites 21 22This tutorial requires [Docker Compose](https://docs.docker.com/compose/install/) to run Kafka, ZooKeeper, and OPA. 23 24## Steps 25 26### 1. Bootstrap the tutorial environment using Docker Compose. 27 28First, create an OPA policy that allows all requests. You will update this policy later in the tutorial. 29 30```bash 31mkdir -p policies 32``` 33 34**policies/tutorial.rego**: 35 36```ruby 37package kafka.authz 38 39allow = true 40``` 41 42Next, create a `docker-compose.yaml` file that runs OPA, ZooKeeper, and Kafka. 43 44**docker-compose.yaml**: 45 46```yaml 47version: "2" 48services: 49 opa: 50 hostname: opa 51 image: openpolicyagent/opa:0.8.2 52 ports: 53 - 8181:8181 54 command: "run --server --watch /policies" 55 volumes: 56 - ./policies:/policies 57 zookeeper: 58 image: confluentinc/cp-zookeeper:4.0.0-3 59 environment: 60 ZOOKEEPER_CLIENT_PORT: 2181 61 zk_id: "1" 62 kafka: 63 hostname: kafka 64 image: openpolicyagent/demo-kafka:1.0 65 links: 66 - zookeeper 67 - opa 68 ports: 69 - "9092:9092" 70 environment: 71 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" 72 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" 73 KAFKA_ADVERTISED_LISTENERS: "SSL://:9093" 74 KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL 75 KAFKA_SSL_CLIENT_AUTH: required 76 KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks 77 KAFKA_SSL_KEYSTORE_CREDENTIALS: broker_keystore_creds 78 KAFKA_SSL_KEY_CREDENTIALS: broker_sslkey_creds 79 KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks 80 KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds 81 KAFKA_AUTHORIZER_CLASS_NAME: com.lbg.kafka.opa.OpaAuthorizer 82 KAFKA_OPA_AUTHORIZER_URL: "http://opa:8181/v1/data/kafka/authz/allow" 83 KAFKA_OPA_AUTHORIZER_ALLOW_ON_ERROR: "false" 84 KAFKA_OPA_AUTHORIZER_CACHE_INITIAL_CAPACITY: 100 85 KAFKA_OPA_AUTHORIZER_CACHE_MAXIMUM_SIZE: 100 86 KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_MS: 600000 87``` 88 89For more information on how to configure the OPA plugin for Kafka, see the [github.com/open-policy-agent/contrib](https://github.com/open-policy-agent/contrib) 90repository. 91 92Once you have created the file, launch the containers for this tutorial. 93 94```bash 95docker-compose --project-name opa-kafka-tutorial up 96``` 97 98Now that the tutorial environment is running, we can define an authorization policy using OPA and test it. 99 100#### Authentication 101 102The Docker Compose file defined above requires **SSL client authentication** 103for clients that connect to the broker. Enabling SSL client authentication 104allows for service identities to be provided as input to your policy. The 105example below shows the input structure. 106 107```json 108{ 109 "operation": { 110 "name": "Write", 111 }, 112 "resource": { 113 "resourceType": { 114 "name": "Topic", 115 }, 116 "name": "credit-scores" 117 }, 118 "session": { 119 "principal": { 120 "principalType": "User", 121 }, 122 "clientAddress": "172.21.0.5", 123 "sanitizedUser": "CN%3Danon_producer.tutorial.openpolicyagent.org%2COU%3DTUTORIAL%2CO%3DOPA%2CL%3DSF%2CST%3DCA%2CC%3DUS" 124 } 125} 126``` 127 128The client identity is extracted from the SSL certificates that clients 129present when they connect to the broker. The client identity information is 130encoded in the `input.session.sanitizedUser` field. This field can be decoded 131inside the policy. 132 133Generating SSL certificates and JKS files required for SSL client 134authentication is outside the scope of this tutorial. To simplify the steps 135below, the Docker Compose file uses an extended version of the 136[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/) 137image from Docker Hub. The extended image includes **pre-generated SSL 138certificates** that the broker and clients use to identify themselves. 139 140Do not rely on these pre-generated SSL certificates in real-world scenarios. 141They are only provided for convenience/test purposes. 142 143#### Kafka Authorizer JAR File 144 145The Kafka image used in this tutorial includes a pre-installed JAR file that implements the [Kafka Authorizer](https://kafka.apache.org/documentation/#security_authz) interface. For more information on the authorizer see [open-policy-agent/contrib/kafka_authorizer](https://github.com/open-policy-agent/contrib/tree/master/kafka_authorizer). 146 147### 2. Define a policy to restrict consumer access to topics containing Personally Identifiable Information (PII). 148 149Update the `policies/tutorial.rego` with the following content. 150 151```ruby 152#----------------------------------------------------------------------------- 153# High level policy for controlling access to Kafka. 154# 155# * Deny operations by default. 156# * Allow operations if no explicit denial. 157# 158# The kafka-authorizer-opa plugin will query OPA for decisions at 159# /kafka/authz/allow. If the policy decision is _true_ the request is allowed. 160# If the policy decision is _false_ the request is denied. 161#----------------------------------------------------------------------------- 162package kafka.authz 163 164default allow = false 165 166allow { 167 not deny 168} 169 170deny { 171 is_read_operation 172 topic_contains_pii 173 not consumer_is_whitelisted_for_pii 174} 175 176#----------------------------------------------------------------------------- 177# Data structures for controlling access to topics. In real-world deployments, 178# these data structures could be loaded into OPA as raw JSON data. The JSON 179# data could be pulled from external sources like AD, Git, etc. 180#----------------------------------------------------------------------------- 181 182consumer_whitelist = {"pii": {"pii_consumer"}} 183 184topic_metadata = {"credit-scores": {"tags": ["pii"]}} 185 186#----------------------------------- 187# Helpers for checking topic access. 188#----------------------------------- 189 190topic_contains_pii { 191 topic_metadata[topic_name].tags[_] == "pii" 192} 193 194consumer_is_whitelisted_for_pii { 195 consumer_whitelist.pii[_] == principal.name 196} 197 198#----------------------------------------------------------------------------- 199# Helpers for processing Kafka operation input. This logic could be split out 200# into a separate file and shared. For conciseness, we have kept it all in one 201# place. 202#----------------------------------------------------------------------------- 203 204is_write_operation { 205 input.operation.name == "Write" 206} 207 208is_read_operation { 209 input.operation.name == "Read" 210} 211 212is_topic_resource { 213 input.resource.resourceType.name == "Topic" 214} 215 216topic_name = input.resource.name { 217 is_topic_resource 218} 219 220principal = {"fqn": parsed.CN, "name": cn_parts[0]} { 221 parsed := parse_user(urlquery.decode(input.session.sanitizedUser)) 222 cn_parts := split(parsed.CN, ".") 223} 224 225parse_user(user) = {key: value | 226 parts := split(user, ",") 227 [key, value] := split(parts[_], "=") 228} 229``` 230 231The `./policies` directory is mounted into the Docker container running OPA. 232When the files under this directory change, OPA is notified and the policies 233are automatically reloaded. 234 235At this point, you can exercise the policy. 236 237### 3. Exercise the policy that restricts consumer access to topics containing PII. 238 239This step shows how you can grant fine-grained access to services using 240Kafka. In this scenario, some services are allowed to read PII data while 241others are not. 242 243First, run `kafka-console-producer` to generate some data on the 244`credit-scores` topic. 245 246> This tutorial uses the `kafka-console-producer` and `kafka-console-consumer` scripts to generate and display Kafka messages. These scripts read from STDIN and write to STDOUT and are frequently used to send and receive data via Kafka over the command line. If you are not familiar with these scripts you can learn more in Kafka's [Quick Start](https://kafka.apache.org/documentation/#quickstart) documentation. 247 248 249```bash 250docker run --rm --network opakafkatutorial_default \ 251 openpolicyagent/demo-kafka:1.0 \ 252 bash -c 'for i in {1..10}; do echo "{\"user\": \"bob\", \"score\": $i}"; done | kafka-console-producer --topic credit-scores --broker-list kafka:9093 -producer.config /etc/kafka/secrets/anon_producer.ssl.config' 253``` 254 255This command will send 10 messages to the `credit-scores` topic. Bob's credit 256score seems to be improving. 257 258Next, run `kafka-console-consumer` and try to read data off the topic. Use 259the `pii_consumer` credentials to simulate a service that is allowed to read 260PII data. 261 262```bash 263docker run --rm --network opakafkatutorial_default \ 264 openpolicyagent/demo-kafka:1.0 \ 265 kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/pii_consumer.ssl.config 266``` 267 268This command will output the 10 messages sent to the topic in the first part 269of this step. Once the 10 messages have been printed, exit out of the script 270(^C). 271 272Finally, run `kafka-console-consumer` again but this time try to use the 273`anon_consumer` credentials. The `anon_consumer` credentials simulate a 274service that has **not** been explicitly granted access to PII data. 275 276```bash 277docker run --rm --network opakafkatutorial_default \ 278 openpolicyagent/demo-kafka:1.0 \ 279 kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config 280``` 281 282Because the `anon_consumer` is not allowed to read PII data, the request will 283be denied and the consumer will output an error message. 284 285``` 286Not authorized to read from topic credit-scores. 287... 288Processed a total of 0 messages 289``` 290 291### 4. Extend the policy to prevent services from accidentally writing to topics with large fanout. 292 293First, add the following content to the policy file (`./policies/tutorial.rego`): 294 295 296```ruby 297deny { 298 is_write_operation 299 topic_has_large_fanout 300 not producer_is_whitelisted_for_large_fanout 301} 302 303producer_whitelist = { 304 "large-fanout": { 305 "fanout_producer", 306 } 307} 308 309topic_has_large_fanout { 310 topic_metadata[topic_name].tags[_] == "large-fanout" 311} 312 313producer_is_whitelisted_for_large_fanout { 314 producer_whitelist["large-fanout"][_] == principal.name 315} 316``` 317 318Next, update the `topic_metadata` data structure in the same file to indicate 319that the `click-stream` topic has a high fanout. 320 321```ruby 322topic_metadata = { 323 "click-stream": { 324 "tags": ["large-fanout"], 325 }, 326 "credit-scores": { 327 "tags": ["pii"], 328 } 329} 330``` 331 332### 5. Exercise the policy that restricts producer access to topics with high fanout. 333 334First, run `kafka-console-producer` and simulate a service with access to the 335`click-stream` topic. 336 337```bash 338docker run --rm --network opakafkatutorial_default \ 339 openpolicyagent/demo-kafka:1.0 \ 340 bash -c 'for i in {1..10}; do echo "{\"user\": \"alice\", \"button\": $i}"; done | kafka-console-producer --topic click-stream --broker-list kafka:9093 -producer.config /etc/kafka/secrets/fanout_producer.ssl.config' 341``` 342 343Next, run the `kafka-console-consumer` to confirm that the messages were published. 344 345```bash 346docker run --rm --network opakafkatutorial_default \ 347 openpolicyagent/demo-kafka:1.0 \ 348 kafka-console-consumer --bootstrap-server kafka:9093 --topic click-stream --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config 349``` 350 351Once you see the 10 messages produced by the first part of this step, exit the console consumer (^C). 352 353Lastly, run `kafka-console-producer` to simulate a service that should **not** 354have access to _high fanout_ topics. 355 356```bash 357docker run --rm --network opakafkatutorial_default \ 358 openpolicyagent/demo-kafka:1.0 \ 359 bash -c 'echo "{\"user\": \"alice\", \"button\": \"bogus\"}" | kafka-console-producer --topic click-stream --broker-list kafka:9093 -producer.config /etc/kafka/secrets/anon_producer.ssl.config' 360``` 361 362Because `anon_producer` is not authorized to write to high fanout topics, the 363request will be denied and the producer will output an error message. 364 365``` 366Not authorized to access topics: [click-stream] 367``` 368 369## Wrap Up 370 371Congratulations for finishing the tutorial! 372 373At this point you have learned how to enforce fine-grained access control 374over Kafka topics. In addition, you have seen how to break down policies into 375smaller rules that can be reused and improve the overall readability over the 376policy. 377 378If you want to use the Kafka Authorizer plugin that integrates Kafka with 379OPA, see the build and install instructions in the 380[github.com/open-policy-agent/contrib](https://github.com/open-policy-agent/contrib) 381repository. 382