1 /*
2 * Copyright (c) 2002-2012 Balabit
3 * Copyright (c) 1998-2012 Balázs Scheidler
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 * As an additional exemption you are allowed to compile & link against the
20 * OpenSSL libraries as published by the OpenSSL project. See the file
21 * COPYING for details.
22 *
23 */
24
25 #include "logproto-server.h"
26 #include "messages.h"
27 #include "cfg.h"
28 #include "plugin.h"
29 #include "plugin-types.h"
30 #include "ack-tracker/ack_tracker_factory.h"
31
32 /**
33 * Find the character terminating the buffer.
34 *
35 * NOTE: when looking for the end-of-message here, it either needs to be
36 * terminated via NUL or via NL, when terminating via NL we have to make
37 * sure that there's no NUL left in the message. This function iterates over
38 * the input data and returns a pointer to the first occurrence of NL or NUL.
39 *
40 * It uses an algorithm similar to what there's in libc memchr/strchr.
41 *
42 * NOTE: find_eom is not static as it is used by a unit test program.
43 **/
44 const guchar *
find_eom(const guchar * s,gsize n)45 find_eom(const guchar *s, gsize n)
46 {
47 const guchar *char_ptr;
48 const gulong *longword_ptr;
49 gulong longword, magic_bits, charmask;
50 gchar c;
51
52 c = '\n';
53
54 /* align input to long boundary */
55 for (char_ptr = s; n > 0 && ((gulong) char_ptr & (sizeof(longword) - 1)) != 0; ++char_ptr, n--)
56 {
57 if (*char_ptr == c || *char_ptr == '\0')
58 return char_ptr;
59 }
60
61 longword_ptr = (gulong *) char_ptr;
62
63 #if GLIB_SIZEOF_LONG == 8
64 magic_bits = 0x7efefefefefefeffL;
65 #elif GLIB_SIZEOF_LONG == 4
66 magic_bits = 0x7efefeffL;
67 #else
68 #error "unknown architecture"
69 #endif
70 memset(&charmask, c, sizeof(charmask));
71
72 while (n > sizeof(longword))
73 {
74 longword = *longword_ptr++;
75 if ((((longword + magic_bits) ^ ~longword) & ~magic_bits) != 0 ||
76 ((((longword ^ charmask) + magic_bits) ^ ~(longword ^ charmask)) & ~magic_bits) != 0)
77 {
78 gint i;
79
80 char_ptr = (const guchar *) (longword_ptr - 1);
81
82 for (i = 0; i < sizeof(longword); i++)
83 {
84 if (*char_ptr == c || *char_ptr == '\0')
85 return char_ptr;
86 char_ptr++;
87 }
88 }
89 n -= sizeof(longword);
90 }
91
92 char_ptr = (const guchar *) longword_ptr;
93
94 while (n-- > 0)
95 {
96 if (*char_ptr == c || *char_ptr == '\0')
97 return char_ptr;
98 ++char_ptr;
99 }
100
101 return NULL;
102 }
103
104 AckTrackerFactory *
log_proto_server_get_ack_tracker_factory(LogProtoServer * s)105 log_proto_server_get_ack_tracker_factory(LogProtoServer *s)
106 {
107 return s->options->ack_tracker_factory;
108 }
109
110 gboolean
log_proto_server_is_position_tracked(LogProtoServer * s)111 log_proto_server_is_position_tracked(LogProtoServer *s)
112 {
113 AckTrackerType type = ack_tracker_factory_get_type(log_proto_server_get_ack_tracker_factory(s));
114
115 return ack_tracker_type_is_position_tracked(type);
116 }
117
118 gboolean
log_proto_server_validate_options_method(LogProtoServer * s)119 log_proto_server_validate_options_method(LogProtoServer *s)
120 {
121 return TRUE;
122 }
123
124 void
log_proto_server_free_method(LogProtoServer * s)125 log_proto_server_free_method(LogProtoServer *s)
126 {
127 log_transport_free(s->transport);
128 }
129
130 void
log_proto_server_free(LogProtoServer * s)131 log_proto_server_free(LogProtoServer *s)
132 {
133 if (s->free_fn)
134 s->free_fn(s);
135 g_free(s);
136 }
137
138 void
log_proto_server_init(LogProtoServer * self,LogTransport * transport,const LogProtoServerOptions * options)139 log_proto_server_init(LogProtoServer *self, LogTransport *transport, const LogProtoServerOptions *options)
140 {
141 self->validate_options = log_proto_server_validate_options_method;
142 self->free_fn = log_proto_server_free_method;
143 self->options = options;
144 self->transport = transport;
145 }
146
147 gboolean
log_proto_server_options_set_encoding(LogProtoServerOptions * self,const gchar * encoding)148 log_proto_server_options_set_encoding(LogProtoServerOptions *self, const gchar *encoding)
149 {
150 GIConv convert;
151
152 g_free(self->encoding);
153 self->encoding = g_strdup(encoding);
154
155 /* validate encoding */
156 convert = g_iconv_open("utf-8", encoding);
157 if (convert == (GIConv) -1)
158 return FALSE;
159 g_iconv_close(convert);
160 return TRUE;
161 }
162
163 void
log_proto_server_options_set_ack_tracker_factory(LogProtoServerOptions * self,AckTrackerFactory * factory)164 log_proto_server_options_set_ack_tracker_factory(LogProtoServerOptions *self, AckTrackerFactory *factory)
165 {
166 ack_tracker_factory_unref(self->ack_tracker_factory);
167 self->ack_tracker_factory = factory;
168 }
169
170 void
log_proto_server_options_defaults(LogProtoServerOptions * options)171 log_proto_server_options_defaults(LogProtoServerOptions *options)
172 {
173 memset(options, 0, sizeof(*options));
174 options->max_msg_size = -1;
175 options->trim_large_messages = -1;
176 options->init_buffer_size = -1;
177 options->max_buffer_size = -1;
178 options->ack_tracker_factory = instant_ack_tracker_bookmarkless_factory_new();
179 }
180
181 void
log_proto_server_options_init(LogProtoServerOptions * options,GlobalConfig * cfg)182 log_proto_server_options_init(LogProtoServerOptions *options, GlobalConfig *cfg)
183 {
184 if (options->initialized)
185 return;
186
187 if (options->max_msg_size == -1)
188 {
189 options->max_msg_size = cfg->log_msg_size;
190 }
191 if (options->trim_large_messages == -1)
192 {
193 options->trim_large_messages = cfg->trim_large_messages;
194 }
195 if (options->max_buffer_size == -1)
196 {
197 if (options->encoding)
198 {
199 /* Based on the implementation of LogProtoTextServer, the buffer is yielded as
200 a complete message when max_msg_size is reached and there is no EOM in the buffer.
201 In worst case, the buffer contains max_msg_size - 1 bytes before the next fetch,
202 which can be 6 times max_msg_size due to the utf8 conversion.
203 And additional space is required because of the possible leftover bytes.
204 */
205 options->max_buffer_size = 8 * options->max_msg_size;
206 }
207 else
208 options->max_buffer_size = options->max_msg_size;
209 }
210 if (options->init_buffer_size == -1)
211 options->init_buffer_size = MIN(options->max_msg_size, options->max_buffer_size);
212 options->initialized = TRUE;
213 }
214
215 void
log_proto_server_options_destroy(LogProtoServerOptions * options)216 log_proto_server_options_destroy(LogProtoServerOptions *options)
217 {
218 g_free(options->encoding);
219 ack_tracker_factory_unref(options->ack_tracker_factory);
220 if (options->destroy)
221 options->destroy(options);
222 options->initialized = FALSE;
223 }
224
225 LogProtoServerFactory *
log_proto_server_get_factory(PluginContext * context,const gchar * name)226 log_proto_server_get_factory(PluginContext *context, const gchar *name)
227 {
228 Plugin *plugin;
229
230 plugin = plugin_find(context, LL_CONTEXT_SERVER_PROTO, name);
231 if (plugin && plugin->construct)
232 return plugin->construct(plugin);
233 return NULL;
234 }
235