1 /**
2 * Copyright (C) 2008 Happy Fish / YuQing
3 *
4 * FastDFS may be copied only under the terms of the GNU General
5 * Public License V3, which may be found in the FastDFS source kit.
6 * Please visit the FastDFS Home Page http://www.fastken.com/ for more detail.
7 **/
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <errno.h>
13 #include <unistd.h>
14 #include <string.h>
15 #include <sys/stat.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <signal.h>
19 #include <sys/types.h>
20 #include <sys/time.h>
21 #include <sys/socket.h>
22 #include <netinet/in.h>
23 #include <pthread.h>
24 #include "fastcommon/shared_func.h"
25 #include "fastcommon/sched_thread.h"
26 #include "fastcommon/logger.h"
27 #include "fastcommon/sockopt.h"
28 #include "fastcommon/fast_task_queue.h"
29 #include "tracker_types.h"
30 #include "tracker_proto.h"
31 #include "storage_global.h"
32 #include "storage_service.h"
33 #include "fastcommon/ioevent_loop.h"
34 #include "storage_dio.h"
35 #include "storage_nio.h"
36
37 static void client_sock_read(int sock, short event, void *arg);
38 static void client_sock_write(int sock, short event, void *arg);
39 static int storage_nio_init(struct fast_task_info *pTask);
40
add_to_deleted_list(struct fast_task_info * pTask)41 void add_to_deleted_list(struct fast_task_info *pTask)
42 {
43 ((StorageClientInfo *)pTask->arg)->canceled = true;
44 pTask->next = pTask->thread_data->deleted_list;
45 pTask->thread_data->deleted_list = pTask;
46 }
47
task_finish_clean_up(struct fast_task_info * pTask)48 void task_finish_clean_up(struct fast_task_info *pTask)
49 {
50 StorageClientInfo *pClientInfo;
51
52 pClientInfo = (StorageClientInfo *)pTask->arg;
53 if (pClientInfo->clean_func != NULL)
54 {
55 pClientInfo->clean_func(pTask);
56 }
57
58 ioevent_detach(&pTask->thread_data->ev_puller, pTask->event.fd);
59 close(pTask->event.fd);
60 pTask->event.fd = -1;
61
62 if (pTask->event.timer.expires > 0)
63 {
64 fast_timer_remove(&pTask->thread_data->timer,
65 &pTask->event.timer);
66 pTask->event.timer.expires = 0;
67 }
68
69 memset(pTask->arg, 0, sizeof(StorageClientInfo));
70 free_queue_push(pTask);
71
72 __sync_fetch_and_sub(&g_storage_stat.connection.current_count, 1);
73 ++g_stat_change_count;
74 }
75
set_recv_event(struct fast_task_info * pTask)76 static int set_recv_event(struct fast_task_info *pTask)
77 {
78 int result;
79
80 if (pTask->event.callback == client_sock_read)
81 {
82 return 0;
83 }
84
85 pTask->event.callback = client_sock_read;
86 if (ioevent_modify(&pTask->thread_data->ev_puller,
87 pTask->event.fd, IOEVENT_READ, pTask) != 0)
88 {
89 result = errno != 0 ? errno : ENOENT;
90 add_to_deleted_list(pTask);
91
92 logError("file: "__FILE__", line: %d, "\
93 "ioevent_modify fail, " \
94 "errno: %d, error info: %s", \
95 __LINE__, result, STRERROR(result));
96 return result;
97 }
98 return 0;
99 }
100
set_send_event(struct fast_task_info * pTask)101 static int set_send_event(struct fast_task_info *pTask)
102 {
103 int result;
104
105 if (pTask->event.callback == client_sock_write)
106 {
107 return 0;
108 }
109
110 pTask->event.callback = client_sock_write;
111 if (ioevent_modify(&pTask->thread_data->ev_puller,
112 pTask->event.fd, IOEVENT_WRITE, pTask) != 0)
113 {
114 result = errno != 0 ? errno : ENOENT;
115 add_to_deleted_list(pTask);
116
117 logError("file: "__FILE__", line: %d, "\
118 "ioevent_modify fail, " \
119 "errno: %d, error info: %s", \
120 __LINE__, result, STRERROR(result));
121 return result;
122 }
123 return 0;
124 }
125
storage_recv_notify_read(int sock,short event,void * arg)126 void storage_recv_notify_read(int sock, short event, void *arg)
127 {
128 struct fast_task_info *pTask;
129 StorageClientInfo *pClientInfo;
130 long task_addr;
131 int64_t remain_bytes;
132 int bytes;
133 int result;
134
135 while (1)
136 {
137 if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0)
138 {
139 if (!(errno == EAGAIN || errno == EWOULDBLOCK))
140 {
141 logError("file: "__FILE__", line: %d, " \
142 "call read failed, " \
143 "errno: %d, error info: %s", \
144 __LINE__, errno, STRERROR(errno));
145 }
146
147 break;
148 }
149 else if (bytes == 0)
150 {
151 logError("file: "__FILE__", line: %d, " \
152 "call read failed, end of file", __LINE__);
153 break;
154 }
155
156 pTask = (struct fast_task_info *)task_addr;
157 pClientInfo = (StorageClientInfo *)pTask->arg;
158
159 if (pTask->event.fd < 0) //quit flag
160 {
161 return;
162 }
163
164 /* //logInfo("=====thread index: %d, pTask->event.fd=%d", \
165 pClientInfo->nio_thread_index, pTask->event.fd);
166 */
167
168 if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD)
169 {
170 pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;
171 }
172 switch (pClientInfo->stage)
173 {
174 case FDFS_STORAGE_STAGE_NIO_INIT:
175 result = storage_nio_init(pTask);
176 break;
177 case FDFS_STORAGE_STAGE_NIO_RECV:
178 pTask->offset = 0;
179 remain_bytes = pClientInfo->total_length - \
180 pClientInfo->total_offset;
181 if (remain_bytes > pTask->size)
182 {
183 pTask->length = pTask->size;
184 }
185 else
186 {
187 pTask->length = remain_bytes;
188 }
189
190 if (set_recv_event(pTask) == 0)
191 {
192 client_sock_read(pTask->event.fd,
193 IOEVENT_READ, pTask);
194 }
195 result = 0;
196 break;
197 case FDFS_STORAGE_STAGE_NIO_SEND:
198 result = storage_send_add_event(pTask);
199 break;
200 case FDFS_STORAGE_STAGE_NIO_CLOSE:
201 result = EIO; //close this socket
202 break;
203 default:
204 logError("file: "__FILE__", line: %d, " \
205 "invalid stage: %d", __LINE__, \
206 pClientInfo->stage);
207 result = EINVAL;
208 break;
209 }
210
211 if (result != 0)
212 {
213 add_to_deleted_list(pTask);
214 }
215 }
216 }
217
storage_nio_init(struct fast_task_info * pTask)218 static int storage_nio_init(struct fast_task_info *pTask)
219 {
220 StorageClientInfo *pClientInfo;
221 struct storage_nio_thread_data *pThreadData;
222
223 pClientInfo = (StorageClientInfo *)pTask->arg;
224 pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;
225
226 pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
227 return ioevent_set(pTask, &pThreadData->thread_data,
228 pTask->event.fd, IOEVENT_READ, client_sock_read,
229 g_fdfs_network_timeout);
230 }
231
storage_send_add_event(struct fast_task_info * pTask)232 int storage_send_add_event(struct fast_task_info *pTask)
233 {
234 pTask->offset = 0;
235
236 /* direct send */
237 client_sock_write(pTask->event.fd, IOEVENT_WRITE, pTask);
238
239 return 0;
240 }
241
client_sock_read(int sock,short event,void * arg)242 static void client_sock_read(int sock, short event, void *arg)
243 {
244 int bytes;
245 int recv_bytes;
246 struct fast_task_info *pTask;
247 StorageClientInfo *pClientInfo;
248
249 pTask = (struct fast_task_info *)arg;
250 pClientInfo = (StorageClientInfo *)pTask->arg;
251 if (pClientInfo->canceled)
252 {
253 return;
254 }
255
256 if (pClientInfo->stage != FDFS_STORAGE_STAGE_NIO_RECV)
257 {
258 if (event & IOEVENT_TIMEOUT) {
259 pTask->event.timer.expires = g_current_time +
260 g_fdfs_network_timeout;
261 fast_timer_add(&pTask->thread_data->timer,
262 &pTask->event.timer);
263 }
264
265 return;
266 }
267
268 if (event & IOEVENT_TIMEOUT)
269 {
270 if (pClientInfo->total_offset == 0)
271 {
272 if (pTask->req_count > 0)
273 {
274 pTask->event.timer.expires = g_current_time +
275 g_fdfs_network_timeout;
276 fast_timer_add(&pTask->thread_data->timer,
277 &pTask->event.timer);
278 }
279 else
280 {
281 logWarning("file: "__FILE__", line: %d, "
282 "client ip: %s, recv timeout. "
283 "after the connection is established, "
284 "you must send a request before %ds timeout, "
285 "maybe connections leak in you application.",
286 __LINE__, pTask->client_ip, g_fdfs_network_timeout);
287 task_finish_clean_up(pTask);
288 }
289 }
290 else
291 {
292 logError("file: "__FILE__", line: %d, "
293 "client ip: %s, recv timeout, "
294 "recv offset: %d, expect length: %d, "
295 "req_count: %"PRId64, __LINE__, pTask->client_ip,
296 pTask->offset, pTask->length, pTask->req_count);
297 task_finish_clean_up(pTask);
298 }
299
300 return;
301 }
302
303 if (event & IOEVENT_ERROR)
304 {
305 logDebug("file: "__FILE__", line: %d, " \
306 "client ip: %s, recv error event: %d, "
307 "close connection", __LINE__, pTask->client_ip, event);
308
309 task_finish_clean_up(pTask);
310 return;
311 }
312
313 fast_timer_modify(&pTask->thread_data->timer,
314 &pTask->event.timer, g_current_time +
315 g_fdfs_network_timeout);
316 while (1)
317 {
318 if (pClientInfo->total_length == 0) //recv header
319 {
320 recv_bytes = sizeof(TrackerHeader) - pTask->offset;
321 }
322 else
323 {
324 recv_bytes = pTask->length - pTask->offset;
325 }
326
327 /*
328 logInfo("total_length=%"PRId64", recv_bytes=%d, "
329 "pTask->length=%d, pTask->offset=%d",
330 pClientInfo->total_length, recv_bytes,
331 pTask->length, pTask->offset);
332 */
333
334 bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0);
335 if (bytes < 0)
336 {
337 if (errno == EAGAIN || errno == EWOULDBLOCK)
338 {
339 }
340 else if (errno == EINTR)
341 {
342 continue;
343 }
344 else
345 {
346 logError("file: "__FILE__", line: %d, " \
347 "client ip: %s, recv failed, " \
348 "errno: %d, error info: %s", \
349 __LINE__, pTask->client_ip, \
350 errno, STRERROR(errno));
351
352 task_finish_clean_up(pTask);
353 }
354
355 return;
356 }
357 else if (bytes == 0)
358 {
359 logDebug("file: "__FILE__", line: %d, " \
360 "client ip: %s, recv failed, " \
361 "connection disconnected.", \
362 __LINE__, pTask->client_ip);
363
364 task_finish_clean_up(pTask);
365 return;
366 }
367
368 if (pClientInfo->total_length == 0) //header
369 {
370 if (pTask->offset + bytes < sizeof(TrackerHeader))
371 {
372 pTask->offset += bytes;
373 return;
374 }
375
376 pClientInfo->total_length=buff2long(((TrackerHeader *) \
377 pTask->data)->pkg_len);
378 if (pClientInfo->total_length < 0)
379 {
380 logError("file: "__FILE__", line: %d, " \
381 "client ip: %s, pkg length: " \
382 "%"PRId64" < 0", \
383 __LINE__, pTask->client_ip, \
384 pClientInfo->total_length);
385
386 task_finish_clean_up(pTask);
387 return;
388 }
389
390 pClientInfo->total_length += sizeof(TrackerHeader);
391 if (pClientInfo->total_length > pTask->size)
392 {
393 pTask->length = pTask->size;
394 }
395 else
396 {
397 pTask->length = pClientInfo->total_length;
398 }
399 }
400
401 pTask->offset += bytes;
402 if (pTask->offset >= pTask->length) //recv current pkg done
403 {
404 if (pClientInfo->total_offset + pTask->length >= \
405 pClientInfo->total_length)
406 {
407 /* current req recv done */
408 pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
409 pTask->req_count++;
410 }
411
412 if (pClientInfo->total_offset == 0)
413 {
414 pClientInfo->total_offset = pTask->length;
415 storage_deal_task(pTask);
416 }
417 else
418 {
419 pClientInfo->total_offset += pTask->length;
420
421 /* continue write to file */
422 storage_dio_queue_push(pTask);
423 }
424
425 return;
426 }
427 }
428
429 return;
430 }
431
client_sock_write(int sock,short event,void * arg)432 static void client_sock_write(int sock, short event, void *arg)
433 {
434 int bytes;
435 struct fast_task_info *pTask;
436 StorageClientInfo *pClientInfo;
437
438 pTask = (struct fast_task_info *)arg;
439 pClientInfo = (StorageClientInfo *)pTask->arg;
440 if (pClientInfo->canceled)
441 {
442 return;
443 }
444
445 if (event & IOEVENT_TIMEOUT)
446 {
447 logError("file: "__FILE__", line: %d, "
448 "client ip: %s, send timeout, offset: %d, "
449 "remain bytes: %d", __LINE__, pTask->client_ip,
450 pTask->offset, pTask->length - pTask->offset);
451
452 task_finish_clean_up(pTask);
453 return;
454 }
455
456 if (event & IOEVENT_ERROR)
457 {
458 logDebug("file: "__FILE__", line: %d, "
459 "client ip: %s, recv error event: %d, "
460 "close connection", __LINE__, pTask->client_ip, event);
461
462 task_finish_clean_up(pTask);
463 return;
464 }
465
466 while (1)
467 {
468 fast_timer_modify(&pTask->thread_data->timer,
469 &pTask->event.timer, g_current_time +
470 g_fdfs_network_timeout);
471 bytes = send(sock, pTask->data + pTask->offset, \
472 pTask->length - pTask->offset, 0);
473 //printf("%08X sended %d bytes\n", (int)pTask, bytes);
474 if (bytes < 0)
475 {
476 if (errno == EAGAIN || errno == EWOULDBLOCK)
477 {
478 set_send_event(pTask);
479 }
480 else if (errno == EINTR)
481 {
482 continue;
483 }
484 else
485 {
486 logError("file: "__FILE__", line: %d, " \
487 "client ip: %s, recv failed, " \
488 "errno: %d, error info: %s", \
489 __LINE__, pTask->client_ip, \
490 errno, STRERROR(errno));
491
492 task_finish_clean_up(pTask);
493 }
494
495 return;
496 }
497 else if (bytes == 0)
498 {
499 logWarning("file: "__FILE__", line: %d, " \
500 "send failed, connection disconnected.", \
501 __LINE__);
502
503 task_finish_clean_up(pTask);
504 return;
505 }
506
507 pTask->offset += bytes;
508 if (pTask->offset >= pTask->length)
509 {
510 if (set_recv_event(pTask) != 0)
511 {
512 return;
513 }
514
515 pClientInfo->total_offset += pTask->length;
516 if (pClientInfo->total_offset>=pClientInfo->total_length)
517 {
518 if (pClientInfo->total_length == sizeof(TrackerHeader)
519 && ((TrackerHeader *)pTask->data)->status == EINVAL)
520 {
521 logDebug("file: "__FILE__", line: %d, "\
522 "close conn: #%d, client ip: %s", \
523 __LINE__, pTask->event.fd,
524 pTask->client_ip);
525 task_finish_clean_up(pTask);
526 return;
527 }
528
529 /* response done, try to recv again */
530 pClientInfo->total_length = 0;
531 pClientInfo->total_offset = 0;
532 pTask->offset = 0;
533 pTask->length = 0;
534
535 pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
536 }
537 else //continue to send file content
538 {
539 pTask->length = 0;
540
541 /* continue read from file */
542 storage_dio_queue_push(pTask);
543 }
544
545 return;
546 }
547 }
548 }
549
550