1 /*
2   +----------------------------------------------------------------------+
3   | Yar - Light, concurrent RPC framework                                |
4   +----------------------------------------------------------------------+
5   | Copyright (c) 2012-2013 The PHP Group                                |
6   +----------------------------------------------------------------------+
7   | This source file is subject to version 3.01 of the PHP license,      |
8   | that is bundled with this package in the file LICENSE, and is        |
9   | available through the world-wide-web at the following url:           |
10   | http://www.php.net/license/3_01.txt                                  |
11   | If you did not receive a copy of the PHP license and are unable to   |
12   | obtain it through the world-wide-web, please send a note to          |
13   | license@php.net so we can mail you a copy immediately.               |
14   +----------------------------------------------------------------------+
15   | Author:  Xinchen Hui   <laruence@php.net>                            |
16   +----------------------------------------------------------------------+
17 */
18 
19 /* $Id$ */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include "php.h"
26 #include "php_network.h"
27 
28 #ifndef PHP_WIN32
29 #define php_select(m, r, w, e, t)   select(m, r, w, e, t)
30 #else
31 #include "win32/select.h"
32 #include "win32/time.h"
33 #endif
34 
35 #include "php_yar.h"
36 #include "yar_protocol.h"
37 #include "yar_request.h"
38 #include "yar_response.h"
39 #include "yar_transport.h"
40 #include "yar_packager.h"
41 #include "yar_exception.h"
42 #include "ext/standard/php_var.h" /* for serialize */
43 
44 #ifdef ENABLE_EPOLL
45 #include <sys/epoll.h>
46 #define YAR_EPOLL_MAX_SIZE 128
47 #endif
48 
49 #define MAX_BODY_LEN 1024 * 1024 * 10 /* 10 M */
50 
51 typedef struct _yar_socket_data_t {
52 	char persistent;
53 	php_stream *stream;
54 } yar_socket_data_t;
55 
php_yar_socket_open(yar_transport_interface_t * self,zend_string * address,long options,char ** err_msg)56 int php_yar_socket_open(yar_transport_interface_t *self, zend_string *address, long options, char **err_msg) /* {{{ */ {
57 	yar_socket_data_t *data = (yar_socket_data_t *)self->data;
58 	struct timeval tv;
59 	php_stream *stream = NULL;
60 	zend_string *errstr = NULL;
61 	char *persistent_key = NULL;
62 	int err;
63 
64 	tv.tv_sec = (zend_ulong)(YAR_G(connect_timeout) / 1000);
65 	tv.tv_usec = (zend_ulong)(YAR_G(connect_timeout) % 1000);
66 
67 	if (options & YAR_PROTOCOL_PERSISTENT) {
68 		data->persistent = 1;
69 		spprintf(&persistent_key, 0, "yar_%s", ZSTR_VAL(address));
70 	} else {
71 		data->persistent = 0;
72 	}
73 
74 	stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);
75 
76 	if (persistent_key) {
77 		efree(persistent_key);
78 	}
79 
80 	if (stream == NULL) {
81 		spprintf(err_msg, 0, "Unable to connect to %s (%s)", ZSTR_VAL(address), strerror(errno));
82 		efree(errstr);
83 		return 0;
84 	}
85 
86 	php_stream_set_option(stream, PHP_STREAM_OPTION_BLOCKING, 0, NULL);
87 
88 #if ZEND_DEBUG
89 	stream->__exposed++;
90 #endif
91 
92 	data->stream = stream;
93 
94 	return 1;
95 } /* }}} */
96 
php_yar_socket_close(yar_transport_interface_t * self)97 void php_yar_socket_close(yar_transport_interface_t* self) /* {{{ */ {
98 	yar_socket_data_t *data = (yar_socket_data_t *)self->data;
99 
100 	if (!data) {
101 		return;
102 	}
103 
104 	if (!data->persistent && data->stream) {
105 		php_stream_close(data->stream);
106 	}
107 
108 	efree(data);
109 	efree(self);
110 
111 	return;
112 }
113 /* }}} */
114 
php_yar_socket_exec(yar_transport_interface_t * self,yar_request_t * request)115 yar_response_t * php_yar_socket_exec(yar_transport_interface_t* self, yar_request_t *request) /* {{{ */ {
116 	fd_set rfds;
117 	struct timeval tv;
118 	yar_header_t *header;
119 	yar_response_t *response;
120 	int fd, retval, recvd;
121    	size_t len = 0, total_recvd = 0;
122 	char *msg, buf[RECV_BUF_SIZE], *payload = NULL;
123 	yar_socket_data_t *data = (yar_socket_data_t *)self->data;
124 
125 	response = ecalloc(1, sizeof(yar_response_t));
126 
127 	FD_ZERO(&rfds);
128 	if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
129 		PHP_SAFE_FD_SET(fd, &rfds);
130 	} else {
131 		len = snprintf(buf, sizeof(buf), "Unable cast socket fd form stream (%s)", strerror(errno));
132 		php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
133 		return response;
134 	}
135 
136 	tv.tv_sec = (zend_ulong)(YAR_G(timeout) / 1000);
137 	tv.tv_usec = (zend_ulong)((YAR_G(timeout) % 1000) * 1000);
138 
139 wait_io:
140 	retval = php_select(fd+1, &rfds, NULL, NULL, &tv);
141 
142 	if (retval == -1) {
143 		len = snprintf(buf, sizeof(buf), "Unable to select %d '%s'", fd, strerror(errno));
144 		php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
145 		return response;
146 	} else if (retval == 0) {
147 		len = snprintf(buf, sizeof(buf), "select timeout %ldms reached", YAR_G(timeout));
148 		php_yar_response_set_error(response, YAR_ERR_TRANSPORT, buf, len);
149 		return response;
150 	}
151 
152 	if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
153 		zval *retval, ret;
154 		if (!payload) {
155 			if ((recvd = (php_stream_xport_recvfrom(data->stream, buf, sizeof(buf), 0, NULL, NULL, NULL))) > 0) {
156 				if (recvd < sizeof(yar_header_t)) {
157 					php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header, insufficient bytes recieved");
158 					return response;
159 				}
160 				if (!(header = php_yar_protocol_parse(buf))) {
161 					php_yar_error(response, YAR_ERR_PROTOCOL, "malformed response header '%.32s'", payload);
162 					return response;
163 				}
164 
165 				if (header->body_len > MAX_BODY_LEN) {
166 					php_yar_error(response, YAR_ERR_PROTOCOL, "response body too large %u", header->body_len);
167 					return response;
168 				}
169 
170 				payload = emalloc(header->body_len);
171 				len = header->body_len;
172 				total_recvd  = recvd - sizeof(yar_header_t);
173 
174 				memcpy(payload, buf + sizeof(yar_header_t), total_recvd);
175 
176 				if (recvd < (sizeof(yar_header_t) + len)) {
177 					goto wait_io;
178 				}
179 			} else if (recvd == 0) {
180 				php_yar_response_set_error(response, YAR_ERR_TRANSPORT, ZEND_STRL("server closed connection prematurely"));
181 				return response;
182 			} else {
183 				/* this should never happen */
184 				goto wait_io;
185 			}
186 		} else {
187 			if ((recvd = php_stream_xport_recvfrom(data->stream, payload + total_recvd, len - total_recvd, 0, NULL, NULL, NULL)) > 0) {
188 				total_recvd += recvd;
189 			} else if (recvd == 0) {
190 				php_yar_response_set_error(response, YAR_ERR_TRANSPORT, ZEND_STRL("server closed connection prematurely"));
191 				efree(payload);
192 				return response;
193 			}
194 			if (total_recvd < len) {
195 				goto wait_io;
196 			}
197 		}
198 
199 		if (len) {
200 			if (!(retval = php_yar_packager_unpack(payload, len, &msg, &ret))) {
201 				php_yar_response_set_error(response, YAR_ERR_PACKAGER, msg, strlen(msg));
202 				efree(payload);
203 				efree(msg);
204 				return response;
205 			}
206 
207 			php_yar_response_map_retval(response, retval);
208 
209 			DEBUG_C(ZEND_ULONG_FMT": server response content packaged by '%.*s', len '%ld', content '%.32s'",
210 					response->id, 7, payload, header->body_len, payload + 8);
211 
212 			efree(payload);
213 			zval_ptr_dtor(retval);
214 		} else {
215 			php_yar_response_set_error(response, YAR_ERR_EMPTY_RESPONSE, ZEND_STRL("empty response"));
216 		}
217 		return response;
218 	} else {
219 		PHP_SAFE_FD_SET(fd, &rfds);
220 		goto wait_io;
221 	}
222 } /* }}} */
223 
php_yar_socket_send(yar_transport_interface_t * self,yar_request_t * request,char ** msg)224 int php_yar_socket_send(yar_transport_interface_t* self, yar_request_t *request, char **msg) /* {{{ */ {
225 	fd_set rfds;
226 	zend_string *payload;
227 	struct timeval tv;
228 	int ret = -1, fd, retval;
229 	char buf[SEND_BUF_SIZE];
230 	yar_header_t header = {0};
231 	yar_socket_data_t *data = (yar_socket_data_t *)self->data;
232 
233 	FD_ZERO(&rfds);
234 	if (SUCCESS == php_stream_cast(data->stream, PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL, (void*)&fd, 1) && fd >= 0) {
235 		PHP_SAFE_FD_SET(fd, &rfds);
236 	} else {
237 		spprintf(msg, 0, "Unable cast socket fd form stream (%s)", strerror(errno));
238 		return 0;
239 	}
240 
241 	if (!(payload = php_yar_request_pack(request, msg))) {
242 		return 0;
243 	}
244 
245 	DEBUG_C(ZEND_ULONG_FMT": pack request by '%.*s', result len '%ld', content: '%.32s'",
246 			request->id, 7, ZSTR_VAL(payload), ZSTR_LEN(payload), ZSTR_VAL(payload) + 8);
247 
248 	/* for tcp/unix RPC, we need another way to supports auth */
249 	php_yar_protocol_render(&header, request->id, "Yar TCP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);
250 
251 	memcpy(buf, (char *)&header, sizeof(yar_header_t));
252 
253 	tv.tv_sec = (zend_ulong)(YAR_G(timeout) / 1000);
254 	tv.tv_usec = (zend_ulong)((YAR_G(timeout) % 1000) * 1000);
255 
256 	retval = php_select(fd+1, NULL, &rfds, NULL, &tv);
257 
258 	if (retval == -1) {
259 		zend_string_release(payload);
260 		spprintf(msg, 0, "select error '%s'", strerror(errno));
261 		return 0;
262 	} else if (retval == 0) {
263 		zend_string_release(payload);
264 		spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
265 		return 0;
266 	}
267 
268 	if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
269 		size_t bytes_left = 0, bytes_sent = 0;
270 
271 		if (ZSTR_LEN(payload) > (sizeof(buf) - sizeof(yar_header_t))) {
272 			memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), sizeof(buf) - sizeof(yar_header_t));
273 			if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(buf), 0, NULL, 0)) < 0) {
274 				zend_string_release(payload);
275 				spprintf(msg, 0, "unable to send data");
276 				return 0;
277 			}
278 		} else {
279 			memcpy(buf + sizeof(yar_header_t), ZSTR_VAL(payload), ZSTR_LEN(payload));
280 			if ((ret = php_stream_xport_sendto(data->stream, buf, sizeof(yar_header_t) + ZSTR_LEN(payload), 0, NULL, 0)) < 0) {
281 				zend_string_release(payload);
282 				spprintf(msg, 0, "unable to send data");
283 				return 0;
284 			}
285 		}
286 
287 		bytes_sent = ret - sizeof(yar_header_t);
288 		bytes_left = ZSTR_LEN(payload) - bytes_sent;
289 
290 wait_io:
291 		if (bytes_left) {
292 			retval = php_select(fd+1, NULL, &rfds, NULL, &tv);
293 
294 			if (retval == -1) {
295 				zend_string_release(payload);
296 				spprintf(msg, 0, "select error '%s'", strerror(errno));
297 				return 0;
298 			} else if (retval == 0) {
299 				zend_string_release(payload);
300 				spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
301 				return 0;
302 			}
303 
304 			if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
305 				if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
306 					bytes_left -= ret;
307 					bytes_sent += ret;
308 				}
309 			}
310 			goto wait_io;
311 		}
312 	} else {
313 		PHP_SAFE_FD_SET(fd, &rfds);
314 		goto wait_io;
315 	}
316 
317 	if (!data->persistent) {
318 		php_stream_xport_shutdown(data->stream, SHUT_WR);
319 	}
320 
321 	zend_string_release(payload);
322 
323 	return ret < 0? 0 : 1;
324 } /* }}} */
325 
php_yar_socket_setopt(yar_transport_interface_t * self,long type,void * value,void * addtional)326 int php_yar_socket_setopt(yar_transport_interface_t* self, long type, void *value, void *addtional) /* {{{ */ {
327 	return 1;
328 } /* }}} */
329 
php_yar_socket_init()330 yar_transport_interface_t * php_yar_socket_init() /* {{{ */ {
331 	yar_socket_data_t *data;
332 	yar_transport_interface_t *self;
333 
334 	self = emalloc(sizeof(yar_transport_interface_t));
335 	self->data = data = ecalloc(1, sizeof(yar_socket_data_t));
336 
337 	self->open   	= php_yar_socket_open;
338 	self->send   	= php_yar_socket_send;
339 	self->exec   	= php_yar_socket_exec;
340 	self->setopt	= php_yar_socket_setopt;
341 	self->calldata 	= NULL;
342 	self->close  	= php_yar_socket_close;
343 
344 	return  self;
345 } /* }}} */
346 
php_yar_socket_destroy(yar_transport_interface_t * self)347 void php_yar_socket_destroy(yar_transport_interface_t *self) /* {{{ */ {
348 } /* }}} */
349 
350 /* {{{ yar_transport_t yar_transport_socket
351  */
352 const yar_transport_t yar_transport_socket = {
353 	"sock",
354 	php_yar_socket_init,
355 	php_yar_socket_destroy,
356 	NULL
357 }; /* }}} */
358 
359 /*
360  * Local variables:
361  * tab-width: 4
362  * c-basic-offset: 4
363  * End:
364  * vim600: noet sw=4 ts=4 fdm=marker
365  * vim<600: noet sw=4 ts=4
366  */
367