1 /*
2 Bacula(R) - The Network Backup Solution
3
4 Copyright (C) 2000-2020 Kern Sibbald
5
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
8
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
13
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
16
17 Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19
20 #include "bacula.h"
21 #include "lib/unittests.h"
22
23 /* Function that reproduce what the director is supposed to do
24 * - Accept the connection from "filedaemon"
25 * - Accept the command "setip"
26 * - Store the socket in the BsockMeeting structure in a global list
27 *
28 * - Accept the connection from "bconsole"
29 * - Do a command that connects to the client
30 * - Get the socket from BsockMeeting list
31 * - do some discussion
32 */
33
34 void *start_heap;
35 int port=2000;
36 int nb_job=10;
37 int done=nb_job;
38 int started=0;
39 int connected=0;
40 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
41 int nb_send=10;
42 char *remote = (char *)"localhost";
43 bool quit=false;
44 ilist clients(1000, not_owned_by_alist);
45
get_client(const char * name)46 BsockMeeting *get_client(const char *name)
47 {
48 lock_guard m(mutex);
49 BsockMeeting *b;
50 int id=0;
51 if (sscanf(name, "client-%d", &id) != 1) {
52 return NULL;
53 }
54 b = (BsockMeeting *)clients.get(id);
55 if (!b) {
56 b = New(BsockMeeting());
57 clients.put(id, b);
58 }
59 return b;
60 }
61
set_client(const char * name,BsockMeeting * b)62 void set_client(const char *name, BsockMeeting *b)
63 {
64 lock_guard m(mutex);
65 int id=0;
66 if (sscanf(name, "client-%d", &id) != 1) {
67 return;
68 }
69 clients.put(id, b);
70 }
71
client_close()72 void client_close()
73 {
74 BsockMeeting *b;
75 int last = clients.last_index();
76 for (int i = 0; i <= last ; i++) {
77 b = (BsockMeeting *)clients.get(i);
78 if (b) {
79 delete b;
80 clients.put(i, NULL);
81 }
82 }
83 clients.destroy();
84 }
85
86 /* When a client connects */
handle_client_request(void * fdp)87 static void *handle_client_request(void *fdp)
88 {
89 BsockMeeting *proxy;
90 BSOCK *fd = (BSOCK *)fdp;
91 if (fd->recv() > 0) {
92 char *name = fd->msg;
93 Pmsg1(0, "Got connection from %s\n", name);
94 proxy = get_client(name);
95 fd->fsend("OK\n");
96 proxy->set(fd);
97 } else {
98 free_bsock(fd);
99 return NULL;
100 }
101
102 P(mutex);
103 connected++;
104 V(mutex);
105 return NULL;
106 }
107
th_server(void *)108 static void *th_server(void *)
109 {
110 static workq_t dir_workq; /* queue of work from Director */
111 IPADDR *tmp = 0;
112 dlist *lst;
113 lst = New(dlist(tmp, &tmp->link));
114
115 init_default_addresses(&lst, port);
116 bnet_thread_server(lst, 100, &dir_workq, handle_client_request);
117 delete lst;
118 return NULL;
119 }
120
121
122 #ifdef HAVE_WIN32
123 #define WD "c:\\program files\\bacula\\working\\"
124 #else
125 #define WD "/tmp/"
126 #endif
127
128 /* Simulate a console and do some action */
th_console(void * arg)129 void *th_console(void *arg)
130 {
131 bool quit;
132 int64_t total=0;
133 char ed1[50], ed2[50];
134 int32_t sig;
135 btime_t timer, end_timer, elapsed;
136
137 BSOCK *dir=NULL;
138 char *name = (char *) arg;
139 BsockMeeting *proxy = get_client(name);
140 if (proxy == NULL) {
141 Pmsg1(0, "Unable to find %s\n", name);
142 goto bail_out;
143 }
144 Pmsg0(0, "Can go in sleep mode. Remove " WD "pause to continue\n");
145 {
146 struct stat sp;
147 fclose(fopen(WD "pause", "w"));
148 while (stat(WD "pause", &sp) == 0) {
149 bmicrosleep(1, 0);
150 }
151 }
152 dir = proxy->get(30);
153 if (!dir) {
154 Pmsg1(0, "Unable to get socket %s\n", name);
155 goto bail_out;
156 }
157
158 Pmsg0(0, "send command\n");
159 dir->fsend("command\n");
160 timer = get_current_btime();
161
162 for (quit=false; dir && !quit;) {
163 /* Read command */
164 sig = dir->recv();
165 if (sig < 0) {
166 Pmsg0(0, "Connection terminated\n");
167 break; /* connection terminated */
168 } else if (!strncmp(dir->msg, "quit", 4)) {
169 Pmsg0(0, "got quit...\n");
170 dir->fsend("quit\n");
171 break;
172 } else {
173 total += dir->msglen;
174 }
175 }
176 end_timer = get_current_btime();
177 elapsed = (end_timer - timer)/1000; /* 0.001s */
178
179 if (elapsed > 0) {
180 printf("got bytes=%sB in %.2fs speed=%sB/s\n", edit_uint64_with_suffix(total, ed1),
181 (float) elapsed/1000, edit_uint64_with_suffix(total/elapsed*1000, ed2));
182 }
183
184 bail_out:
185 free_bsock(dir);
186 free(name);
187 return NULL;
188 }
189
190 /* Simulate a filedaemon */
th_filedaemon(void * arg)191 void *th_filedaemon(void *arg)
192 {
193 BSOCK *sock = NULL;
194 char name[512];
195 BsockMeeting proxy;
196 bstrncpy(name, (char *)arg, sizeof(name));
197 free(arg);
198 P(mutex);
199 started++;
200 V(mutex);
201
202 /* The FD will use a loop to connect */
203 connect_again:
204 free_bsock(sock);
205 sock = new_bsock();
206 if (!sock->connect(NULL, 5, 10, 2000, (char *)"*name*", remote, (char *)"*service*", port, 0)) {
207 bmicrosleep(1, 0);
208 goto connect_again;
209 }
210 Pmsg0(0, ">Connected!\n");
211
212 /* Do "authentication" */
213 sock->fsend("%s", name);
214 if (sock->recv() <= 0 || strcmp(sock->msg, "OK\n") != 0) {
215 free_bsock(sock);
216 return NULL;
217 }
218
219 /* Read command and wait to be used */
220 proxy.wait_request(sock);
221
222 if (sock->is_closed()) {
223 goto connect_again;
224 }
225
226 /* get a command */
227 if (sock->recv() <= 0) {
228 Pmsg0(0, "got incorrect message. Expecting command\n");
229 goto connect_again;
230 }
231
232 /* Do something useful or not */
233 sock->msg = check_pool_memory_size(sock->msg, 4100);
234
235 Pmsg1(0, ">Ready to send %u buffers of 4KB\n", nb_send);
236 for (int i = 0; i < nb_send ; i++) {
237 memset(sock->msg, i, 4096);
238 sock->msglen = 4096;
239 sock->msg[sock->msglen] = 0;
240 sock->send();
241 }
242 Pmsg1(0, ">Send quit command\n", nb_send);
243 sock->fsend("quit\n");
244 sock->recv();
245 sock->close();
246 P(mutex);
247 done--;
248 V(mutex);
249 // goto connect_again;
250
251 free_bsock(sock);
252 Pmsg4(0, ">done=%u started=%u connected=%u name=%s\n", done, started, connected, name);
253 return NULL;
254 }
255
main(int argc,char * argv[])256 int main (int argc, char *argv[])
257 {
258 char ch;
259 int olddone=0;
260 bool server=false;
261 pthread_t server_id, client_id[1000], console_id[1000];
262 Unittests t("BsockMeeting", true, true);
263 InitWinAPIWrapper();
264 WSA_Init();
265 start_heap = sbrk(0);
266 bindtextdomain("bacula", LOCALEDIR);
267 textdomain("bacula");
268 init_stack_dump();
269 init_msg(NULL, NULL);
270 daemon_start_time = time(NULL);
271 set_working_directory((char *)WD);
272 set_thread_concurrency(150);
273 set_trace(0);
274
275 while ((ch = getopt(argc, argv, "?n:j:r:p:sd:")) != -1) {
276 switch (ch) {
277 case 'j':
278 done = nb_job = MIN(atoi(optarg), 1000);
279 break;
280
281 case 'n':
282 nb_send = atoi(optarg);
283 break;
284
285 case 'r':
286 remote = optarg;
287 break;
288
289 case 'p':
290 port = atoi(optarg);
291 break;
292
293 case 's':
294 server = true;
295 break;
296
297 case 'd':
298 debug_level = atoi(optarg);
299 break;
300
301 case '?':
302 default:
303 Pmsg0(0, "Usage: bsock_meeting_test [-r remote] [-s] [-p port] [-n nb_send] [-j nb_job]\n");
304 return 0;
305 }
306 }
307 argc -= optind;
308 argv += optind;
309
310 if (strcmp(remote, "localhost") == 0) {
311 Pmsg1(0, "Start server on port %d\n", port);
312 pthread_create(&server_id, NULL, th_server, NULL);
313
314 Pmsg0(0, ">starting fake console\n");
315 for (int i = nb_job - 1; i >= 0 ; i--) {
316 char *tmp = (char *) malloc (32);
317 snprintf(tmp, 32, "client-%d", i);
318 pthread_create(&console_id[i], NULL, th_console, tmp);
319 }
320 }
321
322 if (!server) {
323 Pmsg0(0, ">starting fake clients\n");
324 for (int i = 0; i < nb_job ; i++) {
325 char *tmp = (char *) malloc (32);
326 snprintf(tmp, 32, "client-%d", i);
327 pthread_create(&client_id[i], NULL, th_filedaemon, tmp);
328 }
329
330 while (done>=1) {
331 if (done != olddone) {
332 Pmsg3(0, ">done=%u started=%u connected=%u\n", done, started, connected);
333 olddone = done;
334 }
335 sleep(1);
336 }
337 quit=true;
338 sleep(1);
339 }
340
341 if (strcmp(remote, "localhost") == 0) {
342 Pmsg0(0, "Stop bnet server...\n");
343 bnet_stop_thread_server(server_id);
344 pthread_join(server_id, NULL);
345 Pmsg0(0, "done.\n");
346 Pmsg0(0, "Join console threads...\n");
347 for (int i = 0; i < nb_job ; i++) {
348 pthread_join(console_id[i], NULL);
349 }
350 Pmsg0(0, "done.\n");
351 }
352
353 if (!server) {
354 #ifdef pthread_kill
355 #undef pthread_kill
356 #endif
357 for (int i = 0; i < nb_job ; i++) {
358 pthread_kill(client_id[i], SIGUSR2);
359 }
360 Pmsg0(0, "Join client threads...\n");
361 for (int i = 0; i < nb_job ; i++) {
362 pthread_join(client_id[i], NULL);
363 }
364 Pmsg0(0, "done.\n");
365 }
366 client_close();
367 term_last_jobs_list();
368 dequeue_daemon_messages(NULL);
369 dequeue_messages(NULL);
370 term_msg();
371 return 0;
372 }
373