1 /* -*- c-basic-offset: 2 -*- */
2 /*
3   Copyright(C) 2009-2016 Brazil
4 
5   This library is free software; you can redistribute it and/or
6   modify it under the terms of the GNU Lesser General Public
7   License version 2.1 as published by the Free Software Foundation.
8 
9   This library is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   Lesser General Public License for more details.
13 
14   You should have received a copy of the GNU Lesser General Public
15   License along with this library; if not, write to the Free Software
16   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
17 */
18 
19 #pragma once
20 
21 #include "grn.h"
22 #include "grn_str.h"
23 #include "grn_hash.h"
24 
25 #ifdef HAVE_NETDB_H
26 #include <netdb.h>
27 #endif /* HAVE_NETDB_H */
28 
29 #ifdef __cplusplus
30 extern "C" {
31 #endif
32 
33 /******* grn_com_queue ********/
34 
35 typedef struct _grn_com_queue grn_com_queue;
36 typedef struct _grn_com_queue_entry grn_com_queue_entry;
37 
38 #define GRN_COM_QUEUE_BINSIZE (0x100)
39 
40 struct _grn_com_queue_entry {
41   grn_obj obj;
42   struct _grn_com_queue_entry *next;
43 };
44 
45 struct _grn_com_queue {
46   grn_com_queue_entry *bins[GRN_COM_QUEUE_BINSIZE];
47   grn_com_queue_entry *next;
48   grn_com_queue_entry **tail;
49   uint8_t first;
50   uint8_t last;
51   grn_critical_section cs;
52 };
53 
54 #define GRN_COM_QUEUE_INIT(q) do {\
55   (q)->next = NULL;\
56   (q)->tail = &(q)->next;\
57   (q)->first = 0;\
58   (q)->last = 0;\
59   CRITICAL_SECTION_INIT((q)->cs);\
60 } while (0)
61 
62 #define GRN_COM_QUEUE_EMPTYP(q) (((q)->first == (q)->last) && !(q)->next)
63 
64 GRN_API grn_rc grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e);
65 GRN_API grn_com_queue_entry *grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q);
66 
67 /******* grn_com ********/
68 
69 #ifdef USE_SELECT
70 # ifdef HAVE_SYS_SELECT_H
71 #  include <sys/select.h>
72 # endif /* HAVE_SYS_SELECT_H */
73 # define GRN_COM_POLLIN  1
74 # define GRN_COM_POLLOUT 2
75 #else /* USE_SELECT */
76 # ifdef USE_EPOLL
77 #  include <sys/epoll.h>
78 #  define GRN_COM_POLLIN  EPOLLIN
79 #  define GRN_COM_POLLOUT EPOLLOUT
80 # else /* USE_EPOLL */
81 #  ifdef USE_KQUEUE
82 #   include <sys/event.h>
83 #   define GRN_COM_POLLIN  EVFILT_READ
84 #   define GRN_COM_POLLOUT EVFILT_WRITE
85 #  else /* USE_KQUEUE */
86 #   include <poll.h>
87 #   define GRN_COM_POLLIN  POLLIN
88 #   define GRN_COM_POLLOUT POLLOUT
89 #  endif /* USE_KQUEUE */
90 # endif /* USE_EPOLL */
91 #endif /* USE_SELECT */
92 
93 typedef struct _grn_com grn_com;
94 typedef struct _grn_com_event grn_com_event;
95 typedef struct _grn_com_addr grn_com_addr;
96 typedef void grn_com_callback(grn_ctx *ctx, grn_com_event *, grn_com *);
97 typedef void grn_msg_handler(grn_ctx *ctx, grn_obj *msg);
98 
99 enum {
100   grn_com_ok = 0,
101   grn_com_emem,
102   grn_com_erecv_head,
103   grn_com_erecv_body,
104   grn_com_eproto,
105 };
106 
107 struct _grn_com_addr {
108   uint32_t addr;
109   uint16_t port;
110   uint16_t sid;
111 };
112 
113 struct _grn_com {
114   grn_sock fd;
115   int events;
116   uint16_t sid;
117   uint8_t has_sid;
118   uint8_t closed;
119   grn_com_queue new_;
120   grn_com_event *ev;
121   void *opaque;
122   grn_bool accepting;
123 };
124 
125 struct _grn_com_event {
126   struct _grn_hash *hash;
127   int max_nevents;
128   grn_ctx *ctx;
129   grn_mutex mutex;
130   grn_cond cond;
131   grn_com_queue recv_old;
132   grn_msg_handler *msg_handler;
133   grn_com_addr curr_edge_id;
134   grn_com *acceptor;
135   void *opaque;
136 #ifndef USE_SELECT
137 #ifdef USE_EPOLL
138   int epfd;
139   struct epoll_event *events;
140 #else /* USE_EPOLL */
141 #ifdef USE_KQUEUE
142   int kqfd;
143   struct kevent *events;
144 #else /* USE_KQUEUE */
145   int dummy; /* dummy */
146   struct pollfd *events;
147 #endif /* USE_KQUEUE */
148 #endif /* USE_EPOLL */
149 #endif /* USE_SELECT */
150 };
151 
152 grn_rc grn_com_init(void);
153 void grn_com_fin(void);
154 GRN_API grn_rc grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size);
155 GRN_API grn_rc grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev);
156 GRN_API grn_rc grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev);
157 grn_rc grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev);
158 grn_rc grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com);
159 grn_rc grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com);
160 GRN_API grn_rc grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd);
161 GRN_API grn_rc grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout);
162 grn_rc grn_com_event_each(grn_ctx *ctx, grn_com_event *ev, grn_com_callback *func);
163 
164 /******* grn_com_gqtp ********/
165 
166 #define GRN_COM_PROTO_HTTP   0x47
167 #define GRN_COM_PROTO_GQTP   0xc7
168 #define GRN_COM_PROTO_MBREQ  0x80
169 #define GRN_COM_PROTO_MBRES  0x81
170 
171 typedef struct _grn_com_header grn_com_header;
172 
173 struct _grn_com_header {
174   uint8_t proto;
175   uint8_t qtype;
176   uint16_t keylen;
177   uint8_t level;
178   uint8_t flags;
179   uint16_t status;
180   uint32_t size;
181   uint32_t opaque;
182   uint64_t cas;
183 };
184 
185 GRN_API grn_com *grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port);
186 GRN_API grn_rc grn_com_sopen(grn_ctx *ctx, grn_com_event *ev,
187                              const char *bind_address, int port,
188                              grn_msg_handler *func, struct hostent *he);
189 
190 GRN_API void grn_com_close_(grn_ctx *ctx, grn_com *com);
191 GRN_API grn_rc grn_com_close(grn_ctx *ctx, grn_com *com);
192 
193 GRN_API grn_rc grn_com_send(grn_ctx *ctx, grn_com *cs,
194                             grn_com_header *header, const char *body, uint32_t size, int flags);
195 grn_rc grn_com_recv(grn_ctx *ctx, grn_com *cs, grn_com_header *header, grn_obj *buf);
196 GRN_API grn_rc grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags);
197 
198 /******* grn_msg ********/
199 
200 typedef struct _grn_msg grn_msg;
201 
202 struct _grn_msg {
203   grn_com_queue_entry qe;
204   union {
205     grn_com *peer;
206     grn_sock fd;
207   } u;
208   grn_ctx *ctx;
209   grn_com_queue *old;
210   grn_com_header header;
211   grn_com_addr edge_id;
212   grn_com *acceptor;
213 };
214 
215 GRN_API grn_rc grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags);
216 GRN_API grn_obj *grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old);
217 GRN_API grn_obj *grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old);
218 GRN_API grn_rc grn_msg_set_property(grn_ctx *ctx, grn_obj *obj,
219                                     uint16_t status, uint32_t key_size, uint8_t extra_size);
220 GRN_API grn_rc grn_msg_close(grn_ctx *ctx, grn_obj *msg);
221 
222 /******* grn_edge ********/
223 
224 #define GRN_EDGE_WORKER       0
225 #define GRN_EDGE_COMMUNICATOR 1
226 
227 typedef struct {
228   grn_com_queue_entry eq;
229   grn_ctx ctx;
230   grn_com_queue recv_new;
231   grn_com_queue send_old;
232   grn_com *com;
233   grn_com_addr *addr;
234   grn_msg *msg;
235   uint8_t stat;
236   uint8_t flags;
237   grn_id id;
238 } grn_edge;
239 
240 GRN_VAR grn_hash *grn_edges;
241 GRN_API void grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge));
242 GRN_API void grn_edges_fin(grn_ctx *ctx);
243 GRN_API grn_edge *grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added);
244 grn_edge *grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr);
245 GRN_API void grn_edges_delete(grn_ctx *ctx, grn_edge *edge);
246 void grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg);
247 
248 #ifdef __cplusplus
249 }
250 #endif
251