1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2001-2020. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /* to test multiple threads in ei */
22
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <string.h>
26
27 #ifdef __WIN32__
28 #include <winsock2.h>
29 #include <windows.h>
30 #include <process.h>
31 #else
32 #include <pthread.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
36 #endif
37
38 #include "ei.h"
39 #include "my_ussi.h"
40
41 #define MAIN main
42
43 /*
44 A small einode.
45 To be called from the test case ei_accept_SUITE:multi_thread
46 usage: eiaccnode <cookie> <n> <default|ussi>
47
48 - start threads 0..n-1
49 - in each thread
50 - listen on "ei0" .. "ei<n-1>"
51 - wait for connection
52 - receive a pid
53 - send {i, <pid>} back
54 - shutdown gracefully
55 */
56
57 static const char* cookie;
58 static int use_ussi;
59
60 #ifndef SD_SEND
61 #ifdef SHUTWR
62 #define SD_SEND SHUT_WR
63 #else
64 #define SD_SEND 1
65 #endif
66 #endif
67
68 #ifdef __WIN32__
69 static DWORD WINAPI
70 #else
71 static void*
72 #endif
einode_thread(void * num)73 einode_thread(void* num)
74 {
75 int n = (int)(long)num;
76 int port;
77 ei_cnode ec;
78 char myname[100], destname[100], filename[100];
79 int r, fd, listen;
80 ErlConnect conn;
81 erlang_msg msg;
82 FILE* file;
83
84 sprintf(filename, "eiacc%d_trace.txt", n);
85 file = fopen(filename, "a");
86
87 sprintf(myname, "eiacc%d", n); fflush(file);
88 fprintf(file, "---- use_ussi = %d ----\n", use_ussi); fflush(file);
89 if (use_ussi)
90 r = ei_connect_init_ussi(&ec, myname, cookie, 0,
91 &my_ussi, sizeof(my_ussi), NULL);
92 else
93 r = ei_connect_init(&ec, myname, cookie, 0);
94 fprintf(file, "r=%d\n", r); fflush(file);
95 port = 0;
96 listen = ei_listen(&ec, &port, 5);
97 if (listen <= 0) {
98 fprintf(file, "listen err\n"); fflush(file);
99 exit(7);
100 }
101 fprintf(file, "thread %d (%s:%s) listening on port %d\n", n, myname, destname, port); fflush(file);
102 r = ei_publish(&ec, port);
103 fprintf(file, "r=%d\n", r); fflush(file);
104 if (r == -1) {
105 fprintf(file, "ei_publish port %d\n", port+n); fflush(file);
106 exit(8);
107 }
108 fd = ei_accept(&ec, listen, &conn);
109 fprintf(file, "ei_accept %d\n", fd); fflush(file);
110 if (fd >= 0) {
111 ei_x_buff x, xs;
112 int index, version;
113 erlang_pid pid;
114
115 ei_x_new(&x);
116 for (;;) {
117 int got = ei_xreceive_msg(fd, &msg, &x);
118 if (got == ERL_TICK)
119 continue;
120 if (got == ERL_ERROR) {
121 fprintf(file, "receive error %d\n", n); fflush(file);
122 return 0;
123 }
124 fprintf(file, "received %d\n", got); fflush(file);
125 break;
126 }
127 index = 0;
128 if (ei_decode_version(x.buff, &index, &version) != 0) {
129 fprintf(file, "ei_decode_version %d\n", n); fflush(file);
130 return 0;
131 }
132 if (ei_decode_pid(x.buff, &index, &pid) != 0) {
133 fprintf(file, "ei_decode_pid %d\n", n); fflush(file);
134 return 0;
135 }
136 fprintf(file, "got pid from %s \n", pid.node); fflush(file);
137 ei_x_new_with_version(&xs);
138 ei_x_encode_tuple_header(&xs, 2);
139 ei_x_encode_long(&xs, n);
140 ei_x_encode_pid(&xs, &pid);
141 r = ei_send(fd, &pid, xs.buff, xs.index);
142 fprintf(file, "sent %d bytes %d\n", xs.index, r); fflush(file);
143 shutdown(fd, SD_SEND);
144 ei_close_connection(fd);
145 ei_x_free(&x);
146 ei_x_free(&xs);
147 } else {
148 fprintf(file, "coudn't connect fd %d r %d\n", fd, r); fflush(file);
149 }
150 ei_close_connection(listen);
151 fprintf(file, "done thread %d\n", n);
152 fclose(file);
153 return 0;
154 }
155
156 int
MAIN(int argc,char * argv[])157 MAIN(int argc, char *argv[])
158 {
159 int i, n, no_threads;
160 #ifdef __WIN32__
161 HANDLE threads[100];
162 #else
163 pthread_t threads[100];
164 #endif
165
166 if (argc < 4)
167 exit(1);
168
169 cookie = argv[1];
170 n = atoi(argv[2]);
171 if (n > 100)
172 exit(2);
173
174 if (strcmp(argv[3], "default") == 0)
175 use_ussi = 0;
176 else if (strcmp(argv[3], "ussi") == 0)
177 use_ussi = 1;
178 else
179 printf("bad argv[3] '%s'", argv[3]);
180
181 if (argc == 4)
182 no_threads = 0;
183 else
184 no_threads = argv[4] != NULL && strcmp(argv[4], "nothreads") == 0;
185
186 ei_init();
187
188 for (i = 0; i < n; ++i) {
189 if (!no_threads) {
190 #ifdef __WIN32__
191 unsigned tid;
192 threads[i] = (HANDLE)_beginthreadex(NULL, 0, einode_thread,
193 (void*)(size_t)i, 0, &tid);
194 #else
195 pthread_create(&threads[i], NULL, einode_thread, (void*)(size_t)i);
196 #endif
197 } else
198 einode_thread((void*)(size_t)i);
199 }
200
201 if (!no_threads)
202 for (i = 0; i < n; ++i) {
203 #ifdef __WIN32__
204 if (WaitForSingleObject(threads[i], INFINITE) != WAIT_OBJECT_0)
205 #else
206 if (pthread_join(threads[i], NULL) != 0)
207 #endif
208 printf("bad wait thread %d\n", i);
209 }
210 printf("ok\n");
211 return 0;
212 }
213