1 /*
2  * File:   ms_conn.h
3  * Author: Mingqiang Zhuang
4  *
5  * Created on February 10, 2009
6  *
7  * (c) Copyright 2009, Schooner Information Technology, Inc.
8  * http://www.schoonerinfotech.com/
9  *
10  */
11 #ifndef MS_CONN_H
12 #define MS_CONN_H
13 
14 #include <sys/socket.h>
15 #include <netinet/in.h>
16 #include <event.h>
17 #include <netdb.h>
18 
19 #include "ms_task.h"
20 #include <libmemcached/memcached/protocol_binary.h>
21 
22 #ifdef __cplusplus
23 extern "C" {
24 #endif
25 
26 #define DATA_BUFFER_SIZE             (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */
27 #define WRITE_BUFFER_SIZE            (32 * 1024)          /* write buffer, 32k */
28 #define UDP_DATA_BUFFER_SIZE         (1 * 1024 * 1024)    /* read buffer for UDP, 1M */
29 #define UDP_MAX_PAYLOAD_SIZE         1400                 /* server limit UDP payload size */
30 #define UDP_MAX_SEND_PAYLOAD_SIZE    1400                 /* mtu size is 1500 */
31 #define UDP_HEADER_SIZE              8                    /* UDP header size */
32 #define MAX_SENDBUF_SIZE             (256 * 1024 * 1024)  /* Maximum socket buffer size */
33 #define SOCK_WAIT_TIMEOUT            30                   /* maximum waiting time of UDP, 30s */
34 #define MAX_UDP_PACKET               (1 << 16)            /* maximum UDP packets, 65536 */
35 
36 /* Initial size of the sendmsg() scatter/gather array. */
37 #define IOV_LIST_INITIAL             400
38 
39 /* Initial number of sendmsg() argument structures to allocate. */
40 #define MSG_LIST_INITIAL             10
41 
42 /* High water marks for buffer shrinking */
43 #define READ_BUFFER_HIGHWAT          (2 * DATA_BUFFER_SIZE)
44 #define UDP_DATA_BUFFER_HIGHWAT      (4 * UDP_DATA_BUFFER_SIZE)
45 #define IOV_LIST_HIGHWAT             600
46 #define MSG_LIST_HIGHWAT             100
47 
48 /* parse udp header */
49 #define HEADER_TO_REQID(ptr)      ((uint16_t)*ptr * 256 \
50                                    + (uint16_t)*(ptr + 1))
51 #define HEADER_TO_SEQNUM(ptr)     ((uint16_t)*(ptr        \
52                                                + 2) * 256 \
53                                    + (uint16_t)*(ptr + 3))
54 #define HEADER_TO_PACKETS(ptr)    ((uint16_t)*(ptr        \
55                                                + 4) * 256 \
56                                    + (uint16_t)*(ptr + 5))
57 
58 /* states of connection */
59 enum conn_states
60 {
61   conn_read,         /* reading in a command line */
62   conn_write,        /* writing out a simple response */
63   conn_closing      /* closing this connection */
64 };
65 
66 /* returned states of memcached command */
67 enum mcd_ret
68 {
69   MCD_SUCCESS,                      /* command success */
70   MCD_FAILURE,                      /* command failure */
71   MCD_UNKNOWN_READ_FAILURE,         /* unknown read failure */
72   MCD_PROTOCOL_ERROR,               /* protocol error */
73   MCD_CLIENT_ERROR,                 /* client error, wrong command */
74   MCD_SERVER_ERROR,                 /* server error, server run command failed */
75   MCD_DATA_EXISTS,                  /* object is existent in server */
76   MCD_NOTSTORED,                    /* server doesn't set the object successfully */
77   MCD_STORED,                       /* server set the object successfully */
78   MCD_NOTFOUND,                     /* server not find the object */
79   MCD_END,                          /* end of the response of get command */
80   MCD_DELETED,                      /* server delete the object successfully */
81   MCD_STAT                         /* response of stats command */
82 };
83 
84 /* used to store the current or previous running command state */
85 typedef struct cmdstat
86 {
87   int cmd;                  /* command name */
88   int retstat;              /* return state of this command */
89   bool isfinish;            /* if it read all the response data */
90   uint64_t key_prefix;      /* key prefix */
91 } ms_cmdstat_t;
92 
93 /* udp packet structure */
94 typedef struct udppkt
95 {
96   uint8_t *header;          /* udp header of the packet */
97   char *data;               /* udp data of the packet */
98   int rbytes;               /* number of data in the packet */
99   int copybytes;            /* number of copied data in the packet */
100 } ms_udppkt_t;
101 
102 /* three protocols supported */
103 enum protocol
104 {
105   ascii_prot = 3,           /* ASCII protocol */
106   binary_prot              /* binary protocol */
107 };
108 
109 /**
110  *  concurrency structure
111  *
112  *  Each thread has a libevent to manage the events of network.
113  *  Each thread has one or more self-governed concurrencies;
114  *  each concurrency has one or more socket connections. This
115  *  concurrency structure includes all the private variables of
116  *  the concurrency.
117  */
118 typedef struct conn
119 {
120   uint32_t conn_idx;             /* connection index in the thread */
121   int sfd;                  /* current tcp sock handler of the connection structure */
122   int udpsfd;               /* current udp sock handler of the connection structure*/
123   int state;                /* state of the connection */
124   struct event event;       /* event for libevent */
125   short ev_flags;           /* event flag for libevent */
126   short which;              /* which events were just triggered */
127   bool change_sfd;          /* whether change sfd */
128 
129   int *tcpsfd;              /* TCP sock array */
130   uint32_t total_sfds;           /* how many socks in the tcpsfd array */
131   uint32_t alive_sfds;           /* alive socks */
132   uint32_t cur_idx;              /* current sock index in tcpsfd array */
133 
134   ms_cmdstat_t precmd;      /* previous command state */
135   ms_cmdstat_t currcmd;     /* current command state */
136 
137   char *rbuf;               /* buffer to read commands into */
138   char *rcurr;              /* but if we parsed some already, this is where we stopped */
139   int rsize;                /* total allocated size of rbuf */
140   int rbytes;               /* how much data, starting from rcur, do we have unparsed */
141 
142   bool readval;             /* read value state, read known data size */
143   int rvbytes;              /* total value size need to read */
144 
145   char *wbuf;               /* buffer to write commands out */
146   char *wcurr;              /* for multi-get, where we stopped */
147   int wsize;                /* total allocated size of wbuf */
148   bool ctnwrite;            /* continue to write */
149 
150   /* data for the mwrite state */
151   struct iovec *iov;
152   int iovsize;              /* number of elements allocated in iov[] */
153   int iovused;              /* number of elements used in iov[] */
154 
155   struct msghdr *msglist;
156   int msgsize;              /* number of elements allocated in msglist[] */
157   int msgused;              /* number of elements used in msglist[] */
158   int msgcurr;              /* element in msglist[] being transmitted now */
159   int msgbytes;             /* number of bytes in current msg */
160 
161   /* data for UDP clients */
162   bool udp;                          /* is this is a UDP "connection" */
163   uint32_t request_id;                   /* UDP request ID of current operation, if this is a UDP "connection" */
164   uint8_t *hdrbuf;                  /* udp packet headers */
165   int hdrsize;                      /* number of headers' worth of space is allocated */
166   struct  sockaddr srv_recv_addr;   /* Sent the most recent request to which server */
167   socklen_t srv_recv_addr_size;
168 
169   /* udp read buffer */
170   char *rudpbuf;                    /* buffer to read commands into for udp */
171   int rudpsize;                     /* total allocated size of rudpbuf */
172   int rudpbytes;                    /* how much data, starting from rudpbuf */
173 
174   /* order udp packet */
175   ms_udppkt_t *udppkt;              /* the offset of udp packet in rudpbuf */
176   int packets;                      /* number of total packets need to read */
177   int recvpkt;                      /* number of received packets */
178   int pktcurr;                      /* current packet in rudpbuf being ordered */
179   int ordcurr;                      /* current ordered packet */
180 
181   ms_task_item_t *item_win;         /* task sequence */
182   int win_size;                     /* current task window size */
183   uint64_t set_cursor;              /* current set item index in the item window */
184   ms_task_t curr_task;              /* current running task */
185   ms_mlget_task_t mlget_task;       /* multi-get task */
186 
187   int warmup_num;                   /* to run how many warm up operations*/
188   int remain_warmup_num;            /* left how many warm up operations to run */
189   int64_t exec_num;                 /* to run how many task operations */
190   int64_t remain_exec_num;          /* how many remained task operations to run */
191 
192   /* response time statistic and time out control */
193   struct timeval start_time;        /* start time of current operation(s) */
194   struct timeval end_time;          /* end time of current operation(s) */
195 
196   /* Binary protocol stuff */
197   protocol_binary_response_header binary_header;    /* local temporary binary header */
198   enum protocol protocol;                           /* which protocol this connection speaks */
199 } ms_conn_t;
200 
201 /* used to generate the key prefix */
202 uint64_t ms_get_key_prefix(void);
203 
204 
205 /**
206  * setup a connection, each connection structure of each
207  * thread must call this function to initialize.
208  */
209 int ms_setup_conn(ms_conn_t *c);
210 
211 
212 /* after one operation completes, reset the connection */
213 void ms_reset_conn(ms_conn_t *c, bool timeout);
214 
215 
216 /**
217  *  reconnect several disconnected socks in the connection
218  *  structure, the ever-1-second timer of the thread will check
219  *  whether some socks in the connections disconnect. if
220  *  disconnect, reconnect the sock.
221  */
222 int ms_reconn_socks(ms_conn_t *c);
223 
224 
225 /* used to send set command to server */
226 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item);
227 
228 
229 /* used to send the get command to server */
230 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item);
231 
232 
233 /* used to send the multi-get command to server */
234 int ms_mcd_mlget(ms_conn_t *c);
235 
236 
237 #ifdef __cplusplus
238 }
239 #endif
240 
241 #endif /* end of MS_CONN_H */
242