1 /*
2 +----------------------------------------------------------------------+
3 | Yar - Light, concurrent RPC framework |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2012-2013 The PHP Group |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
15 | Author: Xinchen Hui <laruence@php.net> |
16 +----------------------------------------------------------------------+
17 */
18
19 /* $Id$ */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include "php.h"
26 #include "php_network.h"
27
28 #ifndef PHP_WIN32
29 #define php_select(m, r, w, e, t) select(m, r, w, e, t)
30 #else
31 #include "win32/select.h"
32 #include "win32/time.h"
33 #endif
34
35 #include "php_yar.h"
36 #include "yar_protocol.h"
37 #include "yar_request.h"
38 #include "yar_response.h"
39 #include "yar_transport.h"
40 #include "yar_packager.h"
41 #include "yar_exception.h"
42 #include "ext/standard/php_var.h" /* for serialize */
43
44 #ifdef ENABLE_EPOLL
45 #include <sys/epoll.h>
46 #define YAR_EPOLL_MAX_SIZE 128
47 #endif
48
49 #define MAX_BODY_LEN 1024 * 1024 * 10 /* 10 M */
50
51 typedef struct _yar_socket_data_t {
52 char persistent;
53 php_stream *stream;
54 } yar_socket_data_t;
55
php_yar_socket_open(yar_transport_interface_t * self,zend_string * address,long options,char ** err_msg)56 int php_yar_socket_open(yar_transport_interface_t *self, zend_string *address, long options, char **err_msg) /* {{{ */ {
57 yar_socket_data_t *data = (yar_socket_data_t *)self->data;
58 struct timeval tv;
59 php_stream *stream = NULL;
60 zend_string *errstr = NULL;
61 char *persistent_key = NULL;
62 int err;
63
64 tv.tv_sec = (zend_ulong)(YAR_G(connect_timeout) / 1000);
65 tv.tv_usec = (zend_ulong)(YAR_G(connect_timeout) % 1000);
66
67 if (options & YAR_PROTOCOL_PERSISTENT) {
68 data->persistent = 1;
69 spprintf(&persistent_key, 0, "yar_%s", ZSTR_VAL(address));
70 } else {
71 data->persistent = 0;
72 }
73
74 stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);
75
76 if (persistent_key) {
77 efree(persistent_key);
78 }
79
80 if (stream == NULL) {
81 spprintf(err_msg, 0, "Unable to connect to %s (%s)", ZSTR_VAL(address), strerror(errno));
82 efree(errstr);
83 return 0;
84 }
85
86 php_stream_set_option(stream, PHP_STREAM_OPTION_BLOCKING, 0, NULL);
87
88 #if ZEND_DEBUG
89 stream->__exposed++;
90 #endif
91
92 data->stream = stream;
93
94 return 1;
95 } /* }}} */
96
php_yar_socket_close(yar_transport_interface_t * self)97 void php_yar_socket_close(yar_transport_interface_t* self) /* {{{ */ {
98 yar_socket_data_t *data = (yar_socket_data_t *)self->data;
99
100 if (!data) {
101 return;
102 }
103
104 if (!data->persistent && data->stream) {
105 php_stream_close(data->stream);
106 }
107
108 efree(data);
109 efree(self);
110
111 return;
112 }
113 /* }}} */
114
php_yar_socket_exec(yar_transport_interface_t * self,yar_request_t * request)115 yar_response_t * php_yar_socket_exec(yar_transport_interface_t* self, yar_request_t *request) /* {{{ */ {
116 fd_set rfds;
117 struct timeval tv;
118 yar_header_t *header;
119 yar_response_t *response;
120 int fd, retval, recvd;
121 size_t len = 0, total_recvd = 0;
122 char *msg, buf[RECV_BUF_SIZE], *payload = NULL;
123 yar_socket_data_t *data = (yar_socket_data_t *)self->data;
124
125 response = ecalloc(1, sizeof(yar_response_t));
126
127 FD_ZERO(&rfds);
128 if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
129 PHP_SAFE_FD_SET(fd, &rfds);
130 } else {
131 len = snprintf(buf, sizeof(buf), "Unable cast socket fd form stream (%s)", strerror(errno));
132 php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
133 return response;
134 }
135
136 tv.tv_sec = (zend_ulong)(YAR_G(timeout) / 1000);
137 tv.tv_usec = (zend_ulong)((YAR_G(timeout) % 1000) * 1000);
138
139 wait_io:
140 retval = php_select(fd+1, &rfds, NULL, NULL, &tv);
141
142 if (retval == -1) {
143 len = snprintf(buf, sizeof(buf), "Unable to select %d '%s'", fd, strerror(errno));
144 php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
145 return response;
146 } else if (retval == 0) {
147 len = snprintf(buf, sizeof(buf), "select timeout %ldms reached", YAR_G(timeout));
148 php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
149 return response;
150 }
151
152 if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
153 zval *retval, ret;
154 if (!payload) {
155 if ((recvd = (php_stream_xport_recvfrom(data->stream, buf, sizeof(buf), 0, NULL, NULL, NULL))) > 0) {
156 if (recvd < sizeof(yar_header_t)) {
157 php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header, insufficient bytes recieved");
158 return response;
159 }
160 if (!(header = php_yar_protocol_parse(buf))) {
161 php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
162 return response;
163 }
164
165 if (header->body_len > MAX_BODY_LEN) {
166 php_yar_error(response, YAR_ERR_PROTOCOL, "response body too large %u", header->body_len);
167 return response;
168 }
169
170 payload = emalloc(header->body_len);
171 len = header->body_len;
172 total_recvd = recvd - sizeof(yar_header_t);
173
174 memcpy(payload, buf + sizeof(yar_header_t), total_recvd);
175
176 if (recvd < (sizeof(yar_header_t) + len)) {
177 goto wait_io;
178 }
179 } else if (recvd == 0) {
180 php_yar_response_set_error(response, YAR_ERR_TRANSPORT, ZEND_STRL("server closed connection prematurely"));
181 return response;
182 } else {
183 /* this should never happen */
184 goto wait_io;
185 }
186 } else {
187 if ((recvd = php_stream_xport_recvfrom(data->stream, payload + total_recvd, len - total_recvd, 0, NULL, NULL, NULL)) > 0) {
188 total_recvd += recvd;
189 } else if (recvd == 0) {
190 php_yar_response_set_error(response, YAR_ERR_TRANSPORT, ZEND_STRL("server closed connection prematurely"));
191 efree(payload);
192 return response;
193 }
194 if (total_recvd < len) {
195 goto wait_io;
196 }
197 }
198
199 if (len) {
200 if (!(retval = php_yar_packager_unpack(payload, len, &msg, &ret))) {
201 php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
202 efree(payload);
203 efree(msg);
204 return response;
205 }
206
207 php_yar_response_map_retval(response, retval);
208
209 DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'",
210 response->id, 7, payload, header->body_len, payload + 8);
211
212 efree(payload);
213 zval_ptr_dtor(retval);
214 } else {
215 php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
216 }
217 return response;
218 } else {
219 PHP_SAFE_FD_SET(fd, &rfds);
220 goto wait_io;
221 }
222 } /* }}} */
223
php_yar_socket_send(yar_transport_interface_t * self,yar_request_t * request,char ** msg)224 int php_yar_socket_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) /* {{{ */ {
225 fd_set rfds;
226 zend_string *payload;
227 struct timeval tv;
228 int ret = -1, fd, retval;
229 char buf[SEND_BUF_SIZE];
230 yar_header_t header = {0};
231 yar_socket_data_t *data = (yar_socket_data_t *)self->data;
232
233 FD_ZERO(&rfds);
234 if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
235 PHP_SAFE_FD_SET(fd, &rfds);
236 } else {
237 spprintf(msg, 0, "Unable cast socket fd form stream (%s)", strerror(errno));
238 return 0;
239 }
240
241 if (!(payload = php_yar_request_pack(request, msg))) {
242 return 0;
243 }
244
245 DEBUG_C(ZEND_ULONG_FMT": pack request by '%.*s', result len '%ld', content: '%.32s'",
246 request->id, 7, ZSTR_VAL(payload), ZSTR_LEN(payload), ZSTR_VAL(payload) + 8);
247
248 /* for tcp/unix RPC, we need another way to supports auth */
249 php_yar_protocol_render(&header, request->id, "Yar TCP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);
250
251 memcpy(buf, (char *)&header, sizeof(yar_header_t));
252
253 tv.tv_sec = (zend_ulong)(YAR_G(timeout) / 1000);
254 tv.tv_usec = (zend_ulong)((YAR_G(timeout) % 1000) * 1000);
255
256 retval = php_select(fd+1, NULL, &rfds, NULL, &tv);
257
258 if (retval == -1) {
259 zend_string_release(payload);
260 spprintf(msg, 0, "select error '%s'", strerror(errno));
261 return 0;
262 } else if (retval == 0) {
263 zend_string_release(payload);
264 spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
265 return 0;
266 }
267
268 if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
269 size_t bytes_left = 0, bytes_sent = 0;
270
271 if (ZSTR_LEN(payload) > (sizeof(buf) - sizeof(yar_header_t))) {
272 memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), sizeof(buf) - sizeof(yar_header_t));
273 if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(buf), 0, NULL, 0)) < 0) {
274 zend_string_release(payload);
275 spprintf(msg, 0, "unable to send data");
276 return 0;
277 }
278 } else {
279 memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), ZSTR_LEN(payload));
280 if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(yar_header_t) + ZSTR_LEN(payload), 0, NULL, 0)) < 0) {
281 zend_string_release(payload);
282 spprintf(msg, 0, "unable to send data");
283 return 0;
284 }
285 }
286
287 bytes_sent = ret - sizeof(yar_header_t);
288 bytes_left = ZSTR_LEN(payload) - bytes_sent;
289
290 wait_io:
291 if (bytes_left) {
292 retval = php_select(fd+1, NULL, &rfds, NULL, &tv);
293
294 if (retval == -1) {
295 zend_string_release(payload);
296 spprintf(msg, 0, "select error '%s'", strerror(errno));
297 return 0;
298 } else if (retval == 0) {
299 zend_string_release(payload);
300 spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
301 return 0;
302 }
303
304 if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
305 if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
306 bytes_left -= ret;
307 bytes_sent += ret;
308 }
309 }
310 goto wait_io;
311 }
312 } else {
313 PHP_SAFE_FD_SET(fd, &rfds);
314 goto wait_io;
315 }
316
317 if (!data->persistent) {
318 php_stream_xport_shutdown(data->stream, SHUT_WR);
319 }
320
321 zend_string_release(payload);
322
323 return ret < 0? 0 : 1;
324 } /* }}} */
325
php_yar_socket_setopt(yar_transport_interface_t * self,long type,void * value,void * addtional)326 int php_yar_socket_setopt(yar_transport_interface_t* self, long type, void *value, void *addtional) /* {{{ */ {
327 return 1;
328 } /* }}} */
329
php_yar_socket_init()330 yar_transport_interface_t * php_yar_socket_init() /* {{{ */ {
331 yar_socket_data_t *data;
332 yar_transport_interface_t *self;
333
334 self = emalloc(sizeof(yar_transport_interface_t));
335 self->data = data = ecalloc(1, sizeof(yar_socket_data_t));
336
337 self->open = php_yar_socket_open;
338 self->send = php_yar_socket_send;
339 self->exec = php_yar_socket_exec;
340 self->setopt = php_yar_socket_setopt;
341 self->calldata = NULL;
342 self->close = php_yar_socket_close;
343
344 return self;
345 } /* }}} */
346
php_yar_socket_destroy(yar_transport_interface_t * self)347 void php_yar_socket_destroy(yar_transport_interface_t *self) /* {{{ */ {
348 } /* }}} */
349
350 /* {{{ yar_transport_t yar_transport_socket
351 */
352 const yar_transport_t yar_transport_socket = {
353 "sock",
354 php_yar_socket_init,
355 php_yar_socket_destroy,
356 NULL
357 }; /* }}} */
358
359 /*
360 * Local variables:
361 * tab-width: 4
362 * c-basic-offset: 4
363 * End:
364 * vim600: noet sw=4 ts=4 fdm=marker
365 * vim<600: noet sw=4 ts=4
366 */
367