From c48f2af60ed8e19c8a15c3733f6367ac86ecae53 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Tue, 3 Mar 2026 06:36:37 +0100 Subject: [PATCH] HTTPCLIENT-2416 - Fix pool entry leak on late lease completion --- .../sync/TestConnectionManagement.java | 101 +++++++++++++++ .../PoolingHttpClientConnectionManager.java | 54 +++++++- ...estPoolingHttpClientConnectionManager.java | 121 +++++++++++++----- 3 files changed, 241 insertions(+), 35 deletions(-) diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java index d58e3595d9..fe439d677b 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java @@ -29,7 +29,13 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.apache.hc.client5.http.HttpRoute; @@ -63,6 +69,7 @@ import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; +import org.apache.hc.core5.pool.PoolStats; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; import org.junit.jupiter.api.AfterEach; @@ -391,4 +398,98 @@ void testConnectionTimeoutSetting() throws Exception { connManager.close(); } + @Test + void testConnectionRequestTimeout() throws Exception { + configureServer(bootstrap -> bootstrap + .register("/random/*", new RandomHandler())); + final HttpHost target = startServer(); + + connManager.setMaxTotal(1); + + final HttpRoute route = new HttpRoute(target, null, false); + final Timeout connRequestTimeout = Timeout.ofMicroseconds(1); + + final int concurrentThreads = 10; + final CountDownLatch countDownLatch = new CountDownLatch(concurrentThreads); + final AtomicLong n = new AtomicLong(concurrentThreads * 100); + + final ExecutorService executorService = Executors.newFixedThreadPool(concurrentThreads); + for (int i = 0; i < concurrentThreads; i++) { + executorService.execute(() -> { + try { + while (n.decrementAndGet() > 0) { + try { + final LeaseRequest request = connManager.lease("id1", route, connRequestTimeout, null); + final ConnectionEndpoint connectionEndpoint = request.get(connRequestTimeout); + connManager.release(connectionEndpoint, null, null); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + Assertions.fail("Unexpected exception", ex); + } catch (final TimeoutException | ExecutionException ignored) { + } + } + } finally { + countDownLatch.countDown(); + } + }); + } + + Assertions.assertTrue(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + Assertions.assertTrue(n.get() <= 0); + + final PoolStats stats = connManager.getStats(route); + Assertions.assertEquals(0, stats.getLeased()); + + connManager.close(); + } + + @Test + void testConnectionRequestCancelLateLeaseReleased() throws Exception { + configureServer(bootstrap -> bootstrap + .register("/random/*", new RandomHandler())); + final HttpHost target = startServer(); + + connManager.setMaxTotal(1); + + final HttpRoute route = new HttpRoute(target, null, false); + final Timeout t = Timeout.ofSeconds(5); + + final LeaseRequest holdRequest = connManager.lease("hold", route, t, null); + final ConnectionEndpoint heldEndpoint = holdRequest.get(t); + + final LeaseRequest pendingRequest = connManager.lease("pending", route, t, null); + + connManager.release(heldEndpoint, null, null); + + PoolStats stats; + final long deadline1 = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + for (;;) { + stats = connManager.getStats(route); + if (stats.getLeased() == 1) { + break; + } + if (System.nanoTime() > deadline1) { + break; + } + Thread.yield(); + } + Assertions.assertEquals(1, stats.getLeased(), "Expected pending lease to complete and become leased"); + Assertions.assertFalse(pendingRequest.cancel(), "Expected cancel() to lose the race once lease is completed"); + + final long deadline2 = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + for (;;) { + stats = connManager.getStats(route); + if (stats.getLeased() == 0) { + break; + } + if (System.nanoTime() > deadline2) { + break; + } + Thread.yield(); + } + Assertions.assertEquals(0, stats.getLeased(), "Late-completed lease must not remain stranded after cancel()"); + + connManager.close(); + } + } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java index 1e464d7386..ab5e3acb2a 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java @@ -56,6 +56,7 @@ import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; @@ -369,7 +370,36 @@ public LeaseRequest lease( if (LOG.isDebugEnabled()) { LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool)); } - final Future> leaseFuture = this.pool.lease(route, state, requestTimeout, null); + final AtomicBoolean aborted = new AtomicBoolean(false); + final AtomicBoolean resultObtained = new AtomicBoolean(false); + final AtomicReference> lateEntryRef = new AtomicReference<>(); + final Future> leaseFuture = this.pool.lease( + route, + state, + requestTimeout, + new FutureCallback>() { + + @Override + public void completed(final PoolEntry poolEntry) { + lateEntryRef.set(poolEntry); + if (aborted.get()) { + if (lateEntryRef.compareAndSet(poolEntry, null)) { + pool.release(poolEntry, false); + } + } else if (resultObtained.get()) { + lateEntryRef.compareAndSet(poolEntry, null); + } + } + + @Override + public void failed(final Exception ex) { + } + + @Override + public void cancelled() { + } + + }); return new LeaseRequest() { // Using a ReentrantLock specific to each LeaseRequest instance to maintain the original // synchronization semantics. This ensures that each LeaseRequest has its own unique lock. @@ -388,8 +418,15 @@ public ConnectionEndpoint get( final PoolEntry poolEntry; try { poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit()); - } catch (final TimeoutException ex) { + resultObtained.set(true); + lateEntryRef.compareAndSet(poolEntry, null); + } catch (final TimeoutException | InterruptedException ex) { + aborted.set(true); leaseFuture.cancel(true); + final PoolEntry latePoolEntry = lateEntryRef.getAndSet(null); + if (latePoolEntry != null) { + pool.release(latePoolEntry, false); + } throw ex; } if (LOG.isDebugEnabled()) { @@ -467,7 +504,18 @@ public ConnectionEndpoint get( @Override public boolean cancel() { - return leaseFuture.cancel(true); + lock.lock(); + try { + aborted.set(true); + final boolean cancelled = leaseFuture.cancel(true); + final PoolEntry latePoolEntry = lateEntryRef.getAndSet(null); + if (latePoolEntry != null) { + pool.release(latePoolEntry, false); + } + return cancelled; + } finally { + lock.unlock(); + } } }; diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java index c21c9fb82c..552f09838e 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java @@ -50,6 +50,7 @@ import org.apache.hc.client5.http.io.ManagedHttpClientConnection; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.ssl.TlsSocketStrategy; +import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.io.SocketConfig; @@ -60,6 +61,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -112,10 +114,10 @@ void testLeaseRelease() throws Exception { Mockito.when(conn.isConsistent()).thenReturn(true); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); @@ -137,10 +139,10 @@ void testReleaseRouteIncomplete() throws Exception { Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); @@ -160,10 +162,10 @@ void testLeaseFutureTimeout() throws Exception { Mockito.when(future.get(1, TimeUnit.SECONDS)).thenThrow(new TimeoutException()); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); @@ -171,6 +173,64 @@ void testLeaseFutureTimeout() throws Exception { connRequest1.get(Timeout.ofSeconds(1))); } + @Test + void testLeaseFutureTimeoutLateLeaseReleased() throws Exception { + final HttpHost target = new HttpHost("localhost", 80); + final HttpRoute route = new HttpRoute(target); + final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); + + final ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(FutureCallback.class); + + Mockito.when(pool.lease( + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + callbackCaptor.capture())) + .thenReturn(future); + Mockito.when(future.cancel(true)).thenReturn(false); + Mockito.when(future.get(1, TimeUnit.SECONDS)).thenAnswer(invocation -> { + callbackCaptor.getValue().completed(entry); + throw new TimeoutException(); + }); + + final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); + Assertions.assertThrows(TimeoutException.class, () -> + connRequest1.get(Timeout.ofSeconds(1))); + + Mockito.verify(future).cancel(true); + Mockito.verify(pool).release(entry, false); + Mockito.verify(future, Mockito.never()).get(0L, TimeUnit.MILLISECONDS); + } + + @Test + void testLeaseFutureInterruptedLateLeaseReleased() throws Exception { + final HttpHost target = new HttpHost("localhost", 80); + final HttpRoute route = new HttpRoute(target); + final PoolEntry entry = new PoolEntry<>(route, TimeValue.NEG_ONE_MILLISECOND); + + final ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(FutureCallback.class); + + Mockito.when(pool.lease( + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + callbackCaptor.capture())) + .thenReturn(future); + Mockito.when(future.cancel(true)).thenReturn(false); + Mockito.when(future.get(1, TimeUnit.SECONDS)).thenAnswer(invocation -> { + callbackCaptor.getValue().completed(entry); + throw new InterruptedException(); + }); + + final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); + Assertions.assertThrows(InterruptedException.class, () -> + connRequest1.get(Timeout.ofSeconds(1))); + + Mockito.verify(future).cancel(true); + Mockito.verify(pool).release(entry, false); + Mockito.verify(future, Mockito.never()).get(0L, TimeUnit.MILLISECONDS); + } + @Test void testReleaseReusable() throws Exception { final HttpHost target = new HttpHost("localhost", 80); @@ -181,10 +241,10 @@ void testReleaseReusable() throws Exception { Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); Mockito.when(conn.isOpen()).thenReturn(true); Mockito.when(conn.isConsistent()).thenReturn(true); @@ -210,10 +270,10 @@ void testReleaseNonReusable() throws Exception { Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); Mockito.when(conn.isOpen()).thenReturn(Boolean.FALSE); @@ -241,10 +301,10 @@ void testTargetConnect() throws Exception { Mockito.when(conn.isOpen()).thenReturn(false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); @@ -308,10 +368,10 @@ void testProxyConnectAndUpgrade() throws Exception { Mockito.when(conn.isOpen()).thenReturn(false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry); Mockito.when(pool.lease( - Mockito.eq(route), - Mockito.eq(null), - Mockito.any(), - Mockito.eq(null))) + Mockito.eq(route), + Mockito.eq(null), + Mockito.any(), + Mockito.any(FutureCallback.class))) .thenReturn(future); final LeaseRequest connRequest1 = mgr.lease("some-id", route, null); @@ -376,7 +436,6 @@ void testLeaseAfterShutdown() { }, "Attempting to lease a connection after shutdown should throw an exception."); } - @Test void testIsShutdown() { // Setup phase @@ -392,7 +451,6 @@ void testIsShutdown() { Assertions.assertTrue(mgr.isClosed(), "Connection manager should be shutdown after close() is called."); } - @Test void testConcurrentShutdown() throws InterruptedException { final ExecutorService executor = Executors.newFixedThreadPool(2); @@ -405,5 +463,4 @@ void testConcurrentShutdown() throws InterruptedException { Assertions.assertTrue(mgr.isClosed(), "Connection manager should be shutdown after concurrent calls to shutdown."); } - -} +} \ No newline at end of file