1 /*
2  * Copyright (c) 2002-2012 Balabit
3  * Copyright (c) 1998-2012 Balázs Scheidler
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  *
19  * As an additional exemption you are allowed to compile & link against the
20  * OpenSSL libraries as published by the OpenSSL project. See the file
21  * COPYING for details.
22  *
23  */
24 
25 #include "logproto-server.h"
26 #include "messages.h"
27 #include "cfg.h"
28 #include "plugin.h"
29 #include "plugin-types.h"
30 #include "ack-tracker/ack_tracker_factory.h"
31 
32 /**
33  * Find the character terminating the buffer.
34  *
35  * NOTE: when looking for the end-of-message here, it either needs to be
36  * terminated via NUL or via NL, when terminating via NL we have to make
37  * sure that there's no NUL left in the message. This function iterates over
38  * the input data and returns a pointer to the first occurrence of NL or NUL.
39  *
40  * It uses an algorithm similar to what there's in libc memchr/strchr.
41  *
42  * NOTE: find_eom is not static as it is used by a unit test program.
43  **/
44 const guchar *
find_eom(const guchar * s,gsize n)45 find_eom(const guchar *s, gsize n)
46 {
47   const guchar *char_ptr;
48   const gulong *longword_ptr;
49   gulong longword, magic_bits, charmask;
50   gchar c;
51 
52   c = '\n';
53 
54   /* align input to long boundary */
55   for (char_ptr = s; n > 0 && ((gulong) char_ptr & (sizeof(longword) - 1)) != 0; ++char_ptr, n--)
56     {
57       if (*char_ptr == c || *char_ptr == '\0')
58         return char_ptr;
59     }
60 
61   longword_ptr = (gulong *) char_ptr;
62 
63 #if GLIB_SIZEOF_LONG == 8
64   magic_bits = 0x7efefefefefefeffL;
65 #elif GLIB_SIZEOF_LONG == 4
66   magic_bits = 0x7efefeffL;
67 #else
68 #error "unknown architecture"
69 #endif
70   memset(&charmask, c, sizeof(charmask));
71 
72   while (n > sizeof(longword))
73     {
74       longword = *longword_ptr++;
75       if ((((longword + magic_bits) ^ ~longword) & ~magic_bits) != 0 ||
76           ((((longword ^ charmask) + magic_bits) ^ ~(longword ^ charmask)) & ~magic_bits) != 0)
77         {
78           gint i;
79 
80           char_ptr = (const guchar *) (longword_ptr - 1);
81 
82           for (i = 0; i < sizeof(longword); i++)
83             {
84               if (*char_ptr == c || *char_ptr == '\0')
85                 return char_ptr;
86               char_ptr++;
87             }
88         }
89       n -= sizeof(longword);
90     }
91 
92   char_ptr = (const guchar *) longword_ptr;
93 
94   while (n-- > 0)
95     {
96       if (*char_ptr == c || *char_ptr == '\0')
97         return char_ptr;
98       ++char_ptr;
99     }
100 
101   return NULL;
102 }
103 
104 AckTrackerFactory *
log_proto_server_get_ack_tracker_factory(LogProtoServer * s)105 log_proto_server_get_ack_tracker_factory(LogProtoServer *s)
106 {
107   return s->options->ack_tracker_factory;
108 }
109 
110 gboolean
log_proto_server_is_position_tracked(LogProtoServer * s)111 log_proto_server_is_position_tracked(LogProtoServer *s)
112 {
113   AckTrackerType type = ack_tracker_factory_get_type(log_proto_server_get_ack_tracker_factory(s));
114 
115   return ack_tracker_type_is_position_tracked(type);
116 }
117 
118 gboolean
log_proto_server_validate_options_method(LogProtoServer * s)119 log_proto_server_validate_options_method(LogProtoServer *s)
120 {
121   return TRUE;
122 }
123 
124 void
log_proto_server_free_method(LogProtoServer * s)125 log_proto_server_free_method(LogProtoServer *s)
126 {
127   log_transport_free(s->transport);
128 }
129 
130 void
log_proto_server_free(LogProtoServer * s)131 log_proto_server_free(LogProtoServer *s)
132 {
133   if (s->free_fn)
134     s->free_fn(s);
135   g_free(s);
136 }
137 
138 void
log_proto_server_init(LogProtoServer * self,LogTransport * transport,const LogProtoServerOptions * options)139 log_proto_server_init(LogProtoServer *self, LogTransport *transport, const LogProtoServerOptions *options)
140 {
141   self->validate_options = log_proto_server_validate_options_method;
142   self->free_fn = log_proto_server_free_method;
143   self->options = options;
144   self->transport = transport;
145 }
146 
147 gboolean
log_proto_server_options_set_encoding(LogProtoServerOptions * self,const gchar * encoding)148 log_proto_server_options_set_encoding(LogProtoServerOptions *self, const gchar *encoding)
149 {
150   GIConv convert;
151 
152   g_free(self->encoding);
153   self->encoding = g_strdup(encoding);
154 
155   /* validate encoding */
156   convert = g_iconv_open("utf-8", encoding);
157   if (convert == (GIConv) -1)
158     return FALSE;
159   g_iconv_close(convert);
160   return TRUE;
161 }
162 
163 void
log_proto_server_options_set_ack_tracker_factory(LogProtoServerOptions * self,AckTrackerFactory * factory)164 log_proto_server_options_set_ack_tracker_factory(LogProtoServerOptions *self, AckTrackerFactory *factory)
165 {
166   ack_tracker_factory_unref(self->ack_tracker_factory);
167   self->ack_tracker_factory = factory;
168 }
169 
170 void
log_proto_server_options_defaults(LogProtoServerOptions * options)171 log_proto_server_options_defaults(LogProtoServerOptions *options)
172 {
173   memset(options, 0, sizeof(*options));
174   options->max_msg_size = -1;
175   options->trim_large_messages = -1;
176   options->init_buffer_size = -1;
177   options->max_buffer_size = -1;
178   options->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
179 }
180 
181 void
log_proto_server_options_init(LogProtoServerOptions * options,GlobalConfig * cfg)182 log_proto_server_options_init(LogProtoServerOptions *options, GlobalConfig *cfg)
183 {
184   if (options->initialized)
185     return;
186 
187   if (options->max_msg_size == -1)
188     {
189       options->max_msg_size = cfg->log_msg_size;
190     }
191   if (options->trim_large_messages == -1)
192     {
193       options->trim_large_messages = cfg->trim_large_messages;
194     }
195   if (options->max_buffer_size == -1)
196     {
197       if (options->encoding)
198         {
199           /* Based on the implementation of LogProtoTextServer, the buffer is yielded as
200              a complete message when max_msg_size is reached and there is no EOM in the buffer.
201              In worst case, the buffer contains max_msg_size - 1 bytes before the next fetch,
202              which can be 6 times max_msg_size due to the utf8 conversion.
203              And additional space is required because of the possible leftover bytes.
204           */
205           options->max_buffer_size = 8 * options->max_msg_size;
206         }
207       else
208         options->max_buffer_size = options->max_msg_size;
209     }
210   if (options->init_buffer_size == -1)
211     options->init_buffer_size = MIN(options->max_msg_size, options->max_buffer_size);
212   options->initialized = TRUE;
213 }
214 
215 void
log_proto_server_options_destroy(LogProtoServerOptions * options)216 log_proto_server_options_destroy(LogProtoServerOptions *options)
217 {
218   g_free(options->encoding);
219   ack_tracker_factory_unref(options->ack_tracker_factory);
220   if (options->destroy)
221     options->destroy(options);
222   options->initialized = FALSE;
223 }
224 
225 LogProtoServerFactory *
log_proto_server_get_factory(PluginContext * context,const gchar * name)226 log_proto_server_get_factory(PluginContext *context, const gchar *name)
227 {
228   Plugin *plugin;
229 
230   plugin = plugin_find(context, LL_CONTEXT_SERVER_PROTO, name);
231   if (plugin && plugin->construct)
232     return plugin->construct(plugin);
233   return NULL;
234 }
235