Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import org.apache.hc.client5.http.HttpRoute;
Expand Down Expand Up @@ -63,6 +69,7 @@
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -391,4 +398,98 @@ void testConnectionTimeoutSetting() throws Exception {
connManager.close();
}

@Test
void testConnectionRequestTimeout() throws Exception {
configureServer(bootstrap -> bootstrap
.register("/random/*", new RandomHandler()));
final HttpHost target = startServer();

connManager.setMaxTotal(1);

final HttpRoute route = new HttpRoute(target, null, false);
final Timeout connRequestTimeout = Timeout.ofMicroseconds(1);

final int concurrentThreads = 10;
final CountDownLatch countDownLatch = new CountDownLatch(concurrentThreads);
final AtomicLong n = new AtomicLong(concurrentThreads * 100);

final ExecutorService executorService = Executors.newFixedThreadPool(concurrentThreads);
for (int i = 0; i < concurrentThreads; i++) {
executorService.execute(() -> {
try {
while (n.decrementAndGet() > 0) {
try {
final LeaseRequest request = connManager.lease("id1", route, connRequestTimeout, null);
final ConnectionEndpoint connectionEndpoint = request.get(connRequestTimeout);
connManager.release(connectionEndpoint, null, null);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
Assertions.fail("Unexpected exception", ex);
} catch (final TimeoutException | ExecutionException ignored) {
}
}
} finally {
countDownLatch.countDown();
}
});
}

Assertions.assertTrue(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
Assertions.assertTrue(n.get() <= 0);

final PoolStats stats = connManager.getStats(route);
Assertions.assertEquals(0, stats.getLeased());

connManager.close();
}

@Test
void testConnectionRequestCancelLateLeaseReleased() throws Exception {
configureServer(bootstrap -> bootstrap
.register("/random/*", new RandomHandler()));
final HttpHost target = startServer();

connManager.setMaxTotal(1);

final HttpRoute route = new HttpRoute(target, null, false);
final Timeout t = Timeout.ofSeconds(5);

final LeaseRequest holdRequest = connManager.lease("hold", route, t, null);
final ConnectionEndpoint heldEndpoint = holdRequest.get(t);

final LeaseRequest pendingRequest = connManager.lease("pending", route, t, null);

connManager.release(heldEndpoint, null, null);

PoolStats stats;
final long deadline1 = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
for (;;) {
stats = connManager.getStats(route);
if (stats.getLeased() == 1) {
break;
}
if (System.nanoTime() > deadline1) {
break;
}
Thread.yield();
}
Assertions.assertEquals(1, stats.getLeased(), "Expected pending lease to complete and become leased");
Assertions.assertFalse(pendingRequest.cancel(), "Expected cancel() to lose the race once lease is completed");

final long deadline2 = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
for (;;) {
stats = connManager.getStats(route);
if (stats.getLeased() == 0) {
break;
}
if (System.nanoTime() > deadline2) {
break;
}
Thread.yield();
}
Assertions.assertEquals(0, stats.getLeased(), "Late-completed lease must not remain stranded after cancel()");

connManager.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand Down Expand Up @@ -369,7 +370,36 @@ public LeaseRequest lease(
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
}
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
final AtomicBoolean aborted = new AtomicBoolean(false);
final AtomicBoolean resultObtained = new AtomicBoolean(false);
final AtomicReference<PoolEntry<HttpRoute, ManagedHttpClientConnection>> lateEntryRef = new AtomicReference<>();
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(
route,
state,
requestTimeout,
new FutureCallback<PoolEntry<HttpRoute, ManagedHttpClientConnection>>() {

@Override
public void completed(final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry) {
lateEntryRef.set(poolEntry);
if (aborted.get()) {
if (lateEntryRef.compareAndSet(poolEntry, null)) {
pool.release(poolEntry, false);
}
} else if (resultObtained.get()) {
lateEntryRef.compareAndSet(poolEntry, null);
}
}

@Override
public void failed(final Exception ex) {
}

@Override
public void cancelled() {
}

});
return new LeaseRequest() {
// Using a ReentrantLock specific to each LeaseRequest instance to maintain the original
// synchronization semantics. This ensures that each LeaseRequest has its own unique lock.
Expand All @@ -388,8 +418,15 @@ public ConnectionEndpoint get(
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
try {
poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
} catch (final TimeoutException ex) {
resultObtained.set(true);
lateEntryRef.compareAndSet(poolEntry, null);
} catch (final TimeoutException | InterruptedException ex) {
aborted.set(true);
leaseFuture.cancel(true);
final PoolEntry<HttpRoute, ManagedHttpClientConnection> latePoolEntry = lateEntryRef.getAndSet(null);
if (latePoolEntry != null) {
pool.release(latePoolEntry, false);
}
throw ex;
}
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -467,7 +504,18 @@ public ConnectionEndpoint get(

@Override
public boolean cancel() {
return leaseFuture.cancel(true);
lock.lock();
try {
aborted.set(true);
final boolean cancelled = leaseFuture.cancel(true);
final PoolEntry<HttpRoute, ManagedHttpClientConnection> latePoolEntry = lateEntryRef.getAndSet(null);
if (latePoolEntry != null) {
pool.release(latePoolEntry, false);
}
return cancelled;
} finally {
lock.unlock();
}
}

};
Expand Down
Loading
Loading