1 /* This is a scalability test for testing million(s) of pinging connections */
2
3 #include <libusockets.h>
4 int SSL;
5
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <time.h>
10 #include <stdint.h>
11
12 unsigned char web_socket_request[26] = {130, 128 | 20, 1, 2, 3, 4};
13
14 char request[] = "GET / HTTP/1.1\r\n"
15 "Upgrade: websocket\r\n"
16 "Connection: Upgrade\r\n"
17 "Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
18 "Host: server.example.com\r\n"
19 "Sec-WebSocket-Version: 13\r\n\r\n";
20 char *host;
21 int port;
22 int connections;
23
24 /* All the ips we as client can use */
25 char **ips;
26 int num_ips;
27
28 /* Send ping every 16 seconds */
29 int WEBSOCKET_PING_INTERVAL = 8;
30
31 /* We only establish 20k connections per address */
32 int CONNECTIONS_PER_ADDRESS = 20000;
33
34 /* How many connections a time */
35 int BATCH_CONNECT = 1;
36
37 /* Currently open and alive connections */
38 int opened_connections;
39 /* Dead connections */
40 int closed_connections;
41
42 struct http_socket {
43 /* How far we have streamed our websocket request */
44 int offset;
45
46 /* How far we have streamed our upgrade request */
47 int upgrade_offset;
48 };
49
50 struct us_socket_t *next_connection_failed = 0;
51
52 /* We don't need any of these */
on_pre(struct us_loop_t * loop)53 void on_pre(struct us_loop_t *loop) {
54
55 }
56
57 /* This is not HTTP POST, it is merely an event emitted post loop iteration */
on_post(struct us_loop_t * loop)58 void on_post(struct us_loop_t *loop) {
59
60 }
61
next_connection(struct us_socket_t * s)62 void next_connection(struct us_socket_t *s) {
63 /* We could wait with this until properly upgraded */
64 if (--connections/* > BATCH_CONNECT*/) {
65 /* Swap address */
66 int address = opened_connections / CONNECTIONS_PER_ADDRESS;
67
68 if (us_socket_context_connect(SSL, us_socket_context(SSL, s), host, port, ips[address], 0, sizeof(struct http_socket)) == 0) {
69 printf("Next connection failed immediately\n");
70
71 /* Try agsin next event loop iteration */
72 next_connection_failed = s;
73 us_wakeup_loop(us_socket_context_loop(0, us_socket_context(0, s)));
74
75 }
76 }
77 }
78
on_wakeup(struct us_loop_t * loop)79 void on_wakeup(struct us_loop_t *loop) {
80 if (next_connection_failed) {
81 struct us_socket_t *s = next_connection_failed;
82 next_connection_failed = 0;
83 next_connection(s);
84 }
85 }
86
on_http_socket_writable(struct us_socket_t * s)87 struct us_socket_t *on_http_socket_writable(struct us_socket_t *s) {
88 struct http_socket *http_socket = (struct http_socket *) us_socket_ext(SSL, s);
89
90 /* Are we still not upgraded yet? */
91 if (http_socket->upgrade_offset < sizeof(request) - 1) {
92 http_socket->upgrade_offset += us_socket_write(SSL, s, request + http_socket->upgrade_offset, sizeof(request) - 1 - http_socket->upgrade_offset, 0);
93
94 /* Now we should be */
95 if (http_socket->upgrade_offset == sizeof(request) - 1) {
96 next_connection(s);
97
98 /* Make sure to send ping */
99 us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
100 }
101 } else {
102 /* Stream whatever is remaining of the request */
103 http_socket->offset += us_socket_write(SSL, s, (char *) web_socket_request + http_socket->offset, sizeof(web_socket_request) - http_socket->offset, 0);
104 if (http_socket->offset == sizeof(web_socket_request)) {
105 /* Reset timeout if we managed to */
106 us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
107 }
108 }
109
110 return s;
111 }
112
on_http_socket_close(struct us_socket_t * s,int code,void * reason)113 struct us_socket_t *on_http_socket_close(struct us_socket_t *s, int code, void *reason) {
114
115 closed_connections++;
116 if (closed_connections % 1000 == 0) {
117 printf("Alive: %d, dead: %d\n", opened_connections, closed_connections);
118 }
119
120 return s;
121 }
122
on_http_socket_end(struct us_socket_t * s)123 struct us_socket_t *on_http_socket_end(struct us_socket_t *s) {
124 return us_socket_close(SSL, s, 0, NULL);
125 }
126
127 // should never get a response!
on_http_socket_data(struct us_socket_t * s,char * data,int length)128 struct us_socket_t *on_http_socket_data(struct us_socket_t *s, char *data, int length) {
129
130 // is this a broadcasted unix time in millis?
131 if (length % 10 == 0) {
132
133 // data sent first will come first, so it is oldest
134
135 struct timespec ts;
136 timespec_get(&ts, TIME_UTC);
137
138 int64_t millis = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
139
140
141 int64_t received_millis;
142 memcpy(&received_millis, data + 2, 8);
143
144 static int counter;
145 static int max_latency;
146 static long average_latency;
147
148 int latency = millis - received_millis;
149
150 average_latency += latency;
151
152 if (max_latency < latency) {
153 max_latency = latency;
154 }
155
156 if (++counter % 10000 == 0) {
157 printf("Alive: %d, dead: %d\n", opened_connections, closed_connections);
158 printf("Max latency: %d ms\n", max_latency);
159 printf("Average latency: %ld ms\n\n", average_latency / 10000);
160 max_latency = 0;
161 average_latency = 0;
162 }
163
164
165
166 }
167
168
169
170 return s;
171 }
172
on_http_socket_open(struct us_socket_t * s,int is_client,char * ip,int ip_length)173 struct us_socket_t *on_http_socket_open(struct us_socket_t *s, int is_client, char *ip, int ip_length) {
174 struct http_socket *http_socket = (struct http_socket *) us_socket_ext(SSL, s);
175
176 /* Display number of opened connections */
177 opened_connections++;
178 if (opened_connections % 1000 == 0) {
179 printf("Alive: %d, dead: %d\n", opened_connections, closed_connections);
180 }
181
182 /* Send an upgrade request */
183 http_socket->upgrade_offset = us_socket_write(SSL, s, request, sizeof(request) - 1, 0);
184 if (http_socket->upgrade_offset == sizeof(request) - 1) {
185 next_connection(s);
186
187 /* Make sure to send ping */
188 us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
189 }
190
191 return s;
192 }
193
194 // here we should send a message as ping (part of the test)
on_http_socket_timeout(struct us_socket_t * s)195 struct us_socket_t *on_http_socket_timeout(struct us_socket_t *s) {
196 struct http_socket *http_socket = (struct http_socket *) us_socket_ext(SSL, s);
197
198 /* Send ping here */
199 http_socket->offset = us_socket_write(SSL, s, (char *) web_socket_request, sizeof(web_socket_request), 0);
200 if (http_socket->offset == sizeof(web_socket_request)) {
201 /* Reset timeout if we managed to */
202 us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
203 }
204
205 return s;
206 }
207
on_http_socket_connect_error(struct us_socket_t * s,int code)208 struct us_socket_t *on_http_socket_connect_error(struct us_socket_t *s, int code) {
209
210 printf("Connection failed\n");
211
212 next_connection(s);
213
214 return s;
215 }
216
main(int argc,char ** argv)217 int main(int argc, char **argv) {
218
219 /* Parse host and port */
220 if (argc < 5) {
221 printf("Usage: connections host port ssl [ip ...]\n");
222 return 0;
223 }
224
225 port = atoi(argv[3]);
226 host = malloc(strlen(argv[2]) + 1);
227 memcpy(host, argv[2], strlen(argv[2]) + 1);
228 connections = atoi(argv[1]);
229 SSL = atoi(argv[4]);
230
231 /* Do we have ip addresses? */
232 if (argc > 5) {
233 ips = &argv[5];
234 num_ips = argc - 5;
235
236 for (int i = 0; i < num_ips; i++) {
237 printf("%s\n", ips[i]);
238 }
239 } else {
240 static char *default_ips[] = {""};
241 ips = default_ips;
242 num_ips = 1;
243 }
244
245 /* Check so that we have enough ip addresses */
246 if (num_ips <= connections / CONNECTIONS_PER_ADDRESS) {
247 printf("You'll need more IP addresses for this run\n");
248 return 0;
249 }
250
251 /* Create the event loop */
252 struct us_loop_t *loop = us_create_loop(0, on_wakeup, on_pre, on_post, 0);
253
254 /* Create a socket context for HTTP */
255 struct us_socket_context_options_t options = {};
256 struct us_socket_context_t *http_context = us_create_socket_context(SSL, loop, 0, options);
257
258 /* Set up event handlers */
259 us_socket_context_on_open(SSL, http_context, on_http_socket_open);
260 us_socket_context_on_data(SSL, http_context, on_http_socket_data);
261 us_socket_context_on_writable(SSL, http_context, on_http_socket_writable);
262 us_socket_context_on_close(SSL, http_context, on_http_socket_close);
263 us_socket_context_on_timeout(SSL, http_context, on_http_socket_timeout);
264 us_socket_context_on_end(SSL, http_context, on_http_socket_end);
265 us_socket_context_on_connect_error(SSL, http_context, on_http_socket_connect_error);
266
267 /* Start making HTTP connections */
268 for (int i = 0; i < BATCH_CONNECT; i++) {
269 if (us_socket_context_connect(SSL, http_context, host, port, ips[0], 0, sizeof(struct http_socket)) == 0) {
270 printf("Connection failed immediately\n");
271 return 0;
272 }
273 }
274
275 us_loop_run(loop);
276 }
277