1 /* 2 * File: ms_conn.h 3 * Author: Mingqiang Zhuang 4 * 5 * Created on February 10, 2009 6 * 7 * (c) Copyright 2009, Schooner Information Technology, Inc. 8 * http://www.schoonerinfotech.com/ 9 * 10 */ 11 #ifndef MS_CONN_H 12 #define MS_CONN_H 13 14 #include <sys/socket.h> 15 #include <netinet/in.h> 16 #include <event.h> 17 #include <netdb.h> 18 19 #include "ms_task.h" 20 #include <libmemcached/memcached/protocol_binary.h> 21 22 #ifdef __cplusplus 23 extern "C" { 24 #endif 25 26 #define DATA_BUFFER_SIZE (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */ 27 #define WRITE_BUFFER_SIZE (32 * 1024) /* write buffer, 32k */ 28 #define UDP_DATA_BUFFER_SIZE (1 * 1024 * 1024) /* read buffer for UDP, 1M */ 29 #define UDP_MAX_PAYLOAD_SIZE 1400 /* server limit UDP payload size */ 30 #define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */ 31 #define UDP_HEADER_SIZE 8 /* UDP header size */ 32 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */ 33 #define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */ 34 #define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */ 35 36 /* Initial size of the sendmsg() scatter/gather array. */ 37 #define IOV_LIST_INITIAL 400 38 39 /* Initial number of sendmsg() argument structures to allocate. */ 40 #define MSG_LIST_INITIAL 10 41 42 /* High water marks for buffer shrinking */ 43 #define READ_BUFFER_HIGHWAT (2 * DATA_BUFFER_SIZE) 44 #define UDP_DATA_BUFFER_HIGHWAT (4 * UDP_DATA_BUFFER_SIZE) 45 #define IOV_LIST_HIGHWAT 600 46 #define MSG_LIST_HIGHWAT 100 47 48 /* parse udp header */ 49 #define HEADER_TO_REQID(ptr) ((uint16_t)*ptr * 256 \ 50 + (uint16_t)*(ptr + 1)) 51 #define HEADER_TO_SEQNUM(ptr) ((uint16_t)*(ptr \ 52 + 2) * 256 \ 53 + (uint16_t)*(ptr + 3)) 54 #define HEADER_TO_PACKETS(ptr) ((uint16_t)*(ptr \ 55 + 4) * 256 \ 56 + (uint16_t)*(ptr + 5)) 57 58 /* states of connection */ 59 enum conn_states 60 { 61 conn_read, /* reading in a command line */ 62 conn_write, /* writing out a simple response */ 63 conn_closing /* closing this connection */ 64 }; 65 66 /* returned states of memcached command */ 67 enum mcd_ret 68 { 69 MCD_SUCCESS, /* command success */ 70 MCD_FAILURE, /* command failure */ 71 MCD_UNKNOWN_READ_FAILURE, /* unknown read failure */ 72 MCD_PROTOCOL_ERROR, /* protocol error */ 73 MCD_CLIENT_ERROR, /* client error, wrong command */ 74 MCD_SERVER_ERROR, /* server error, server run command failed */ 75 MCD_DATA_EXISTS, /* object is existent in server */ 76 MCD_NOTSTORED, /* server doesn't set the object successfully */ 77 MCD_STORED, /* server set the object successfully */ 78 MCD_NOTFOUND, /* server not find the object */ 79 MCD_END, /* end of the response of get command */ 80 MCD_DELETED, /* server delete the object successfully */ 81 MCD_STAT /* response of stats command */ 82 }; 83 84 /* used to store the current or previous running command state */ 85 typedef struct cmdstat 86 { 87 int cmd; /* command name */ 88 int retstat; /* return state of this command */ 89 bool isfinish; /* if it read all the response data */ 90 uint64_t key_prefix; /* key prefix */ 91 } ms_cmdstat_t; 92 93 /* udp packet structure */ 94 typedef struct udppkt 95 { 96 uint8_t *header; /* udp header of the packet */ 97 char *data; /* udp data of the packet */ 98 int rbytes; /* number of data in the packet */ 99 int copybytes; /* number of copied data in the packet */ 100 } ms_udppkt_t; 101 102 /* three protocols supported */ 103 enum protocol 104 { 105 ascii_prot = 3, /* ASCII protocol */ 106 binary_prot /* binary protocol */ 107 }; 108 109 /** 110 * concurrency structure 111 * 112 * Each thread has a libevent to manage the events of network. 113 * Each thread has one or more self-governed concurrencies; 114 * each concurrency has one or more socket connections. This 115 * concurrency structure includes all the private variables of 116 * the concurrency. 117 */ 118 typedef struct conn 119 { 120 uint32_t conn_idx; /* connection index in the thread */ 121 int sfd; /* current tcp sock handler of the connection structure */ 122 int udpsfd; /* current udp sock handler of the connection structure*/ 123 int state; /* state of the connection */ 124 struct event event; /* event for libevent */ 125 short ev_flags; /* event flag for libevent */ 126 short which; /* which events were just triggered */ 127 bool change_sfd; /* whether change sfd */ 128 129 int *tcpsfd; /* TCP sock array */ 130 uint32_t total_sfds; /* how many socks in the tcpsfd array */ 131 uint32_t alive_sfds; /* alive socks */ 132 uint32_t cur_idx; /* current sock index in tcpsfd array */ 133 134 ms_cmdstat_t precmd; /* previous command state */ 135 ms_cmdstat_t currcmd; /* current command state */ 136 137 char *rbuf; /* buffer to read commands into */ 138 char *rcurr; /* but if we parsed some already, this is where we stopped */ 139 int rsize; /* total allocated size of rbuf */ 140 int rbytes; /* how much data, starting from rcur, do we have unparsed */ 141 142 bool readval; /* read value state, read known data size */ 143 int rvbytes; /* total value size need to read */ 144 145 char *wbuf; /* buffer to write commands out */ 146 char *wcurr; /* for multi-get, where we stopped */ 147 int wsize; /* total allocated size of wbuf */ 148 bool ctnwrite; /* continue to write */ 149 150 /* data for the mwrite state */ 151 struct iovec *iov; 152 int iovsize; /* number of elements allocated in iov[] */ 153 int iovused; /* number of elements used in iov[] */ 154 155 struct msghdr *msglist; 156 int msgsize; /* number of elements allocated in msglist[] */ 157 int msgused; /* number of elements used in msglist[] */ 158 int msgcurr; /* element in msglist[] being transmitted now */ 159 int msgbytes; /* number of bytes in current msg */ 160 161 /* data for UDP clients */ 162 bool udp; /* is this is a UDP "connection" */ 163 uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */ 164 uint8_t *hdrbuf; /* udp packet headers */ 165 int hdrsize; /* number of headers' worth of space is allocated */ 166 struct sockaddr srv_recv_addr; /* Sent the most recent request to which server */ 167 socklen_t srv_recv_addr_size; 168 169 /* udp read buffer */ 170 char *rudpbuf; /* buffer to read commands into for udp */ 171 int rudpsize; /* total allocated size of rudpbuf */ 172 int rudpbytes; /* how much data, starting from rudpbuf */ 173 174 /* order udp packet */ 175 ms_udppkt_t *udppkt; /* the offset of udp packet in rudpbuf */ 176 int packets; /* number of total packets need to read */ 177 int recvpkt; /* number of received packets */ 178 int pktcurr; /* current packet in rudpbuf being ordered */ 179 int ordcurr; /* current ordered packet */ 180 181 ms_task_item_t *item_win; /* task sequence */ 182 int win_size; /* current task window size */ 183 uint64_t set_cursor; /* current set item index in the item window */ 184 ms_task_t curr_task; /* current running task */ 185 ms_mlget_task_t mlget_task; /* multi-get task */ 186 187 int warmup_num; /* to run how many warm up operations*/ 188 int remain_warmup_num; /* left how many warm up operations to run */ 189 int64_t exec_num; /* to run how many task operations */ 190 int64_t remain_exec_num; /* how many remained task operations to run */ 191 192 /* response time statistic and time out control */ 193 struct timeval start_time; /* start time of current operation(s) */ 194 struct timeval end_time; /* end time of current operation(s) */ 195 196 /* Binary protocol stuff */ 197 protocol_binary_response_header binary_header; /* local temporary binary header */ 198 enum protocol protocol; /* which protocol this connection speaks */ 199 } ms_conn_t; 200 201 /* used to generate the key prefix */ 202 uint64_t ms_get_key_prefix(void); 203 204 205 /** 206 * setup a connection, each connection structure of each 207 * thread must call this function to initialize. 208 */ 209 int ms_setup_conn(ms_conn_t *c); 210 211 212 /* after one operation completes, reset the connection */ 213 void ms_reset_conn(ms_conn_t *c, bool timeout); 214 215 216 /** 217 * reconnect several disconnected socks in the connection 218 * structure, the ever-1-second timer of the thread will check 219 * whether some socks in the connections disconnect. if 220 * disconnect, reconnect the sock. 221 */ 222 int ms_reconn_socks(ms_conn_t *c); 223 224 225 /* used to send set command to server */ 226 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item); 227 228 229 /* used to send the get command to server */ 230 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item); 231 232 233 /* used to send the multi-get command to server */ 234 int ms_mcd_mlget(ms_conn_t *c); 235 236 237 #ifdef __cplusplus 238 } 239 #endif 240 241 #endif /* end of MS_CONN_H */ 242