/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * . * */ package ch.boye.httpclientandroidlib.pool; import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import ch.boye.httpclientandroidlib.annotation.ThreadSafe; import ch.boye.httpclientandroidlib.concurrent.FutureCallback; import ch.boye.httpclientandroidlib.util.Args; import ch.boye.httpclientandroidlib.util.Asserts; /** * Abstract synchronous (blocking) pool of connections. *

* Please note that this class does not maintain its own pool of execution {@link Thread}s. * Therefore, one must call {@link Future#get()} or {@link Future#get(long, TimeUnit)} * method on the {@link Future} object returned by the * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation * to complete. * * @param the route type that represents the opposite endpoint of a pooled * connection. * @param the connection type. * @param the type of the pool entry containing a pooled connection. * @since 4.2 */ @ThreadSafe public abstract class AbstractConnPool> implements ConnPool, ConnPoolControl { private final Lock lock; private final ConnFactory connFactory; private final Map> routeToPool; private final Set leased; private final LinkedList available; private final LinkedList> pending; private final Map maxPerRoute; private volatile boolean isShutDown; private volatile int defaultMaxPerRoute; private volatile int maxTotal; public AbstractConnPool( final ConnFactory connFactory, final int defaultMaxPerRoute, final int maxTotal) { super(); this.connFactory = Args.notNull(connFactory, "Connection factory"); this.defaultMaxPerRoute = Args.notNegative(defaultMaxPerRoute, "Max per route value"); this.maxTotal = Args.notNegative(maxTotal, "Max total value"); this.lock = new ReentrantLock(); this.routeToPool = new HashMap>(); this.leased = new HashSet(); this.available = new LinkedList(); this.pending = new LinkedList>(); this.maxPerRoute = new HashMap(); } /** * Creates a new entry for the given connection with the given route. */ protected abstract E createEntry(T route, C conn); /** * @since 4.3 */ protected void onLease(final E entry) { } /** * @since 4.3 */ protected void onRelease(final E entry) { } public boolean isShutdown() { return this.isShutDown; } /** * Shuts down the pool. */ public void shutdown() throws IOException { if (this.isShutDown) { return ; } this.isShutDown = true; this.lock.lock(); try { for (final E entry: this.available) { entry.close(); } for (final E entry: this.leased) { entry.close(); } for (final RouteSpecificPool pool: this.routeToPool.values()) { pool.shutdown(); } this.routeToPool.clear(); this.leased.clear(); this.available.clear(); } finally { this.lock.unlock(); } } private RouteSpecificPool getPool(final T route) { RouteSpecificPool pool = this.routeToPool.get(route); if (pool == null) { pool = new RouteSpecificPool(route) { @Override protected E createEntry(final C conn) { return AbstractConnPool.this.createEntry(route, conn); } }; this.routeToPool.put(route, pool); } return pool; } /** * {@inheritDoc} *

* Please note that this class does not maintain its own pool of execution * {@link Thread}s. Therefore, one must call {@link Future#get()} * or {@link Future#get(long, TimeUnit)} method on the {@link Future} * returned by this method in order for the lease operation to complete. */ public Future lease(final T route, final Object state, final FutureCallback callback) { Args.notNull(route, "Route"); Asserts.check(!this.isShutDown, "Connection pool shut down"); return new PoolEntryFuture(this.lock, callback) { @Override public E getPoolEntry( final long timeout, final TimeUnit tunit) throws InterruptedException, TimeoutException, IOException { final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this); onLease(entry); return entry; } }; } /** * Attempts to lease a connection for the given route and with the given * state from the pool. *

* Please note that this class does not maintain its own pool of execution * {@link Thread}s. Therefore, one must call {@link Future#get()} * or {@link Future#get(long, TimeUnit)} method on the {@link Future} * returned by this method in order for the lease operation to complete. * * @param route route of the connection. * @param state arbitrary object that represents a particular state * (usually a security principal or a unique token identifying * the user whose credentials have been used while establishing the connection). * May be null. * @return future for a leased pool entry. */ public Future lease(final T route, final Object state) { return lease(route, state, null); } private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit tunit, final PoolEntryFuture future) throws IOException, InterruptedException, TimeoutException { Date deadline = null; if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout)); } this.lock.lock(); try { final RouteSpecificPool pool = getPool(route); E entry = null; while (entry == null) { Asserts.check(!this.isShutDown, "Connection pool shut down"); for (;;) { entry = pool.getFree(state); if (entry == null) { break; } if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) { entry.close(); this.available.remove(entry); pool.free(entry, false); } else { break; } } if (entry != null) { this.available.remove(entry); this.leased.add(entry); return entry; } // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } boolean success = false; try { pool.queue(future); this.pending.add(future); success = future.await(deadline); } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout if (!success && (deadline != null) && (deadline.getTime() <= System.currentTimeMillis())) { break; } } throw new TimeoutException("Timeout waiting for connection"); } finally { this.lock.unlock(); } } public void release(final E entry, final boolean reusable) { this.lock.lock(); try { if (this.leased.remove(entry)) { final RouteSpecificPool pool = getPool(entry.getRoute()); pool.free(entry, reusable); if (reusable && !this.isShutDown) { this.available.addFirst(entry); onRelease(entry); } else { entry.close(); } PoolEntryFuture future = pool.nextPending(); if (future != null) { this.pending.remove(future); } else { future = this.pending.poll(); } if (future != null) { future.wakeup(); } } } finally { this.lock.unlock(); } } private int getMax(final T route) { final Integer v = this.maxPerRoute.get(route); if (v != null) { return v.intValue(); } else { return this.defaultMaxPerRoute; } } public void setMaxTotal(final int max) { Args.notNegative(max, "Max value"); this.lock.lock(); try { this.maxTotal = max; } finally { this.lock.unlock(); } } public int getMaxTotal() { this.lock.lock(); try { return this.maxTotal; } finally { this.lock.unlock(); } } public void setDefaultMaxPerRoute(final int max) { Args.notNegative(max, "Max per route value"); this.lock.lock(); try { this.defaultMaxPerRoute = max; } finally { this.lock.unlock(); } } public int getDefaultMaxPerRoute() { this.lock.lock(); try { return this.defaultMaxPerRoute; } finally { this.lock.unlock(); } } public void setMaxPerRoute(final T route, final int max) { Args.notNull(route, "Route"); Args.notNegative(max, "Max per route value"); this.lock.lock(); try { this.maxPerRoute.put(route, Integer.valueOf(max)); } finally { this.lock.unlock(); } } public int getMaxPerRoute(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { return getMax(route); } finally { this.lock.unlock(); } } public PoolStats getTotalStats() { this.lock.lock(); try { return new PoolStats( this.leased.size(), this.pending.size(), this.available.size(), this.maxTotal); } finally { this.lock.unlock(); } } public PoolStats getStats(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { final RouteSpecificPool pool = getPool(route); return new PoolStats( pool.getLeasedCount(), pool.getPendingCount(), pool.getAvailableCount(), getMax(route)); } finally { this.lock.unlock(); } } /** * Enumerates all available connections. * * @since 4.3 */ protected void enumAvailable(final PoolEntryCallback callback) { this.lock.lock(); try { final Iterator it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); callback.process(entry); if (entry.isClosed()) { final RouteSpecificPool pool = getPool(entry.getRoute()); pool.remove(entry); it.remove(); } } purgePoolMap(); } finally { this.lock.unlock(); } } /** * Enumerates all leased connections. * * @since 4.3 */ protected void enumLeased(final PoolEntryCallback callback) { this.lock.lock(); try { final Iterator it = this.leased.iterator(); while (it.hasNext()) { final E entry = it.next(); callback.process(entry); } } finally { this.lock.unlock(); } } private void purgePoolMap() { final Iterator>> it = this.routeToPool.entrySet().iterator(); while (it.hasNext()) { final Map.Entry> entry = it.next(); final RouteSpecificPool pool = entry.getValue(); if (pool.getPendingCount() + pool.getAllocatedCount() == 0) { it.remove(); } } } /** * Closes connections that have been idle longer than the given period * of time and evicts them from the pool. * * @param idletime maximum idle time. * @param tunit time unit. */ public void closeIdle(final long idletime, final TimeUnit tunit) { Args.notNull(tunit, "Time unit"); long time = tunit.toMillis(idletime); if (time < 0) { time = 0; } final long deadline = System.currentTimeMillis() - time; enumAvailable(new PoolEntryCallback() { public void process(final PoolEntry entry) { if (entry.getUpdated() <= deadline) { entry.close(); } } }); } /** * Closes expired connections and evicts them from the pool. */ public void closeExpired() { final long now = System.currentTimeMillis(); enumAvailable(new PoolEntryCallback() { public void process(final PoolEntry entry) { if (entry.isExpired(now)) { entry.close(); } } }); } @Override public String toString() { final StringBuilder buffer = new StringBuilder(); buffer.append("[leased: "); buffer.append(this.leased); buffer.append("][available: "); buffer.append(this.available); buffer.append("][pending: "); buffer.append(this.pending); buffer.append("]"); return buffer.toString(); } }