1 
2 /*
3  * compress.c: Compression functions for Linux-HA
4  *
5  * Copyright (C) 2005 Guochun Shi <gshi@ncsa.uiuc.edu>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21 
22 /*
23  * Compression is designed to handle big messages, right now with 4 nodes
24  * cib message can go up to 64 KB or more. I expect much larger messages
25  * when the number of node increase. This makes message compression necessary.
26  *
27  *
28  * Compression is handled in field level. One can add a struct field using
29  * ha_msg_addstruct() -- the field will not get compressed, or using
30  * ha_msg_addstruct_compress(), and the field will get compressed when
31  * the message is converted to wire format, i.e. when msg2wirefmt() is called.
32  * The compressed field will stay compressed until it reached the desination.
33  * It will finally decompressed when the user start to get the field value.
34  * It is designed this way so that the compression/decompression only happens
35  * in end users so that heartbeat itself can save cpu cycle and memory.
36  * (more info about compression can be found in cl_msg_types.c about FT_COMPRESS
37  * FT_UNCOMPRESS types)
38  *
39  * compression has another legacy mode, which is there so it can be compatible
40  * to old ways of compression. In the old way, no field is compressed individually
41  * and the messages is compressed before it is sent out, and it will be decompressed
42  * in the receiver side immediately. So in each IPC channel, the message is compressed
43  * and decompressed once. This way will cost a lot of cpu time and memory and it is
44  * discouraged.
45  *
46  * If use_traditional_compression is true, then it is using the legacy mode, otherwise
47  * it is using the new compression. For back compatibility, the default is legacy mode.
48  *
49  * The real compression work is done by compression plugins. There are two plugins right
50  * now: zlib and bz2, they are in lib/plugins/compress
51  *
52  */
53 
54 #include <lha_internal.h>
55 #include <stdlib.h>
56 #include <stdio.h>
57 #include <errno.h>
58 #include <string.h>
59 #include <time.h>
60 #include <unistd.h>
61 #include <unistd.h>
62 #include <assert.h>
63 #include <glib.h>
64 #include <compress.h>
65 #include <ha_msg.h>
66 #include <clplumbing/netstring.h>
67 #include <pils/plugin.h>
68 #include <pils/generic.h>
69 #include <stonith/stonith.h>
70 #include <stonith/stonith_plugin.h>
71 
72 #define COMPRESSED_FIELD "_compressed_payload"
73 #define COMPRESS_NAME "_compression_algorithm"
74 #define HACOMPRESSNAME "HA_COMPRESSION"
75 #define DFLT_COMPRESS_PLUGIN "bz2"
76 
77 static struct hb_compress_fns* msg_compress_fns = NULL;
78 static char*  compress_name = NULL;
79 GHashTable*		CompressFuncs = NULL;
80 
81 static PILGenericIfMgmtRqst	Reqs[] =
82 	{
83 		{"compress", &CompressFuncs, NULL, NULL, NULL},
84 		{NULL, NULL, NULL, NULL, NULL}
85 	};
86 
87 static PILPluginUniv*		CompressPIsys = NULL;
88 
89 static int
init_pluginsys(void)90 init_pluginsys(void){
91 
92 	if (CompressPIsys) {
93 		return TRUE;
94 	}
95 
96 	CompressPIsys = NewPILPluginUniv(HA_PLUGIN_DIR);
97 
98 	if (CompressPIsys) {
99 		if (PILLoadPlugin(CompressPIsys, PI_IFMANAGER, "generic", Reqs)
100 		!=	PIL_OK){
101 			cl_log(LOG_ERR, "generic plugin load failed\n");
102 			DelPILPluginUniv(CompressPIsys);
103 			CompressPIsys = NULL;
104 		}
105 	}else{
106 		cl_log(LOG_ERR, "pi univ creation failed\n");
107 	}
108 	return CompressPIsys != NULL;
109 
110 }
111 
112 int
cl_compress_remove_plugin(const char * pluginname)113 cl_compress_remove_plugin(const char* pluginname)
114 {
115 	return HA_OK;
116 }
117 
118 int
cl_compress_load_plugin(const char * pluginname)119 cl_compress_load_plugin(const char* pluginname)
120 {
121 	struct hb_compress_fns*	funcs = NULL;
122 
123 	if (!init_pluginsys()){
124 		return HA_FAIL;
125 	}
126 
127 	if ((funcs = g_hash_table_lookup(CompressFuncs, pluginname))
128 	    == NULL){
129 		if (PILPluginExists(CompressPIsys, HB_COMPRESS_TYPE_S,
130 				    pluginname) == PIL_OK){
131 			PIL_rc rc;
132 			if ((rc = PILLoadPlugin(CompressPIsys,
133 						HB_COMPRESS_TYPE_S,
134 						pluginname,
135 						NULL))!= PIL_OK){
136 				cl_log(LOG_ERR,
137 				       "Cannot load compress plugin %s[%s]",
138 				       pluginname,
139 				       PIL_strerror(rc));
140 				return HA_FAIL;
141 			}
142 			funcs = g_hash_table_lookup(CompressFuncs,
143 						    pluginname);
144 		}
145 
146 	}
147 	if (funcs == NULL){
148 		cl_log(LOG_ERR, "Compression module(%s) not found", pluginname);
149 		return HA_FAIL;
150 	}
151 
152 	/* set the environment variable so that later programs can
153 	 * load the appropriate plugin
154 	 */
155 	setenv(HACOMPRESSNAME,pluginname,1);
156 	msg_compress_fns = funcs;
157 
158 	return HA_OK;
159 }
160 
161 int
cl_set_compress_fns(const char * pluginname)162 cl_set_compress_fns(const char* pluginname)
163 {
164 	/* this function was unnecessary duplication of the
165 	 * code in cl_compress_load_plugin
166 	 */
167 	return cl_compress_load_plugin(pluginname);
168 }
169 
170 struct hb_compress_fns*
cl_get_compress_fns(void)171 cl_get_compress_fns(void)
172 {
173 	static int try_dflt = 1;
174 
175 	if (try_dflt && !msg_compress_fns) {
176 		try_dflt = 0;
177 		cl_log(LOG_INFO, "%s: user didn't set compression type, "
178 		       "loading %s plugin",
179 		       __FUNCTION__, DFLT_COMPRESS_PLUGIN);
180 		cl_compress_load_plugin(DFLT_COMPRESS_PLUGIN);
181 	}
182 	return msg_compress_fns;
183 }
184 
185 static struct hb_compress_fns*
get_compress_fns(const char * pluginname)186 get_compress_fns(const char* pluginname)
187 {
188 	struct hb_compress_fns*	funcs = NULL;
189 
190 	if (cl_compress_load_plugin(pluginname) != HA_OK){
191 		cl_log(LOG_ERR, "%s: loading compression module"
192 		       "(%s) failed",
193 		       __FUNCTION__, pluginname);
194 		return NULL;
195 	}
196 
197 	funcs = g_hash_table_lookup(CompressFuncs, pluginname);
198 	return funcs;
199 }
200 
201 void cl_realtime_malloc_check(void);
202 
203 char*
cl_compressmsg(struct ha_msg * m,size_t * len)204 cl_compressmsg(struct ha_msg* m, size_t* len)
205 {
206 	char*	src;
207 	char*	dest;
208 	size_t	destlen;
209 	int rc;
210 	char* ret = NULL;
211 	struct ha_msg* tmpmsg;
212 	size_t datalen;
213 
214 	destlen = MAXMSG;
215 
216 	dest = malloc(destlen);
217 	if (!dest) {
218 		cl_log(LOG_ERR, "%s: failed to allocate destination buffer",
219 		       __FUNCTION__);
220 		return NULL;
221 	}
222 
223 	if (msg_compress_fns == NULL){
224 		cl_log(LOG_ERR, "%s: msg_compress_fns is NULL!",
225 		       __FUNCTION__);
226 		goto out;
227 	}
228 	if ( get_netstringlen(m) > MAXUNCOMPRESSED
229 	     || get_stringlen(m) > MAXUNCOMPRESSED){
230 		cl_log(LOG_ERR, "%s: msg too big(stringlen=%d,"
231 		       "netstringlen=%d)",
232 		       __FUNCTION__,
233 		       get_stringlen(m),
234 		       get_netstringlen(m));
235 		goto out;
236 	}
237 
238 
239 	if ((src = msg2wirefmt_noac(m, &datalen)) == NULL){
240 		cl_log(LOG_ERR,"%s: converting msg"
241 		       " to wirefmt failed", __FUNCTION__);
242 		goto out;
243 	}
244 
245 	rc = msg_compress_fns->compress(dest, &destlen,
246 					src, datalen);
247 	if (rc != HA_OK){
248 		cl_log(LOG_ERR, "%s: compression failed",
249 		       __FUNCTION__);
250 		goto out;
251 	}
252 
253 	free(src);
254 
255 	tmpmsg =ha_msg_new(0);
256 	rc = ha_msg_addbin(tmpmsg, COMPRESSED_FIELD, dest, destlen)/*discouraged function*/;
257 
258 	if (rc != HA_OK){
259 		cl_log(LOG_ERR, "%s: adding binary to msg failed",
260 		       __FUNCTION__);
261 		goto out;
262 	}
263 
264 	rc = ha_msg_add(tmpmsg, COMPRESS_NAME,
265 			msg_compress_fns->getname());
266 
267 	if (rc != HA_OK){
268 		cl_log(LOG_ERR, "%s: adding compress name to msg failed",
269 		       __FUNCTION__);
270 		goto out;
271 	}
272 
273 
274 	ret = msg2netstring(tmpmsg, len);
275 	ha_msg_del(tmpmsg);
276 
277 #if 0
278 	cl_log(LOG_INFO, "------original stringlen=%d, netstringlen=%d,"
279 	       "compressed_datalen=%d,current len=%d",
280 	       get_stringlen(m), get_netstringlen(m),(int)destlen,  (int)*len);
281 
282 #endif
283 
284 out:
285 	if (dest) {
286 		free(dest);
287 	}
288 
289 	return ret;
290 }
291 
292 
293 gboolean
is_compressed_msg(struct ha_msg * m)294 is_compressed_msg(struct ha_msg* m)
295 {
296 	if( cl_get_binary(m, COMPRESSED_FIELD, NULL) /*discouraged function*/
297 	    != NULL){
298 		return TRUE;
299 	}
300 
301 	return FALSE;
302 
303 }
304 
305 /* the decompressmsg function is not exactly the reverse
306  * operation of compressmsg, it starts when the prorgram
307  * detects there is compressed_field in a msg
308  */
309 
310 struct ha_msg*
cl_decompressmsg(struct ha_msg * m)311 cl_decompressmsg(struct ha_msg* m)
312 {
313 	const char* src;
314 	size_t srclen;
315 	char *dest = NULL;
316 	size_t destlen = MAXUNCOMPRESSED;
317 	int rc;
318 	struct ha_msg* ret = NULL;
319 	const char* decompress_name;
320 	struct hb_compress_fns* funcs = NULL;
321 
322 	dest = malloc(destlen);
323 
324 	if (!dest) {
325 		cl_log(LOG_ERR, "%s: Failed to allocate buffer.", __FUNCTION__);
326 		return NULL;
327 	}
328 
329 	if (m == NULL){
330 		cl_log(LOG_ERR, "%s: NULL message", __FUNCTION__);
331 		goto out;
332 	}
333 	src = cl_get_binary(m, COMPRESSED_FIELD, &srclen)/*discouraged function*/;
334 	if (src == NULL){
335 		cl_log(LOG_ERR, "%s: compressed-field is NULL",
336 		       __FUNCTION__);
337 		goto out;
338 	}
339 
340 	if (srclen > MAXMSG){
341 		cl_log(LOG_ERR, "%s: field too long(%d)",
342 		       __FUNCTION__, (int)srclen);
343 		goto out;
344 	}
345 
346 	decompress_name = ha_msg_value(m, COMPRESS_NAME);
347 	if (decompress_name == NULL){
348 		cl_log(LOG_ERR, "compress name not found");
349 		goto out;
350 	}
351 
352 
353 	funcs = get_compress_fns(decompress_name);
354 
355 	if (funcs == NULL){
356 		cl_log(LOG_ERR, "%s: compress method(%s) is not"
357 		       " supported in this machine",
358 		       __FUNCTION__, decompress_name);
359 		goto out;
360 	}
361 
362 	rc = funcs->decompress(dest, &destlen, src, srclen);
363 
364 	if (rc != HA_OK){
365 		cl_log(LOG_ERR, "%s: decompression failed",
366 		       __FUNCTION__);
367 		goto out;
368 	}
369 
370 	ret = wirefmt2msg(dest, destlen, 0);
371 
372 #if 0
373 	cl_log(LOG_INFO, "%s: srclen =%d, destlen=%d",
374 	       __FUNCTION__,
375 	       srclen, destlen);
376 #endif
377 
378 out:
379 	if (dest) {
380 		free(dest);
381 	}
382 
383 	return ret;
384 }
385 
386 
387 int
cl_decompress_field(struct ha_msg * msg,int index,char * buf,size_t * buflen)388 cl_decompress_field(struct ha_msg* msg, int index, char* buf, size_t* buflen)
389 {
390 	char*		value;
391 	int		vallen;
392 	int		rc;
393 	const char*	decompress_name;
394 	struct hb_compress_fns* funcs;
395 
396 	if ( msg == NULL|| index >= msg->nfields){
397 		cl_log(LOG_ERR, "%s: wrong argument",
398 		       __FUNCTION__);
399 		return HA_FAIL;
400 	}
401 
402 	value = msg->values[index];
403 	vallen = msg->vlens[index];
404 
405 	decompress_name = ha_msg_value(msg, COMPRESS_NAME);
406 	if (decompress_name == NULL){
407 		cl_log(LOG_ERR, "compress name not found");
408 		return HA_FAIL;
409 	}
410 
411 
412 	funcs = get_compress_fns(decompress_name);
413 
414 	if (funcs == NULL){
415 		cl_log(LOG_ERR, "%s: compress method(%s) is not"
416 		       " supported in this machine",
417 		       __FUNCTION__, decompress_name);
418 		return HA_FAIL;
419 	}
420 
421 	rc = funcs->decompress(buf, buflen, value, vallen);
422 	if (rc != HA_OK){
423 		cl_log(LOG_ERR, "%s: decompression failed",
424 		       __FUNCTION__);
425 		return HA_FAIL;
426 	}
427 
428 	return HA_OK;
429 }
430 
431 
432 int
cl_compress_field(struct ha_msg * msg,int index,char * buf,size_t * buflen)433 cl_compress_field(struct ha_msg* msg, int index, char* buf, size_t* buflen)
434 {
435 	char*   src;
436 	size_t	srclen;
437 	int	rc;
438 
439 	if ( msg == NULL|| index >= msg->nfields
440 	     || msg->types[index] != FT_UNCOMPRESS){
441 		cl_log(LOG_ERR, "%s: wrong argument",
442 		       __FUNCTION__);
443 		return HA_FAIL;
444 	}
445 
446 	if (msg_compress_fns == NULL){
447 		if (compress_name == NULL){
448 			compress_name = getenv(HACOMPRESSNAME);
449 		}
450 
451 		if (compress_name == NULL){
452 			cl_log(LOG_ERR, "%s: no compression module name found",
453 			       __FUNCTION__);
454 			return HA_FAIL;
455 		}
456 
457 		if(cl_set_compress_fns(compress_name) != HA_OK){
458 			cl_log(LOG_ERR, "%s: loading compression module failed",
459 			       __FUNCTION__);
460 			return HA_FAIL;
461 		}
462 	}
463 
464 	if (msg_compress_fns == NULL){
465 		cl_log(LOG_ERR, "%s: msg_compress_fns is NULL!",
466 		       __FUNCTION__);
467 		return HA_FAIL;
468 	}
469 
470 	src = msg2wirefmt_noac(msg->values[index], &srclen);
471 	if (src == NULL){
472 		 cl_log(LOG_ERR,"%s: converting msg"
473 			" to wirefmt failed", __FUNCTION__);
474 		 return HA_FAIL;
475 	}
476 
477 	rc = msg_compress_fns->compress(buf, buflen,
478 					src, srclen);
479 	if (rc != HA_OK){
480 		cl_log(LOG_ERR, "%s: compression failed",
481 		       __FUNCTION__);
482 		return HA_FAIL;
483 	}
484 
485 
486 	rc = ha_msg_mod(msg, COMPRESS_NAME,
487 			msg_compress_fns->getname());
488 
489 	if (rc != HA_OK){
490 		cl_log(LOG_ERR, "%s: adding compress name to msg failed",
491 		       __FUNCTION__);
492 		return HA_FAIL;;
493 	}
494 
495 	free(src);
496 	src = NULL;
497 
498 	return HA_OK;
499 
500 }
501