1 /*
2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.io.IOException;
29 import java.lang.System.Logger.Level;
30 import java.net.InetSocketAddress;
31 import java.nio.ByteBuffer;
32 import java.time.Instant;
33 import java.time.temporal.ChronoUnit;
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.Iterator;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.ListIterator;
41 import java.util.Objects;
42 import java.util.Optional;
43 import java.util.concurrent.Flow;
44 import java.util.stream.Collectors;
45 import jdk.internal.net.http.common.FlowTube;
46 import jdk.internal.net.http.common.Logger;
47 import jdk.internal.net.http.common.Utils;
48 
49 /**
50  * Http 1.1 connection pool.
51  */
52 final class ConnectionPool {
53 
54     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
55             "jdk.httpclient.keepalive.timeout", 1200); // seconds
56     static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
57             "jdk.httpclient.connectionPoolSize", 0); // unbounded
58     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
59 
60     // Pools of idle connections
61 
62     private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
63     private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
64     private final ExpiryList expiryList;
65     private final String dbgTag; // used for debug
66     boolean stopped;
67 
68     /**
69      * Entries in connection pool are keyed by destination address and/or
70      * proxy address:
71      * case 1: plain TCP not via proxy (destination only)
72      * case 2: plain TCP via proxy (proxy only)
73      * case 3: SSL not via proxy (destination only)
74      * case 4: SSL over tunnel (destination and proxy)
75      */
76     static class CacheKey {
77         final InetSocketAddress proxy;
78         final InetSocketAddress destination;
79 
CacheKey(InetSocketAddress destination, InetSocketAddress proxy)80         CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
81             this.proxy = proxy;
82             this.destination = destination;
83         }
84 
85         @Override
equals(Object obj)86         public boolean equals(Object obj) {
87             if (obj == null) {
88                 return false;
89             }
90             if (getClass() != obj.getClass()) {
91                 return false;
92             }
93             final CacheKey other = (CacheKey) obj;
94             if (!Objects.equals(this.proxy, other.proxy)) {
95                 return false;
96             }
97             if (!Objects.equals(this.destination, other.destination)) {
98                 return false;
99             }
100             return true;
101         }
102 
103         @Override
hashCode()104         public int hashCode() {
105             return Objects.hash(proxy, destination);
106         }
107     }
108 
ConnectionPool(long clientId)109     ConnectionPool(long clientId) {
110         this("ConnectionPool("+clientId+")");
111     }
112 
113     /**
114      * There should be one of these per HttpClient.
115      */
ConnectionPool(String tag)116     private ConnectionPool(String tag) {
117         dbgTag = tag;
118         plainPool = new HashMap<>();
119         sslPool = new HashMap<>();
120         expiryList = new ExpiryList();
121     }
122 
dbgString()123     final String dbgString() {
124         return dbgTag;
125     }
126 
start()127     synchronized void start() {
128         assert !stopped : "Already stopped";
129     }
130 
cacheKey(InetSocketAddress destination, InetSocketAddress proxy)131     static CacheKey cacheKey(InetSocketAddress destination,
132                              InetSocketAddress proxy)
133     {
134         return new CacheKey(destination, proxy);
135     }
136 
getConnection(boolean secure, InetSocketAddress addr, InetSocketAddress proxy)137     synchronized HttpConnection getConnection(boolean secure,
138                                               InetSocketAddress addr,
139                                               InetSocketAddress proxy) {
140         if (stopped) return null;
141         CacheKey key = new CacheKey(addr, proxy);
142         HttpConnection c = secure ? findConnection(key, sslPool)
143                                   : findConnection(key, plainPool);
144         //System.out.println ("getConnection returning: " + c);
145         assert c == null || c.isSecure() == secure;
146         return c;
147     }
148 
149     /**
150      * Returns the connection to the pool.
151      */
returnToPool(HttpConnection conn)152     void returnToPool(HttpConnection conn) {
153         returnToPool(conn, Instant.now(), KEEP_ALIVE);
154     }
155 
156     // Called also by whitebox tests
returnToPool(HttpConnection conn, Instant now, long keepAlive)157     void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
158 
159         assert (conn instanceof PlainHttpConnection) || conn.isSecure()
160             : "Attempting to return unsecure connection to SSL pool: "
161                 + conn.getClass();
162 
163         // Don't call registerCleanupTrigger while holding a lock,
164         // but register it before the connection is added to the pool,
165         // since we don't want to trigger the cleanup if the connection
166         // is not in the pool.
167         CleanupTrigger cleanup = registerCleanupTrigger(conn);
168 
169         // it's possible that cleanup may have been called.
170         HttpConnection toClose = null;
171         synchronized(this) {
172             if (cleanup.isDone()) {
173                 return;
174             } else if (stopped) {
175                 conn.close();
176                 return;
177             }
178             if (MAX_POOL_SIZE > 0 && expiryList.size() >= MAX_POOL_SIZE) {
179                 toClose = expiryList.removeOldest();
180                 if (toClose != null) removeFromPool(toClose);
181             }
182             if (conn instanceof PlainHttpConnection) {
183                 putConnection(conn, plainPool);
184             } else {
185                 assert conn.isSecure();
186                 putConnection(conn, sslPool);
187             }
188             expiryList.add(conn, now, keepAlive);
189         }
190         if (toClose != null) {
191             if (debug.on()) {
192                 debug.log("Maximum pool size reached: removing oldest connection %s",
193                           toClose.dbgString());
194             }
195             close(toClose);
196         }
197         //System.out.println("Return to pool: " + conn);
198     }
199 
registerCleanupTrigger(HttpConnection conn)200     private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
201         // Connect the connection flow to a pub/sub pair that will take the
202         // connection out of the pool and close it if anything happens
203         // while the connection is sitting in the pool.
204         CleanupTrigger cleanup = new CleanupTrigger(conn);
205         FlowTube flow = conn.getConnectionFlow();
206         if (debug.on()) debug.log("registering %s", cleanup);
207         flow.connectFlows(cleanup, cleanup);
208         return cleanup;
209     }
210 
211     private HttpConnection
findConnection(CacheKey key, HashMap<CacheKey,LinkedList<HttpConnection>> pool)212     findConnection(CacheKey key,
213                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
214         LinkedList<HttpConnection> l = pool.get(key);
215         if (l == null || l.isEmpty()) {
216             return null;
217         } else {
218             HttpConnection c = l.removeFirst();
219             expiryList.remove(c);
220             return c;
221         }
222     }
223 
224     /* called from cache cleaner only  */
225     private boolean
removeFromPool(HttpConnection c, HashMap<CacheKey,LinkedList<HttpConnection>> pool)226     removeFromPool(HttpConnection c,
227                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
228         //System.out.println("cacheCleaner removing: " + c);
229         assert Thread.holdsLock(this);
230         CacheKey k = c.cacheKey();
231         List<HttpConnection> l = pool.get(k);
232         if (l == null || l.isEmpty()) {
233             pool.remove(k);
234             return false;
235         }
236         return l.remove(c);
237     }
238 
239     private void
putConnection(HttpConnection c, HashMap<CacheKey,LinkedList<HttpConnection>> pool)240     putConnection(HttpConnection c,
241                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
242         CacheKey key = c.cacheKey();
243         LinkedList<HttpConnection> l = pool.get(key);
244         if (l == null) {
245             l = new LinkedList<>();
246             pool.put(key, l);
247         }
248         l.add(c);
249     }
250 
251     /**
252      * Purge expired connection and return the number of milliseconds
253      * in which the next connection is scheduled to expire.
254      * If no connections are scheduled to be purged return 0.
255      * @return the delay in milliseconds in which the next connection will
256      *         expire.
257      */
purgeExpiredConnectionsAndReturnNextDeadline()258     long purgeExpiredConnectionsAndReturnNextDeadline() {
259         if (!expiryList.purgeMaybeRequired()) return 0;
260         return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
261     }
262 
263     // Used for whitebox testing
purgeExpiredConnectionsAndReturnNextDeadline(Instant now)264     long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
265         long nextPurge = 0;
266 
267         // We may be in the process of adding new elements
268         // to the expiry list - but those elements will not
269         // have outlast their keep alive timer yet since we're
270         // just adding them.
271         if (!expiryList.purgeMaybeRequired()) return nextPurge;
272 
273         List<HttpConnection> closelist;
274         synchronized (this) {
275             closelist = expiryList.purgeUntil(now);
276             for (HttpConnection c : closelist) {
277                 if (c instanceof PlainHttpConnection) {
278                     boolean wasPresent = removeFromPool(c, plainPool);
279                     assert wasPresent;
280                 } else {
281                     boolean wasPresent = removeFromPool(c, sslPool);
282                     assert wasPresent;
283                 }
284             }
285             nextPurge = now.until(
286                     expiryList.nextExpiryDeadline().orElse(now),
287                     ChronoUnit.MILLIS);
288         }
289         closelist.forEach(this::close);
290         return nextPurge;
291     }
292 
close(HttpConnection c)293     private void close(HttpConnection c) {
294         try {
295             c.close();
296         } catch (Throwable e) {} // ignore
297     }
298 
stop()299     void stop() {
300         List<HttpConnection> closelist = Collections.emptyList();
301         try {
302             synchronized (this) {
303                 stopped = true;
304                 closelist = expiryList.stream()
305                     .map(e -> e.connection)
306                     .collect(Collectors.toList());
307                 expiryList.clear();
308                 plainPool.clear();
309                 sslPool.clear();
310             }
311         } finally {
312             closelist.forEach(this::close);
313         }
314     }
315 
316     static final class ExpiryEntry {
317         final HttpConnection connection;
318         final Instant expiry; // absolute time in seconds of expiry time
ExpiryEntry(HttpConnection connection, Instant expiry)319         ExpiryEntry(HttpConnection connection, Instant expiry) {
320             this.connection = connection;
321             this.expiry = expiry;
322         }
323     }
324 
325     /**
326      * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
327      * deadline is at the tail of the list, and the entry with the farther
328      * deadline is at the head. In the most common situation, new elements
329      * will need to be added at the head (or close to it), and expired elements
330      * will need to be purged from the tail.
331      */
332     private static final class ExpiryList {
333         private final LinkedList<ExpiryEntry> list = new LinkedList<>();
334         private volatile boolean mayContainEntries;
335 
size()336         int size() { return list.size(); }
337 
338         // A loosely accurate boolean whose value is computed
339         // at the end of each operation performed on ExpiryList;
340         // Does not require synchronizing on the ConnectionPool.
purgeMaybeRequired()341         boolean purgeMaybeRequired() {
342             return mayContainEntries;
343         }
344 
345         // Returns the next expiry deadline
346         // should only be called while holding a synchronization
347         // lock on the ConnectionPool
nextExpiryDeadline()348         Optional<Instant> nextExpiryDeadline() {
349             if (list.isEmpty()) return Optional.empty();
350             else return Optional.of(list.getLast().expiry);
351         }
352 
353         // should only be called while holding a synchronization
354         // lock on the ConnectionPool
removeOldest()355         HttpConnection removeOldest() {
356             ExpiryEntry entry = list.pollLast();
357             return entry == null ? null : entry.connection;
358         }
359 
360         // should only be called while holding a synchronization
361         // lock on the ConnectionPool
add(HttpConnection conn)362         void add(HttpConnection conn) {
363             add(conn, Instant.now(), KEEP_ALIVE);
364         }
365 
366         // Used by whitebox test.
add(HttpConnection conn, Instant now, long keepAlive)367         void add(HttpConnection conn, Instant now, long keepAlive) {
368             Instant then = now.truncatedTo(ChronoUnit.SECONDS)
369                     .plus(keepAlive, ChronoUnit.SECONDS);
370 
371             // Elements with the farther deadline are at the head of
372             // the list. It's more likely that the new element will
373             // have the farthest deadline, and will need to be inserted
374             // at the head of the list, so we're using an ascending
375             // list iterator to find the right insertion point.
376             ListIterator<ExpiryEntry> li = list.listIterator();
377             while (li.hasNext()) {
378                 ExpiryEntry entry = li.next();
379 
380                 if (then.isAfter(entry.expiry)) {
381                     li.previous();
382                     // insert here
383                     li.add(new ExpiryEntry(conn, then));
384                     mayContainEntries = true;
385                     return;
386                 }
387             }
388             // last (or first) element of list (the last element is
389             // the first when the list is empty)
390             list.add(new ExpiryEntry(conn, then));
391             mayContainEntries = true;
392         }
393 
394         // should only be called while holding a synchronization
395         // lock on the ConnectionPool
remove(HttpConnection c)396         void remove(HttpConnection c) {
397             if (c == null || list.isEmpty()) return;
398             ListIterator<ExpiryEntry> li = list.listIterator();
399             while (li.hasNext()) {
400                 ExpiryEntry e = li.next();
401                 if (e.connection.equals(c)) {
402                     li.remove();
403                     mayContainEntries = !list.isEmpty();
404                     return;
405                 }
406             }
407         }
408 
409         // should only be called while holding a synchronization
410         // lock on the ConnectionPool.
411         // Purge all elements whose deadline is before now (now included).
purgeUntil(Instant now)412         List<HttpConnection> purgeUntil(Instant now) {
413             if (list.isEmpty()) return Collections.emptyList();
414 
415             List<HttpConnection> closelist = new ArrayList<>();
416 
417             // elements with the closest deadlines are at the tail
418             // of the queue, so we're going to use a descending iterator
419             // to remove them, and stop when we find the first element
420             // that has not expired yet.
421             Iterator<ExpiryEntry> li = list.descendingIterator();
422             while (li.hasNext()) {
423                 ExpiryEntry entry = li.next();
424                 // use !isAfter instead of isBefore in order to
425                 // remove the entry if its expiry == now
426                 if (!entry.expiry.isAfter(now)) {
427                     li.remove();
428                     HttpConnection c = entry.connection;
429                     closelist.add(c);
430                 } else break; // the list is sorted
431             }
432             mayContainEntries = !list.isEmpty();
433             return closelist;
434         }
435 
436         // should only be called while holding a synchronization
437         // lock on the ConnectionPool
stream()438         java.util.stream.Stream<ExpiryEntry> stream() {
439             return list.stream();
440         }
441 
442         // should only be called while holding a synchronization
443         // lock on the ConnectionPool
clear()444         void clear() {
445             list.clear();
446             mayContainEntries = false;
447         }
448     }
449 
450     // Remove a connection from the pool.
451     // should only be called while holding a synchronization
452     // lock on the ConnectionPool
removeFromPool(HttpConnection c)453     private void removeFromPool(HttpConnection c) {
454         assert Thread.holdsLock(this);
455         if (c instanceof PlainHttpConnection) {
456             removeFromPool(c, plainPool);
457         } else {
458             assert c.isSecure() : "connection " + c + " is not secure!";
459             removeFromPool(c, sslPool);
460         }
461     }
462 
463     // Used by tests
contains(HttpConnection c)464     synchronized boolean contains(HttpConnection c) {
465         final CacheKey key = c.cacheKey();
466         List<HttpConnection> list;
467         if ((list = plainPool.get(key)) != null) {
468             if (list.contains(c)) return true;
469         }
470         if ((list = sslPool.get(key)) != null) {
471             if (list.contains(c)) return true;
472         }
473         return false;
474     }
475 
cleanup(HttpConnection c, Throwable error)476     void cleanup(HttpConnection c, Throwable error) {
477         if (debug.on())
478             debug.log("%s : ConnectionPool.cleanup(%s)",
479                     String.valueOf(c.getConnectionFlow()), error);
480         synchronized(this) {
481             removeFromPool(c);
482             expiryList.remove(c);
483         }
484         c.close();
485     }
486 
487     /**
488      * An object that subscribes to the flow while the connection is in
489      * the pool. Anything that comes in will cause the connection to be closed
490      * and removed from the pool.
491      */
492     private final class CleanupTrigger implements
493             FlowTube.TubeSubscriber, FlowTube.TubePublisher,
494             Flow.Subscription {
495 
496         private final HttpConnection connection;
497         private volatile boolean done;
498 
CleanupTrigger(HttpConnection connection)499         public CleanupTrigger(HttpConnection connection) {
500             this.connection = connection;
501         }
502 
isDone()503         public boolean isDone() { return done;}
504 
triggerCleanup(Throwable error)505         private void triggerCleanup(Throwable error) {
506             done = true;
507             cleanup(connection, error);
508         }
509 
request(long n)510         @Override public void request(long n) {}
cancel()511         @Override public void cancel() {}
512 
513         @Override
onSubscribe(Flow.Subscription subscription)514         public void onSubscribe(Flow.Subscription subscription) {
515             subscription.request(1);
516         }
517         @Override
onError(Throwable error)518         public void onError(Throwable error) { triggerCleanup(error); }
519         @Override
onComplete()520         public void onComplete() { triggerCleanup(null); }
521         @Override
onNext(List<ByteBuffer> item)522         public void onNext(List<ByteBuffer> item) {
523             triggerCleanup(new IOException("Data received while in pool"));
524         }
525 
526         @Override
subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber)527         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
528             subscriber.onSubscribe(this);
529         }
530 
531         @Override
toString()532         public String toString() {
533             return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
534         }
535     }
536 }
537