1 /*
2    Bacula(R) - The Network Backup Solution
3 
4    Copyright (C) 2000-2020 Kern Sibbald
5 
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8 
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13 
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16 
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 
20 #include "bacula.h"
21 #include "lib/unittests.h"
22 
23 /* Function that reproduce what the director is supposed to do
24  *  - Accept the connection from "filedaemon"
25  *  - Accept the command "setip"
26  *  - Store the socket in the BsockMeeting structure in a global list
27  *
28  *  - Accept the connection from "bconsole"
29  *  - Do a command that connects to the client
30  *  - Get the socket from BsockMeeting list
31  *  - do some discussion
32  */
33 
34 void *start_heap;
35 int port=2000;
36 int nb_job=10;
37 int done=nb_job;
38 int started=0;
39 int connected=0;
40 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
41 int nb_send=10;
42 char *remote = (char *)"localhost";
43 bool quit=false;
44 ilist clients(1000, not_owned_by_alist);
45 
get_client(const char * name)46 BsockMeeting *get_client(const char *name)
47 {
48    lock_guard m(mutex);
49    BsockMeeting *b;
50    int id=0;
51    if (sscanf(name, "client-%d", &id) != 1) {
52       return NULL;
53    }
54    b = (BsockMeeting *)clients.get(id);
55    if (!b) {
56       b = New(BsockMeeting());
57       clients.put(id, b);
58    }
59    return b;
60 }
61 
set_client(const char * name,BsockMeeting * b)62 void set_client(const char *name, BsockMeeting *b)
63 {
64    lock_guard m(mutex);
65    int id=0;
66    if (sscanf(name, "client-%d", &id) != 1) {
67       return;
68    }
69    clients.put(id, b);
70 }
71 
client_close()72 void client_close()
73 {
74    BsockMeeting *b;
75    int last = clients.last_index();
76    for (int i = 0; i <= last ; i++) {
77       b = (BsockMeeting *)clients.get(i);
78       if (b) {
79          delete b;
80          clients.put(i, NULL);
81       }
82    }
83    clients.destroy();
84 }
85 
86 /* When a client connects */
handle_client_request(void * fdp)87 static void *handle_client_request(void *fdp)
88 {
89    BsockMeeting *proxy;
90    BSOCK *fd = (BSOCK *)fdp;
91    if (fd->recv() > 0) {
92       char *name = fd->msg;
93       Pmsg1(0, "Got connection from %s\n", name);
94       proxy = get_client(name);
95       fd->fsend("OK\n");
96       proxy->set(fd);
97    } else {
98       free_bsock(fd);
99       return NULL;
100    }
101 
102    P(mutex);
103    connected++;
104    V(mutex);
105    return NULL;
106 }
107 
th_server(void *)108 static void *th_server(void *)
109 {
110    static workq_t dir_workq;             /* queue of work from Director */
111    IPADDR *tmp = 0;
112    dlist *lst;
113    lst = New(dlist(tmp, &tmp->link));
114 
115    init_default_addresses(&lst, port);
116    bnet_thread_server(lst, 100, &dir_workq, handle_client_request);
117    delete lst;
118    return NULL;
119 }
120 
121 
122 #ifdef HAVE_WIN32
123 #define WD "c:\\program files\\bacula\\working\\"
124 #else
125 #define WD "/tmp/"
126 #endif
127 
128 /* Simulate a console and do some action */
th_console(void * arg)129 void *th_console(void *arg)
130 {
131    bool quit;
132    int64_t total=0;
133    char ed1[50], ed2[50];
134    int32_t sig;
135    btime_t timer, end_timer, elapsed;
136 
137    BSOCK *dir=NULL;
138    char *name = (char *) arg;
139    BsockMeeting *proxy = get_client(name);
140    if (proxy == NULL) {
141       Pmsg1(0, "Unable to find %s\n", name);
142       goto bail_out;
143    }
144    Pmsg0(0, "Can go in sleep mode. Remove " WD "pause to continue\n");
145    {
146       struct stat sp;
147       fclose(fopen(WD "pause", "w"));
148       while (stat(WD "pause", &sp) == 0) {
149          bmicrosleep(1, 0);
150       }
151    }
152    dir = proxy->get(30);
153    if (!dir) {
154       Pmsg1(0, "Unable to get socket %s\n", name);
155       goto bail_out;
156    }
157 
158    Pmsg0(0, "send command\n");
159    dir->fsend("command\n");
160    timer = get_current_btime();
161 
162    for (quit=false; dir && !quit;) {
163       /* Read command */
164       sig = dir->recv();
165       if (sig < 0) {
166          Pmsg0(0, "Connection terminated\n");
167          break;               /* connection terminated */
168       } else if (!strncmp(dir->msg, "quit", 4)) {
169          Pmsg0(0, "got quit...\n");
170          dir->fsend("quit\n");
171          break;
172       } else {
173          total += dir->msglen;
174       }
175    }
176    end_timer = get_current_btime();
177    elapsed = (end_timer - timer)/1000; /* 0.001s */
178 
179    if (elapsed > 0) {
180       printf("got bytes=%sB in %.2fs speed=%sB/s\n", edit_uint64_with_suffix(total, ed1),
181              (float) elapsed/1000, edit_uint64_with_suffix(total/elapsed*1000, ed2));
182    }
183 
184 bail_out:
185    free_bsock(dir);
186    free(name);
187    return NULL;
188 }
189 
190 /* Simulate a filedaemon */
th_filedaemon(void * arg)191 void *th_filedaemon(void *arg)
192 {
193    BSOCK *sock = NULL;
194    char name[512];
195    BsockMeeting proxy;
196    bstrncpy(name, (char *)arg, sizeof(name));
197    free(arg);
198    P(mutex);
199    started++;
200    V(mutex);
201 
202    /* The FD will use a loop to connect */
203 connect_again:
204    free_bsock(sock);
205    sock = new_bsock();
206    if (!sock->connect(NULL, 5, 10, 2000, (char *)"*name*", remote, (char *)"*service*", port, 0)) {
207       bmicrosleep(1, 0);
208       goto connect_again;
209    }
210    Pmsg0(0, ">Connected!\n");
211 
212    /* Do "authentication" */
213    sock->fsend("%s", name);
214    if (sock->recv() <= 0 || strcmp(sock->msg, "OK\n") != 0) {
215       free_bsock(sock);
216       return NULL;
217    }
218 
219    /* Read command and wait to be used */
220    proxy.wait_request(sock);
221 
222    if (sock->is_closed()) {
223       goto connect_again;
224    }
225 
226    /* get a command */
227    if (sock->recv() <= 0) {
228       Pmsg0(0, "got incorrect message. Expecting command\n");
229       goto connect_again;
230    }
231 
232    /* Do something useful or not */
233    sock->msg = check_pool_memory_size(sock->msg, 4100);
234 
235    Pmsg1(0, ">Ready to send %u buffers of 4KB\n", nb_send);
236    for (int i = 0; i < nb_send ; i++) {
237       memset(sock->msg, i, 4096);
238       sock->msglen = 4096;
239       sock->msg[sock->msglen] = 0;
240       sock->send();
241    }
242    Pmsg1(0, ">Send quit command\n", nb_send);
243    sock->fsend("quit\n");
244    sock->recv();
245    sock->close();
246    P(mutex);
247    done--;
248    V(mutex);
249 //   goto connect_again;
250 
251    free_bsock(sock);
252    Pmsg4(0, ">done=%u started=%u connected=%u name=%s\n", done, started, connected, name);
253    return NULL;
254 }
255 
main(int argc,char * argv[])256 int main (int argc, char *argv[])
257 {
258    char ch;
259    int olddone=0;
260    bool server=false;
261    pthread_t server_id, client_id[1000], console_id[1000];
262    Unittests t("BsockMeeting", true, true);
263    InitWinAPIWrapper();
264    WSA_Init();
265    start_heap = sbrk(0);
266    bindtextdomain("bacula", LOCALEDIR);
267    textdomain("bacula");
268    init_stack_dump();
269    init_msg(NULL, NULL);
270    daemon_start_time = time(NULL);
271    set_working_directory((char *)WD);
272    set_thread_concurrency(150);
273    set_trace(0);
274 
275    while ((ch = getopt(argc, argv, "?n:j:r:p:sd:")) != -1) {
276       switch (ch) {
277       case 'j':
278          done = nb_job = MIN(atoi(optarg), 1000);
279          break;
280 
281       case 'n':
282          nb_send = atoi(optarg);
283          break;
284 
285       case 'r':
286          remote = optarg;
287          break;
288 
289       case 'p':
290          port = atoi(optarg);
291          break;
292 
293       case 's':
294          server = true;
295          break;
296 
297       case 'd':
298          debug_level = atoi(optarg);
299          break;
300 
301       case '?':
302       default:
303          Pmsg0(0, "Usage: bsock_meeting_test [-r remote] [-s] [-p port] [-n nb_send] [-j nb_job]\n");
304          return 0;
305       }
306    }
307    argc -= optind;
308    argv += optind;
309 
310    if (strcmp(remote, "localhost") == 0) {
311       Pmsg1(0, "Start server on port %d\n", port);
312       pthread_create(&server_id, NULL, th_server, NULL);
313 
314       Pmsg0(0, ">starting fake console\n");
315       for (int i = nb_job - 1; i >= 0 ; i--) {
316          char *tmp = (char *) malloc (32);
317          snprintf(tmp, 32, "client-%d", i);
318          pthread_create(&console_id[i], NULL, th_console, tmp);
319       }
320    }
321 
322    if (!server) {
323       Pmsg0(0, ">starting fake clients\n");
324       for (int i = 0; i < nb_job ; i++) {
325          char *tmp = (char *) malloc (32);
326          snprintf(tmp, 32, "client-%d", i);
327          pthread_create(&client_id[i], NULL, th_filedaemon, tmp);
328       }
329 
330       while (done>=1) {
331          if (done != olddone) {
332             Pmsg3(0, ">done=%u started=%u connected=%u\n", done, started, connected);
333             olddone = done;
334          }
335          sleep(1);
336       }
337       quit=true;
338       sleep(1);
339    }
340 
341    if (strcmp(remote, "localhost") == 0) {
342       Pmsg0(0, "Stop bnet server...\n");
343       bnet_stop_thread_server(server_id);
344       pthread_join(server_id, NULL);
345       Pmsg0(0, "done.\n");
346       Pmsg0(0, "Join console threads...\n");
347       for (int i = 0; i < nb_job ; i++) {
348          pthread_join(console_id[i], NULL);
349       }
350       Pmsg0(0, "done.\n");
351    }
352 
353    if (!server) {
354 #ifdef pthread_kill
355 #undef pthread_kill
356 #endif
357       for (int i = 0; i < nb_job ; i++) {
358          pthread_kill(client_id[i], SIGUSR2);
359       }
360       Pmsg0(0, "Join client threads...\n");
361       for (int i = 0; i < nb_job ; i++) {
362          pthread_join(client_id[i], NULL);
363       }
364       Pmsg0(0, "done.\n");
365    }
366    client_close();
367    term_last_jobs_list();
368    dequeue_daemon_messages(NULL);
369    dequeue_messages(NULL);
370    term_msg();
371    return 0;
372 }
373