1 /* -*- c-basic-offset: 2 -*- */ 2 /* 3 Copyright(C) 2009-2016 Brazil 4 5 This library is free software; you can redistribute it and/or 6 modify it under the terms of the GNU Lesser General Public 7 License version 2.1 as published by the Free Software Foundation. 8 9 This library is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 Lesser General Public License for more details. 13 14 You should have received a copy of the GNU Lesser General Public 15 License along with this library; if not, write to the Free Software 16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA 17 */ 18 19 #pragma once 20 21 #include "grn.h" 22 #include "grn_str.h" 23 #include "grn_hash.h" 24 25 #ifdef HAVE_NETDB_H 26 #include <netdb.h> 27 #endif /* HAVE_NETDB_H */ 28 29 #ifdef __cplusplus 30 extern "C" { 31 #endif 32 33 /******* grn_com_queue ********/ 34 35 typedef struct _grn_com_queue grn_com_queue; 36 typedef struct _grn_com_queue_entry grn_com_queue_entry; 37 38 #define GRN_COM_QUEUE_BINSIZE (0x100) 39 40 struct _grn_com_queue_entry { 41 grn_obj obj; 42 struct _grn_com_queue_entry *next; 43 }; 44 45 struct _grn_com_queue { 46 grn_com_queue_entry *bins[GRN_COM_QUEUE_BINSIZE]; 47 grn_com_queue_entry *next; 48 grn_com_queue_entry **tail; 49 uint8_t first; 50 uint8_t last; 51 grn_critical_section cs; 52 }; 53 54 #define GRN_COM_QUEUE_INIT(q) do {\ 55 (q)->next = NULL;\ 56 (q)->tail = &(q)->next;\ 57 (q)->first = 0;\ 58 (q)->last = 0;\ 59 CRITICAL_SECTION_INIT((q)->cs);\ 60 } while (0) 61 62 #define GRN_COM_QUEUE_EMPTYP(q) (((q)->first == (q)->last) && !(q)->next) 63 64 GRN_API grn_rc grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e); 65 GRN_API grn_com_queue_entry *grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q); 66 67 /******* grn_com ********/ 68 69 #ifdef USE_SELECT 70 # ifdef HAVE_SYS_SELECT_H 71 # include <sys/select.h> 72 # endif /* HAVE_SYS_SELECT_H */ 73 # define GRN_COM_POLLIN 1 74 # define GRN_COM_POLLOUT 2 75 #else /* USE_SELECT */ 76 # ifdef USE_EPOLL 77 # include <sys/epoll.h> 78 # define GRN_COM_POLLIN EPOLLIN 79 # define GRN_COM_POLLOUT EPOLLOUT 80 # else /* USE_EPOLL */ 81 # ifdef USE_KQUEUE 82 # include <sys/event.h> 83 # define GRN_COM_POLLIN EVFILT_READ 84 # define GRN_COM_POLLOUT EVFILT_WRITE 85 # else /* USE_KQUEUE */ 86 # include <poll.h> 87 # define GRN_COM_POLLIN POLLIN 88 # define GRN_COM_POLLOUT POLLOUT 89 # endif /* USE_KQUEUE */ 90 # endif /* USE_EPOLL */ 91 #endif /* USE_SELECT */ 92 93 typedef struct _grn_com grn_com; 94 typedef struct _grn_com_event grn_com_event; 95 typedef struct _grn_com_addr grn_com_addr; 96 typedef void grn_com_callback(grn_ctx *ctx, grn_com_event *, grn_com *); 97 typedef void grn_msg_handler(grn_ctx *ctx, grn_obj *msg); 98 99 enum { 100 grn_com_ok = 0, 101 grn_com_emem, 102 grn_com_erecv_head, 103 grn_com_erecv_body, 104 grn_com_eproto, 105 }; 106 107 struct _grn_com_addr { 108 uint32_t addr; 109 uint16_t port; 110 uint16_t sid; 111 }; 112 113 struct _grn_com { 114 grn_sock fd; 115 int events; 116 uint16_t sid; 117 uint8_t has_sid; 118 uint8_t closed; 119 grn_com_queue new_; 120 grn_com_event *ev; 121 void *opaque; 122 grn_bool accepting; 123 }; 124 125 struct _grn_com_event { 126 struct _grn_hash *hash; 127 int max_nevents; 128 grn_ctx *ctx; 129 grn_mutex mutex; 130 grn_cond cond; 131 grn_com_queue recv_old; 132 grn_msg_handler *msg_handler; 133 grn_com_addr curr_edge_id; 134 grn_com *acceptor; 135 void *opaque; 136 #ifndef USE_SELECT 137 #ifdef USE_EPOLL 138 int epfd; 139 struct epoll_event *events; 140 #else /* USE_EPOLL */ 141 #ifdef USE_KQUEUE 142 int kqfd; 143 struct kevent *events; 144 #else /* USE_KQUEUE */ 145 int dummy; /* dummy */ 146 struct pollfd *events; 147 #endif /* USE_KQUEUE */ 148 #endif /* USE_EPOLL */ 149 #endif /* USE_SELECT */ 150 }; 151 152 grn_rc grn_com_init(void); 153 void grn_com_fin(void); 154 GRN_API grn_rc grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size); 155 GRN_API grn_rc grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev); 156 GRN_API grn_rc grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev); 157 grn_rc grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev); 158 grn_rc grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com); 159 grn_rc grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com); 160 GRN_API grn_rc grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd); 161 GRN_API grn_rc grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout); 162 grn_rc grn_com_event_each(grn_ctx *ctx, grn_com_event *ev, grn_com_callback *func); 163 164 /******* grn_com_gqtp ********/ 165 166 #define GRN_COM_PROTO_HTTP 0x47 167 #define GRN_COM_PROTO_GQTP 0xc7 168 #define GRN_COM_PROTO_MBREQ 0x80 169 #define GRN_COM_PROTO_MBRES 0x81 170 171 typedef struct _grn_com_header grn_com_header; 172 173 struct _grn_com_header { 174 uint8_t proto; 175 uint8_t qtype; 176 uint16_t keylen; 177 uint8_t level; 178 uint8_t flags; 179 uint16_t status; 180 uint32_t size; 181 uint32_t opaque; 182 uint64_t cas; 183 }; 184 185 GRN_API grn_com *grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port); 186 GRN_API grn_rc grn_com_sopen(grn_ctx *ctx, grn_com_event *ev, 187 const char *bind_address, int port, 188 grn_msg_handler *func, struct hostent *he); 189 190 GRN_API void grn_com_close_(grn_ctx *ctx, grn_com *com); 191 GRN_API grn_rc grn_com_close(grn_ctx *ctx, grn_com *com); 192 193 GRN_API grn_rc grn_com_send(grn_ctx *ctx, grn_com *cs, 194 grn_com_header *header, const char *body, uint32_t size, int flags); 195 grn_rc grn_com_recv(grn_ctx *ctx, grn_com *cs, grn_com_header *header, grn_obj *buf); 196 GRN_API grn_rc grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags); 197 198 /******* grn_msg ********/ 199 200 typedef struct _grn_msg grn_msg; 201 202 struct _grn_msg { 203 grn_com_queue_entry qe; 204 union { 205 grn_com *peer; 206 grn_sock fd; 207 } u; 208 grn_ctx *ctx; 209 grn_com_queue *old; 210 grn_com_header header; 211 grn_com_addr edge_id; 212 grn_com *acceptor; 213 }; 214 215 GRN_API grn_rc grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags); 216 GRN_API grn_obj *grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old); 217 GRN_API grn_obj *grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old); 218 GRN_API grn_rc grn_msg_set_property(grn_ctx *ctx, grn_obj *obj, 219 uint16_t status, uint32_t key_size, uint8_t extra_size); 220 GRN_API grn_rc grn_msg_close(grn_ctx *ctx, grn_obj *msg); 221 222 /******* grn_edge ********/ 223 224 #define GRN_EDGE_WORKER 0 225 #define GRN_EDGE_COMMUNICATOR 1 226 227 typedef struct { 228 grn_com_queue_entry eq; 229 grn_ctx ctx; 230 grn_com_queue recv_new; 231 grn_com_queue send_old; 232 grn_com *com; 233 grn_com_addr *addr; 234 grn_msg *msg; 235 uint8_t stat; 236 uint8_t flags; 237 grn_id id; 238 } grn_edge; 239 240 GRN_VAR grn_hash *grn_edges; 241 GRN_API void grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge)); 242 GRN_API void grn_edges_fin(grn_ctx *ctx); 243 GRN_API grn_edge *grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added); 244 grn_edge *grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr); 245 GRN_API void grn_edges_delete(grn_ctx *ctx, grn_edge *edge); 246 void grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg); 247 248 #ifdef __cplusplus 249 } 250 #endif 251