1 /*
2 * Copyright (c) 2006-2009 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Steven Dake <sdake@redhat.com>
7 *
8 * libqb is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * libqb is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with libqb. If not, see <http://www.gnu.org/licenses/>.
20 */
21 #include "os_base.h"
22 #include <signal.h>
23
24 #include <qb/qbarray.h>
25 #include <qb/qbdefs.h>
26 #include <qb/qblog.h>
27 #include <qb/qbutil.h>
28 #include <qb/qbloop.h>
29 #include <qb/qbipcs.h>
30 #ifdef HAVE_GLIB
31 #include <glib.h>
32 #endif
33
34 int32_t blocking = QB_TRUE;
35 int32_t events = QB_FALSE;
36 int32_t use_glib = QB_FALSE;
37 int32_t verbose = 0;
38
39 static qb_loop_t *bms_loop;
40 #ifdef HAVE_GLIB
41 static GMainLoop *glib_loop;
42 static qb_array_t *gio_map;
43 #endif
44 static qb_ipcs_service_t* s1;
45
s1_connection_accept_fn(qb_ipcs_connection_t * c,uid_t uid,gid_t gid)46 static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
47 {
48 #if 0
49 if (uid == 0 && gid == 0) {
50 if (verbose) {
51 qb_log(LOG_INFO, "%s:%d %s authenticated connection\n",
52 __FILE__, __LINE__, __func__);
53 }
54 return 1;
55 }
56 qb_log(LOG_INFO, "%s:%d %s() BAD user!\n", __FILE__, __LINE__, __func__);
57 return 0;
58 #else
59 return 0;
60 #endif
61 }
62
63
s1_connection_created_fn(qb_ipcs_connection_t * c)64 static void s1_connection_created_fn(qb_ipcs_connection_t *c)
65 {
66 struct qb_ipcs_stats srv_stats;
67
68 qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
69 qb_log(LOG_NOTICE, "Connection created > active:%d > closed:%d",
70 srv_stats.active_connections,
71 srv_stats.closed_connections);
72 }
73
s1_connection_destroyed_fn(qb_ipcs_connection_t * c)74 static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
75 {
76 qb_log(LOG_INFO, "connection about to be freed\n");
77 }
78
s1_connection_closed_fn(qb_ipcs_connection_t * c)79 static int32_t s1_connection_closed_fn(qb_ipcs_connection_t *c)
80 {
81 struct qb_ipcs_connection_stats stats;
82 struct qb_ipcs_stats srv_stats;
83
84 qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
85
86 qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
87
88 qb_log(LOG_INFO, "Connection to pid:%d destroyed > active:%d > closed:%d",
89 stats.client_pid,
90 srv_stats.active_connections,
91 srv_stats.closed_connections);
92
93 qb_log(LOG_INFO, " Requests %"PRIu64"\n", stats.requests);
94 qb_log(LOG_INFO, " Responses %"PRIu64"\n", stats.responses);
95 qb_log(LOG_INFO, " Events %"PRIu64"\n", stats.events);
96 qb_log(LOG_INFO, " Send retries %"PRIu64"\n", stats.send_retries);
97 qb_log(LOG_INFO, " Recv retries %"PRIu64"\n", stats.recv_retries);
98 qb_log(LOG_INFO, " FC state %d\n", stats.flow_control_state);
99 qb_log(LOG_INFO, " FC count %"PRIu64"\n\n", stats.flow_control_count);
100 return 0;
101 }
102
s1_msg_process_fn(qb_ipcs_connection_t * c,void * data,size_t size)103 static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
104 void *data, size_t size)
105 {
106 struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
107 struct qb_ipc_response_header response;
108 ssize_t res;
109
110 qb_log(LOG_TRACE, "msg:%d, size:%d",
111 req_pt->id, req_pt->size);
112 response.size = sizeof(struct qb_ipc_response_header);
113 response.id = 13;
114 response.error = 0;
115 if (blocking) {
116 res = qb_ipcs_response_send(c, &response,
117 sizeof(response));
118 if (res < 0) {
119 qb_perror(LOG_ERR, "qb_ipcs_response_send");
120 return res;
121 }
122 }
123 if (events) {
124 res = qb_ipcs_event_send(c, &response,
125 sizeof(response));
126 if (res < 0) {
127 qb_perror(LOG_ERR, "qb_ipcs_event_send");
128 return res;
129 }
130 }
131 return 0;
132 }
133
sigusr1_handler(int32_t num)134 static void sigusr1_handler(int32_t num)
135 {
136 qb_log(LOG_INFO, "%s(%d)\n", __func__, num);
137 qb_ipcs_destroy(s1);
138 exit(0);
139 }
140
show_usage(const char * name)141 static void show_usage(const char *name)
142 {
143 qb_log(LOG_INFO, "usage: \n");
144 qb_log(LOG_INFO, "%s <options>\n", name);
145 qb_log(LOG_INFO, "\n");
146 qb_log(LOG_INFO, " options:\n");
147 qb_log(LOG_INFO, "\n");
148 qb_log(LOG_INFO, " -n non-blocking ipc (default blocking)\n");
149 qb_log(LOG_INFO, " -e send events back instead for responses\n");
150 qb_log(LOG_INFO, " -v verbose\n");
151 qb_log(LOG_INFO, " -h show this help text\n");
152 qb_log(LOG_INFO, " -m use shared memory\n");
153 qb_log(LOG_INFO, " -p use posix message queues\n");
154 qb_log(LOG_INFO, " -s use sysv message queues\n");
155 qb_log(LOG_INFO, " -u use unix sockets\n");
156 qb_log(LOG_INFO, " -g use glib mainloop\n");
157 qb_log(LOG_INFO, "\n");
158 }
159
160 #ifdef HAVE_GLIB
161
162 struct gio_to_qb_poll {
163 gboolean is_used;
164 GIOChannel *channel;
165 int32_t events;
166 void * data;
167 qb_ipcs_dispatch_fn_t fn;
168 enum qb_loop_priority p;
169 };
170
171
172 static gboolean
gio_read_socket(GIOChannel * gio,GIOCondition condition,gpointer data)173 gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data)
174 {
175 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
176 gint fd = g_io_channel_unix_get_fd(gio);
177
178 return (adaptor->fn(fd, condition, adaptor->data) == 0);
179 }
180
my_g_dispatch_add(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)181 static int32_t my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
182 void *data, qb_ipcs_dispatch_fn_t fn)
183 {
184 struct gio_to_qb_poll *adaptor;
185 GIOChannel *channel;
186 int32_t res = 0;
187
188 res = qb_array_grow(gio_map, fd + 1);
189 if (res < 0) {
190 return res;
191 }
192 res = qb_array_index(gio_map, fd, (void**)&adaptor);
193 if (res < 0) {
194 return res;
195 }
196 if (adaptor->is_used) {
197 return -EEXIST;
198 }
199
200 channel = g_io_channel_unix_new(fd);
201 if (!channel) {
202 return -ENOMEM;
203 }
204
205 adaptor->channel = channel;
206 adaptor->fn = fn;
207 adaptor->events = evts;
208 adaptor->data = data;
209 adaptor->p = p;
210 adaptor->is_used = TRUE;
211
212 g_io_add_watch(channel, evts, gio_read_socket, adaptor);
213 return 0;
214 }
215
my_g_dispatch_mod(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)216 static int32_t my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
217 void *data, qb_ipcs_dispatch_fn_t fn)
218 {
219 return 0;
220 }
221
my_g_dispatch_del(int32_t fd)222 static int32_t my_g_dispatch_del(int32_t fd)
223 {
224 struct gio_to_qb_poll *adaptor;
225 if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) {
226 g_io_channel_unref(adaptor->channel);
227 adaptor->is_used = FALSE;
228 }
229 return 0;
230 }
231
232 #endif /* HAVE_GLIB */
233
my_job_add(enum qb_loop_priority p,void * data,qb_loop_job_dispatch_fn fn)234 static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
235 {
236 return qb_loop_job_add(bms_loop, p, data, fn);
237 }
238
my_dispatch_add(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)239 static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
240 void *data, qb_ipcs_dispatch_fn_t fn)
241 {
242 return qb_loop_poll_add(bms_loop, p, fd, evts, data, fn);
243 }
244
my_dispatch_mod(enum qb_loop_priority p,int32_t fd,int32_t evts,void * data,qb_ipcs_dispatch_fn_t fn)245 static int32_t my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
246 void *data, qb_ipcs_dispatch_fn_t fn)
247 {
248 return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn);
249 }
250
my_dispatch_del(int32_t fd)251 static int32_t my_dispatch_del(int32_t fd)
252 {
253 return qb_loop_poll_del(bms_loop, fd);
254 }
255
256
main(int32_t argc,char * argv[])257 int32_t main(int32_t argc, char *argv[])
258 {
259 const char *options = "nevhmpsug";
260 int32_t opt;
261 int32_t rc;
262 enum qb_ipc_type ipc_type = QB_IPC_SHM;
263 struct qb_ipcs_service_handlers sh = {
264 .connection_accept = s1_connection_accept_fn,
265 .connection_created = s1_connection_created_fn,
266 .msg_process = s1_msg_process_fn,
267 .connection_destroyed = s1_connection_destroyed_fn,
268 .connection_closed = s1_connection_closed_fn,
269 };
270 struct qb_ipcs_poll_handlers ph = {
271 .job_add = my_job_add,
272 .dispatch_add = my_dispatch_add,
273 .dispatch_mod = my_dispatch_mod,
274 .dispatch_del = my_dispatch_del,
275 };
276 #ifdef HAVE_GLIB
277 struct qb_ipcs_poll_handlers glib_ph = {
278 .job_add = NULL, /* FIXME */
279 .dispatch_add = my_g_dispatch_add,
280 .dispatch_mod = my_g_dispatch_mod,
281 .dispatch_del = my_g_dispatch_del,
282 };
283 #endif /* HAVE_GLIB */
284
285 while ((opt = getopt(argc, argv, options)) != -1) {
286 switch (opt) {
287 case 'm':
288 ipc_type = QB_IPC_SHM;
289 break;
290 case 'u':
291 ipc_type = QB_IPC_SOCKET;
292 break;
293 case 'n': /* non-blocking */
294 blocking = QB_FALSE;
295 break;
296 case 'e': /* events */
297 events = QB_TRUE;
298 break;
299 case 'g':
300 use_glib = QB_TRUE;
301 break;
302 case 'v':
303 verbose++;
304 break;
305 case 'h':
306 default:
307 show_usage(argv[0]);
308 exit(0);
309 break;
310 }
311 }
312 signal(SIGINT, sigusr1_handler);
313 signal(SIGILL, sigusr1_handler);
314 signal(SIGTERM, sigusr1_handler);
315
316 qb_log_init("bms", LOG_USER, LOG_EMERG);
317 qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
318 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
319 QB_LOG_FILTER_FILE, "*", LOG_INFO + verbose);
320 qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
321
322 if (!use_glib) {
323 bms_loop = qb_loop_create();
324 s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
325 if (s1 == 0) {
326 qb_perror(LOG_ERR, "qb_ipcs_create");
327 exit(1);
328 }
329 qb_ipcs_poll_handlers_set(s1, &ph);
330 rc = qb_ipcs_run(s1);
331 if (rc != 0) {
332 errno = -rc;
333 qb_perror(LOG_ERR, "qb_ipcs_run");
334 exit(1);
335 }
336 qb_loop_run(bms_loop);
337 } else {
338 #ifdef HAVE_GLIB
339 glib_loop = g_main_loop_new(NULL, FALSE);
340
341 gio_map = qb_array_create(64, sizeof(struct gio_to_qb_poll));
342
343 s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
344 if (s1 == 0) {
345 qb_perror(LOG_ERR, "qb_ipcs_create");
346 exit(1);
347 }
348 qb_ipcs_poll_handlers_set(s1, &glib_ph);
349 rc = qb_ipcs_run(s1);
350 if (rc != 0) {
351 errno = -rc;
352 qb_perror(LOG_ERR, "qb_ipcs_run");
353 exit(1);
354 }
355
356 g_main_loop_run(glib_loop);
357 #else
358 qb_log(LOG_ERR, "You don't seem to have glib-devel installed.\n");
359 #endif
360 }
361 return EXIT_SUCCESS;
362 }
363