1 // Larbin
2 // Sebastien Ailleret
3 // 03-02-00 -> 23-11-01
4 
5 #include <unistd.h>
6 #include <iostream>
7 #include <cstring>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 
11 #include "options.h"
12 
13 #include "types.h"
14 #include "global.h"
15 #include "utils/text.h"
16 #include "utils/debug.h"
17 #include "interf/input.h"
18 
19 #define INIT -1
20 #define END -2
21 
22 /* input connexion */
23 struct Input {
24   int fds;
25   uint pos;
26   uint end_pos;
27   uint end_posp;
28   int priority;
29   uint depth;
30   uint test;
31   char buffer[BUF_SIZE];
32 };
33 
34 /** socket used for input */
35 static int inputFds;
36 /** number of opened input connections */
37 static int nbInput;
38 /** array for the opened input connections */
39 static Input *inputConns[maxInput];
40 
41 // declaration of forward functions
42 static bool readMore (Input *in);
43 static char *readline (Input *in);
44 
input()45 int input () {
46   if (nbInput < 0) {
47     // Input is disabled
48     return -1;
49   }
50   int n = -1;
51   if (nbInput < maxInput-1
52       && global::ansPoll[inputFds]) {
53     // test if there is a new connection
54     struct sockaddr_in addr;
55     int fdc;
56     socklen_t len = sizeof(addr);
57     fdc = accept(inputFds, (struct sockaddr *) &addr, &len);
58     if (fdc != -1) {
59       global::verifMax(fdc);
60       fcntl(fdc, F_SETFL, O_NONBLOCK);
61       inputConns[nbInput]->fds = fdc;
62       inputConns[nbInput]->pos = 0;
63       inputConns[nbInput]->end_pos = 0;
64       inputConns[nbInput]->end_posp = 0;
65       inputConns[nbInput]->priority = INIT;
66       nbInput++;
67 #ifdef URL_TAGS
68       ecrire(fdc, "Welcome to larbin input system !\nYour first line should look like \"priority:0 depth:5 test:1\"\nThe following should contain one id and one url separed by one space per line\n\"137 http://pauillac.inria.fr/~ailleret/prog/larbin/\" for instance\n\n");
69 #else
70       ecrire(fdc, "Welcome to larbin input system !\nYour first line should look like \"priority:0 depth:5 test:1\"\nThe following should contain one url per line (http://pauillac.inria.fr/~ailleret/prog/larbin/ for instance)\n\n");
71 #endif // URL_TAGS
72     }
73   }
74   if (nbInput < maxInput-1) {
75     n = inputFds;
76     global::setPoll(inputFds, POLLIN);
77   }
78   // read open sockets
79   int i=0;
80   while (i<nbInput) {
81     Input *in = inputConns[i];
82     if (global::ansPoll[in->fds] && readMore(in)) {
83       char *line = readline(in);
84       while (line != NULL) {
85         if (in->priority == INIT) {
86           // first line
87           if (sscanf(line, "priority:%d depth:%u test:%u",
88                      &in->priority, &in->depth, &in->test) == 3) {
89             line = readline(in);
90           } else {
91             ecrire(in->fds, "Incorrect input\n");
92             line = NULL;
93             in->priority = END;
94           }
95         } else {
96           // this is an url
97           url *u = new url(line, in->depth);
98           if (u->isValid()) {
99             if (in->test) {
100               if (global::seen->testSet(u)) {
101                 hashUrls();   // stats
102                 if (in->priority) {
103                   global::URLsPriority->put(u);
104                 } else {
105                   global::URLsDisk->put(u);
106                 }
107               } else {
108                 delete u;
109               }
110             } else {
111               hashUrls();   // stats
112               global::seen->set(u);
113               if (in->priority) {
114                 global::URLsPriority->put(u);
115               } else {
116                 global::URLsDisk->put(u);
117               }
118             }
119           } else {
120             delete u;
121           }
122           line = readline(in);
123         }
124       }
125     }
126     if (in->priority == END) {
127       // forget this connection
128       ecrire(in->fds, "Bye bye...\n");
129       close(in->fds);
130       nbInput--;
131       Input *tmp = inputConns[i];
132       inputConns[i] = inputConns[nbInput];
133       inputConns[nbInput] = tmp;
134     } else { // go to next connection
135       if (in->fds > n) n = in->fds;
136       global::setPoll(in->fds, POLLIN);
137       i++;
138     }
139   }
140   return n;
141 }
142 
readMore(Input * in)143 static bool readMore (Input *in) {
144   assert (in->end_posp == in->end_pos);
145   if (in->end_posp - in->pos > maxUrlSize+100) {
146     // error -> stop connection
147     ecrire(in->fds, "Url submitted too long\n");
148     in->priority = END;
149     return false;
150   }
151   if (2 * in->pos > BUF_SIZE) {
152     in->end_pos -= in->pos;
153     in->end_posp = in->end_pos;
154     memmove(in->buffer, in->buffer+in->pos, in->end_pos);
155     in->pos = 0;
156   }
157   int nb = read(in->fds, in->buffer+in->end_pos, BUF_SIZE-in->end_pos);
158   if (nb == -1 && errno == EAGAIN) {
159     return false;
160   } else if (nb <= 0) {
161     in->priority = END;
162     return false;
163   } else {
164     in->end_pos += nb;
165     return true;
166   }
167 }
168 
169 /* no allocation */
readline(Input * in)170 static char *readline (Input *in) {
171   while (in->end_posp < in->end_pos && in->buffer[in->end_posp] != '\n') {
172     in->end_posp++;
173   }
174   if (in->end_posp == in->end_pos) {
175     return NULL;
176   } else {
177     if (in->buffer[in->end_posp-1] == '\r') {
178       in->buffer[in->end_posp-1] = 0;
179     } else {
180       in->buffer[in->end_posp] = 0;
181     }
182     char *res = in->buffer+in->pos;
183     in->pos = ++in->end_posp;
184     return res;
185   }
186 }
187 
188 ////////////////////////////////////////////////////
189 /** init everything */
initInput()190 void initInput () {
191   if (global::inputPort != 0) {
192     int allowReuse = 1;
193     struct sockaddr_in addr;
194     memset ((void *) &addr, 0, sizeof(addr));
195     addr.sin_addr.s_addr = INADDR_ANY;
196     addr.sin_family = AF_INET;
197     addr.sin_port = htons(global::inputPort);
198     if ((inputFds = socket(AF_INET, SOCK_STREAM, 0)) == -1
199         || setsockopt(inputFds, SOL_SOCKET, SO_REUSEADDR,
200                       (char*)&allowReuse, sizeof(allowReuse))
201         || bind(inputFds, (struct sockaddr *) &addr, sizeof(addr)) != 0
202         || listen(inputFds, 4) != 0) {
203       std::cerr << "unable to get input socket (port " << global::inputPort
204            << ") : " << strerror(errno) << "\n";
205       exit(1);
206     }
207     fcntl(inputFds, F_SETFL, O_NONBLOCK);
208     for (int i=0; i<maxInput; i++) {
209       inputConns[i] = new Input;
210     }
211     nbInput = 0;
212   } else {
213     nbInput = -1;
214   }
215 }
216