1 /**
2  * Copyright 2015 Confluent Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 /**
19  * Internal implementation of libserdes C++ interface
20  */
21 
22 #include <cstdarg>
23 
24 extern "C" {
25 #include "serdes.h"
26 };
27 
28 
29 namespace Serdes {
30 
31 
err2str(ErrorCode err)32   std::string err2str (ErrorCode err) {
33     return std::string(serdes_err2str(static_cast<serdes_err_t>(err)));
34   }
35 
36 
37 
38   class ConfImpl : public Conf {
39  public:
~ConfImpl()40     ~ConfImpl () {
41       if (conf_)
42         serdes_conf_destroy(conf_);
43     }
44 
45     static Conf *create ();
46 
ConfImpl()47     ConfImpl (): conf_(NULL), log_cb_(NULL) { }
48 
49 
50 
set(const std::string & name,const std::string & value,std::string & errstr)51     ErrorCode set (const std::string &name,
52                    const std::string &value,
53                    std::string &errstr) {
54       char c_errstr[256];
55       serdes_err_t err;
56       err = serdes_conf_set(conf_, name.c_str(), value.c_str(),
57                             c_errstr, sizeof(c_errstr));
58       if (err != SERDES_ERR_OK)
59         errstr = c_errstr;
60       return static_cast<ErrorCode>(err);
61     }
62 
set(LogCb * log_cb)63     void set (LogCb *log_cb) {
64       log_cb_ = log_cb;
65     }
66 
67     serdes_conf_t *conf_;
68     LogCb *log_cb_;
69 
70   };
71 
72 
73   class HandleImpl : public Handle {
74  public:
~HandleImpl()75     ~HandleImpl () {
76       if (sd_)
77         serdes_destroy(sd_);
78     }
79 
80     static Handle *create (const Conf *conf, std::string &errstr);
81 
HandleImpl()82     HandleImpl (): log_cb_(NULL), sd_(NULL) {}
83 
schemas_purge(int max_age)84     int schemas_purge (int max_age) {
85       return serdes_schemas_purge(sd_, max_age);
86     }
87 
serializer_framing_size()88     ssize_t serializer_framing_size () const {
89       return serdes_serializer_framing_size(sd_);
90     }
91 
deserializer_framing_size()92     ssize_t deserializer_framing_size () const {
93       return serdes_deserializer_framing_size(sd_);
94     }
95 
96 
97 
98     LogCb *log_cb_;
99     serdes_t *sd_;
100   };
101 
102 
103   class SchemaImpl : public Schema {
104  public:
~SchemaImpl()105     ~SchemaImpl () {
106       if (schema_) {
107         serdes_schema_set_opaque(schema_, NULL);
108         serdes_schema_destroy(schema_);
109       }
110     }
111 
SchemaImpl()112     SchemaImpl (): schema_(NULL) {}
SchemaImpl(serdes_schema_t * ss)113     SchemaImpl (serdes_schema_t *ss): schema_(ss) {}
114 
115     static Schema *get (Handle *handle, int id, std::string &errstr);
116     static Schema *get (Handle *handle, std::string &name, std::string &errstr);
117 
118     static Schema *add (Handle *handle, int id, std::string &definition,
119                         std::string &errstr);
120     static Schema *add (Handle *handle, std::string name, std::string &definition,
121                         std::string &errstr);
122     static Schema *add (Handle *handle, int id, std::string name,
123                         std::string &definition, std::string &errstr);
124 
125 
id()126     int id () {
127       return serdes_schema_id(schema_);
128     }
129 
name()130     const std::string name () {
131       const char *name = serdes_schema_name(schema_);
132       return std::string(name ? name : "");
133     }
134 
definition()135     const std::string definition () {
136       const char *def = serdes_schema_definition(schema_);
137       return std::string(def ? def : "");
138     }
139 
object()140     avro::ValidSchema *object () {
141       return static_cast<avro::ValidSchema*>(serdes_schema_object(schema_));
142     }
143 
144 
framing_write(std::vector<char> & out)145     ssize_t framing_write (std::vector<char> &out) const {
146       ssize_t framing_size = serdes_serializer_framing_size(serdes_schema_handle(schema_));
147       if (framing_size == 0)
148         return 0;
149 
150       /* Make room for framing */
151       int pos = out.size();
152       out.resize(out.size() + framing_size);
153 
154       /* Write framing */
155       return serdes_framing_write(schema_, &out[pos], framing_size);
156     }
157 
158     serdes_schema_t *schema_;
159   };
160 
161 
162   class AvroImpl : virtual public Avro, virtual public HandleImpl {
163  public:
~AvroImpl()164     ~AvroImpl () { }
165 
166     static Avro *create (const Conf *conf, std::string &errstr);
167 
168     ssize_t serialize (Schema *schema, const avro::GenericDatum *datum,
169                        std::vector<char> &out, std::string &errstr);
170 
171     ssize_t deserialize (Schema **schemap, avro::GenericDatum **datump,
172                          const void *payload, size_t size, std::string &errstr);
173 
serializer_framing_size()174     ssize_t serializer_framing_size () const {
175       return dynamic_cast<const HandleImpl*>(this)->serializer_framing_size();
176     }
177 
deserializer_framing_size()178     ssize_t deserializer_framing_size () const {
179       return dynamic_cast<const HandleImpl*>(this)->deserializer_framing_size();
180     }
181 
schemas_purge(int max_age)182     int schemas_purge (int max_age) {
183       return dynamic_cast<HandleImpl*>(this)->schemas_purge(max_age);
184     }
185 
186   };
187 
188 }
189