1 /**
2 * Copyright (C) 2008 Happy Fish / YuQing
3 *
4 * FastDFS may be copied only under the terms of the GNU General
5 * Public License V3, which may be found in the FastDFS source kit.
6 * Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
7 **/
8 
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <errno.h>
13 #include <unistd.h>
14 #include <string.h>
15 #include <sys/stat.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <signal.h>
19 #include <sys/types.h>
20 #include <sys/time.h>
21 #include <sys/socket.h>
22 #include <netinet/in.h>
23 #include <pthread.h>
24 #include "fastcommon/shared_func.h"
25 #include "fastcommon/sched_thread.h"
26 #include "fastcommon/logger.h"
27 #include "fastcommon/sockopt.h"
28 #include "fastcommon/fast_task_queue.h"
29 #include "tracker_types.h"
30 #include "tracker_proto.h"
31 #include "storage_global.h"
32 #include "storage_service.h"
33 #include "fastcommon/ioevent_loop.h"
34 #include "storage_dio.h"
35 #include "storage_nio.h"
36 
37 static void client_sock_read(int sock, short event, void *arg);
38 static void client_sock_write(int sock, short event, void *arg);
39 static int storage_nio_init(struct fast_task_info *pTask);
40 
add_to_deleted_list(struct fast_task_info * pTask)41 void add_to_deleted_list(struct fast_task_info *pTask)
42 {
43 	((StorageClientInfo *)pTask->arg)->canceled = true;
44 	pTask->next = pTask->thread_data->deleted_list;
45 	pTask->thread_data->deleted_list = pTask;
46 }
47 
task_finish_clean_up(struct fast_task_info * pTask)48 void task_finish_clean_up(struct fast_task_info *pTask)
49 {
50 	StorageClientInfo *pClientInfo;
51 
52 	pClientInfo = (StorageClientInfo *)pTask->arg;
53 	if (pClientInfo->clean_func != NULL)
54 	{
55 		pClientInfo->clean_func(pTask);
56 	}
57 
58 	ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd);
59 	close(pTask->event.fd);
60 	pTask->event.fd = -1;
61 
62 	if (pTask->event.timer.expires > 0)
63 	{
64 		fast_timer_remove(&pTask->thread_data->timer,
65 			&pTask->event.timer);
66 		pTask->event.timer.expires = 0;
67 	}
68 
69 	memset(pTask->arg, 0, sizeof(StorageClientInfo));
70 	free_queue_push(pTask);
71 
72     __sync_fetch_and_sub(&g_storage_stat.connection.current_count, 1);
73     ++g_stat_change_count;
74 }
75 
set_recv_event(struct fast_task_info * pTask)76 static int set_recv_event(struct fast_task_info *pTask)
77 {
78 	int result;
79 
80 	if (pTask->event.callback == client_sock_read)
81 	{
82 		return 0;
83 	}
84 
85 	pTask->event.callback = client_sock_read;
86 	if (ioevent_modify(&pTask->thread_data->ev_puller,
87 		pTask->event.fd, IOEVENT_READ, pTask) != 0)
88 	{
89 		result = errno != 0 ? errno : ENOENT;
90 		add_to_deleted_list(pTask);
91 
92 		logError("file: "__FILE__", line: %d, "\
93 			"ioevent_modify fail, " \
94 			"errno: %d, error info: %s", \
95 			__LINE__, result, STRERROR(result));
96 		return result;
97 	}
98 	return 0;
99 }
100 
set_send_event(struct fast_task_info * pTask)101 static int set_send_event(struct fast_task_info *pTask)
102 {
103 	int result;
104 
105 	if (pTask->event.callback == client_sock_write)
106 	{
107 		return 0;
108 	}
109 
110 	pTask->event.callback = client_sock_write;
111 	if (ioevent_modify(&pTask->thread_data->ev_puller,
112 		pTask->event.fd, IOEVENT_WRITE, pTask) != 0)
113 	{
114 		result = errno != 0 ? errno : ENOENT;
115 		add_to_deleted_list(pTask);
116 
117 		logError("file: "__FILE__", line: %d, "\
118 			"ioevent_modify fail, " \
119 			"errno: %d, error info: %s", \
120 			__LINE__, result, STRERROR(result));
121 		return result;
122 	}
123 	return 0;
124 }
125 
storage_recv_notify_read(int sock,short event,void * arg)126 void storage_recv_notify_read(int sock, short event, void *arg)
127 {
128 	struct fast_task_info *pTask;
129 	StorageClientInfo *pClientInfo;
130 	long task_addr;
131 	int64_t remain_bytes;
132 	int bytes;
133 	int result;
134 
135 	while (1)
136 	{
137 		if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
138 		{
139 			if (!(errno == EAGAIN || errno == EWOULDBLOCK))
140 			{
141 				logError("file: "__FILE__", line: %d, " \
142 					"call read failed, " \
143 					"errno: %d, error info: %s", \
144 					__LINE__, errno, STRERROR(errno));
145 			}
146 
147 			break;
148 		}
149 		else if (bytes == 0)
150 		{
151 			logError("file: "__FILE__", line: %d, " \
152 				"call read failed, end of file", __LINE__);
153 			break;
154 		}
155 
156 		pTask = (struct fast_task_info *)task_addr;
157 		pClientInfo = (StorageClientInfo *)pTask->arg;
158 
159 		if (pTask->event.fd < 0)  //quit flag
160 		{
161 			return;
162 		}
163 
164 		/* //logInfo("=====thread index: %d, pTask->event.fd=%d", \
165 			pClientInfo->nio_thread_index, pTask->event.fd);
166 		*/
167 
168 		if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
169 		{
170 			pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
171 		}
172 		switch (pClientInfo->stage)
173 		{
174 			case FDFS_STORAGE_STAGE_NIO_INIT:
175 				result = storage_nio_init(pTask);
176 				break;
177 			case FDFS_STORAGE_STAGE_NIO_RECV:
178 				pTask->offset = 0;
179 				remain_bytes = pClientInfo->total_length - \
180 					       pClientInfo->total_offset;
181 				if (remain_bytes > pTask->size)
182 				{
183 					pTask->length = pTask->size;
184 				}
185 				else
186 				{
187 					pTask->length = remain_bytes;
188 				}
189 
190 				if (set_recv_event(pTask) == 0)
191 				{
192 					client_sock_read(pTask->event.fd,
193 						IOEVENT_READ, pTask);
194 				}
195 				result = 0;
196 				break;
197 			case FDFS_STORAGE_STAGE_NIO_SEND:
198 				result = storage_send_add_event(pTask);
199 				break;
200 			case FDFS_STORAGE_STAGE_NIO_CLOSE:
201 				result = EIO;   //close this socket
202 				break;
203 			default:
204 				logError("file: "__FILE__", line: %d, " \
205 					"invalid stage: %d", __LINE__, \
206 					pClientInfo->stage);
207 				result = EINVAL;
208 				break;
209 		}
210 
211 		if (result != 0)
212 		{
213 			add_to_deleted_list(pTask);
214 		}
215 	}
216 }
217 
storage_nio_init(struct fast_task_info * pTask)218 static int storage_nio_init(struct fast_task_info *pTask)
219 {
220 	StorageClientInfo *pClientInfo;
221 	struct storage_nio_thread_data *pThreadData;
222 
223 	pClientInfo = (StorageClientInfo *)pTask->arg;
224 	pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
225 
226 	pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
227 	return ioevent_set(pTask, &pThreadData->thread_data,
228 			pTask->event.fd, IOEVENT_READ, client_sock_read,
229 			g_fdfs_network_timeout);
230 }
231 
storage_send_add_event(struct fast_task_info * pTask)232 int storage_send_add_event(struct fast_task_info *pTask)
233 {
234 	pTask->offset = 0;
235 
236 	/* direct send */
237 	client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask);
238 
239 	return 0;
240 }
241 
client_sock_read(int sock,short event,void * arg)242 static void client_sock_read(int sock, short event, void *arg)
243 {
244 	int bytes;
245 	int recv_bytes;
246 	struct fast_task_info *pTask;
247         StorageClientInfo *pClientInfo;
248 
249 	pTask = (struct fast_task_info *)arg;
250         pClientInfo = (StorageClientInfo *)pTask->arg;
251 	if (pClientInfo->canceled)
252 	{
253 		return;
254 	}
255 
256 	if (pClientInfo->stage != FDFS_STORAGE_STAGE_NIO_RECV)
257 	{
258 		if (event & IOEVENT_TIMEOUT) {
259 			pTask->event.timer.expires = g_current_time +
260 				g_fdfs_network_timeout;
261 			fast_timer_add(&pTask->thread_data->timer,
262 				&pTask->event.timer);
263 		}
264 
265 		return;
266 	}
267 
268 	if (event & IOEVENT_TIMEOUT)
269 	{
270 		if (pClientInfo->total_offset == 0)
271 		{
272             if (pTask->req_count > 0)
273             {
274                 pTask->event.timer.expires = g_current_time +
275                     g_fdfs_network_timeout;
276                 fast_timer_add(&pTask->thread_data->timer,
277                         &pTask->event.timer);
278             }
279             else
280             {
281                 logWarning("file: "__FILE__", line: %d, "
282                         "client ip: %s, recv timeout. "
283                         "after the connection is established, "
284                         "you must send a request before %ds timeout, "
285                         "maybe connections leak in you application.",
286                         __LINE__, pTask->client_ip, g_fdfs_network_timeout);
287                 task_finish_clean_up(pTask);
288             }
289 		}
290 		else
291         {
292             logError("file: "__FILE__", line: %d, "
293                     "client ip: %s, recv timeout, "
294                     "recv offset: %d, expect length: %d, "
295                     "req_count: %"PRId64, __LINE__, pTask->client_ip,
296                     pTask->offset, pTask->length, pTask->req_count);
297             task_finish_clean_up(pTask);
298         }
299 
300 		return;
301 	}
302 
303 	if (event & IOEVENT_ERROR)
304 	{
305 		logDebug("file: "__FILE__", line: %d, " \
306 			"client ip: %s, recv error event: %d, "
307 			"close connection", __LINE__, pTask->client_ip, event);
308 
309 		task_finish_clean_up(pTask);
310 		return;
311 	}
312 
313 	fast_timer_modify(&pTask->thread_data->timer,
314 		&pTask->event.timer, g_current_time +
315 		g_fdfs_network_timeout);
316 	while (1)
317 	{
318 		if (pClientInfo->total_length == 0) //recv header
319 		{
320 			recv_bytes = sizeof(TrackerHeader) - pTask->offset;
321 		}
322 		else
323 		{
324 			recv_bytes = pTask->length - pTask->offset;
325 		}
326 
327 		/*
328 		logInfo("total_length=%"PRId64", recv_bytes=%d, "
329 			"pTask->length=%d, pTask->offset=%d",
330 			pClientInfo->total_length, recv_bytes,
331 			pTask->length, pTask->offset);
332 		*/
333 
334 		bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
335 		if (bytes < 0)
336 		{
337 			if (errno == EAGAIN || errno == EWOULDBLOCK)
338 			{
339 			}
340 			else if (errno == EINTR)
341 			{
342 				continue;
343 			}
344 			else
345 			{
346 				logError("file: "__FILE__", line: %d, " \
347 					"client ip: %s, recv failed, " \
348 					"errno: %d, error info: %s", \
349 					__LINE__, pTask->client_ip, \
350 					errno, STRERROR(errno));
351 
352 				task_finish_clean_up(pTask);
353 			}
354 
355 			return;
356 		}
357 		else if (bytes == 0)
358 		{
359 			logDebug("file: "__FILE__", line: %d, " \
360 				"client ip: %s, recv failed, " \
361 				"connection disconnected.", \
362 				__LINE__, pTask->client_ip);
363 
364 			task_finish_clean_up(pTask);
365 			return;
366 		}
367 
368 		if (pClientInfo->total_length == 0) //header
369 		{
370 			if (pTask->offset + bytes < sizeof(TrackerHeader))
371 			{
372 				pTask->offset += bytes;
373 				return;
374 			}
375 
376 			pClientInfo->total_length=buff2long(((TrackerHeader *) \
377 						pTask->data)->pkg_len);
378 			if (pClientInfo->total_length < 0)
379 			{
380 				logError("file: "__FILE__", line: %d, " \
381 					"client ip: %s, pkg length: " \
382 					"%"PRId64" < 0", \
383 					__LINE__, pTask->client_ip, \
384 					pClientInfo->total_length);
385 
386 				task_finish_clean_up(pTask);
387 				return;
388 			}
389 
390 			pClientInfo->total_length += sizeof(TrackerHeader);
391 			if (pClientInfo->total_length > pTask->size)
392 			{
393 				pTask->length = pTask->size;
394 			}
395 			else
396 			{
397 				pTask->length = pClientInfo->total_length;
398 			}
399 		}
400 
401 		pTask->offset += bytes;
402 		if (pTask->offset >= pTask->length) //recv current pkg done
403 		{
404 			if (pClientInfo->total_offset + pTask->length >= \
405 					pClientInfo->total_length)
406 			{
407 				/* current req recv done */
408 				pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
409 				pTask->req_count++;
410 			}
411 
412 			if (pClientInfo->total_offset == 0)
413 			{
414 				pClientInfo->total_offset = pTask->length;
415 				storage_deal_task(pTask);
416 			}
417 			else
418 			{
419 				pClientInfo->total_offset += pTask->length;
420 
421 				/* continue write to file */
422 				storage_dio_queue_push(pTask);
423 			}
424 
425 			return;
426 		}
427 	}
428 
429 	return;
430 }
431 
client_sock_write(int sock,short event,void * arg)432 static void client_sock_write(int sock, short event, void *arg)
433 {
434 	int bytes;
435 	struct fast_task_info *pTask;
436         StorageClientInfo *pClientInfo;
437 
438 	pTask = (struct fast_task_info *)arg;
439         pClientInfo = (StorageClientInfo *)pTask->arg;
440 	if (pClientInfo->canceled)
441 	{
442 		return;
443 	}
444 
445 	if (event & IOEVENT_TIMEOUT)
446 	{
447 		logError("file: "__FILE__", line: %d, "
448 			"client ip: %s, send timeout, offset: %d, "
449             "remain bytes: %d", __LINE__, pTask->client_ip,
450             pTask->offset, pTask->length - pTask->offset);
451 
452 		task_finish_clean_up(pTask);
453 		return;
454 	}
455 
456 	if (event & IOEVENT_ERROR)
457 	{
458 		logDebug("file: "__FILE__", line: %d, "
459 			"client ip: %s, recv error event: %d, "
460 			"close connection", __LINE__, pTask->client_ip, event);
461 
462 		task_finish_clean_up(pTask);
463 		return;
464 	}
465 
466 	while (1)
467 	{
468 		fast_timer_modify(&pTask->thread_data->timer,
469 			&pTask->event.timer, g_current_time +
470 			g_fdfs_network_timeout);
471 		bytes = send(sock, pTask->data + pTask->offset, \
472 				pTask->length - pTask->offset,  0);
473 		//printf("%08X sended %d bytes\n", (int)pTask, bytes);
474 		if (bytes < 0)
475 		{
476 			if (errno == EAGAIN || errno == EWOULDBLOCK)
477 			{
478 				set_send_event(pTask);
479 			}
480 			else if (errno == EINTR)
481 			{
482 				continue;
483 			}
484 			else
485 			{
486 				logError("file: "__FILE__", line: %d, " \
487 					"client ip: %s, recv failed, " \
488 					"errno: %d, error info: %s", \
489 					__LINE__, pTask->client_ip, \
490 					errno, STRERROR(errno));
491 
492 				task_finish_clean_up(pTask);
493 			}
494 
495 			return;
496 		}
497 		else if (bytes == 0)
498 		{
499 			logWarning("file: "__FILE__", line: %d, " \
500 				"send failed, connection disconnected.", \
501 				__LINE__);
502 
503 			task_finish_clean_up(pTask);
504 			return;
505 		}
506 
507 		pTask->offset += bytes;
508 		if (pTask->offset >= pTask->length)
509 		{
510 			if (set_recv_event(pTask) != 0)
511 			{
512 				return;
513 			}
514 
515 			pClientInfo->total_offset += pTask->length;
516 			if (pClientInfo->total_offset>=pClientInfo->total_length)
517 			{
518 				if (pClientInfo->total_length == sizeof(TrackerHeader)
519 					&& ((TrackerHeader *)pTask->data)->status == EINVAL)
520 				{
521 					logDebug("file: "__FILE__", line: %d, "\
522 						"close conn: #%d, client ip: %s", \
523 						__LINE__, pTask->event.fd,
524 						pTask->client_ip);
525 					task_finish_clean_up(pTask);
526 					return;
527 				}
528 
529 				/*  response done, try to recv again */
530 				pClientInfo->total_length = 0;
531 				pClientInfo->total_offset = 0;
532 				pTask->offset = 0;
533 				pTask->length = 0;
534 
535 				pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
536 			}
537 			else  //continue to send file content
538 			{
539 				pTask->length = 0;
540 
541 				/* continue read from file */
542 				storage_dio_queue_push(pTask);
543 			}
544 
545 			return;
546 		}
547 	}
548 }
549 
550