1 /* This is a scalability test for testing million(s) of pinging connections */
2 
3 #include <libusockets.h>
4 int SSL;
5 
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <time.h>
10 #include <stdint.h>
11 
12 unsigned char web_socket_request[26] = {130, 128 | 20, 1, 2, 3, 4};
13 
14 char request[] = "GET / HTTP/1.1\r\n"
15                  "Upgrade: websocket\r\n"
16                  "Connection: Upgrade\r\n"
17                  "Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
18                  "Host: server.example.com\r\n"
19                  "Sec-WebSocket-Version: 13\r\n\r\n";
20 char *host;
21 int port;
22 int connections;
23 
24 /* All the ips we as client can use */
25 char **ips;
26 int num_ips;
27 
28 /* Send ping every 16 seconds */
29 int WEBSOCKET_PING_INTERVAL = 8;
30 
31 /* We only establish 20k connections per address */
32 int CONNECTIONS_PER_ADDRESS = 20000;
33 
34 /* How many connections a time */
35 int BATCH_CONNECT = 1;
36 
37 /* Currently open and alive connections */
38 int opened_connections;
39 /* Dead connections */
40 int closed_connections;
41 
42 struct http_socket {
43     /* How far we have streamed our websocket request */
44     int offset;
45 
46     /* How far we have streamed our upgrade request */
47     int upgrade_offset;
48 };
49 
50 struct us_socket_t *next_connection_failed = 0;
51 
52 /* We don't need any of these */
on_pre(struct us_loop_t * loop)53 void on_pre(struct us_loop_t *loop) {
54 
55 }
56 
57 /* This is not HTTP POST, it is merely an event emitted post loop iteration */
on_post(struct us_loop_t * loop)58 void on_post(struct us_loop_t *loop) {
59 
60 }
61 
next_connection(struct us_socket_t * s)62 void next_connection(struct us_socket_t *s) {
63     /* We could wait with this until properly upgraded */
64     if (--connections/* > BATCH_CONNECT*/) {
65         /* Swap address */
66         int address = opened_connections / CONNECTIONS_PER_ADDRESS;
67 
68         if (us_socket_context_connect(SSL, us_socket_context(SSL, s), host, port, ips[address], 0, sizeof(struct http_socket)) == 0) {
69             printf("Next connection failed immediately\n");
70 
71             /* Try agsin next event loop iteration */
72             next_connection_failed = s;
73             us_wakeup_loop(us_socket_context_loop(0, us_socket_context(0, s)));
74 
75         }
76     }
77 }
78 
on_wakeup(struct us_loop_t * loop)79 void on_wakeup(struct us_loop_t *loop) {
80     if (next_connection_failed) {
81         struct us_socket_t *s = next_connection_failed;
82         next_connection_failed = 0;
83         next_connection(s);
84     }
85 }
86 
on_http_socket_writable(struct us_socket_t * s)87 struct us_socket_t *on_http_socket_writable(struct us_socket_t *s) {
88     struct http_socket *http_socket = (struct http_socket *) us_socket_ext(SSL, s);
89 
90     /* Are we still not upgraded yet? */
91     if (http_socket->upgrade_offset < sizeof(request) - 1) {
92         http_socket->upgrade_offset += us_socket_write(SSL, s, request + http_socket->upgrade_offset, sizeof(request) - 1 - http_socket->upgrade_offset, 0);
93 
94         /* Now we should be */
95         if (http_socket->upgrade_offset == sizeof(request) - 1) {
96             next_connection(s);
97 
98             /* Make sure to send ping */
99             us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
100         }
101     } else {
102         /* Stream whatever is remaining of the request */
103         http_socket->offset += us_socket_write(SSL, s, (char *) web_socket_request + http_socket->offset, sizeof(web_socket_request) - http_socket->offset, 0);
104         if (http_socket->offset == sizeof(web_socket_request)) {
105             /* Reset timeout if we managed to */
106             us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
107         }
108     }
109 
110     return s;
111 }
112 
on_http_socket_close(struct us_socket_t * s,int code,void * reason)113 struct us_socket_t *on_http_socket_close(struct us_socket_t *s, int code, void *reason) {
114 
115     closed_connections++;
116     if (closed_connections % 1000 == 0) {
117         printf("Alive: %d, dead: %d\n", opened_connections, closed_connections);
118     }
119 
120     return s;
121 }
122 
on_http_socket_end(struct us_socket_t * s)123 struct us_socket_t *on_http_socket_end(struct us_socket_t *s) {
124     return us_socket_close(SSL, s, 0, NULL);
125 }
126 
127 // should never get a response!
on_http_socket_data(struct us_socket_t * s,char * data,int length)128 struct us_socket_t *on_http_socket_data(struct us_socket_t *s, char *data, int length) {
129 
130     // is this a broadcasted unix time in millis?
131     if (length % 10 == 0) {
132 
133         // data sent first will come first, so it is oldest
134 
135         struct timespec ts;
136         timespec_get(&ts, TIME_UTC);
137 
138         int64_t millis = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
139 
140 
141         int64_t received_millis;
142         memcpy(&received_millis, data + 2, 8);
143 
144         static int counter;
145         static int max_latency;
146         static long average_latency;
147 
148         int latency = millis - received_millis;
149 
150         average_latency += latency;
151 
152         if (max_latency < latency) {
153             max_latency = latency;
154         }
155 
156         if (++counter % 10000 == 0) {
157             printf("Alive: %d, dead: %d\n", opened_connections, closed_connections);
158             printf("Max latency: %d ms\n", max_latency);
159             printf("Average latency: %ld ms\n\n", average_latency / 10000);
160             max_latency = 0;
161             average_latency = 0;
162         }
163 
164 
165 
166     }
167 
168 
169 
170     return s;
171 }
172 
on_http_socket_open(struct us_socket_t * s,int is_client,char * ip,int ip_length)173 struct us_socket_t *on_http_socket_open(struct us_socket_t *s, int is_client, char *ip, int ip_length) {
174     struct http_socket *http_socket = (struct http_socket *) us_socket_ext(SSL, s);
175 
176     /* Display number of opened connections */
177     opened_connections++;
178     if (opened_connections % 1000 == 0) {
179         printf("Alive: %d, dead: %d\n", opened_connections, closed_connections);
180     }
181 
182     /* Send an upgrade request */
183     http_socket->upgrade_offset = us_socket_write(SSL, s, request, sizeof(request) - 1, 0);
184     if (http_socket->upgrade_offset == sizeof(request) - 1) {
185         next_connection(s);
186 
187         /* Make sure to send ping */
188         us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
189     }
190 
191     return s;
192 }
193 
194 // here we should send a message as ping (part of the test)
on_http_socket_timeout(struct us_socket_t * s)195 struct us_socket_t *on_http_socket_timeout(struct us_socket_t *s) {
196     struct http_socket *http_socket = (struct http_socket *) us_socket_ext(SSL, s);
197 
198     /* Send ping here */
199     http_socket->offset = us_socket_write(SSL, s, (char *) web_socket_request, sizeof(web_socket_request), 0);
200     if (http_socket->offset == sizeof(web_socket_request)) {
201         /* Reset timeout if we managed to */
202         us_socket_timeout(SSL, s, WEBSOCKET_PING_INTERVAL);
203     }
204 
205     return s;
206 }
207 
on_http_socket_connect_error(struct us_socket_t * s,int code)208 struct us_socket_t *on_http_socket_connect_error(struct us_socket_t *s, int code) {
209 
210     printf("Connection failed\n");
211 
212     next_connection(s);
213 
214     return s;
215 }
216 
main(int argc,char ** argv)217 int main(int argc, char **argv) {
218 
219     /* Parse host and port */
220     if (argc < 5) {
221         printf("Usage: connections host port ssl [ip ...]\n");
222         return 0;
223     }
224 
225     port = atoi(argv[3]);
226     host = malloc(strlen(argv[2]) + 1);
227     memcpy(host, argv[2], strlen(argv[2]) + 1);
228     connections = atoi(argv[1]);
229     SSL = atoi(argv[4]);
230 
231     /* Do we have ip addresses? */
232     if (argc > 5) {
233         ips = &argv[5];
234         num_ips = argc - 5;
235 
236         for (int i = 0; i < num_ips; i++) {
237             printf("%s\n", ips[i]);
238         }
239     } else {
240         static char *default_ips[] = {""};
241         ips = default_ips;
242         num_ips = 1;
243     }
244 
245     /* Check so that we have enough ip addresses */
246     if (num_ips <= connections / CONNECTIONS_PER_ADDRESS) {
247         printf("You'll need more IP addresses for this run\n");
248         return 0;
249     }
250 
251     /* Create the event loop */
252     struct us_loop_t *loop = us_create_loop(0, on_wakeup, on_pre, on_post, 0);
253 
254     /* Create a socket context for HTTP */
255     struct us_socket_context_options_t options = {};
256     struct us_socket_context_t *http_context = us_create_socket_context(SSL, loop, 0, options);
257 
258     /* Set up event handlers */
259     us_socket_context_on_open(SSL, http_context, on_http_socket_open);
260     us_socket_context_on_data(SSL, http_context, on_http_socket_data);
261     us_socket_context_on_writable(SSL, http_context, on_http_socket_writable);
262     us_socket_context_on_close(SSL, http_context, on_http_socket_close);
263     us_socket_context_on_timeout(SSL, http_context, on_http_socket_timeout);
264     us_socket_context_on_end(SSL, http_context, on_http_socket_end);
265     us_socket_context_on_connect_error(SSL, http_context, on_http_socket_connect_error);
266 
267     /* Start making HTTP connections */
268     for (int i = 0; i < BATCH_CONNECT; i++) {
269         if (us_socket_context_connect(SSL, http_context, host, port, ips[0], 0, sizeof(struct http_socket)) == 0) {
270             printf("Connection failed immediately\n");
271             return 0;
272         }
273     }
274 
275     us_loop_run(loop);
276 }
277