1#!/usr/bin/env python 2# 3# Copyright 2016 Confluent Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18 19# 20# derived from https://github.com/verisign/python-confluent-schemaregistry.git 21# 22import os 23 24from confluent_kafka import avro 25 26from requests.exceptions import ConnectionError 27 28import unittest 29from confluent_kafka.avro import AvroProducer 30from confluent_kafka.avro.serializer import (KeySerializerError, 31 ValueSerializerError) 32 33from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient 34 35 36avsc_dir = os.path.dirname(os.path.realpath(__file__)) 37 38 39class TestAvroProducer(unittest.TestCase): 40 41 def test_instantiation(self): 42 obj = AvroProducer({'schema.registry.url': 'http://127.0.0.1:0'}) 43 self.assertTrue(isinstance(obj, AvroProducer)) 44 self.assertNotEqual(obj, None) 45 46 def test_produce_no_key(self): 47 value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) 48 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, default_value_schema=value_schema) 49 with self.assertRaises(ConnectionError): # Unexistent schema-registry 50 producer.produce(topic='test', value={"name": 'abc"'}) 51 52 def test_produce_no_value(self): 53 key_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) 54 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, default_key_schema=key_schema) 55 with self.assertRaises(ConnectionError): # Unexistent schema-registry 56 producer.produce(topic='test', key={"name": 'abc"'}) 57 58 def test_produce_no_value_schema(self): 59 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}) 60 with self.assertRaises(ValueSerializerError): 61 # Producer should not accept a value with no schema 62 producer.produce(topic='test', value={"name": 'abc"'}) 63 64 def test_produce_no_key_schema(self): 65 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}) 66 with self.assertRaises(KeySerializerError): 67 # If the key is provided as a dict an avro schema must also be provided 68 producer.produce(topic='test', key={"name": 'abc"'}) 69 70 def test_produce_value_and_key_schemas(self): 71 value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) 72 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, default_value_schema=value_schema, 73 default_key_schema=value_schema) 74 with self.assertRaises(ConnectionError): # Unexistent schema-registry 75 producer.produce(topic='test', value={"name": 'abc"'}, key={"name": 'abc"'}) 76 77 def test_produce_primitive_string_key(self): 78 value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) 79 key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc")) 80 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}) 81 with self.assertRaises(ConnectionError): # Unexistent schema-registry 82 producer.produce(topic='test', value={"name": 'abc"'}, value_schema=value_schema, key='mykey', 83 key_schema=key_schema) 84 85 def test_produce_primitive_key_and_value(self): 86 value_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc")) 87 key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc")) 88 producer = AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}) 89 with self.assertRaises(ConnectionError): # Unexistent schema-registry 90 producer.produce(topic='test', value=32., value_schema=value_schema, key='mykey', key_schema=key_schema) 91 92 def test_produce_with_custom_registry(self): 93 schema_registry = MockSchemaRegistryClient() 94 value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc")) 95 key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc")) 96 producer = AvroProducer({}, schema_registry=schema_registry) 97 producer.produce(topic='test', value={"name": 'abc"'}, value_schema=value_schema, key='mykey', 98 key_schema=key_schema) 99 100 def test_produce_with_custom_registry_and_registry_url(self): 101 schema_registry = MockSchemaRegistryClient() 102 with self.assertRaises(ValueError): 103 AvroProducer({'schema.registry.url': 'http://127.0.0.1:9001'}, schema_registry=schema_registry) 104 105 def test_produce_with_empty_value_no_schema(self): 106 schema_registry = MockSchemaRegistryClient() 107 producer = AvroProducer({}, schema_registry=schema_registry) 108 with self.assertRaises(ValueSerializerError): 109 producer.produce(topic='test', value='', key='not empty') 110 111 def test_produce_with_empty_key_no_schema(self): 112 value_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc")) 113 schema_registry = MockSchemaRegistryClient() 114 producer = AvroProducer({}, schema_registry=schema_registry, 115 default_value_schema=value_schema) 116 with self.assertRaises(KeySerializerError): 117 producer.produce(topic='test', value=0.0, key='') 118 119 def test_produce_with_empty_key_value_with_schema(self): 120 key_schema = avro.load(os.path.join(avsc_dir, "primitive_string.avsc")) 121 value_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc")) 122 schema_registry = MockSchemaRegistryClient() 123 producer = AvroProducer({}, schema_registry=schema_registry, 124 default_key_schema=key_schema, 125 default_value_schema=value_schema) 126 producer.produce(topic='test', value=0.0, key='') 127