1
2 /*
3 * compress.c: Compression functions for Linux-HA
4 *
5 * Copyright (C) 2005 Guochun Shi <gshi@ncsa.uiuc.edu>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 /*
23 * Compression is designed to handle big messages, right now with 4 nodes
24 * cib message can go up to 64 KB or more. I expect much larger messages
25 * when the number of node increase. This makes message compression necessary.
26 *
27 *
28 * Compression is handled in field level. One can add a struct field using
29 * ha_msg_addstruct() -- the field will not get compressed, or using
30 * ha_msg_addstruct_compress(), and the field will get compressed when
31 * the message is converted to wire format, i.e. when msg2wirefmt() is called.
32 * The compressed field will stay compressed until it reached the desination.
33 * It will finally decompressed when the user start to get the field value.
34 * It is designed this way so that the compression/decompression only happens
35 * in end users so that heartbeat itself can save cpu cycle and memory.
36 * (more info about compression can be found in cl_msg_types.c about FT_COMPRESS
37 * FT_UNCOMPRESS types)
38 *
39 * compression has another legacy mode, which is there so it can be compatible
40 * to old ways of compression. In the old way, no field is compressed individually
41 * and the messages is compressed before it is sent out, and it will be decompressed
42 * in the receiver side immediately. So in each IPC channel, the message is compressed
43 * and decompressed once. This way will cost a lot of cpu time and memory and it is
44 * discouraged.
45 *
46 * If use_traditional_compression is true, then it is using the legacy mode, otherwise
47 * it is using the new compression. For back compatibility, the default is legacy mode.
48 *
49 * The real compression work is done by compression plugins. There are two plugins right
50 * now: zlib and bz2, they are in lib/plugins/compress
51 *
52 */
53
54 #include <lha_internal.h>
55 #include <stdlib.h>
56 #include <stdio.h>
57 #include <errno.h>
58 #include <string.h>
59 #include <time.h>
60 #include <unistd.h>
61 #include <unistd.h>
62 #include <assert.h>
63 #include <glib.h>
64 #include <compress.h>
65 #include <ha_msg.h>
66 #include <clplumbing/netstring.h>
67 #include <pils/plugin.h>
68 #include <pils/generic.h>
69 #include <stonith/stonith.h>
70 #include <stonith/stonith_plugin.h>
71
72 #define COMPRESSED_FIELD "_compressed_payload"
73 #define COMPRESS_NAME "_compression_algorithm"
74 #define HACOMPRESSNAME "HA_COMPRESSION"
75 #define DFLT_COMPRESS_PLUGIN "bz2"
76
77 static struct hb_compress_fns* msg_compress_fns = NULL;
78 static char* compress_name = NULL;
79 GHashTable* CompressFuncs = NULL;
80
81 static PILGenericIfMgmtRqst Reqs[] =
82 {
83 {"compress", &CompressFuncs, NULL, NULL, NULL},
84 {NULL, NULL, NULL, NULL, NULL}
85 };
86
87 static PILPluginUniv* CompressPIsys = NULL;
88
89 static int
init_pluginsys(void)90 init_pluginsys(void){
91
92 if (CompressPIsys) {
93 return TRUE;
94 }
95
96 CompressPIsys = NewPILPluginUniv(HA_PLUGIN_DIR);
97
98 if (CompressPIsys) {
99 if (PILLoadPlugin(CompressPIsys, PI_IFMANAGER, "generic", Reqs)
100 != PIL_OK){
101 cl_log(LOG_ERR, "generic plugin load failed\n");
102 DelPILPluginUniv(CompressPIsys);
103 CompressPIsys = NULL;
104 }
105 }else{
106 cl_log(LOG_ERR, "pi univ creation failed\n");
107 }
108 return CompressPIsys != NULL;
109
110 }
111
112 int
cl_compress_remove_plugin(const char * pluginname)113 cl_compress_remove_plugin(const char* pluginname)
114 {
115 return HA_OK;
116 }
117
118 int
cl_compress_load_plugin(const char * pluginname)119 cl_compress_load_plugin(const char* pluginname)
120 {
121 struct hb_compress_fns* funcs = NULL;
122
123 if (!init_pluginsys()){
124 return HA_FAIL;
125 }
126
127 if ((funcs = g_hash_table_lookup(CompressFuncs, pluginname))
128 == NULL){
129 if (PILPluginExists(CompressPIsys, HB_COMPRESS_TYPE_S,
130 pluginname) == PIL_OK){
131 PIL_rc rc;
132 if ((rc = PILLoadPlugin(CompressPIsys,
133 HB_COMPRESS_TYPE_S,
134 pluginname,
135 NULL))!= PIL_OK){
136 cl_log(LOG_ERR,
137 "Cannot load compress plugin %s[%s]",
138 pluginname,
139 PIL_strerror(rc));
140 return HA_FAIL;
141 }
142 funcs = g_hash_table_lookup(CompressFuncs,
143 pluginname);
144 }
145
146 }
147 if (funcs == NULL){
148 cl_log(LOG_ERR, "Compression module(%s) not found", pluginname);
149 return HA_FAIL;
150 }
151
152 /* set the environment variable so that later programs can
153 * load the appropriate plugin
154 */
155 setenv(HACOMPRESSNAME,pluginname,1);
156 msg_compress_fns = funcs;
157
158 return HA_OK;
159 }
160
161 int
cl_set_compress_fns(const char * pluginname)162 cl_set_compress_fns(const char* pluginname)
163 {
164 /* this function was unnecessary duplication of the
165 * code in cl_compress_load_plugin
166 */
167 return cl_compress_load_plugin(pluginname);
168 }
169
170 struct hb_compress_fns*
cl_get_compress_fns(void)171 cl_get_compress_fns(void)
172 {
173 static int try_dflt = 1;
174
175 if (try_dflt && !msg_compress_fns) {
176 try_dflt = 0;
177 cl_log(LOG_INFO, "%s: user didn't set compression type, "
178 "loading %s plugin",
179 __FUNCTION__, DFLT_COMPRESS_PLUGIN);
180 cl_compress_load_plugin(DFLT_COMPRESS_PLUGIN);
181 }
182 return msg_compress_fns;
183 }
184
185 static struct hb_compress_fns*
get_compress_fns(const char * pluginname)186 get_compress_fns(const char* pluginname)
187 {
188 struct hb_compress_fns* funcs = NULL;
189
190 if (cl_compress_load_plugin(pluginname) != HA_OK){
191 cl_log(LOG_ERR, "%s: loading compression module"
192 "(%s) failed",
193 __FUNCTION__, pluginname);
194 return NULL;
195 }
196
197 funcs = g_hash_table_lookup(CompressFuncs, pluginname);
198 return funcs;
199 }
200
201 void cl_realtime_malloc_check(void);
202
203 char*
cl_compressmsg(struct ha_msg * m,size_t * len)204 cl_compressmsg(struct ha_msg* m, size_t* len)
205 {
206 char* src;
207 char* dest;
208 size_t destlen;
209 int rc;
210 char* ret = NULL;
211 struct ha_msg* tmpmsg;
212 size_t datalen;
213
214 destlen = MAXMSG;
215
216 dest = malloc(destlen);
217 if (!dest) {
218 cl_log(LOG_ERR, "%s: failed to allocate destination buffer",
219 __FUNCTION__);
220 return NULL;
221 }
222
223 if (msg_compress_fns == NULL){
224 cl_log(LOG_ERR, "%s: msg_compress_fns is NULL!",
225 __FUNCTION__);
226 goto out;
227 }
228 if ( get_netstringlen(m) > MAXUNCOMPRESSED
229 || get_stringlen(m) > MAXUNCOMPRESSED){
230 cl_log(LOG_ERR, "%s: msg too big(stringlen=%d,"
231 "netstringlen=%d)",
232 __FUNCTION__,
233 get_stringlen(m),
234 get_netstringlen(m));
235 goto out;
236 }
237
238
239 if ((src = msg2wirefmt_noac(m, &datalen)) == NULL){
240 cl_log(LOG_ERR,"%s: converting msg"
241 " to wirefmt failed", __FUNCTION__);
242 goto out;
243 }
244
245 rc = msg_compress_fns->compress(dest, &destlen,
246 src, datalen);
247 if (rc != HA_OK){
248 cl_log(LOG_ERR, "%s: compression failed",
249 __FUNCTION__);
250 goto out;
251 }
252
253 free(src);
254
255 tmpmsg =ha_msg_new(0);
256 rc = ha_msg_addbin(tmpmsg, COMPRESSED_FIELD, dest, destlen)/*discouraged function*/;
257
258 if (rc != HA_OK){
259 cl_log(LOG_ERR, "%s: adding binary to msg failed",
260 __FUNCTION__);
261 goto out;
262 }
263
264 rc = ha_msg_add(tmpmsg, COMPRESS_NAME,
265 msg_compress_fns->getname());
266
267 if (rc != HA_OK){
268 cl_log(LOG_ERR, "%s: adding compress name to msg failed",
269 __FUNCTION__);
270 goto out;
271 }
272
273
274 ret = msg2netstring(tmpmsg, len);
275 ha_msg_del(tmpmsg);
276
277 #if 0
278 cl_log(LOG_INFO, "------original stringlen=%d, netstringlen=%d,"
279 "compressed_datalen=%d,current len=%d",
280 get_stringlen(m), get_netstringlen(m),(int)destlen, (int)*len);
281
282 #endif
283
284 out:
285 if (dest) {
286 free(dest);
287 }
288
289 return ret;
290 }
291
292
293 gboolean
is_compressed_msg(struct ha_msg * m)294 is_compressed_msg(struct ha_msg* m)
295 {
296 if( cl_get_binary(m, COMPRESSED_FIELD, NULL) /*discouraged function*/
297 != NULL){
298 return TRUE;
299 }
300
301 return FALSE;
302
303 }
304
305 /* the decompressmsg function is not exactly the reverse
306 * operation of compressmsg, it starts when the prorgram
307 * detects there is compressed_field in a msg
308 */
309
310 struct ha_msg*
cl_decompressmsg(struct ha_msg * m)311 cl_decompressmsg(struct ha_msg* m)
312 {
313 const char* src;
314 size_t srclen;
315 char *dest = NULL;
316 size_t destlen = MAXUNCOMPRESSED;
317 int rc;
318 struct ha_msg* ret = NULL;
319 const char* decompress_name;
320 struct hb_compress_fns* funcs = NULL;
321
322 dest = malloc(destlen);
323
324 if (!dest) {
325 cl_log(LOG_ERR, "%s: Failed to allocate buffer.", __FUNCTION__);
326 return NULL;
327 }
328
329 if (m == NULL){
330 cl_log(LOG_ERR, "%s: NULL message", __FUNCTION__);
331 goto out;
332 }
333 src = cl_get_binary(m, COMPRESSED_FIELD, &srclen)/*discouraged function*/;
334 if (src == NULL){
335 cl_log(LOG_ERR, "%s: compressed-field is NULL",
336 __FUNCTION__);
337 goto out;
338 }
339
340 if (srclen > MAXMSG){
341 cl_log(LOG_ERR, "%s: field too long(%d)",
342 __FUNCTION__, (int)srclen);
343 goto out;
344 }
345
346 decompress_name = ha_msg_value(m, COMPRESS_NAME);
347 if (decompress_name == NULL){
348 cl_log(LOG_ERR, "compress name not found");
349 goto out;
350 }
351
352
353 funcs = get_compress_fns(decompress_name);
354
355 if (funcs == NULL){
356 cl_log(LOG_ERR, "%s: compress method(%s) is not"
357 " supported in this machine",
358 __FUNCTION__, decompress_name);
359 goto out;
360 }
361
362 rc = funcs->decompress(dest, &destlen, src, srclen);
363
364 if (rc != HA_OK){
365 cl_log(LOG_ERR, "%s: decompression failed",
366 __FUNCTION__);
367 goto out;
368 }
369
370 ret = wirefmt2msg(dest, destlen, 0);
371
372 #if 0
373 cl_log(LOG_INFO, "%s: srclen =%d, destlen=%d",
374 __FUNCTION__,
375 srclen, destlen);
376 #endif
377
378 out:
379 if (dest) {
380 free(dest);
381 }
382
383 return ret;
384 }
385
386
387 int
cl_decompress_field(struct ha_msg * msg,int index,char * buf,size_t * buflen)388 cl_decompress_field(struct ha_msg* msg, int index, char* buf, size_t* buflen)
389 {
390 char* value;
391 int vallen;
392 int rc;
393 const char* decompress_name;
394 struct hb_compress_fns* funcs;
395
396 if ( msg == NULL|| index >= msg->nfields){
397 cl_log(LOG_ERR, "%s: wrong argument",
398 __FUNCTION__);
399 return HA_FAIL;
400 }
401
402 value = msg->values[index];
403 vallen = msg->vlens[index];
404
405 decompress_name = ha_msg_value(msg, COMPRESS_NAME);
406 if (decompress_name == NULL){
407 cl_log(LOG_ERR, "compress name not found");
408 return HA_FAIL;
409 }
410
411
412 funcs = get_compress_fns(decompress_name);
413
414 if (funcs == NULL){
415 cl_log(LOG_ERR, "%s: compress method(%s) is not"
416 " supported in this machine",
417 __FUNCTION__, decompress_name);
418 return HA_FAIL;
419 }
420
421 rc = funcs->decompress(buf, buflen, value, vallen);
422 if (rc != HA_OK){
423 cl_log(LOG_ERR, "%s: decompression failed",
424 __FUNCTION__);
425 return HA_FAIL;
426 }
427
428 return HA_OK;
429 }
430
431
432 int
cl_compress_field(struct ha_msg * msg,int index,char * buf,size_t * buflen)433 cl_compress_field(struct ha_msg* msg, int index, char* buf, size_t* buflen)
434 {
435 char* src;
436 size_t srclen;
437 int rc;
438
439 if ( msg == NULL|| index >= msg->nfields
440 || msg->types[index] != FT_UNCOMPRESS){
441 cl_log(LOG_ERR, "%s: wrong argument",
442 __FUNCTION__);
443 return HA_FAIL;
444 }
445
446 if (msg_compress_fns == NULL){
447 if (compress_name == NULL){
448 compress_name = getenv(HACOMPRESSNAME);
449 }
450
451 if (compress_name == NULL){
452 cl_log(LOG_ERR, "%s: no compression module name found",
453 __FUNCTION__);
454 return HA_FAIL;
455 }
456
457 if(cl_set_compress_fns(compress_name) != HA_OK){
458 cl_log(LOG_ERR, "%s: loading compression module failed",
459 __FUNCTION__);
460 return HA_FAIL;
461 }
462 }
463
464 if (msg_compress_fns == NULL){
465 cl_log(LOG_ERR, "%s: msg_compress_fns is NULL!",
466 __FUNCTION__);
467 return HA_FAIL;
468 }
469
470 src = msg2wirefmt_noac(msg->values[index], &srclen);
471 if (src == NULL){
472 cl_log(LOG_ERR,"%s: converting msg"
473 " to wirefmt failed", __FUNCTION__);
474 return HA_FAIL;
475 }
476
477 rc = msg_compress_fns->compress(buf, buflen,
478 src, srclen);
479 if (rc != HA_OK){
480 cl_log(LOG_ERR, "%s: compression failed",
481 __FUNCTION__);
482 return HA_FAIL;
483 }
484
485
486 rc = ha_msg_mod(msg, COMPRESS_NAME,
487 msg_compress_fns->getname());
488
489 if (rc != HA_OK){
490 cl_log(LOG_ERR, "%s: adding compress name to msg failed",
491 __FUNCTION__);
492 return HA_FAIL;;
493 }
494
495 free(src);
496 src = NULL;
497
498 return HA_OK;
499
500 }
501