1#!/usr/bin/env python
2#
3# Copyright 2016 Confluent Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18
19#
20# derived from https://github.com/verisign/python-confluent-schemaregistry.git
21#
22import os
23
24from confluent_kafka import avro
25
26from requests.exceptions import ConnectionError
27
28import unittest
29from confluent_kafka.avro import AvroProducer
30from confluent_kafka.avro.serializer import (KeySerializerError,
31                                             ValueSerializerError)
32
33from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient
34
35
36avsc_dir = os.path.dirname(os.path.realpath(__file__))
37
38
39class TestAvroProducer(unittest.TestCase):
40
41    def test_instantiation(self):
42        obj = AvroProducer({'schema.registry.url': 'http://127.0.0.1:0'})
43        self.assertTrue(isinstance(obj, AvroProducer))
44        self.assertNotEqual(obj, None)
45
46    def test_produce_no_key(self):
47        value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc"))
48        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, default_value_schema=value_schema)
49        with self.assertRaises(ConnectionError):  # Unexistent schema-registry
50            producer.produce(topic='test', value={"name": 'abc"'})
51
52    def test_produce_no_value(self):
53        key_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc"))
54        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, default_key_schema=key_schema)
55        with self.assertRaises(ConnectionError):  # Unexistent schema-registry
56            producer.produce(topic='test', key={"name": 'abc"'})
57
58    def test_produce_no_value_schema(self):
59        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'})
60        with self.assertRaises(ValueSerializerError):
61            # Producer should not accept a value with no schema
62            producer.produce(topic='test', value={"name": 'abc"'})
63
64    def test_produce_no_key_schema(self):
65        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'})
66        with self.assertRaises(KeySerializerError):
67            # If the key is provided as a dict an avro schema must also be provided
68            producer.produce(topic='test', key={"name": 'abc"'})
69
70    def test_produce_value_and_key_schemas(self):
71        value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc"))
72        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, default_value_schema=value_schema,
73                                default_key_schema=value_schema)
74        with self.assertRaises(ConnectionError):  # Unexistent schema-registry
75            producer.produce(topic='test', value={"name": 'abc"'}, key={"name": 'abc"'})
76
77    def test_produce_primitive_string_key(self):
78        value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc"))
79        key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc"))
80        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'})
81        with self.assertRaises(ConnectionError):  # Unexistent schema-registry
82            producer.produce(topic='test', value={"name": 'abc"'}, value_schema=value_schema, key='mykey',
83                             key_schema=key_schema)
84
85    def test_produce_primitive_key_and_value(self):
86        value_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
87        key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc"))
88        producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'})
89        with self.assertRaises(ConnectionError):  # Unexistent schema-registry
90            producer.produce(topic='test', value=32., value_schema=value_schema, key='mykey', key_schema=key_schema)
91
92    def test_produce_with_custom_registry(self):
93        schema_registry = MockSchemaRegistryClient()
94        value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc"))
95        key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc"))
96        producer = AvroProducer({}, schema_registry=schema_registry)
97        producer.produce(topic='test', value={"name": 'abc"'}, value_schema=value_schema, key='mykey',
98                         key_schema=key_schema)
99
100    def test_produce_with_custom_registry_and_registry_url(self):
101        schema_registry = MockSchemaRegistryClient()
102        with self.assertRaises(ValueError):
103            AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, schema_registry=schema_registry)
104
105    def test_produce_with_empty_value_no_schema(self):
106        schema_registry = MockSchemaRegistryClient()
107        producer = AvroProducer({}, schema_registry=schema_registry)
108        with self.assertRaises(ValueSerializerError):
109            producer.produce(topic='test', value='', key='not empty')
110
111    def test_produce_with_empty_key_no_schema(self):
112        value_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
113        schema_registry = MockSchemaRegistryClient()
114        producer = AvroProducer({}, schema_registry=schema_registry,
115                                default_value_schema=value_schema)
116        with self.assertRaises(KeySerializerError):
117            producer.produce(topic='test', value=0.0, key='')
118
119    def test_produce_with_empty_key_value_with_schema(self):
120        key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc"))
121        value_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
122        schema_registry = MockSchemaRegistryClient()
123        producer = AvroProducer({}, schema_registry=schema_registry,
124                                default_key_schema=key_schema,
125                                default_value_schema=value_schema)
126        producer.produce(topic='test', value=0.0, key='')
127