1 /*
2  * Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *
8  *   - Redistributions of source code must retain the above copyright
9  *     notice, this list of conditions and the following disclaimer.
10  *
11  *   - Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *
15  *   - Neither the name of Oracle nor the names of its
16  *     contributors may be used to endorse or promote products derived
17  *     from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 /*
33  * This source code is provided to illustrate the usage of a given feature
34  * or technique and has been deliberately simplified. Additional steps
35  * required for a production-quality application, such as security checks,
36  * input validation and proper error handling, might not be present in
37  * this sample code.
38  */
39 
40 
41 import java.io.*;
42 import java.net.*;
43 import java.lang.Byte;
44 
45 /**
46  * Simple Java "server" using the Poller class
47  * to multiplex on incoming connections.  Note
48  * that handoff of events, via linked Q is not
49  * actually be a performance booster here, since
50  * the processing of events is cheaper than
51  * the overhead in scheduling/executing them.
52  * Although this demo does allow for concurrency
53  * in handling connections, it uses a rather
54  * primitive "gang scheduling" policy to keep
55  * the code simpler.
56  */
57 
58 public class PollingServer
59 {
60   public final static int MAXCONN    = 10000;
61   public final static int PORTNUM    = 4444;
62   public final static int BYTESPEROP = 10;
63 
64   /**
65    * This synchronization object protects access to certain
66    * data (bytesRead,eventsToProcess) by concurrent Consumer threads.
67    */
68   private final static Object eventSync = new Object();
69 
70   private static InputStream[] instr = new InputStream[MAXCONN];
71   private static int[] mapping = new int[65535];
72   private static LinkedQueue linkedQ = new LinkedQueue();
73   private static int bytesRead = 0;
74   private static int bytesToRead;
75   private static int eventsToProcess=0;
76 
PollingServer(int concurrency)77   public PollingServer(int concurrency) {
78     Socket[] sockArr = new Socket[MAXCONN];
79     long timestart, timestop;
80     short[] revents = new short[MAXCONN];
81     int[] fds = new int[MAXCONN];
82     int bytes;
83     Poller Mux;
84     int serverFd;
85     int totalConn=0;
86     int connects=0;
87 
88     System.out.println ("Serv: Initializing port " + PORTNUM);
89     try {
90 
91       ServerSocket skMain = new ServerSocket (PORTNUM);
92       /*
93        * Create the Poller object Mux, allow for up to MAXCONN
94        * sockets/filedescriptors to be polled.
95        */
96       Mux = new Poller(MAXCONN);
97       serverFd = Mux.add(skMain, Poller.POLLIN);
98 
99       Socket ctrlSock = skMain.accept();
100 
101       BufferedReader ctrlReader =
102         new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
103       String ctrlString = ctrlReader.readLine();
104       bytesToRead = Integer.valueOf(ctrlString).intValue();
105       ctrlString = ctrlReader.readLine();
106       totalConn = Integer.valueOf(ctrlString).intValue();
107 
108       System.out.println("Receiving " + bytesToRead + " bytes from " +
109                          totalConn + " client connections");
110 
111       timestart = System.currentTimeMillis();
112 
113       /*
114        * Start the consumer threads to read data.
115        */
116       for (int consumerThread = 0;
117            consumerThread < concurrency; consumerThread++ ) {
118         new Consumer(consumerThread).start();
119       }
120 
121       /*
122        * Take connections, read Data
123        */
124       int numEvents=0;
125 
126       while ( bytesRead < bytesToRead ) {
127 
128         int loopWaits=0;
129         while (eventsToProcess > 0) {
130           synchronized (eventSync) {
131             loopWaits++;
132             if (eventsToProcess <= 0) break;
133             try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
134           }
135         }
136         if (loopWaits > 1)
137           System.out.println("Done waiting...loops = " + loopWaits +
138                              " events " + numEvents +
139                              " bytes read : " + bytesRead );
140 
141         if (bytesRead >= bytesToRead) break; // may be done!
142 
143         /*
144          * Wait for events
145          */
146         numEvents = Mux.waitMultiple(100, fds, revents);
147         synchronized (eventSync) {
148           eventsToProcess = numEvents;
149         }
150         /*
151          * Process all the events we got from Mux.waitMultiple
152          */
153         int cnt = 0;
154         while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
155           int fd = fds[cnt];
156 
157           if (revents[cnt] == Poller.POLLIN) {
158             if (fd == serverFd) {
159               /*
160                * New connection coming in on the ServerSocket
161                * Add the socket to the Mux, keep track of mapping
162                * the fdval returned by Mux.add to the connection.
163                */
164               sockArr[connects] = skMain.accept();
165               instr[connects] = sockArr[connects].getInputStream();
166               int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
167               mapping[fdval] = connects;
168               synchronized(eventSync) {
169                 eventsToProcess--; // just processed this one!
170               }
171               connects++;
172             } else {
173               /*
174                * We've got data from this client connection.
175                * Put it on the queue for the consumer threads to process.
176                */
177               linkedQ.put(new Integer(fd));
178             }
179           } else {
180             System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
181           }
182           cnt++;
183         }
184       }
185       timestop = System.currentTimeMillis();
186       System.out.println("Time for all reads (" + totalConn +
187                          " sockets) : " + (timestop-timestart));
188 
189       // Tell the client it can now go away
190       byte[] buff = new byte[BYTESPEROP];
191       ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
192 
193       // Tell the cunsumer threads they can exit.
194       for (int cThread = 0; cThread < concurrency; cThread++ ) {
195         linkedQ.put(new Integer(-1));
196       }
197     } catch (Exception exc) { exc.printStackTrace(); }
198   }
199 
200   /*
201    * main ... just check if a concurrency was specified
202    */
main(String args[])203   public static void main (String args[])
204   {
205     int concurrency;
206 
207     if (args.length == 1)
208       concurrency = java.lang.Integer.valueOf(args[0]).intValue();
209     else
210       concurrency = Poller.getNumCPUs() + 1;
211     PollingServer server = new PollingServer(concurrency);
212   }
213 
214   /*
215    * This class is for handling the Client data.
216    * The PollingServer spawns off a number of these based upon
217    * the number of CPUs (or concurrency argument).
218    * Each just loops grabbing events off the queue and
219    * processing them.
220    */
221   class Consumer extends Thread {
222     private int threadNumber;
Consumer(int i)223     public Consumer(int i) { threadNumber = i; }
224 
run()225     public void run() {
226       byte[] buff = new byte[BYTESPEROP];
227       int bytes = 0;
228 
229       InputStream instream;
230       while (bytesRead < bytesToRead) {
231         try {
232           Integer Fd = (Integer) linkedQ.take();
233           int fd = Fd.intValue();
234           if (fd == -1) break; /* got told we could exit */
235 
236           /*
237            * We have to map the fd value returned from waitMultiple
238            * to the actual input stream associated with that fd.
239            * Take a look at how the Mux.add() was done to see how
240            * we stored that.
241            */
242           int map = mapping[fd];
243           instream = instr[map];
244           bytes = instream.read(buff,0,BYTESPEROP);
245         } catch (Exception e) { System.out.println(e.toString()); }
246 
247         if (bytes > 0) {
248           /*
249            * Any real server would do some synchronized and some
250            * unsynchronized work on behalf of the client, and
251            * most likely send some data back...but this is a
252            * gross oversimplification.
253            */
254           synchronized(eventSync) {
255             bytesRead += bytes;
256             eventsToProcess--;
257             if (eventsToProcess <= 0) {
258               eventSync.notify();
259             }
260           }
261         }
262       }
263     }
264   }
265 }
266