1# Kafka Authorization
2
3[Apache Kafka](https://kafka.apache.org/) is a high-performance distributed
4streaming platform deployed by thousands of companies. In many deployments,
5administrators require fine-grained access control over Kafka topics to
6enforce important requirements around confidentiality and integrity.
7
8## Goals
9
10This tutorial shows how to enforce fine-grained access control over Kafka
11topics. In this tutorial you will use OPA to define and enforce an
12authorization policy stating:
13
14* Consumers of topics containing Personally Identifiable Information (PII) must be whitelisted.
15* Producers to topics with _high fanout_ must be whitelisted.
16
17In addition, this tutorial shows how to break up a policy with small helper
18rules to reuse logic and improve overall readability.
19
20## Prerequisites
21
22This tutorial requires [Docker Compose](https://docs.docker.com/compose/install/) to run Kafka, ZooKeeper, and OPA.
23
24## Steps
25
26### 1. Bootstrap the tutorial environment using Docker Compose.
27
28First, create an OPA policy that allows all requests. You will update this policy later in the tutorial.
29
30```bash
31mkdir -p policies
32```
33
34**policies/tutorial.rego**:
35
36```ruby
37package kafka.authz
38
39allow = true
40```
41
42Next, create a `docker-compose.yaml` file that runs OPA, ZooKeeper, and Kafka.
43
44**docker-compose.yaml**:
45
46```yaml
47version: "2"
48services:
49  opa:
50    hostname: opa
51    image: openpolicyagent/opa:0.8.2
52    ports:
53      - 8181:8181
54    command: "run --server --watch /policies"
55    volumes:
56      - ./policies:/policies
57  zookeeper:
58    image: confluentinc/cp-zookeeper:4.0.0-3
59    environment:
60      ZOOKEEPER_CLIENT_PORT: 2181
61      zk_id: "1"
62  kafka:
63    hostname: kafka
64    image: openpolicyagent/demo-kafka:1.0
65    links:
66      - zookeeper
67      - opa
68    ports:
69      - "9092:9092"
70    environment:
71      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
72      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
73      KAFKA_ADVERTISED_LISTENERS: "SSL://:9093"
74      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
75      KAFKA_SSL_CLIENT_AUTH: required
76      KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
77      KAFKA_SSL_KEYSTORE_CREDENTIALS: broker_keystore_creds
78      KAFKA_SSL_KEY_CREDENTIALS: broker_sslkey_creds
79      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
80      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
81      KAFKA_AUTHORIZER_CLASS_NAME: com.lbg.kafka.opa.OpaAuthorizer
82      KAFKA_OPA_AUTHORIZER_URL: "http://opa:8181/v1/data/kafka/authz/allow"
83      KAFKA_OPA_AUTHORIZER_ALLOW_ON_ERROR: "false"
84      KAFKA_OPA_AUTHORIZER_CACHE_INITIAL_CAPACITY: 100
85      KAFKA_OPA_AUTHORIZER_CACHE_MAXIMUM_SIZE: 100
86      KAFKA_OPA_AUTHORIZER_CACHE_EXPIRE_AFTER_MS: 600000
87```
88
89For more information on how to configure the OPA plugin for Kafka, see the [github.com/open-policy-agent/contrib](https://github.com/open-policy-agent/contrib)
90repository.
91
92Once you have created the file, launch the containers for this tutorial.
93
94```bash
95docker-compose --project-name opa-kafka-tutorial up
96```
97
98Now that the tutorial environment is running, we can define an authorization policy using OPA and test it.
99
100#### Authentication
101
102The Docker Compose file defined above requires **SSL client authentication**
103for clients that connect to the broker. Enabling SSL client authentication
104allows for service identities to be provided as input to your policy. The
105example below shows the input structure.
106
107```json
108{
109  "operation": {
110    "name": "Write",
111  },
112  "resource": {
113    "resourceType": {
114      "name": "Topic",
115    },
116    "name": "credit-scores"
117  },
118  "session": {
119    "principal": {
120      "principalType": "User",
121    },
122    "clientAddress": "172.21.0.5",
123    "sanitizedUser": "CN%3Danon_producer.tutorial.openpolicyagent.org%2COU%3DTUTORIAL%2CO%3DOPA%2CL%3DSF%2CST%3DCA%2CC%3DUS"
124  }
125}
126```
127
128The client identity is extracted from the SSL certificates that clients
129present when they connect to the broker. The client identity information is
130encoded in the `input.session.sanitizedUser` field. This field can be decoded
131inside the policy.
132
133Generating SSL certificates and JKS files required for SSL client
134authentication is outside the scope of this tutorial. To simplify the steps
135below, the Docker Compose file uses an extended version of the
136[confluentinc/cp-kafka](https://hub.docker.com/r/confluentinc/cp-kafka/)
137image from Docker Hub. The extended image includes **pre-generated SSL
138certificates** that the broker and clients use to identify themselves.
139
140Do not rely on these pre-generated SSL certificates in real-world scenarios.
141They are only provided for convenience/test purposes.
142
143#### Kafka Authorizer JAR File
144
145The Kafka image used in this tutorial includes a pre-installed JAR file that implements the [Kafka Authorizer](https://kafka.apache.org/documentation/#security_authz) interface. For more information on the authorizer see [open-policy-agent/contrib/kafka_authorizer](https://github.com/open-policy-agent/contrib/tree/master/kafka_authorizer).
146
147### 2. Define a policy to restrict consumer access to topics containing Personally Identifiable Information (PII).
148
149Update the `policies/tutorial.rego` with the following content.
150
151```ruby
152#-----------------------------------------------------------------------------
153# High level policy for controlling access to Kafka.
154#
155# * Deny operations by default.
156# * Allow operations if no explicit denial.
157#
158# The kafka-authorizer-opa plugin will query OPA for decisions at
159# /kafka/authz/allow. If the policy decision is _true_ the request is allowed.
160# If the policy decision is _false_ the request is denied.
161#-----------------------------------------------------------------------------
162package kafka.authz
163
164default allow = false
165
166allow {
167	not deny
168}
169
170deny {
171	is_read_operation
172	topic_contains_pii
173	not consumer_is_whitelisted_for_pii
174}
175
176#-----------------------------------------------------------------------------
177# Data structures for controlling access to topics. In real-world deployments,
178# these data structures could be loaded into OPA as raw JSON data. The JSON
179# data could be pulled from external sources like AD, Git, etc.
180#-----------------------------------------------------------------------------
181
182consumer_whitelist = {"pii": {"pii_consumer"}}
183
184topic_metadata = {"credit-scores": {"tags": ["pii"]}}
185
186#-----------------------------------
187# Helpers for checking topic access.
188#-----------------------------------
189
190topic_contains_pii {
191	topic_metadata[topic_name].tags[_] == "pii"
192}
193
194consumer_is_whitelisted_for_pii {
195	consumer_whitelist.pii[_] == principal.name
196}
197
198#-----------------------------------------------------------------------------
199# Helpers for processing Kafka operation input. This logic could be split out
200# into a separate file and shared. For conciseness, we have kept it all in one
201# place.
202#-----------------------------------------------------------------------------
203
204is_write_operation {
205    input.operation.name == "Write"
206}
207
208is_read_operation {
209	input.operation.name == "Read"
210}
211
212is_topic_resource {
213	input.resource.resourceType.name == "Topic"
214}
215
216topic_name = input.resource.name {
217	is_topic_resource
218}
219
220principal = {"fqn": parsed.CN, "name": cn_parts[0]} {
221	parsed := parse_user(urlquery.decode(input.session.sanitizedUser))
222	cn_parts := split(parsed.CN, ".")
223}
224
225parse_user(user) = {key: value |
226	parts := split(user, ",")
227	[key, value] := split(parts[_], "=")
228}
229```
230
231The `./policies` directory is mounted into the Docker container running OPA.
232When the files under this directory change, OPA is notified and the policies
233are automatically reloaded.
234
235At this point, you can exercise the policy.
236
237### 3. Exercise the policy that restricts consumer access to topics containing PII.
238
239This step shows how you can grant fine-grained access to services using
240Kafka. In this scenario, some services are allowed to read PII data while
241others are not.
242
243First, run `kafka-console-producer` to generate some data on the
244`credit-scores` topic.
245
246> This tutorial uses the `kafka-console-producer` and `kafka-console-consumer` scripts to generate and display Kafka messages. These scripts read from STDIN and write to STDOUT and are frequently used to send and receive data via Kafka over the command line. If you are not familiar with these scripts you can learn more in Kafka's [Quick Start](https://kafka.apache.org/documentation/#quickstart) documentation.
247
248
249```bash
250docker run --rm --network opakafkatutorial_default \
251    openpolicyagent/demo-kafka:1.0 \
252    bash -c 'for i in {1..10}; do echo "{\"user\": \"bob\", \"score\": $i}"; done | kafka-console-producer --topic credit-scores --broker-list kafka:9093 -producer.config /etc/kafka/secrets/anon_producer.ssl.config'
253```
254
255This command will send 10 messages to the `credit-scores` topic. Bob's credit
256score seems to be improving.
257
258Next, run `kafka-console-consumer` and try to read data off the topic. Use
259the `pii_consumer` credentials to simulate a service that is allowed to read
260PII data.
261
262```bash
263docker run --rm --network opakafkatutorial_default \
264    openpolicyagent/demo-kafka:1.0 \
265    kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/pii_consumer.ssl.config
266```
267
268This command will output the 10 messages sent to the topic in the first part
269of this step. Once the 10 messages have been printed, exit out of the script
270(^C).
271
272Finally, run `kafka-console-consumer` again but this time try to use the
273`anon_consumer` credentials. The `anon_consumer` credentials simulate a
274service that has **not** been explicitly granted access to PII data.
275
276```bash
277docker run --rm --network opakafkatutorial_default \
278    openpolicyagent/demo-kafka:1.0 \
279    kafka-console-consumer --bootstrap-server kafka:9093 --topic credit-scores --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config
280```
281
282Because the `anon_consumer` is not allowed to read PII data, the request will
283be denied and the consumer will output an error message.
284
285```
286Not authorized to read from topic credit-scores.
287...
288Processed a total of 0 messages
289```
290
291### 4. Extend the policy to prevent services from accidentally writing to topics with large fanout.
292
293First, add the following content to the policy file (`./policies/tutorial.rego`):
294
295
296```ruby
297deny {
298  is_write_operation
299  topic_has_large_fanout
300  not producer_is_whitelisted_for_large_fanout
301}
302
303producer_whitelist = {
304  "large-fanout": {
305    "fanout_producer",
306  }
307}
308
309topic_has_large_fanout {
310  topic_metadata[topic_name].tags[_] == "large-fanout"
311}
312
313producer_is_whitelisted_for_large_fanout {
314  producer_whitelist["large-fanout"][_] == principal.name
315}
316```
317
318Next, update the `topic_metadata` data structure in the same file to indicate
319that the `click-stream` topic has a high fanout.
320
321```ruby
322topic_metadata = {
323  "click-stream": {
324    "tags": ["large-fanout"],
325  },
326  "credit-scores": {
327    "tags": ["pii"],
328  }
329}
330```
331
332### 5. Exercise the policy that restricts producer access to topics with high fanout.
333
334First, run `kafka-console-producer` and simulate a service with access to the
335`click-stream` topic.
336
337```bash
338docker run --rm --network opakafkatutorial_default \
339    openpolicyagent/demo-kafka:1.0 \
340    bash -c 'for i in {1..10}; do echo "{\"user\": \"alice\", \"button\": $i}"; done | kafka-console-producer --topic click-stream --broker-list kafka:9093 -producer.config /etc/kafka/secrets/fanout_producer.ssl.config'
341```
342
343Next, run the `kafka-console-consumer` to confirm that the messages were published.
344
345```bash
346docker run --rm --network opakafkatutorial_default \
347    openpolicyagent/demo-kafka:1.0 \
348    kafka-console-consumer --bootstrap-server kafka:9093 --topic click-stream --from-beginning --consumer.config /etc/kafka/secrets/anon_consumer.ssl.config
349```
350
351Once you see the 10 messages produced by the first part of this step, exit the console consumer (^C).
352
353Lastly, run `kafka-console-producer` to simulate a service that should **not**
354have access to _high fanout_ topics.
355
356```bash
357docker run --rm --network opakafkatutorial_default \
358    openpolicyagent/demo-kafka:1.0 \
359    bash -c 'echo "{\"user\": \"alice\", \"button\": \"bogus\"}" | kafka-console-producer --topic click-stream --broker-list kafka:9093 -producer.config /etc/kafka/secrets/anon_producer.ssl.config'
360```
361
362Because `anon_producer` is not authorized to write to high fanout topics, the
363request will be denied and the producer will output an error message.
364
365```
366Not authorized to access topics: [click-stream]
367```
368
369## Wrap Up
370
371Congratulations for finishing the tutorial!
372
373At this point you have learned how to enforce fine-grained access control
374over Kafka topics. In addition, you have seen how to break down policies into
375smaller rules that can be reused and improve the overall readability over the
376policy.
377
378If you want to use the Kafka Authorizer plugin that integrates Kafka with
379OPA, see the build and install instructions in the
380[github.com/open-policy-agent/contrib](https://github.com/open-policy-agent/contrib)
381repository.
382