1 /* 2 * Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * - Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * - Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * - Neither the name of Oracle nor the names of its 16 * contributors may be used to endorse or promote products derived 17 * from this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR 23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32 /* 33 * This source code is provided to illustrate the usage of a given feature 34 * or technique and has been deliberately simplified. Additional steps 35 * required for a production-quality application, such as security checks, 36 * input validation and proper error handling, might not be present in 37 * this sample code. 38 */ 39 40 41 import java.io.*; 42 import java.net.*; 43 import java.lang.Byte; 44 45 /** 46 * Simple Java "server" using the Poller class 47 * to multiplex on incoming connections. Note 48 * that handoff of events, via linked Q is not 49 * actually be a performance booster here, since 50 * the processing of events is cheaper than 51 * the overhead in scheduling/executing them. 52 * Although this demo does allow for concurrency 53 * in handling connections, it uses a rather 54 * primitive "gang scheduling" policy to keep 55 * the code simpler. 56 */ 57 58 public class PollingServer 59 { 60 public final static int MAXCONN = 10000; 61 public final static int PORTNUM = 4444; 62 public final static int BYTESPEROP = 10; 63 64 /** 65 * This synchronization object protects access to certain 66 * data (bytesRead,eventsToProcess) by concurrent Consumer threads. 67 */ 68 private final static Object eventSync = new Object(); 69 70 private static InputStream[] instr = new InputStream[MAXCONN]; 71 private static int[] mapping = new int[65535]; 72 private static LinkedQueue linkedQ = new LinkedQueue(); 73 private static int bytesRead = 0; 74 private static int bytesToRead; 75 private static int eventsToProcess=0; 76 PollingServer(int concurrency)77 public PollingServer(int concurrency) { 78 Socket[] sockArr = new Socket[MAXCONN]; 79 long timestart, timestop; 80 short[] revents = new short[MAXCONN]; 81 int[] fds = new int[MAXCONN]; 82 int bytes; 83 Poller Mux; 84 int serverFd; 85 int totalConn=0; 86 int connects=0; 87 88 System.out.println ("Serv: Initializing port " + PORTNUM); 89 try { 90 91 ServerSocket skMain = new ServerSocket (PORTNUM); 92 /* 93 * Create the Poller object Mux, allow for up to MAXCONN 94 * sockets/filedescriptors to be polled. 95 */ 96 Mux = new Poller(MAXCONN); 97 serverFd = Mux.add(skMain, Poller.POLLIN); 98 99 Socket ctrlSock = skMain.accept(); 100 101 BufferedReader ctrlReader = 102 new BufferedReader(new InputStreamReader(ctrlSock.getInputStream())); 103 String ctrlString = ctrlReader.readLine(); 104 bytesToRead = Integer.valueOf(ctrlString).intValue(); 105 ctrlString = ctrlReader.readLine(); 106 totalConn = Integer.valueOf(ctrlString).intValue(); 107 108 System.out.println("Receiving " + bytesToRead + " bytes from " + 109 totalConn + " client connections"); 110 111 timestart = System.currentTimeMillis(); 112 113 /* 114 * Start the consumer threads to read data. 115 */ 116 for (int consumerThread = 0; 117 consumerThread < concurrency; consumerThread++ ) { 118 new Consumer(consumerThread).start(); 119 } 120 121 /* 122 * Take connections, read Data 123 */ 124 int numEvents=0; 125 126 while ( bytesRead < bytesToRead ) { 127 128 int loopWaits=0; 129 while (eventsToProcess > 0) { 130 synchronized (eventSync) { 131 loopWaits++; 132 if (eventsToProcess <= 0) break; 133 try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();}; 134 } 135 } 136 if (loopWaits > 1) 137 System.out.println("Done waiting...loops = " + loopWaits + 138 " events " + numEvents + 139 " bytes read : " + bytesRead ); 140 141 if (bytesRead >= bytesToRead) break; // may be done! 142 143 /* 144 * Wait for events 145 */ 146 numEvents = Mux.waitMultiple(100, fds, revents); 147 synchronized (eventSync) { 148 eventsToProcess = numEvents; 149 } 150 /* 151 * Process all the events we got from Mux.waitMultiple 152 */ 153 int cnt = 0; 154 while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) { 155 int fd = fds[cnt]; 156 157 if (revents[cnt] == Poller.POLLIN) { 158 if (fd == serverFd) { 159 /* 160 * New connection coming in on the ServerSocket 161 * Add the socket to the Mux, keep track of mapping 162 * the fdval returned by Mux.add to the connection. 163 */ 164 sockArr[connects] = skMain.accept(); 165 instr[connects] = sockArr[connects].getInputStream(); 166 int fdval = Mux.add(sockArr[connects], Poller.POLLIN); 167 mapping[fdval] = connects; 168 synchronized(eventSync) { 169 eventsToProcess--; // just processed this one! 170 } 171 connects++; 172 } else { 173 /* 174 * We've got data from this client connection. 175 * Put it on the queue for the consumer threads to process. 176 */ 177 linkedQ.put(new Integer(fd)); 178 } 179 } else { 180 System.out.println("Got revents[" + cnt + "] == " + revents[cnt]); 181 } 182 cnt++; 183 } 184 } 185 timestop = System.currentTimeMillis(); 186 System.out.println("Time for all reads (" + totalConn + 187 " sockets) : " + (timestop-timestart)); 188 189 // Tell the client it can now go away 190 byte[] buff = new byte[BYTESPEROP]; 191 ctrlSock.getOutputStream().write(buff,0,BYTESPEROP); 192 193 // Tell the cunsumer threads they can exit. 194 for (int cThread = 0; cThread < concurrency; cThread++ ) { 195 linkedQ.put(new Integer(-1)); 196 } 197 } catch (Exception exc) { exc.printStackTrace(); } 198 } 199 200 /* 201 * main ... just check if a concurrency was specified 202 */ main(String args[])203 public static void main (String args[]) 204 { 205 int concurrency; 206 207 if (args.length == 1) 208 concurrency = java.lang.Integer.valueOf(args[0]).intValue(); 209 else 210 concurrency = Poller.getNumCPUs() + 1; 211 PollingServer server = new PollingServer(concurrency); 212 } 213 214 /* 215 * This class is for handling the Client data. 216 * The PollingServer spawns off a number of these based upon 217 * the number of CPUs (or concurrency argument). 218 * Each just loops grabbing events off the queue and 219 * processing them. 220 */ 221 class Consumer extends Thread { 222 private int threadNumber; Consumer(int i)223 public Consumer(int i) { threadNumber = i; } 224 run()225 public void run() { 226 byte[] buff = new byte[BYTESPEROP]; 227 int bytes = 0; 228 229 InputStream instream; 230 while (bytesRead < bytesToRead) { 231 try { 232 Integer Fd = (Integer) linkedQ.take(); 233 int fd = Fd.intValue(); 234 if (fd == -1) break; /* got told we could exit */ 235 236 /* 237 * We have to map the fd value returned from waitMultiple 238 * to the actual input stream associated with that fd. 239 * Take a look at how the Mux.add() was done to see how 240 * we stored that. 241 */ 242 int map = mapping[fd]; 243 instream = instr[map]; 244 bytes = instream.read(buff,0,BYTESPEROP); 245 } catch (Exception e) { System.out.println(e.toString()); } 246 247 if (bytes > 0) { 248 /* 249 * Any real server would do some synchronized and some 250 * unsynchronized work on behalf of the client, and 251 * most likely send some data back...but this is a 252 * gross oversimplification. 253 */ 254 synchronized(eventSync) { 255 bytesRead += bytes; 256 eventsToProcess--; 257 if (eventsToProcess <= 0) { 258 eventSync.notify(); 259 } 260 } 261 } 262 } 263 } 264 } 265 } 266