1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 import com.sun.net.httpserver.HttpExchange;
21 import com.sun.net.httpserver.HttpHandler;
22 import com.sun.net.httpserver.HttpServer;
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
25 
26 import java.io.BufferedReader;
27 import java.io.IOException;
28 import java.io.InputStreamReader;
29 import java.io.OutputStream;
30 import java.net.InetSocketAddress;
31 import java.util.LinkedList;
32 
33 public class IntegerBufferServer {
34   private static final Logger logger = LogManager.getLogger(IntegerBufferServer.class.getCanonicalName());
35   private static final Logger perfLogger = LogManager.getLogger("perf." + IntegerBufferServer.class.getCanonicalName());
36 
37   private static class FixedSizeBuffer {
38     private LinkedList<Integer> buffer;
39     private int capacity;
40     private int size;
41 
FixedSizeBuffer(int capacity)42     public FixedSizeBuffer(int capacity) {
43       this.buffer = new LinkedList<Integer>();
44       this.capacity = capacity;
45       this.size = 0;
46     }
47 
put(Integer i)48     public synchronized boolean put(Integer i) {
49       if (size >= capacity) {
50         return false;
51       }
52 
53       buffer.add(i);
54       size++;
55       return true;
56     }
57 
take()58     public synchronized Integer take() {
59       if (size <= 0) {
60         return null;
61       }
62 
63       size--;
64       return buffer.remove();
65     }
66   }
67 
68   private static class IntegerHandler implements HttpHandler {
69     private static final int HTTP_OK = 200;
70     private static final int HTTP_UNAVAILABLE = 503;
71 
put(HttpExchange httpExchange)72     private static void put(HttpExchange httpExchange) throws IOException {
73       logger.info("Entering put request method...");
74       BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpExchange.getRequestBody()));
75       String body = bufferedReader.readLine();
76       bufferedReader.close();
77       int num = Integer.parseInt(body);
78       if (fixedSizeBuffer.put(num)) {
79         httpExchange.sendResponseHeaders(HTTP_OK, 0);
80         logger.trace("Received: " + body);
81       }
82       else {
83         httpExchange.sendResponseHeaders(HTTP_UNAVAILABLE, 0);
84         logger.trace("Full queue");
85       }
86       logger.info("Exiting put request method...");
87     }
88 
take(HttpExchange httpExchange)89     private static void take(HttpExchange httpExchange) throws IOException {
90       logger.info("Entering take command method...");
91       Integer i = fixedSizeBuffer.take();
92       if (i == null) {
93         httpExchange.sendResponseHeaders(HTTP_UNAVAILABLE, 0);
94         logger.trace("Empty queue");
95       }
96       else {
97         String response = i.toString();
98         httpExchange.sendResponseHeaders(HTTP_OK, response.length());
99         OutputStream responseBody = httpExchange.getResponseBody();
100         responseBody.write(response.getBytes());
101         responseBody.flush();
102         responseBody.close();
103         logger.trace("Sent: " + response);
104       }
105       logger.info("Exiting take request method...");
106     }
107 
handle(HttpExchange httpExchange)108     public void handle(HttpExchange httpExchange) throws IOException {
109       logger.info("[ENTER] http request handler...");
110       long startTime = System.nanoTime();
111 
112       String requestMethod = httpExchange.getRequestMethod();
113       if (requestMethod.equalsIgnoreCase("POST")) {
114         put(httpExchange);
115       }
116       else if (requestMethod.equalsIgnoreCase("GET")) {
117         take(httpExchange);
118       }
119       else {
120         httpExchange.sendResponseHeaders(HTTP_OK, 0);
121       }
122 
123       float executionTime = (float)(System.nanoTime() - startTime) / 1000000;
124       perfLogger.info(String.format("%.3f", executionTime));
125       logger.info("[EXIT] http request handler...");
126     }
127   }
128 
129   private static FixedSizeBuffer fixedSizeBuffer;
130 
main(String[] args)131   public static void main(String[] args) throws IOException {
132     fixedSizeBuffer = new FixedSizeBuffer(Integer.parseInt(args[2]));
133 
134     InetSocketAddress socketAddress = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
135     HttpServer httpServer = HttpServer.create(socketAddress, 0);
136     httpServer.createContext("/", new IntegerHandler());
137     httpServer.setExecutor(null);
138     httpServer.start();
139   }
140 }
141