1 /*
2  * Copyright (c) 2006-2009 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Steven Dake <sdake@redhat.com>
7  *
8  * libqb is free software: you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation, either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * libqb is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20  */
21 #include "os_base.h"
22 #include <signal.h>
23 
24 #include <qb/qbarray.h>
25 #include <qb/qbdefs.h>
26 #include <qb/qblog.h>
27 #include <qb/qbutil.h>
28 #include <qb/qbloop.h>
29 #include <qb/qbipcs.h>
30 #ifdef HAVE_GLIB
31 #include <glib.h>
32 #endif
33 
34 int32_t blocking = QB_TRUE;
35 int32_t events = QB_FALSE;
36 int32_t use_glib = QB_FALSE;
37 int32_t verbose = 0;
38 
39 static qb_loop_t *bms_loop;
40 #ifdef HAVE_GLIB
41 static GMainLoop *glib_loop;
42 static qb_array_t *gio_map;
43 #endif
44 static qb_ipcs_service_t* s1;
45 
s1_connection_accept_fn(qb_ipcs_connection_t * c,uid_t uid,gid_t gid)46 static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
47 {
48 #if 0
49 	if (uid == 0 && gid == 0) {
50 		if (verbose) {
51 			qb_log(LOG_INFO, "%s:%d %s authenticated connection\n",
52 					__FILE__, __LINE__, __func__);
53 		}
54 		return 1;
55 	}
56 	qb_log(LOG_INFO, "%s:%d %s() BAD user!\n", __FILE__, __LINE__, __func__);
57 	return 0;
58 #else
59 	return 0;
60 #endif
61 }
62 
63 
s1_connection_created_fn(qb_ipcs_connection_t * c)64 static void s1_connection_created_fn(qb_ipcs_connection_t *c)
65 {
66 	struct qb_ipcs_stats srv_stats;
67 
68 	qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
69 	qb_log(LOG_NOTICE, "Connection created > active:%d > closed:%d",
70 	       srv_stats.active_connections,
71 	       srv_stats.closed_connections);
72 }
73 
s1_connection_destroyed_fn(qb_ipcs_connection_t * c)74 static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
75 {
76 	qb_log(LOG_INFO, "connection about to be freed\n");
77 }
78 
s1_connection_closed_fn(qb_ipcs_connection_t * c)79 static int32_t s1_connection_closed_fn(qb_ipcs_connection_t *c)
80 {
81 	struct qb_ipcs_connection_stats stats;
82 	struct qb_ipcs_stats srv_stats;
83 
84 	qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
85 
86 	qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
87 
88 	qb_log(LOG_INFO, "Connection to pid:%d destroyed > active:%d > closed:%d",
89 	       stats.client_pid,
90 	       srv_stats.active_connections,
91 	       srv_stats.closed_connections);
92 
93 	qb_log(LOG_INFO, " Requests     %"PRIu64"\n", stats.requests);
94 	qb_log(LOG_INFO, " Responses    %"PRIu64"\n", stats.responses);
95 	qb_log(LOG_INFO, " Events       %"PRIu64"\n", stats.events);
96 	qb_log(LOG_INFO, " Send retries %"PRIu64"\n", stats.send_retries);
97 	qb_log(LOG_INFO, " Recv retries %"PRIu64"\n", stats.recv_retries);
98 	qb_log(LOG_INFO, " FC state     %d\n", stats.flow_control_state);
99 	qb_log(LOG_INFO, " FC count     %"PRIu64"\n\n", stats.flow_control_count);
100 	return 0;
101 }
102 
s1_msg_process_fn(qb_ipcs_connection_t * c,void * data,size_t size)103 static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
104 		void *data, size_t size)
105 {
106 	struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
107 	struct qb_ipc_response_header response;
108 	ssize_t res;
109 
110 	qb_log(LOG_TRACE, "msg:%d, size:%d",
111 	       req_pt->id, req_pt->size);
112 	response.size = sizeof(struct qb_ipc_response_header);
113 	response.id = 13;
114 	response.error = 0;
115 	if (blocking) {
116 		res = qb_ipcs_response_send(c, &response,
117 				sizeof(response));
118 		if (res < 0) {
119 			qb_perror(LOG_ERR, "qb_ipcs_response_send");
120 			return res;
121 		}
122 	}
123 	if (events) {
124 		res = qb_ipcs_event_send(c, &response,
125 				sizeof(response));
126 		if (res < 0) {
127 			qb_perror(LOG_ERR, "qb_ipcs_event_send");
128 			return res;
129 		}
130 	}
131 	return 0;
132 }
133 
sigusr1_handler(int32_t num)134 static void sigusr1_handler(int32_t num)
135 {
136 	qb_log(LOG_INFO, "%s(%d)\n", __func__, num);
137 	qb_ipcs_destroy(s1);
138 	exit(0);
139 }
140 
show_usage(const char * name)141 static void show_usage(const char *name)
142 {
143 	qb_log(LOG_INFO, "usage: \n");
144 	qb_log(LOG_INFO, "%s <options>\n", name);
145 	qb_log(LOG_INFO, "\n");
146 	qb_log(LOG_INFO, "  options:\n");
147 	qb_log(LOG_INFO, "\n");
148 	qb_log(LOG_INFO, "  -n             non-blocking ipc (default blocking)\n");
149 	qb_log(LOG_INFO, "  -e             send events back instead for responses\n");
150 	qb_log(LOG_INFO, "  -v             verbose\n");
151 	qb_log(LOG_INFO, "  -h             show this help text\n");
152 	qb_log(LOG_INFO, "  -m             use shared memory\n");
153 	qb_log(LOG_INFO, "  -p             use posix message queues\n");
154 	qb_log(LOG_INFO, "  -s             use sysv message queues\n");
155 	qb_log(LOG_INFO, "  -u             use unix sockets\n");
156 	qb_log(LOG_INFO, "  -g             use glib mainloop\n");
157 	qb_log(LOG_INFO, "\n");
158 }
159 
160 #ifdef HAVE_GLIB
161 
162 struct gio_to_qb_poll {
163 	gboolean is_used;
164 	GIOChannel *channel;
165 	int32_t events;
166 	void * data;
167 	qb_ipcs_dispatch_fn_t fn;
168 	enum qb_loop_priority p;
169 };
170 
171 
172 static gboolean
gio_read_socket(GIOChannel * gio,GIOCondition condition,gpointer data)173 gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data)
174 {
175 	struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
176 	gint fd = g_io_channel_unix_get_fd(gio);
177 
178 	return (adaptor->fn(fd, condition, adaptor->data) == 0);
179 }
180 
my_g_dispatch_add(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)181 static int32_t my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
182 	void *data, qb_ipcs_dispatch_fn_t fn)
183 {
184 	struct gio_to_qb_poll *adaptor;
185 	GIOChannel *channel;
186 	int32_t res = 0;
187 
188 	res = qb_array_grow(gio_map, fd + 1);
189 	if (res < 0) {
190 		return res;
191 	}
192 	res = qb_array_index(gio_map, fd, (void**)&adaptor);
193 	if (res < 0) {
194 		return res;
195 	}
196 	if (adaptor->is_used) {
197 		return -EEXIST;
198 	}
199 
200 	channel = g_io_channel_unix_new(fd);
201 	if (!channel) {
202 		return -ENOMEM;
203 	}
204 
205 	adaptor->channel = channel;
206 	adaptor->fn = fn;
207 	adaptor->events = evts;
208 	adaptor->data = data;
209 	adaptor->p = p;
210 	adaptor->is_used = TRUE;
211 
212 	g_io_add_watch(channel, evts, gio_read_socket, adaptor);
213 	return 0;
214 }
215 
my_g_dispatch_mod(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)216 static int32_t my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
217 	void *data, qb_ipcs_dispatch_fn_t fn)
218 {
219 	return 0;
220 }
221 
my_g_dispatch_del(int32_t fd)222 static int32_t my_g_dispatch_del(int32_t fd)
223 {
224 	struct gio_to_qb_poll *adaptor;
225 	if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) {
226 		g_io_channel_unref(adaptor->channel);
227 		adaptor->is_used = FALSE;
228 	}
229 	return 0;
230 }
231 
232 #endif /* HAVE_GLIB */
233 
my_job_add(enum qb_loop_priority p,void * data,qb_loop_job_dispatch_fn fn)234 static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
235 {
236 	return qb_loop_job_add(bms_loop, p, data, fn);
237 }
238 
my_dispatch_add(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)239 static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
240 	void *data, qb_ipcs_dispatch_fn_t fn)
241 {
242 	return qb_loop_poll_add(bms_loop, p, fd, evts, data, fn);
243 }
244 
my_dispatch_mod(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)245 static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
246 	void *data, qb_ipcs_dispatch_fn_t fn)
247 {
248 	return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn);
249 }
250 
my_dispatch_del(int32_t fd)251 static int32_t my_dispatch_del(int32_t fd)
252 {
253 	return qb_loop_poll_del(bms_loop, fd);
254 }
255 
256 
main(int32_t argc,char * argv[])257 int32_t main(int32_t argc, char *argv[])
258 {
259 	const char *options = "nevhmpsug";
260 	int32_t opt;
261 	int32_t rc;
262 	enum qb_ipc_type ipc_type = QB_IPC_SHM;
263 	struct qb_ipcs_service_handlers sh = {
264 		.connection_accept = s1_connection_accept_fn,
265 		.connection_created = s1_connection_created_fn,
266 		.msg_process = s1_msg_process_fn,
267 		.connection_destroyed = s1_connection_destroyed_fn,
268 		.connection_closed = s1_connection_closed_fn,
269 	};
270 	struct qb_ipcs_poll_handlers ph = {
271 		.job_add = my_job_add,
272 		.dispatch_add = my_dispatch_add,
273 		.dispatch_mod = my_dispatch_mod,
274 		.dispatch_del = my_dispatch_del,
275 	};
276 #ifdef HAVE_GLIB
277 	struct qb_ipcs_poll_handlers glib_ph = {
278 		.job_add = NULL, /* FIXME */
279 		.dispatch_add = my_g_dispatch_add,
280 		.dispatch_mod = my_g_dispatch_mod,
281 		.dispatch_del = my_g_dispatch_del,
282 	};
283 #endif /* HAVE_GLIB */
284 
285 	while ((opt = getopt(argc, argv, options)) != -1) {
286 		switch (opt) {
287 		case 'm':
288 			ipc_type = QB_IPC_SHM;
289 			break;
290 		case 'u':
291 			ipc_type = QB_IPC_SOCKET;
292 			break;
293 		case 'n':	/* non-blocking */
294 			blocking = QB_FALSE;
295 			break;
296 		case 'e':	/* events */
297 			events = QB_TRUE;
298 			break;
299 		case 'g':
300 			use_glib = QB_TRUE;
301 			break;
302 		case 'v':
303 			verbose++;
304 			break;
305 		case 'h':
306 		default:
307 			show_usage(argv[0]);
308 			exit(0);
309 			break;
310 		}
311 	}
312 	signal(SIGINT, sigusr1_handler);
313 	signal(SIGILL, sigusr1_handler);
314 	signal(SIGTERM, sigusr1_handler);
315 
316 	qb_log_init("bms", LOG_USER, LOG_EMERG);
317 	qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
318 	qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
319 			  QB_LOG_FILTER_FILE, "*", LOG_INFO + verbose);
320 	qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
321 
322 	if (!use_glib) {
323 		bms_loop = qb_loop_create();
324 		s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
325 		if (s1 == 0) {
326 			qb_perror(LOG_ERR, "qb_ipcs_create");
327 			exit(1);
328 		}
329 		qb_ipcs_poll_handlers_set(s1, &ph);
330 		rc = qb_ipcs_run(s1);
331 		if (rc != 0) {
332 			errno = -rc;
333 			qb_perror(LOG_ERR, "qb_ipcs_run");
334 			exit(1);
335 		}
336 		qb_loop_run(bms_loop);
337 	} else {
338 #ifdef HAVE_GLIB
339 		glib_loop = g_main_loop_new(NULL, FALSE);
340 
341 		gio_map = qb_array_create(64, sizeof(struct gio_to_qb_poll));
342 
343 		s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
344 		if (s1 == 0) {
345 			qb_perror(LOG_ERR, "qb_ipcs_create");
346 			exit(1);
347 		}
348 		qb_ipcs_poll_handlers_set(s1, &glib_ph);
349 		rc = qb_ipcs_run(s1);
350 		if (rc != 0) {
351 			errno = -rc;
352 			qb_perror(LOG_ERR, "qb_ipcs_run");
353 			exit(1);
354 		}
355 
356 		g_main_loop_run(glib_loop);
357 #else
358 		qb_log(LOG_ERR, "You don't seem to have glib-devel installed.\n");
359 #endif
360 	}
361 	return EXIT_SUCCESS;
362 }
363