1 /** 2 * Copyright 2015 Confluent Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 /** 19 * Internal implementation of libserdes C++ interface 20 */ 21 22 #include <cstdarg> 23 24 extern "C" { 25 #include "serdes.h" 26 }; 27 28 29 namespace Serdes { 30 31 err2str(ErrorCode err)32 std::string err2str (ErrorCode err) { 33 return std::string(serdes_err2str(static_cast<serdes_err_t>(err))); 34 } 35 36 37 38 class ConfImpl : public Conf { 39 public: ~ConfImpl()40 ~ConfImpl () { 41 if (conf_) 42 serdes_conf_destroy(conf_); 43 } 44 45 static Conf *create (); 46 ConfImpl()47 ConfImpl (): conf_(NULL), log_cb_(NULL) { } 48 49 50 set(const std::string & name,const std::string & value,std::string & errstr)51 ErrorCode set (const std::string &name, 52 const std::string &value, 53 std::string &errstr) { 54 char c_errstr[256]; 55 serdes_err_t err; 56 err = serdes_conf_set(conf_, name.c_str(), value.c_str(), 57 c_errstr, sizeof(c_errstr)); 58 if (err != SERDES_ERR_OK) 59 errstr = c_errstr; 60 return static_cast<ErrorCode>(err); 61 } 62 set(LogCb * log_cb)63 void set (LogCb *log_cb) { 64 log_cb_ = log_cb; 65 } 66 67 serdes_conf_t *conf_; 68 LogCb *log_cb_; 69 70 }; 71 72 73 class HandleImpl : public Handle { 74 public: ~HandleImpl()75 ~HandleImpl () { 76 if (sd_) 77 serdes_destroy(sd_); 78 } 79 80 static Handle *create (const Conf *conf, std::string &errstr); 81 HandleImpl()82 HandleImpl (): log_cb_(NULL), sd_(NULL) {} 83 schemas_purge(int max_age)84 int schemas_purge (int max_age) { 85 return serdes_schemas_purge(sd_, max_age); 86 } 87 serializer_framing_size()88 ssize_t serializer_framing_size () const { 89 return serdes_serializer_framing_size(sd_); 90 } 91 deserializer_framing_size()92 ssize_t deserializer_framing_size () const { 93 return serdes_deserializer_framing_size(sd_); 94 } 95 96 97 98 LogCb *log_cb_; 99 serdes_t *sd_; 100 }; 101 102 103 class SchemaImpl : public Schema { 104 public: ~SchemaImpl()105 ~SchemaImpl () { 106 if (schema_) { 107 serdes_schema_set_opaque(schema_, NULL); 108 serdes_schema_destroy(schema_); 109 } 110 } 111 SchemaImpl()112 SchemaImpl (): schema_(NULL) {} SchemaImpl(serdes_schema_t * ss)113 SchemaImpl (serdes_schema_t *ss): schema_(ss) {} 114 115 static Schema *get (Handle *handle, int id, std::string &errstr); 116 static Schema *get (Handle *handle, std::string &name, std::string &errstr); 117 118 static Schema *add (Handle *handle, int id, std::string &definition, 119 std::string &errstr); 120 static Schema *add (Handle *handle, std::string name, std::string &definition, 121 std::string &errstr); 122 static Schema *add (Handle *handle, int id, std::string name, 123 std::string &definition, std::string &errstr); 124 125 id()126 int id () { 127 return serdes_schema_id(schema_); 128 } 129 name()130 const std::string name () { 131 const char *name = serdes_schema_name(schema_); 132 return std::string(name ? name : ""); 133 } 134 definition()135 const std::string definition () { 136 const char *def = serdes_schema_definition(schema_); 137 return std::string(def ? def : ""); 138 } 139 object()140 avro::ValidSchema *object () { 141 return static_cast<avro::ValidSchema*>(serdes_schema_object(schema_)); 142 } 143 144 framing_write(std::vector<char> & out)145 ssize_t framing_write (std::vector<char> &out) const { 146 ssize_t framing_size = serdes_serializer_framing_size(serdes_schema_handle(schema_)); 147 if (framing_size == 0) 148 return 0; 149 150 /* Make room for framing */ 151 int pos = out.size(); 152 out.resize(out.size() + framing_size); 153 154 /* Write framing */ 155 return serdes_framing_write(schema_, &out[pos], framing_size); 156 } 157 158 serdes_schema_t *schema_; 159 }; 160 161 162 class AvroImpl : virtual public Avro, virtual public HandleImpl { 163 public: ~AvroImpl()164 ~AvroImpl () { } 165 166 static Avro *create (const Conf *conf, std::string &errstr); 167 168 ssize_t serialize (Schema *schema, const avro::GenericDatum *datum, 169 std::vector<char> &out, std::string &errstr); 170 171 ssize_t deserialize (Schema **schemap, avro::GenericDatum **datump, 172 const void *payload, size_t size, std::string &errstr); 173 serializer_framing_size()174 ssize_t serializer_framing_size () const { 175 return dynamic_cast<const HandleImpl*>(this)->serializer_framing_size(); 176 } 177 deserializer_framing_size()178 ssize_t deserializer_framing_size () const { 179 return dynamic_cast<const HandleImpl*>(this)->deserializer_framing_size(); 180 } 181 schemas_purge(int max_age)182 int schemas_purge (int max_age) { 183 return dynamic_cast<HandleImpl*>(this)->schemas_purge(max_age); 184 } 185 186 }; 187 188 } 189