1 /*
2  * Copyright (c) 2014, 2015  Machine Zone, Inc.
3  *
4  * Original author: Lev Walkin <lwalkin@machinezone.com>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14 
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 #ifndef TCPKALI_ENGINE_H
28 #define TCPKALI_ENGINE_H
29 
30 #include <StreamBoyerMooreHorspool.h>
31 #include <hdr_histogram.h>
32 
33 #include "tcpkali_traffic_stats.h"
34 #include "tcpkali_transport.h"
35 #include "tcpkali_logging.h"
36 #include "tcpkali_atomic.h"
37 #include "tcpkali_rate.h"
38 #include "tcpkali_expr.h"
39 #include "tcpkali_dns.h"
40 
41 long number_of_cpus();
42 
43 struct engine;
44 
45 struct engine_params {
46     struct addresses remote_addresses;
47     struct addresses listen_addresses;
48     struct addresses source_addresses;
49     size_t requested_workers;             /* Number of threads to start */
50     rate_spec_t channel_send_rate;        /* --channel-upstream */
51     rate_spec_t channel_recv_rate;        /* --channel-downstream */
52     enum verbosity_level verbosity_level; /* Default verbosity level is 1 */
53     enum {
54         NSET_UNSET = -1,
55         NSET_NODELAY_OFF = 0, /* Enable Nagle */
56         NSET_NODELAY_ON = 1,  /* Disable Nagle */
57     } nagle_setting;
58     enum {
59         WRCOMB_OFF = 0, /* Disable write coalescing */
60         WRCOMB_ON = 1,  /* Enable write coalescing (default) */
61     } write_combine;
62     enum {
63         LMODE_DEFAULT = 0x00, /* Do not send data, ignore received data */
64         LMODE_ACTIVE = 0x01,  /* Actively send messages */
65         _LMODE_RCV_MASK = 0xf0,
66         _LMODE_SND_MASK = 0x0f,
67     } listen_mode;
68     uint32_t sock_rcvbuf_size; /* SO_RCVBUF setting */
69     uint32_t sock_sndbuf_size; /* SO_SNDBUF setting */
70     double connect_timeout;
71     double channel_lifetime;
72     double epoch;
73     int websocket_enable; /* Enable Websocket responder on (-l) */
74     /* Pre-computed message data template */
75     struct message_collection message_collection;  /* A descr. what to send */
76     struct transport_data_spec *data_templates[2]; /* client, server tmpls */
77     enum {
78         DS_DUMP_ONE_IN = 1,
79         DS_DUMP_ONE_OUT = 2,
80         DS_DUMP_ONE = 3, /* 2|1 */
81         DS_DUMP_ALL_IN = 4,
82         DS_DUMP_ALL_OUT = 8,
83         DS_DUMP_ALL = 12 /* 8|4 */
84     } dump_setting;
85     statsd_report_latency_types latency_setting;
86     int latency_marker_skip;        /* --latency-marker-skip <N> */
87     int message_marker;     /* \{message.marker} */
88     tk_expr_t *latency_marker_expr; /* --latency-marker */
89     tk_expr_t *message_stop_expr;   /* --message-stop */
90 
91     /* Streaming Boyer-Moore-Horspool */
92     struct StreamBMH_Occ sbmh_shared_marker_occ; /* --latency-marker */
93     struct StreamBMH_Occ sbmh_shared_stop_occ;   /* --message-stop */
94 };
95 
96 struct engine *engine_start(struct engine_params);
97 const struct engine_params *engine_params(struct engine *);
98 void engine_update_message_send_rate(struct engine *, double msg_rate);
99 
100 
101 /*
102  * Report the number of opened connections by categories.
103  */
104 void engine_get_connection_stats(struct engine *, size_t *connecting,
105                                  size_t *incoming, size_t *outgoing,
106                                  size_t *counter);
107 
108 /*
109  * Create snapshot of the current latency histogram.
110  */
111 void engine_prepare_latency_snapshot(struct engine *);
112 struct latency_snapshot *engine_collect_latency_snapshot(struct engine *);
113 struct latency_snapshot *engine_diff_latency_snapshot(struct latency_snapshot *base, struct latency_snapshot *update);
114 void engine_free_latency_snapshot(struct latency_snapshot *);
115 
116 size_t engine_initiate_new_connections(struct engine *, size_t n);
117 
118 non_atomic_traffic_stats engine_traffic(struct engine *);
119 
120 void engine_terminate(struct engine *, double epoch_start,
121                       /* Traffic observed during ramp-up phase */
122                       non_atomic_traffic_stats initial_traffic,
123                       /* Report latencies at specified %'iles */
124                       struct percentile_values *report_latency_percentiles);
125 
126 #endif /* TCPKALI_ENGINE_H */
127