SoFunction
Updated on 2025-03-08

Detailed explanation of the CPool definition method of httpclient

sequence

This article mainly studies the CPool of httpclient

ConnPool

org/apache/http/pool/

public interface ConnPool<T, E> {
    /**
     * Attempts to lease a connection for the given route and with the given
     * state from the pool.
     *
     * @param route route of the connection.
     * @param state arbitrary object that represents a particular state
     *  (usually a security principal or a unique token identifying
     *  the user whose credentials have been used while establishing the connection).
     *  May be {@code null}.
     * @param callback operation completion callback.
     *
     * @return future for a leased pool entry.
     */
    Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);
    /**
     * Releases the pool entry back to the pool.
     *
     * @param entry pool entry leased from the pool
     * @param reusable flag indicating whether or not the released connection
     *   is in a consistent state and is safe for further use.
     */
    void release(E entry, boolean reusable);
}
ConnPool defines the lease and release methods, which define two generics, T represents route, and E represents poolEntry

ConnPoolControl

public interface ConnPoolControl<T> {
    void setMaxTotal(int max);
    int getMaxTotal();
    void setDefaultMaxPerRoute(int max);
    int getDefaultMaxPerRoute();
    void setMaxPerRoute(final T route, int max);
    int getMaxPerRoute(final T route);
    PoolStats getTotalStats();
    PoolStats getStats(final T route);
}
The ConnPoolControl interface defines the methods to set and access maxTotal, defaultMaxPerRoute and PoolStats.

AbstractConnPool

org/apache/http/pool/

@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                                               implements ConnPool<T, E>, ConnPoolControl<T> {
    private final Lock lock;
    private final Condition condition;
    private final ConnFactory<T, C> connFactory;
    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
    private final Set<E> leased;
    private final LinkedList<E> available;
    private final LinkedList<Future<E>> pending;
    private final Map<T, Integer> maxPerRoute;
    private volatile boolean isShutDown;
    private volatile int defaultMaxPerRoute;
    private volatile int maxTotal;
    private volatile int validateAfterInactivity;
    public AbstractConnPool(
            final ConnFactory<T, C> connFactory,
            final int defaultMaxPerRoute,
            final int maxTotal) {
        super();
         = (connFactory, "Connection factory");
         = (defaultMaxPerRoute, "Max per route value");
         = (maxTotal, "Max total value");
         = new ReentrantLock();
         = ();
         = new HashMap<T, RouteSpecificPool<T, C, E>>();
         = new HashSet<E>();
         = new LinkedList<E>();
         = new LinkedList<Future<E>>();
         = new HashMap<T, Integer>();
    }
    /**
     * Creates a new entry for the given connection with the given route.
     */
    protected abstract E createEntry(T route, C conn);
    //......
 }
AbstractConnPool declares the implementation of the ConnPool and ConnPoolControl interfaces. It defines that E must inherit PoolEntry and defines generic C, indicating connectionType

shutdown

public void shutdown() throws IOException {
        if () {
            return ;
        }
         = true;
        ();
        try {
            for (final E entry: ) {
                ();
            }
            for (final E entry: ) {
                ();
            }
            for (final RouteSpecificPool<T, C, E> pool: ()) {
                ();
            }
            ();
            ();
            ();
        } finally {
            ();
        }
    }
The shutdown method will traverse available, and execute close one by one, and then traverse routeToPool to execute shutdown one by one.

Lease method

public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        (route, "Route");
        (!, "Connection pool shut down");
        return new Future<E>() {
            private final AtomicBoolean cancelled = new AtomicBoolean(false);
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
            @Override
            public boolean cancel(final boolean mayInterruptIfRunning) {
                if ((false, true)) {
                    (true);
                    ();
                    try {
                        ();
                    } finally {
                        ();
                    }
                    if (callback != null) {
                        ();
                    }
                    return true;
                }
                return false;
            }
            @Override
            public boolean isCancelled() {
                return ();
            }
            @Override
            public boolean isDone() {
                return ();
            }
            @Override
            public E get() throws InterruptedException, ExecutionException {
                try {
                    return get(0L, );
                } catch (final TimeoutException ex) {
                    throw new ExecutionException(ex);
                }
            }
            @Override
            public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                for (;;) {
                    synchronized (this) {
                        try {
                            final E entry = ();
                            if (entry != null) {
                                return entry;
                            }
                            if (()) {
                                throw new ExecutionException(operationAborted());
                            }
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                            if (validateAfterInactivity > 0)  {
                                if (() + validateAfterInactivity <= ()) {
                                    if (!validate(leasedEntry)) {
                                        ();
                                        release(leasedEntry, false);
                                        continue;
                                    }
                                }
                            }
                            if ((false, true)) {
                                (leasedEntry);
                                (true);
                                onLease(leasedEntry);
                                if (callback != null) {
                                    (leasedEntry);
                                }
                                return leasedEntry;
                            } else {
                                release(leasedEntry, true);
                                throw new ExecutionException(operationAborted());
                            }
                        } catch (final IOException ex) {
                            if ((false, true)) {
                                if (callback != null) {
                                    (ex);
                                }
                            }
                            throw new ExecutionException(ex);
                        }
                    }
                }
            }
        };
    }
The lease method is mainly get and cancel, where the get method mainly executes getPoolEntryBlocking. For validateAfterInactivity greater than 0, determine whether validate is needed. If validate is required and validate fails, execute() and release methods.

getPoolEntryBlocking

org/apache/http/pool/

private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit timeUnit,
            final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (() + (timeout));
        }
        ();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                (!, "Connection pool shut down");
                if (()) {
                    throw new ExecutionException(operationAborted());
                }
                for (;;) {
                    entry = (state);
                    if (entry == null) {
                        break;
                    }
                    if ((())) {
                        ();
                    }
                    if (()) {
                        (entry);
                        (entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    (entry);
                    (entry);
                    onReuse(entry);
                    return entry;
                }
                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = (0, () + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = ();
                        if (lastUsed == null) {
                            break;
                        }
                        ();
                        (lastUsed);
                        (lastUsed);
                    }
                }
                if (() < maxPerRoute) {
                    final int totalUsed = ();
                    final int freeCapacity = ( - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = ();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!()) {
                                final E lastUsed = ();
                                ();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(());
                                (lastUsed);
                            }
                        }
                        final C conn = (route);
                        entry = (conn);
                        (entry);
                        return entry;
                    }
                }
                boolean success = false;
                try {
                    (future);
                    (future);
                    if (deadline != null) {
                        success = (deadline);
                    } else {
                        ();
                        success = true;
                    }
                    if (()) {
                        throw new ExecutionException(operationAborted());
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    (future);
                    (future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && () <= ())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            ();
        }
    }
getPoolEntryBlocking First, take out the corresponding RouteSpecificPool from routeToPool based on route, and then (state), then determine whether it expires or is closed. If there is no problem, remove it from available, add it to spared, and then execute onReuse callback. If entry is null, create it through (route)

release

@Override
    public void release(final E entry, final boolean reusable) {
        ();
        try {
            if ((entry)) {
                final RouteSpecificPool<T, C, E> pool = getPool(());
                (entry, reusable);
                if (reusable && !) {
                    (entry);
                } else {
                    ();
                }
                onRelease(entry);
                Future<E> future = ();
                if (future != null) {
                    (future);
                } else {
                    future = ();
                }
                if (future != null) {
                    ();
                }
            }
        } finally {
            ();
        }
    }
The release method first gets the RouteSpecificPool, and then executes (entry, reusable)

CPool

org/apache/http/impl/conn/

@Contract(threading = )
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {
    private static final AtomicLong COUNTER = new AtomicLong();
    private final Log log = ();
    private final long timeToLive;
    private final TimeUnit timeUnit;
    public CPool(
            final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
            final int defaultMaxPerRoute, final int maxTotal,
            final long timeToLive, final TimeUnit timeUnit) {
        super(connFactory, defaultMaxPerRoute, maxTotal);
         = timeToLive;
         = timeUnit;
    }
    @Override
    protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {
        final String id = (());
        return new CPoolEntry(, id, route, conn, , );
    }
    @Override
    protected boolean validate(final CPoolEntry entry) {
        return !().isStale();
    }
    @Override
    protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
        (callback);
    }
    @Override
    protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
        (callback);
    }
}
CPool inherits AbstractConnPool, its T is HttpRoute, C is ManagedHttpClientConnection, and E is CPoolEntry; its createEntry method creates CPoolEntry, and validate determines whether the connect is stale

summary

ConnPool defines the lease and release methods, which define two generics, T represents route and E represents poolEntry;

AbstractConnPool declares the implementation of the ConnPool and ConnPoolControl interfaces. It defines that E must inherit PoolEntry, and at the same time defines generic C, representing connectionType; CPool inherits AbstractConnPool, its T is HttpRoute, C is ManagedHttpClientConnection, and E is CPoolEntry.

The lease method of AbstractConnPool is mainly get and cancel. The get method mainly executes getPoolEntryBlocking. For validateAfterInactivity greater than 0, determine whether validate is needed based on whether validate is needed. If it is necessary and validate fails, execute() and release methods; the release method first obtains RouteSpecificPool, and then executes (entry, reusable)

The above is the detailed explanation of the CPool definition method of httpclient. For more information about the definition of httpclient CPool method, please pay attention to my other related articles!