1 // Larbin
2 // Sebastien Ailleret
3 // 03-02-00 -> 23-11-01
4
5 #include <unistd.h>
6 #include <iostream>
7 #include <cstring>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10
11 #include "options.h"
12
13 #include "types.h"
14 #include "global.h"
15 #include "utils/text.h"
16 #include "utils/debug.h"
17 #include "interf/input.h"
18
19 #define INIT -1
20 #define END -2
21
22 /* input connexion */
23 struct Input {
24 int fds;
25 uint pos;
26 uint end_pos;
27 uint end_posp;
28 int priority;
29 uint depth;
30 uint test;
31 char buffer[BUF_SIZE];
32 };
33
34 /** socket used for input */
35 static int inputFds;
36 /** number of opened input connections */
37 static int nbInput;
38 /** array for the opened input connections */
39 static Input *inputConns[maxInput];
40
41 // declaration of forward functions
42 static bool readMore (Input *in);
43 static char *readline (Input *in);
44
input()45 int input () {
46 if (nbInput < 0) {
47 // Input is disabled
48 return -1;
49 }
50 int n = -1;
51 if (nbInput < maxInput-1
52 && global::ansPoll[inputFds]) {
53 // test if there is a new connection
54 struct sockaddr_in addr;
55 int fdc;
56 socklen_t len = sizeof(addr);
57 fdc = accept(inputFds, (struct sockaddr *) &addr, &len);
58 if (fdc != -1) {
59 global::verifMax(fdc);
60 fcntl(fdc, F_SETFL, O_NONBLOCK);
61 inputConns[nbInput]->fds = fdc;
62 inputConns[nbInput]->pos = 0;
63 inputConns[nbInput]->end_pos = 0;
64 inputConns[nbInput]->end_posp = 0;
65 inputConns[nbInput]->priority = INIT;
66 nbInput++;
67 #ifdef URL_TAGS
68 ecrire(fdc, "Welcome to larbin input system !\nYour first line should look like \"priority:0 depth:5 test:1\"\nThe following should contain one id and one url separed by one space per line\n\"137 http://pauillac.inria.fr/~ailleret/prog/larbin/\" for instance\n\n");
69 #else
70 ecrire(fdc, "Welcome to larbin input system !\nYour first line should look like \"priority:0 depth:5 test:1\"\nThe following should contain one url per line (http://pauillac.inria.fr/~ailleret/prog/larbin/ for instance)\n\n");
71 #endif // URL_TAGS
72 }
73 }
74 if (nbInput < maxInput-1) {
75 n = inputFds;
76 global::setPoll(inputFds, POLLIN);
77 }
78 // read open sockets
79 int i=0;
80 while (i<nbInput) {
81 Input *in = inputConns[i];
82 if (global::ansPoll[in->fds] && readMore(in)) {
83 char *line = readline(in);
84 while (line != NULL) {
85 if (in->priority == INIT) {
86 // first line
87 if (sscanf(line, "priority:%d depth:%u test:%u",
88 &in->priority, &in->depth, &in->test) == 3) {
89 line = readline(in);
90 } else {
91 ecrire(in->fds, "Incorrect input\n");
92 line = NULL;
93 in->priority = END;
94 }
95 } else {
96 // this is an url
97 url *u = new url(line, in->depth);
98 if (u->isValid()) {
99 if (in->test) {
100 if (global::seen->testSet(u)) {
101 hashUrls(); // stats
102 if (in->priority) {
103 global::URLsPriority->put(u);
104 } else {
105 global::URLsDisk->put(u);
106 }
107 } else {
108 delete u;
109 }
110 } else {
111 hashUrls(); // stats
112 global::seen->set(u);
113 if (in->priority) {
114 global::URLsPriority->put(u);
115 } else {
116 global::URLsDisk->put(u);
117 }
118 }
119 } else {
120 delete u;
121 }
122 line = readline(in);
123 }
124 }
125 }
126 if (in->priority == END) {
127 // forget this connection
128 ecrire(in->fds, "Bye bye...\n");
129 close(in->fds);
130 nbInput--;
131 Input *tmp = inputConns[i];
132 inputConns[i] = inputConns[nbInput];
133 inputConns[nbInput] = tmp;
134 } else { // go to next connection
135 if (in->fds > n) n = in->fds;
136 global::setPoll(in->fds, POLLIN);
137 i++;
138 }
139 }
140 return n;
141 }
142
readMore(Input * in)143 static bool readMore (Input *in) {
144 assert (in->end_posp == in->end_pos);
145 if (in->end_posp - in->pos > maxUrlSize+100) {
146 // error -> stop connection
147 ecrire(in->fds, "Url submitted too long\n");
148 in->priority = END;
149 return false;
150 }
151 if (2 * in->pos > BUF_SIZE) {
152 in->end_pos -= in->pos;
153 in->end_posp = in->end_pos;
154 memmove(in->buffer, in->buffer+in->pos, in->end_pos);
155 in->pos = 0;
156 }
157 int nb = read(in->fds, in->buffer+in->end_pos, BUF_SIZE-in->end_pos);
158 if (nb == -1 && errno == EAGAIN) {
159 return false;
160 } else if (nb <= 0) {
161 in->priority = END;
162 return false;
163 } else {
164 in->end_pos += nb;
165 return true;
166 }
167 }
168
169 /* no allocation */
readline(Input * in)170 static char *readline (Input *in) {
171 while (in->end_posp < in->end_pos && in->buffer[in->end_posp] != '\n') {
172 in->end_posp++;
173 }
174 if (in->end_posp == in->end_pos) {
175 return NULL;
176 } else {
177 if (in->buffer[in->end_posp-1] == '\r') {
178 in->buffer[in->end_posp-1] = 0;
179 } else {
180 in->buffer[in->end_posp] = 0;
181 }
182 char *res = in->buffer+in->pos;
183 in->pos = ++in->end_posp;
184 return res;
185 }
186 }
187
188 ////////////////////////////////////////////////////
189 /** init everything */
initInput()190 void initInput () {
191 if (global::inputPort != 0) {
192 int allowReuse = 1;
193 struct sockaddr_in addr;
194 memset ((void *) &addr, 0, sizeof(addr));
195 addr.sin_addr.s_addr = INADDR_ANY;
196 addr.sin_family = AF_INET;
197 addr.sin_port = htons(global::inputPort);
198 if ((inputFds = socket(AF_INET, SOCK_STREAM, 0)) == -1
199 || setsockopt(inputFds, SOL_SOCKET, SO_REUSEADDR,
200 (char*)&allowReuse, sizeof(allowReuse))
201 || bind(inputFds, (struct sockaddr *) &addr, sizeof(addr)) != 0
202 || listen(inputFds, 4) != 0) {
203 std::cerr << "unable to get input socket (port " << global::inputPort
204 << ") : " << strerror(errno) << "\n";
205 exit(1);
206 }
207 fcntl(inputFds, F_SETFL, O_NONBLOCK);
208 for (int i=0; i<maxInput; i++) {
209 inputConns[i] = new Input;
210 }
211 nbInput = 0;
212 } else {
213 nbInput = -1;
214 }
215 }
216