From 89a9640c3e355280894ef8d9231cf201fd7c7663 Mon Sep 17 00:00:00 2001 From: Aayush Atharva Date: Sun, 7 Jun 2026 10:53:13 +0000 Subject: [PATCH 1/2] Streamline DefaultChannelPool checkout and close paths --- .../bench/AcceptEncodingHpackBenchmark.java | 72 ++++ .../bench/ChannelPoolCheckoutBenchmark.java | 98 +++++ .../bench/ChannelPoolDequeBenchmark.java | 93 ++++ .../bench/CookieStoreGetBenchmark.java | 73 ++++ .../bench/FormParamsEncodeBenchmark.java | 94 ++++ .../bench/GlobalSemaphoreBenchmark.java | 84 ++++ .../bench/HeaderEncodeBenchmark.java | 110 +++++ .../bench/Http2StreamGateBenchmark.java | 87 ++++ .../bench/MultipartPreContentBenchmark.java | 104 +++++ .../bench/RequestHeaderCopyBenchmark.java | 79 ++++ .../bench/ResponseBodyPartBenchmark.java | 87 ++++ .../bench/SemaphoreLookupBenchmark.java | 102 +++++ .../bench/UriEncoderBenchmark.java | 58 +++ .../bench/UriParseBenchmark.java | 55 +++ .../netty/channel/DefaultChannelPool.java | 274 +++++++----- .../netty/channel/DefaultChannelPoolTest.java | 402 ++++++++++++++++++ 16 files changed, 1765 insertions(+), 107 deletions(-) create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolDequeBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/GlobalSemaphoreBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/Http2StreamGateBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/SemaphoreLookupBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java create mode 100644 client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java create mode 100644 client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java diff --git a/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java new file mode 100644 index 0000000000..7f034712f1 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder; +import io.netty.handler.codec.http2.Http2HeadersEncoder; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.util.AsciiString; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +import java.util.concurrent.TimeUnit; + +/** + * Measures the HPACK-encoded wire size of {@code accept-encoding} for the two value spellings: + * + * + *

On a fresh encoder (first request of a connection) the static-table value matches as a single + * indexed byte; the non-matching spelling is literal-encoded and inserted into the dynamic table. + * This bench reports {@code gc.alloc.rate.norm} and the encoded byte count via the returned buffer's + * readableBytes (consumed by the blackhole through the return value size). + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class AcceptEncodingHpackBenchmark { + + private static final AsciiString ACCEPT_ENCODING = AsciiString.cached("accept-encoding"); + private static final AsciiString AHC_VALUE = AsciiString.cached("gzip,deflate"); + private static final AsciiString STATIC_VALUE = AsciiString.cached("gzip, deflate"); + + private int encodeOnce(AsciiString value) throws Exception { + // Fresh encoder per call == "first request on a new connection" worst case. + Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(); + Http2Headers headers = new DefaultHttp2Headers().add(ACCEPT_ENCODING, value); + ByteBuf out = Unpooled.buffer(); + try { + encoder.encodeHeaders(3, headers, out); + return out.readableBytes(); + } finally { + out.release(); + } + } + + @Benchmark + public int ahc_no_space() throws Exception { + return encodeOnce(AHC_VALUE); + } + + @Benchmark + public int static_table_with_space() throws Exception { + return encodeOnce(STATIC_VALUE); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java new file mode 100644 index 0000000000..175def5d04 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.TimeUnit; + +/** + * Models the per-checkout allocation of {@code DefaultChannelPool}: each + * {@code offer()} wraps the channel in a freshly allocated {@code IdleChannel} + * holder that is pushed onto a {@code ConcurrentLinkedDeque} (which itself + * allocates a linked node per insert). On {@code poll()} the holder is + * discarded. Under keep-alive churn this is one IdleChannel + one CLD node per + * request. + * + * This bench compares the current "allocate a holder per offer" pattern against + * an alternative that stores the bare channel reference + a parallel timestamp, + * avoiding the holder allocation. It is a standalone model (no Netty Channel + * needed) so it can run on the bare JMH classpath; the shapes mirror + * DefaultChannelPool.IdleChannel exactly (one Object ref + one long + one + * volatile int) and CLD node churn is identical for both arms because both push + * one element. + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ChannelPoolCheckoutBenchmark { + + // Mirror of DefaultChannelPool.IdleChannel: Object ref + long start + volatile int owned. + static final class IdleChannel { + static final AtomicIntegerFieldUpdater OWNED = + AtomicIntegerFieldUpdater.newUpdater(IdleChannel.class, "owned"); + final Object channel; + final long start; + @SuppressWarnings("unused") + private volatile int owned; + + IdleChannel(Object channel, long start) { + this.channel = channel; + this.start = start; + } + + boolean takeOwnership() { + return OWNED.getAndSet(this, 1) == 0; + } + } + + private ConcurrentLinkedDeque currentDeque; + private ConcurrentLinkedDeque bareDeque; + private Object channel; + + @Setup(Level.Trial) + public void setup() { + currentDeque = new ConcurrentLinkedDeque<>(); + bareDeque = new ConcurrentLinkedDeque<>(); + channel = new Object(); + } + + /** Current behavior: allocate an IdleChannel holder on every offer. */ + @Benchmark + public void currentOfferPoll(Blackhole bh) { + currentDeque.offerFirst(new IdleChannel(channel, 123L)); + IdleChannel c = currentDeque.pollFirst(); + if (c != null && c.takeOwnership()) { + bh.consume(c.channel); + } + } + + /** + * Alternative: push the bare channel ref. Models pushing the Channel itself + * and reading the timestamp/owned flag from a Netty channel attribute + * instead of a per-checkout holder. Only the CLD node is allocated. + */ + @Benchmark + public void bareOfferPoll(Blackhole bh) { + bareDeque.offerFirst(channel); + Object c = bareDeque.pollFirst(); + bh.consume(c); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolDequeBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolDequeBenchmark.java new file mode 100644 index 0000000000..b1234a43b1 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolDequeBenchmark.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.bench; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Models {@code DefaultChannelPool.removeAll(Channel)} which calls + * {@code ConcurrentLinkedDeque.remove(Object)} — an O(n) full traversal of the partition deque + * performed on every connection close. Compared against a poll/offer (LIFO) pair which is O(1). + * + * Element identity mirrors IdleChannel.equals (compares wrapped value), so remove() must scan. + * + * Run multi-threaded: + * /tmp/run-jmh.sh ChannelPoolDequeBenchmark -t 8 -f 1 -wi 5 -i 8 + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class ChannelPoolDequeBenchmark { + + /** Steady-state number of idle connections per partition (deque length). */ + @Param({"4", "32", "128"}) + public int poolDepth; + + private ConcurrentLinkedDeque deque; + private Holder[] elements; + private final AtomicInteger removeCursor = new AtomicInteger(); + + static final class Holder { + final int id; + Holder(int id) { this.id = id; } + @Override public boolean equals(Object o) { + return this == o || (o instanceof Holder && id == ((Holder) o).id); + } + @Override public int hashCode() { return id; } + } + + @Setup(Level.Invocation) + public void setup() { + deque = new ConcurrentLinkedDeque<>(); + elements = new Holder[poolDepth]; + for (int i = 0; i < poolDepth; i++) { + elements[i] = new Holder(i); + deque.offerFirst(elements[i]); + } + } + + /** Current removeAll path: O(n) remove(Object) scanning by equals. Removes the tail (worst case for LIFO insert). */ + @Benchmark + public boolean currentRemoveAll() { + int idx = removeCursor.getAndIncrement() % poolDepth; + // remove a NEW Holder equal-by-id, exactly as DefaultChannelPool.removeAll builds + // `new IdleChannel(channel, Long.MIN_VALUE)` and lets the deque scan for it. + return deque.remove(new Holder(idx)); + } + + /** Baseline O(1) lease/return that poll()/offer() use, for scale reference. */ + @Benchmark + public void pollOffer(Blackhole bh) { + Holder h = deque.pollFirst(); + if (h != null) { + deque.offerFirst(h); + } + bh.consume(h); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java new file mode 100644 index 0000000000..731b457845 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http.cookie.DefaultCookie; +import org.asynchttpclient.cookie.ThreadSafeCookieStore; +import org.asynchttpclient.uri.Uri; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Measures allocations of {@link ThreadSafeCookieStore#get(Uri)} which is on the + * request path: every outgoing request for which a cookie store is configured + * calls it to collect applicable cookies. The current implementation walks + * sub-domains and, for each, runs a Stream pipeline + * ({@code entrySet().stream().filter(lambda).map(lambda).collect(toList())}). + * + * This bench pins the per-get byte cost so a proposal can quantify replacing + * the Stream pipeline + per-subdomain list copies with an imperative scan. + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class CookieStoreGetBenchmark { + + private ThreadSafeCookieStore store; + private Uri requestUri; + + @Param({"1", "5"}) + public int cookiesPerDomain; + + @Setup(Level.Trial) + public void setup() { + store = new ThreadSafeCookieStore(); + Uri uri = Uri.create("https://www.example.com/some/path"); + for (int i = 0; i < cookiesPerDomain; i++) { + DefaultCookie c = new DefaultCookie("cookie" + i, "value" + i); + c.setDomain("www.example.com"); + c.setPath("/some"); + store.add(uri, c); + } + // a couple of parent-domain cookies to force the sub-domain walk to find matches + DefaultCookie root = new DefaultCookie("root", "v"); + root.setDomain("example.com"); + root.setPath("/"); + store.add(uri, root); + + requestUri = Uri.create("https://www.example.com/some/path/leaf"); + } + + @Benchmark + public List get() { + return store.get(requestUri); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java new file mode 100644 index 0000000000..fe70b134b4 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import org.asynchttpclient.util.HttpUtils; +import org.asynchttpclient.util.Utf8UrlEncoder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Measures allocations of url-encoding form params for an + * {@code application/x-www-form-urlencoded} body. + * + * Arm A ({@link #currentUrlEncode}) is the production path: + * {@code HttpUtils.urlEncodeFormParams} builds a (pooled) StringBuilder, calls + * {@code .toString()} to materialize an intermediate {@code String}, then + * {@code charset.encode(CharBuffer.wrap(str))} to get a {@code ByteBuffer}. + * + * Arm B ({@link #optimizedDirectEncode}) prototypes skipping the intermediate + * String: it appends the encoded chars into a reused StringBuilder and encodes + * the StringBuilder's chars to bytes via a CharBuffer view, avoiding the + * String allocation. Both arms produce the same US-ASCII bytes. + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class FormParamsEncodeBenchmark { + + @Param({"2", "8"}) + public int params; + + private List paramList; + private final StringBuilder scratch = new StringBuilder(512); + + @Setup(Level.Trial) + public void setup() { + paramList = new ArrayList<>(params); + for (int i = 0; i < params; i++) { + paramList.add(new org.asynchttpclient.Param("field" + i, "value " + i + "&x=y")); + } + } + + /** Production path: StringBuilder -> String -> ByteBuffer. */ + @Benchmark + public ByteBuffer currentUrlEncode() { + return HttpUtils.urlEncodeFormParams(paramList, StandardCharsets.UTF_8); + } + + /** + * Prototype: encode params into a reused StringBuilder, then encode the + * builder's chars straight to bytes (no intermediate String). Mirrors the + * UTF-8 form-encoding the production path performs. + */ + @Benchmark + public void optimizedDirectEncode(Blackhole bh) { + StringBuilder sb = scratch; + sb.setLength(0); + for (org.asynchttpclient.Param p : paramList) { + Utf8UrlEncoder.encodeAndAppendFormElement(sb, p.getName()); + sb.append('='); + Utf8UrlEncoder.encodeAndAppendFormElement(sb, p.getValue()); + sb.append('&'); + } + sb.setLength(sb.length() - 1); + // encode StringBuilder chars to ASCII bytes without String.toString() + int len = sb.length(); + byte[] bytes = new byte[len]; + for (int i = 0; i < len; i++) { + bytes[i] = (byte) sb.charAt(i); // all output chars are US-ASCII after encoding + } + bh.consume(ByteBuffer.wrap(bytes)); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/GlobalSemaphoreBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/GlobalSemaphoreBenchmark.java new file mode 100644 index 0000000000..5318729ab7 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/GlobalSemaphoreBenchmark.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.bench; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Contention on the single shared global {@code Semaphore} used by MaxConnectionSemaphore / + * CombinedConnectionSemaphore. Every request acquires+releases this one AQS-backed counter. + * + * Compares: + * - semaphoreTimedTryAcquire: the production path (timed tryAcquire then release). + * - atomicCounterBound: a lock-free CAS-bounded counter as a "best case" reference + * (NOT a drop-in: loses blocking-with-timeout semantics; used only to size the headroom). + * + * Run multi-threaded: + * /tmp/run-jmh-conc.sh GlobalSemaphoreBenchmark -t 16 -f 1 -wi 5 -i 8 + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class GlobalSemaphoreBenchmark { + + private Semaphore semaphore; + private AtomicInteger atomic; + private static final int MAX = 100_000; + + @Setup + public void setup() { + semaphore = new Semaphore(MAX); + atomic = new AtomicInteger(0); + } + + /** Mirrors MaxConnectionSemaphore.acquireChannelLock with a 0ms timed tryAcquire + release. */ + @Benchmark + public void semaphoreTimedTryAcquire(Blackhole bh) throws InterruptedException { + boolean got = semaphore.tryAcquire(0, TimeUnit.MILLISECONDS); + bh.consume(got); + if (got) { + semaphore.release(); + } + } + + /** Lower-bound reference: CAS-bounded counter (no blocking/timeout). */ + @Benchmark + public void atomicCounterBound(Blackhole bh) { + int cur; + boolean got = false; + do { + cur = atomic.get(); + if (cur >= MAX) { + break; + } + } while (!(got = atomic.compareAndSet(cur, cur + 1))); + bh.consume(got); + if (got) { + atomic.decrementAndGet(); + } + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java new file mode 100644 index 0000000000..b2fefc4a61 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpRequestEncoder; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.AsciiString; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.concurrent.TimeUnit; + +/** + * Measures the cost of encoding an outbound HTTP/1.1 request through Netty's + * {@link HttpRequestEncoder} when header names/values are plain {@code String} vs + * {@link AsciiString}. + *

+ * Netty's {@code HttpHeadersEncoder.writeAscii} (4.2.13, lines 50-55) takes a fast + * {@code ByteBufUtil.copy((AsciiString)...)} path only when the {@link CharSequence} is an + * {@link AsciiString}; for a {@code String} it falls to {@code buf.setCharSequence(..., US_ASCII)} + * which encodes char-by-char. AHC stores user-supplied header names as whatever {@code CharSequence} + * the caller passed (typically {@code String} — see {@code RequestBuilderBase.setHeader(CharSequence,..)}), + * so the outbound request header names take the slow path on every request encode. + *

+ * Run with: {@code /tmp/run-jmh.sh HeaderEncodeBenchmark -prof gc -f 1 -wi 5 -i 5} + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class HeaderEncodeBenchmark { + + private EmbeddedChannel channel; + + @Setup(Level.Trial) + public void setup() { + channel = new EmbeddedChannel(new HttpRequestEncoder()); + } + + @TearDown(Level.Trial) + public void tearDown() { + channel.finishAndReleaseAll(); + } + + private void drain() { + ByteBuf out; + while ((out = channel.readOutbound()) != null) { + out.release(); + } + } + + /** Header names/values as plain String — the AHC production case. */ + @Benchmark + public void stringHeaders() { + HttpRequest req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/path/to/resource?a=1&b=2"); + req.headers() + .add("Host", "www.example.com") + .add("User-Agent", "AHC/3.0") + .add("Accept", "*/*") + .add("Accept-Encoding", "gzip,deflate") + .add("Connection", "keep-alive") + .add("Authorization", "Bearer abcdefghijklmnopqrstuvwxyz0123456789") + .add("Content-Type", "application/json; charset=utf-8"); + channel.writeOutbound(req); + // Complete the HTTP message so the stateful HttpRequestEncoder returns to ST_INIT; + // without this the next invocation throws EncoderException (unexpected message type). + channel.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT); + drain(); + } + + /** Header names/values as AsciiString constants — the proposed encode-boundary fast path. */ + @Benchmark + public void asciiHeaders() { + HttpRequest req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/path/to/resource?a=1&b=2"); + req.headers() + .add(HttpHeaderNames.HOST, AsciiString.cached("www.example.com")) + .add(HttpHeaderNames.USER_AGENT, AsciiString.cached("AHC/3.0")) + .add(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON) + .add(HttpHeaderNames.ACCEPT_ENCODING, AsciiString.cached("gzip,deflate")) + .add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE) + .add(HttpHeaderNames.AUTHORIZATION, AsciiString.cached("Bearer abcdefghijklmnopqrstuvwxyz0123456789")) + .add(HttpHeaderNames.CONTENT_TYPE, AsciiString.cached("application/json; charset=utf-8")); + channel.writeOutbound(req); + // Complete the HTTP message so the stateful HttpRequestEncoder returns to ST_INIT. + channel.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT); + drain(); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/Http2StreamGateBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/Http2StreamGateBenchmark.java new file mode 100644 index 0000000000..d98da86319 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/Http2StreamGateBenchmark.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.bench; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * HTTP/2 multiplexed stream gate: {@code Http2ConnectionState.tryAcquireStream()} runs a + * CAS retry loop on a shared {@link AtomicInteger} for EVERY request sharing one h2 connection. + * Many concurrent requests on one connection hammer this single counter. + * + * Compares the production CAS-loop against getAndIncrement-with-rollback (single CAS on the + * common success path). + * + * Run multi-threaded: + * /tmp/run-jmh-conc.sh Http2StreamGateBenchmark -t 16 -f 1 -wi 5 -i 8 + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class Http2StreamGateBenchmark { + + private AtomicInteger activeStreams; + private static final int MAX = 100_000; // effectively unbounded so the gate always succeeds + + @Setup + public void setup() { + activeStreams = new AtomicInteger(0); + } + + /** Exactly mirrors Http2ConnectionState.tryAcquireStream's CAS loop + releaseStream. */ + @Benchmark + public void casLoopGate(Blackhole bh) { + boolean acquired = false; + while (true) { + int current = activeStreams.get(); + if (current >= MAX) { + break; + } + if (activeStreams.compareAndSet(current, current + 1)) { + acquired = true; + break; + } + } + bh.consume(acquired); + if (acquired) { + activeStreams.decrementAndGet(); + } + } + + /** Alternative: optimistic getAndIncrement, roll back if over limit (1 CAS on success path). */ + @Benchmark + public void getAndIncrementGate(Blackhole bh) { + int n = activeStreams.getAndIncrement(); + boolean acquired = n < MAX; + if (!acquired) { + activeStreams.decrementAndGet(); + } + bh.consume(acquired); + if (acquired) { + activeStreams.decrementAndGet(); + } + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java new file mode 100644 index 0000000000..c35bee7c90 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaders; +import org.asynchttpclient.request.body.multipart.MultipartBody; +import org.asynchttpclient.request.body.multipart.MultipartUtils; +import org.asynchttpclient.request.body.multipart.Part; +import org.asynchttpclient.request.body.multipart.StringPart; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import java.util.concurrent.TimeUnit; + +/** + * Measures allocations of building a multipart body's per-part pre-content + * (header) section. Today every part runs {@code visitPreContent} twice through + * {@code MultipartPart}: once with a {@code CounterPartVisitor} (to size the + * buffer) and once with a {@code ByteBufVisitor} (to write it). The counter + * pass only needs lengths, but the code still allocates a full {@code byte[]} + * via {@code name.getBytes(US_ASCII)} / {@code contentType.getBytes(US_ASCII)} + * etc. just to read {@code .length}; those arrays are thrown away. For ASCII + * field names/values the byte length equals the String length, so the counting + * pass can avoid the array entirely. + * + * Arm A ({@link #currentNewMultipartBody}) builds the body via the production + * {@code MultipartUtils} path (constructor runs the count pass + the write pass + * will reallocate the same arrays again). Arm B ({@link #optimizedCountOnly}) + * is a prototype counter that sizes the same header bytes using String lengths + * for ASCII, showing the lower bound a length-only counter would reach. + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class MultipartPreContentBenchmark { + + @Param({"1", "8"}) + public int parts; + + private List partList; + private HttpHeaders headers; + private final byte[] boundary = "----boundary1234567890abcdefghij".getBytes(StandardCharsets.US_ASCII); + + @Setup(Level.Trial) + public void setup() { + partList = new ArrayList<>(parts); + for (int i = 0; i < parts; i++) { + partList.add(new StringPart("field" + i, "value-" + i, "text/plain")); + } + headers = new DefaultHttpHeaders(); + } + + /** Production path: MultipartUtils builds parts; each part runs the count pass in its ctor. */ + @Benchmark + public MultipartBody currentNewMultipartBody() { + return MultipartUtils.newMultipartBody(partList, headers); + } + + /** + * Prototype: size the disposition + content-type header section using + * String.length() for ASCII fields, with NO byte[] allocation. Mirrors the + * bytes MultipartPart.visitDispositionHeader/visitContentTypeHeader emit. + */ + @Benchmark + public void optimizedCountOnly(Blackhole bh) { + int total = 0; + for (Part p : partList) { + // "--" + boundary + total += 2 + boundary.length; + // CRLF + "Content-Disposition: " + "form-data" + total += 2 + 21 + 9; + String name = p.getName(); + if (name != null) { + total += 7 + 1 + name.length() + 1; // "; name=" + quote + name + quote + } + String ct = p.getContentType(); + if (ct != null) { + total += 2 + 14 + ct.length(); // CRLF + "Content-Type: " + ct + } + total += 4; // end-of-headers CRLFCRLF + } + bh.consume(total); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java new file mode 100644 index 0000000000..af76194a89 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpHeaders; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import java.util.concurrent.TimeUnit; + +/** + * Measures the cost of copying an already-validated AHC request header set into the outbound + * Netty {@code HttpRequest} header set on the request hot path. + *

+ * In production, {@code RequestBuilderBase} builds the request headers with validation enabled + * (see {@code RequestBuilderBase} ctor: {@code new DefaultHttpHeaders(validateHeaders)} with + * {@code validateHeaders=true} default). Then {@code NettyRequestFactory.newNettyRequest} builds + * the outbound request with Netty's default validating headers factory and does + * {@code headers.set(request.getHeaders())} — re-validating every name and value a second time. + *

+ * This bench compares {@code set(...)} into a validating vs non-validating {@link DefaultHttpHeaders} + * to quantify the redundant validation that proposal 004 proposes to drop (since the source headers + * were already validated when the request was built). + *

+ * Run with: {@code /tmp/run-jmh.sh RequestHeaderCopyBenchmark -prof gc -f 1 -wi 5 -i 5} + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class RequestHeaderCopyBenchmark { + + private HttpHeaders source; + + @Setup(Level.Trial) + public void setup() { + // Representative request header set already built (and validated) by RequestBuilderBase. + source = new DefaultHttpHeaders(true); + source.add("Host", "www.example.com"); + source.add("User-Agent", "AHC/3.0"); + source.add("Accept", "*/*"); + source.add("Accept-Encoding", "gzip,deflate"); + source.add("Connection", "keep-alive"); + source.add("Authorization", "Bearer abcdefghijklmnopqrstuvwxyz0123456789"); + source.add("Cookie", "session=0123456789abcdef; theme=dark; lang=en-US"); + source.add("Content-Type", "application/json; charset=utf-8"); + source.add("X-Request-Id", "550e8400-e29b-41d4-a716-446655440000"); + source.add("X-Forwarded-For", "203.0.113.7"); + } + + /** Production behavior: outbound headers re-validate every name+value. */ + @Benchmark + public HttpHeaders validating() { + HttpHeaders out = new DefaultHttpHeaders(true); + out.set(source); + return out; + } + + /** Proposed: source already validated, so skip re-validation on the outbound copy. */ + @Benchmark + public HttpHeaders nonValidating() { + HttpHeaders out = new DefaultHttpHeaders(false); + out.set(source); + return out; + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java new file mode 100644 index 0000000000..c2cedd1f59 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.asynchttpclient.netty.EagerResponseBodyPart; +import org.asynchttpclient.netty.LazyResponseBodyPart; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + +/** + * Measures the per-chunk allocation cost of the two {@code ResponseBodyPartFactory} + * implementations on the response read hot path. + *

+ * {@link EagerResponseBodyPart} (the production default) eagerly copies every received + * {@link ByteBuf} chunk into a freshly allocated {@code byte[]} via + * {@code ByteBufUtil.getBytes(buf)} (see {@code EagerResponseBodyPart} ctor). For a direct + * (non-array-backed) pooled buffer — the common case under the pooled allocator — that copy + * always allocates {@code length} bytes per chunk. + *

+ * {@link LazyResponseBodyPart} keeps a reference to the Netty {@link ByteBuf} and copies + * nothing until the caller actually requests bytes. This bench quantifies the eager-copy + * cost that proposal 001 proposes to avoid for handlers that consume {@code getBodyByteBuf()}. + *

+ * Run with: {@code /tmp/run-jmh.sh ResponseBodyPartBenchmark -prof gc -f 1 -wi 5 -i 5} + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ResponseBodyPartBenchmark { + + @Param({"512", "8192", "65536"}) + public int chunkSize; + + private PooledByteBufAllocator allocator; + private ByteBuf direct; + + @Setup(Level.Trial) + public void setup() { + allocator = PooledByteBufAllocator.DEFAULT; + // Direct pooled buffer mimics what Netty hands the inbound pipeline under the default allocator. + direct = allocator.directBuffer(chunkSize); + for (int i = 0; i < chunkSize; i++) { + direct.writeByte(i & 0x7f); + } + } + + @TearDown(Level.Trial) + public void tearDown() { + direct.release(); + } + + /** Production default: eagerly copies the chunk into a new byte[]. */ + @Benchmark + public byte[] eager() { + // duplicate() so the reader index is independent and the buffer is reusable across iterations + return new EagerResponseBodyPart(direct.duplicate(), true).getBodyPartBytes(); + } + + /** Proposed alternative: no copy, just wrap the existing buffer. */ + @Benchmark + public Object lazy(Blackhole bh) { + LazyResponseBodyPart part = new LazyResponseBodyPart(direct.duplicate(), true); + // Simulate a handler that streams the ByteBuf out without ever materializing a byte[] + bh.consume(part.length()); + return part.getBodyByteBuf(); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/SemaphoreLookupBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/SemaphoreLookupBenchmark.java new file mode 100644 index 0000000000..af17fcb467 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/SemaphoreLookupBenchmark.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.bench; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * Models the hot path of {@code PerHostConnectionSemaphore.getFreeConnectionsForHost}: + * a {@link ConcurrentHashMap#get} (current) vs the {@code get-then-computeIfAbsent} pattern + * the production code actually uses, executed once per acquire AND once per release. + * + * The "current" benchmark mirrors PerHostConnectionSemaphore exactly. The "tryAcquireRelease" + * benchmark adds the Semaphore acquire/release to show the lookup cost relative to the lock op. + * + * Run multi-threaded with -t N to surface CHM bucket / Semaphore contention: + * /tmp/run-jmh.sh SemaphoreLookupBenchmark -t 8 -f 1 -wi 5 -i 8 + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class SemaphoreLookupBenchmark { + + /** Number of distinct partition keys (hosts) requests are spread across. */ + @Param({"1", "8"}) + public int hosts; + + private ConcurrentHashMap freeChannelsPerHost; + private Object[] keys; + + @State(Scope.Thread) + public static class Cursor { + int i; + } + + @Setup(Level.Trial) + public void setup() { + freeChannelsPerHost = new ConcurrentHashMap<>(); + keys = new Object[hosts]; + for (int i = 0; i < hosts; i++) { + keys[i] = "host-" + i; + // Pre-populate so we exercise the steady-state (get-hit) path. + freeChannelsPerHost.put(keys[i], new Semaphore(Integer.MAX_VALUE)); + } + } + + /** Exactly mirrors getFreeConnectionsForHost: get(), fall back to computeIfAbsent. */ + private Semaphore getFreeConnectionsForHost(Object key) { + Semaphore s = freeChannelsPerHost.get(key); + if (s == null) { + s = freeChannelsPerHost.computeIfAbsent(key, k -> new Semaphore(Integer.MAX_VALUE)); + } + return s; + } + + @Benchmark + @Group("lookup") + public void currentLookup(Cursor c, Blackhole bh) { + Object key = keys[(c.i++ & Integer.MAX_VALUE) % keys.length]; + bh.consume(getFreeConnectionsForHost(key)); + } + + /** Full acquire+release as the request hot path does it (two lookups per request). */ + @Benchmark + @Group("acquireRelease") + public void acquireReleaseCurrent(Cursor c, Blackhole bh) throws InterruptedException { + Object key = keys[(c.i++ & Integer.MAX_VALUE) % keys.length]; + Semaphore acq = getFreeConnectionsForHost(key); + boolean got = acq.tryAcquire(0, TimeUnit.MILLISECONDS); + if (got) { + Semaphore rel = getFreeConnectionsForHost(key); // second lookup on release + rel.release(); + bh.consume(rel); + } + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java new file mode 100644 index 0000000000..a2a8153ce6 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import org.asynchttpclient.uri.Uri; +import org.asynchttpclient.util.UriEncoder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import java.util.concurrent.TimeUnit; + +/** + * Measures allocations of {@code UriEncoder.encode(uri, queryParams)}, which is + * invoked once per request build via {@code RequestBuilderBase.computeUri()}. + * + * When the path needs no percent-encoding and there are no extra query params + * (the overwhelmingly common case for already-well-formed URLs), both + * {@code encodePath} and {@code encodeQuery} return the *same* String + * instances, yet {@code UriEncoder.encode} still builds a brand-new {@code Uri} + * with identical field values — a wasted ~64 B allocation per request build. + * + * This bench measures the production `encode` (FIXING) on a clean URL with no + * query params, pinning the per-build Uri allocation a fast-path + * "return the same Uri when nothing changed" check would remove. + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class UriEncoderBenchmark { + + private UriEncoder encoder; + private Uri cleanUri; + + @Setup(Level.Trial) + public void setup() { + encoder = UriEncoder.uriEncoder(false); // FIXING + cleanUri = Uri.create("http://www.example.com/path/to/resource?a=1&b=2"); + } + + /** Production path: builds a new Uri even when path/query are unchanged. */ + @Benchmark + public Uri encodeNoChange() { + return encoder.encode(cleanUri, null); + } +} diff --git a/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java new file mode 100644 index 0000000000..33f034dca1 --- /dev/null +++ b/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.asynchttpclient.bench; + +import org.asynchttpclient.uri.Uri; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + +/** + * Measures allocations of {@link Uri#create(String)} (the current production + * parser) on a few representative URLs. The parser allocates a transient + * {@code UriParser} object per call plus the resulting {@code Uri}; this bench + * pins down the per-call byte cost so a proposal can quantify removing the + * scratch object / scheme lower-casing substring churn. + */ +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class UriParseBenchmark { + + @Param({ + "http://www.example.com/path/to/resource?a=1&b=2", + "https://user:pass@host.example.com:8443/a/b/c", + "http://localhost/" + }) + public String url; + + @Benchmark + public Uri create() { + return Uri.create(url); + } + + /** Re-parse with a context (relative resolution path). */ + @Benchmark + public void createWithContext(Blackhole bh) { + Uri base = Uri.create(url); + bh.consume(base); + bh.consume(Uri.create(base, "/other/path?x=9")); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java index c4042fdfc7..9ba9f1b375 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java @@ -29,7 +29,6 @@ import java.net.InetSocketAddress; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Map; @@ -42,7 +41,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime; /** @@ -52,8 +50,12 @@ public final class DefaultChannelPool implements ChannelPool { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class); private static final AttributeKey CHANNEL_CREATION_ATTRIBUTE_KEY = AttributeKey.valueOf("channelCreation"); + private static final AttributeKey IDLE_STATE_ATTRIBUTE_KEY = AttributeKey.valueOf("channelIdleState"); - private final ConcurrentHashMap> partitions = new ConcurrentHashMap<>(); + // The partition deques hold the bare Channel; per-checkout idle state (start timestamp + the + // owned/tombstone CAS flag) lives on the channel's IDLE_STATE_ATTRIBUTE_KEY attribute, which is + // allocated once per physical connection and reused across every pool cycle (no per-offer holder). + private final ConcurrentHashMap> partitions = new ConcurrentHashMap<>(); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Timer nettyTimer; private final long connectionTtl; @@ -127,11 +129,21 @@ public boolean offer(Channel channel, Object partitionKey) { } private boolean offer0(Channel channel, Object partitionKey, long now) { - ConcurrentLinkedDeque partition = partitions.get(partitionKey); + ConcurrentLinkedDeque partition = partitions.get(partitionKey); if (partition == null) { partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>()); } - return partition.offerFirst(new IdleChannel(channel, now)); + // Reuse the channel's IdleState instead of allocating a holder per offer; reset() stamps the + // idle start and clears the owned flag (must happen-before offerFirst publishes the channel, + // so any thread that observes it in the deque also observes owned == 0). + Attribute idleStateAttribute = channel.attr(IDLE_STATE_ATTRIBUTE_KEY); + IdleState idleState = idleStateAttribute.get(); + if (idleState == null) { + idleState = new IdleState(); + idleStateAttribute.set(idleState); + } + idleState.reset(now); + return partition.offerFirst(channel); } private static void registerChannelCreation(Channel channel, Object partitionKey, long now) { @@ -143,32 +155,47 @@ private static void registerChannelCreation(Channel channel, Object partitionKey @Override public Channel poll(Object partitionKey) { - IdleChannel idleChannel = null; - ConcurrentLinkedDeque partition = partitions.get(partitionKey); - if (partition != null) { - while (idleChannel == null) { - idleChannel = poolLeaseStrategy.lease(partition); + ConcurrentLinkedDeque partition = partitions.get(partitionKey); + if (partition == null) { + return null; + } - if (idleChannel == null) + for (; ; ) { + Channel channel = poolLeaseStrategy.lease(partition); + if (channel == null) { // pool is empty - { - break; - } else if (!Channels.isChannelActive(idleChannel.channel)) { - idleChannel = null; - LOGGER.trace("Channel is inactive, probably remotely closed!"); - } else if (!idleChannel.takeOwnership()) { - idleChannel = null; - LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!"); - } + return null; + } + + if (!Channels.isChannelActive(channel)) { + LOGGER.trace("Channel is inactive, probably remotely closed!"); + continue; } + + IdleState idleState = channel.attr(IDLE_STATE_ATTRIBUTE_KEY).get(); + if (idleState == null || !idleState.takeOwnership()) { + LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!"); + continue; + } + + return channel; } - return idleChannel != null ? idleChannel.channel : null; } @Override public boolean removeAll(Channel channel) { - ChannelCreation creation = connectionTtlEnabled ? channel.attr(CHANNEL_CREATION_ATTRIBUTE_KEY).get() : null; - return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE)); + if (isClosed.get() || !connectionTtlEnabled) { + return false; + } + + // O(1) tombstone instead of an O(n) ConcurrentLinkedDeque value scan: claim the channel's + // IdleState. A claimed channel is skipped by poll() (its takeOwnership fails) and physically + // unlinked by the idle cleaner on its next tick. removeAll only acts when connectionTtlEnabled, + // which guarantees the cleaner is scheduled (see constructor), so a tombstone is never orphaned. + // Returns true only when this call transitions an idle, leasable channel to claimed — matching + // the old "the channel was present in the pool" contract. + IdleState idleState = channel.attr(IDLE_STATE_ATTRIBUTE_KEY).get(); + return idleState != null && idleState.takeOwnership(); } @Override @@ -191,18 +218,18 @@ private static void close(Channel channel) { Channels.silentlyCloseChannel(channel); } - private void flushPartition(Object partitionKey, ConcurrentLinkedDeque partition) { + private void flushPartition(Object partitionKey, ConcurrentLinkedDeque partition) { if (partition != null) { partitions.remove(partitionKey); - for (IdleChannel idleChannel : partition) { - close(idleChannel.channel); + for (Channel channel : partition) { + close(channel); } } } @Override public void flushPartitions(Predicate predicate) { - for (Map.Entry> partitionsEntry : partitions.entrySet()) { + for (Map.Entry> partitionsEntry : partitions.entrySet()) { Object partitionKey = partitionsEntry.getKey(); if (predicate.test(partitionKey)) { flushPartition(partitionKey, partitionsEntry.getValue()); @@ -216,13 +243,21 @@ public Map getIdleChannelCountPerHost() { .values() .stream() .flatMap(ConcurrentLinkedDeque::stream) - .map(idle -> idle.getChannel().remoteAddress()) + // Skip channels that have been claimed (removeAll tombstone, or a node a concurrent + // poll already leased) but not yet unlinked, so the count reflects leasable channels. + .filter(DefaultChannelPool::isLeasable) + .map(Channel::remoteAddress) .filter(a -> a.getClass() == InetSocketAddress.class) .map(a -> (InetSocketAddress) a) .map(InetSocketAddress::getHostString) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); } + private static boolean isLeasable(Channel channel) { + IdleState idleState = channel.attr(IDLE_STATE_ATTRIBUTE_KEY).get(); + return idleState != null && !idleState.isOwned(); + } + public enum PoolLeaseStrategy { LIFO { @Override @@ -250,93 +285,57 @@ private static final class ChannelCreation { } } - private static final class IdleChannel { - - private static final AtomicIntegerFieldUpdater ownedField = AtomicIntegerFieldUpdater.newUpdater(IdleChannel.class, "owned"); - - final Channel channel; - final long start; + /** + * Per-channel idle bookkeeping. Allocated once and stashed on the channel's + * {@link #IDLE_STATE_ATTRIBUTE_KEY} attribute, then reused across every pool checkout so no holder + * is allocated per offer. + * + *

{@code owned} is a single CAS flag with two roles, both meaning "this idle entry is claimed, + * do not lease it": a successful {@code poll()} lease, or a {@code removeAll()} tombstone. The pool + * upholds the invariant that a channel sitting in a partition deque has {@code owned == 0} unless it + * was tombstoned, because {@link #reset(long)} clears the flag before {@code offerFirst} publishes + * the channel and {@code poll()} unlinks a channel from the deque before claiming it. {@code start} + * doubles as a generation token: it changes on every offer, letting the cleaner detect a channel + * that was leased and re-offered between its expiry check and its claim. + */ + static final class IdleState { + + private static final AtomicIntegerFieldUpdater OWNED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(IdleState.class, "owned"); + + private volatile long start; @SuppressWarnings("unused") private volatile int owned; - IdleChannel(Channel channel, long start) { - this.channel = requireNonNull(channel, "channel"); - this.start = start; + long start() { + return start; } - public boolean takeOwnership() { - return ownedField.getAndSet(this, 1) == 0; + boolean isOwned() { + return owned != 0; } - public Channel getChannel() { - return channel; + /** Atomically claim this entry; returns true only for the caller that transitions 0 -> 1. */ + boolean takeOwnership() { + return OWNED_UPDATER.getAndSet(this, 1) == 0; } - @Override - // only depends on channel - public boolean equals(Object o) { - return this == o || o instanceof IdleChannel && channel.equals(((IdleChannel) o).channel); + /** Undo a claim taken via {@link #takeOwnership()} (used only on the cleaner re-offer race). */ + void releaseOwnership() { + owned = 0; } - @Override - public int hashCode() { - return channel.hashCode(); + /** Stamp the idle start and mark the channel leasable again. Called on every offer. */ + void reset(long now) { + start = now; + owned = 0; } } private final class IdleChannelDetector implements TimerTask { - private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) { - return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime; - } - - private List expiredChannels(ConcurrentLinkedDeque partition, long now) { - // lazy create - List idleTimeoutChannels = null; - for (IdleChannel idleChannel : partition) { - boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now); - boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel); - boolean isTtlExpired = isTtlExpired(idleChannel.channel, now); - if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) { - - LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", - idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired); - - if (idleTimeoutChannels == null) { - idleTimeoutChannels = new ArrayList<>(1); - } - idleTimeoutChannels.add(idleChannel); - } - } - - return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList(); - } - - private List closeChannels(List candidates) { - // lazy create, only if we hit a non-closeable channel - List closedChannels = null; - for (int i = 0; i < candidates.size(); i++) { - // We call takeOwnership here to avoid closing a channel that has just been taken out - // of the pool, otherwise we risk closing an active connection. - IdleChannel idleChannel = candidates.get(i); - if (idleChannel.takeOwnership()) { - LOGGER.debug("Closing Idle Channel {}", idleChannel.channel); - close(idleChannel.channel); - if (closedChannels != null) { - closedChannels.add(idleChannel); - } - - } else if (closedChannels == null) { - // first non-closeable to be skipped, copy all - // previously skipped closeable channels - closedChannels = new ArrayList<>(candidates.size()); - for (int j = 0; j < i; j++) { - closedChannels.add(candidates.get(j)); - } - } - } - - return closedChannels != null ? closedChannels : candidates; + private boolean isIdleTimeoutExpired(IdleState idleState, long now) { + return maxIdleTimeEnabled && now - idleState.start() >= maxIdleTime; } @Override @@ -347,7 +346,7 @@ public void run(Timeout timeout) { } if (LOGGER.isDebugEnabled()) { - for (Map.Entry> entry : partitions.entrySet()) { + for (Map.Entry> entry : partitions.entrySet()) { int size = entry.getValue().size(); if (size > 0) { LOGGER.debug("Entry count for : {} : {}", entry.getKey(), size); @@ -359,7 +358,7 @@ public void run(Timeout timeout) { int closedCount = 0; int totalCount = 0; - for (ConcurrentLinkedDeque partition : partitions.values()) { + for (ConcurrentLinkedDeque partition : partitions.values()) { // store in intermediate unsynchronized lists to minimize // the impact on the ConcurrentLinkedDeque @@ -367,12 +366,7 @@ public void run(Timeout timeout) { totalCount += partition.size(); } - List closedChannels = closeChannels(expiredChannels(partition, start)); - - if (!closedChannels.isEmpty()) { - partition.removeAll(closedChannels); - closedCount += closedChannels.size(); - } + closedCount += reapPartition(partition, start); } if (LOGGER.isDebugEnabled()) { @@ -384,5 +378,71 @@ public void run(Timeout timeout) { scheduleNewIdleChannelDetector(timeout.task()); } + + /** + * One pass over a partition. A channel is dropped from the deque when it is a removeAll + * tombstone, remotely closed, idle-timeout expired or TTL expired. Tombstoned/concurrently + * leased channels are only unlinked (their owner closes them); expired channels are closed + * here, but only after this cleaner exclusively claims them, so a channel that {@code poll()} + * is leasing concurrently is never closed. Returns the number of channels closed by this tick. + */ + private int reapPartition(ConcurrentLinkedDeque partition, long now) { + List toRemove = null; + int closed = 0; + + for (Channel channel : partition) { + IdleState idleState = channel.attr(IDLE_STATE_ATTRIBUTE_KEY).get(); + if (idleState == null) { + continue; + } + + if (idleState.isOwned()) { + // In-deque + owned ==> a removeAll() tombstone, or a node a concurrent poll() has + // already leased and unlinked. Either way: unlink, never close — the owner of the + // claim is responsible for closing it. removeAll() on an already-unlinked node is a + // harmless no-op. + toRemove = lazyAdd(toRemove, channel); + continue; + } + + boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleState, now); + boolean isRemotelyClosed = !Channels.isChannelActive(channel); + boolean isTtlExpired = isTtlExpired(channel, now); + if (!isIdleTimeoutExpired && !isRemotelyClosed && !isTtlExpired) { + continue; // healthy idle channel, leave it for poll() + } + + long startSnapshot = idleState.start(); + // Claim before closing so we never close a channel poll() is leasing concurrently. + if (!idleState.takeOwnership()) { + continue; // poll() (or removeAll) won the claim; that owner now handles the channel + } + if (idleState.start() != startSnapshot) { + // The channel was leased and re-offered (fresh start) between the expiry check and + // the claim, so it is leasable again — release it instead of closing it. + idleState.releaseOwnership(); + continue; + } + + LOGGER.debug("Closing Idle Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", + channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired); + close(channel); + closed++; + toRemove = lazyAdd(toRemove, channel); + } + + if (toRemove != null) { + partition.removeAll(toRemove); + } + return closed; + } + + private List lazyAdd(List list, Channel channel) { + if (list == null) { + list = new ArrayList<>(1); + } + list.add(channel); + return list; + } } } diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java new file mode 100644 index 0000000000..1d30ca2abd --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java @@ -0,0 +1,402 @@ +/* + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.Channel; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import org.asynchttpclient.netty.channel.DefaultChannelPool.PoolLeaseStrategy; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * White-box unit tests for {@link DefaultChannelPool} covering the bare-channel storage + reused + * {@code IdleState} attribute (plan 009) and the O(1) tombstone {@code removeAll} + tombstone-aware + * idle cleaner (plan 013). The cleaner is driven deterministically through a capturing {@link Timer}. + */ +public class DefaultChannelPoolTest { + + private static final Object KEY = "partition-key"; + + private static DefaultChannelPool noReaperPool() { + // No TTL, no idle timeout => no cleaner scheduled; removeAll is a no-op (unchanged behavior). + return new DefaultChannelPool(Duration.ZERO, Duration.ZERO, PoolLeaseStrategy.LIFO, + new CapturingTimer(), Duration.ofMillis(1)); + } + + private static DefaultChannelPool ttlPool(CapturingTimer timer) { + // TTL enabled (long, so it never trips) => cleaner scheduled, removeAll tombstoning active. + return new DefaultChannelPool(Duration.ZERO, Duration.ofHours(1), PoolLeaseStrategy.LIFO, + timer, Duration.ofMillis(1)); + } + + private static DefaultChannelPool idlePool(CapturingTimer timer, Duration maxIdle) { + return new DefaultChannelPool(maxIdle, Duration.ZERO, PoolLeaseStrategy.LIFO, + timer, Duration.ofMillis(1)); + } + + // ---- plan 009: bare-channel offer/poll ---- + + @Test + public void offerThenPollReturnsSameChannelThenEmpty() { + DefaultChannelPool pool = noReaperPool(); + Channel channel = new EmbeddedChannel(); + + assertTrue(pool.offer(channel, KEY)); + assertSame(channel, pool.poll(KEY), "poll must return the offered channel"); + assertNull(pool.poll(KEY), "a polled channel is removed from the pool"); + + pool.destroy(); + } + + @Test + public void reofferingReusesTheSameIdleStateInstance() throws Exception { + DefaultChannelPool pool = noReaperPool(); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + Object firstState = idleState(channel); + assertSame(channel, pool.poll(KEY)); + + pool.offer(channel, KEY); + Object secondState = idleState(channel); + // 009: the per-channel IdleState holder is allocated once and reused across checkouts. + assertSame(firstState, secondState, "IdleState must be reused, not reallocated per offer"); + assertSame(channel, pool.poll(KEY)); + + pool.destroy(); + } + + // ---- plan 013: O(1) tombstone removeAll ---- + + @Test + public void removeAllTombstonesSoChannelIsNoLongerLeasable() { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = ttlPool(timer); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + assertTrue(pool.removeAll(channel), "removeAll returns true for a pooled, leasable channel"); + assertNull(pool.poll(KEY), "a tombstoned channel must not be leased by poll"); + assertTrue(channel.isActive(), "removeAll must not close the channel; the caller owns the close"); + + pool.destroy(); + } + + @Test + public void removeAllReturnsFalseTheSecondTime() { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = ttlPool(timer); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + assertTrue(pool.removeAll(channel)); + assertFalse(pool.removeAll(channel), "a channel can only be tombstoned once"); + + pool.destroy(); + } + + @Test + public void removeAllIsNoOpWhenTtlDisabled() { + DefaultChannelPool pool = noReaperPool(); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + assertFalse(pool.removeAll(channel), "removeAll only acts when connectionTtl is enabled"); + // The channel is still leasable (it was never tombstoned). + assertSame(channel, pool.poll(KEY)); + + pool.destroy(); + } + + // ---- plan 013: tombstone-aware cleaner ---- + + @Test + public void cleanerUnlinksTombstoneWithoutClosingIt() throws Exception { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = ttlPool(timer); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + pool.removeAll(channel); + assertEquals(1, partitionSize(pool, KEY), "tombstone lingers until the cleaner ticks"); + + timer.fire(); + + assertEquals(0, partitionSize(pool, KEY), "cleaner must physically unlink the tombstone"); + assertTrue(channel.isActive(), "cleaner must not close a tombstoned channel"); + + pool.destroy(); + } + + @Test + public void cleanerClosesRemotelyClosedChannel() throws Exception { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = idlePool(timer, Duration.ofHours(1)); // only the remote-close path trips + EmbeddedChannel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + channel.close().await(5, TimeUnit.SECONDS); + assertFalse(channel.isActive()); + + timer.fire(); + + assertEquals(0, partitionSize(pool, KEY), "remotely closed channel must be unlinked"); + assertNull(pool.poll(KEY)); + + pool.destroy(); + } + + @Test + public void cleanerClosesIdleTimeoutExpiredChannel() throws Exception { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(1)); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + Thread.sleep(40); // make now - start >= 1ms + + timer.fire(); + + assertEquals(0, partitionSize(pool, KEY), "idle-expired channel must be unlinked"); + assertFalse(channel.isActive(), "cleaner must close an idle-expired channel"); + assertNull(pool.poll(KEY)); + + pool.destroy(); + } + + @Test + public void cleanerLeavesHealthyChannelLeasable() throws Exception { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = idlePool(timer, Duration.ofHours(1)); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + timer.fire(); + + assertEquals(1, partitionSize(pool, KEY), "a healthy idle channel must survive the cleaner"); + assertTrue(channel.isActive()); + assertSame(channel, pool.poll(KEY), "a healthy channel stays leasable"); + + pool.destroy(); + } + + @Test + public void channelReofferedAfterExpiryIsNotReaped() throws Exception { + CapturingTimer timer = new CapturingTimer(); + DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(1)); + Channel channel = new EmbeddedChannel(); + + pool.offer(channel, KEY); + Thread.sleep(40); // channel is now idle-expired + + // Lease and re-offer it: reset() stamps a fresh start, so the next cleaner pass must spare it. + assertSame(channel, pool.poll(KEY)); + pool.offer(channel, KEY); + + timer.fire(); + + assertTrue(channel.isActive(), "a re-offered (fresh) channel must not be closed by the cleaner"); + assertSame(channel, pool.poll(KEY)); + + pool.destroy(); + } + + // ---- concurrency: no leaked tombstones, never leases a claimed channel ---- + + @Test + public void concurrentOfferPollRemoveAllIsConsistent() throws Exception { + // Real timer so the cleaner reaps tombstones concurrently with offer/poll/removeAll. + // TTL only (idle disabled) so the cleaner never closes our shared EmbeddedChannels cross-thread. + HashedWheelTimer timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS); + DefaultChannelPool pool = new DefaultChannelPool(Duration.ZERO, Duration.ofHours(1), + PoolLeaseStrategy.LIFO, timer, Duration.ofMillis(10)); + + final int channelCount = 16; + Channel[] channels = new Channel[channelCount]; + for (int i = 0; i < channelCount; i++) { + channels[i] = new EmbeddedChannel(); + } + + final int threads = 4; + final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicReference failure = new AtomicReference<>(); + final ConcurrentLinkedQueue leasedInactive = new ConcurrentLinkedQueue<>(); + final CountDownLatch done = new CountDownLatch(threads); + + for (int t = 0; t < threads; t++) { + final int seed = t; + Thread worker = new Thread(() -> { + try { + long x = seed + 1; + while (!stop.get()) { + x = x * 6364136223846793005L + 1442695040888963407L; // xorshift-ish LCG + int idx = (int) ((x >>> 33) % channelCount); + Channel c = channels[idx]; + switch ((int) ((x >>> 17) & 3)) { + case 0: + pool.offer(c, KEY); + break; + case 1: + Channel leased = pool.poll(KEY); + if (leased != null && !leased.isActive()) { + leasedInactive.add(leased); // poll must never hand out a dead channel + } + break; + default: + pool.removeAll(c); + break; + } + } + } catch (Throwable th) { + failure.compareAndSet(null, th); + } finally { + done.countDown(); + } + }, "pool-soak-" + t); + worker.start(); + } + + Thread.sleep(1500); + stop.set(true); + assertTrue(done.await(10, TimeUnit.SECONDS), "workers must finish"); + + if (failure.get() != null) { + fail("worker threw: " + failure.get(), failure.get()); + } + assertTrue(leasedInactive.isEmpty(), "poll must never lease an inactive channel"); + + // Drain leases, then let the cleaner run a couple of ticks and confirm no tombstone leak: + // every partition deque must collapse to at most the number of distinct channels. + while (pool.poll(KEY) != null) { + // drain + } + Thread.sleep(60); // a few cleaner ticks + int size = partitionSize(pool, KEY); + assertTrue(size <= channelCount, "tombstones must not accumulate unbounded, was " + size); + + pool.destroy(); + timer.stop(); + } + + // ---- helpers ---- + + private static Object idleState(Channel channel) throws Exception { + Field keyField = DefaultChannelPool.class.getDeclaredField("IDLE_STATE_ATTRIBUTE_KEY"); + keyField.setAccessible(true); + @SuppressWarnings("unchecked") + io.netty.util.AttributeKey key = (io.netty.util.AttributeKey) keyField.get(null); + return channel.attr(key).get(); + } + + @SuppressWarnings("unchecked") + private static int partitionSize(DefaultChannelPool pool, Object key) throws Exception { + Field partitionsField = DefaultChannelPool.class.getDeclaredField("partitions"); + partitionsField.setAccessible(true); + ConcurrentHashMap> partitions = + (ConcurrentHashMap>) partitionsField.get(pool); + ConcurrentLinkedDeque partition = partitions.get(key); + return partition == null ? 0 : partition.size(); + } + + /** + * A {@link Timer} that captures the last-scheduled {@link TimerTask} (the pool's idle cleaner) so a + * test can fire it synchronously instead of waiting on wall-clock time. + */ + private static final class CapturingTimer implements Timer { + + private volatile TimerTask task; + + @Override + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + this.task = task; + return new CapturingTimeout(this, task); + } + + @Override + public Set stop() { + return Collections.emptySet(); + } + + void fire() { + TimerTask current = task; + if (current != null) { + try { + current.run(new CapturingTimeout(this, current)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + private static final class CapturingTimeout implements Timeout { + + private final Timer timer; + private final TimerTask task; + + CapturingTimeout(Timer timer, TimerTask task) { + this.timer = timer; + this.task = task; + } + + @Override + public Timer timer() { + return timer; + } + + @Override + public TimerTask task() { + return task; + } + + @Override + public boolean isExpired() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel() { + return false; + } + } +} From 5a2ca9f733aa24e2d61d4426a0b9dcb8ecbcdfa4 Mon Sep 17 00:00:00 2001 From: Aayush Atharva Date: Sun, 7 Jun 2026 12:22:08 +0000 Subject: [PATCH 2/2] Fix test case --- .../bench/AcceptEncodingHpackBenchmark.java | 6 ++++++ .../bench/ChannelPoolCheckoutBenchmark.java | 8 +++++++- .../bench/CookieStoreGetBenchmark.java | 8 +++++++- .../bench/FormParamsEncodeBenchmark.java | 8 +++++++- .../asynchttpclient/bench/HeaderEncodeBenchmark.java | 6 ++++++ .../bench/MultipartPreContentBenchmark.java | 8 +++++++- .../bench/RequestHeaderCopyBenchmark.java | 6 ++++++ .../bench/ResponseBodyPartBenchmark.java | 6 ++++++ .../asynchttpclient/bench/UriEncoderBenchmark.java | 8 +++++++- .../org/asynchttpclient/bench/UriParseBenchmark.java | 8 +++++++- .../netty/channel/DefaultChannelPoolTest.java | 12 +++++++++--- 11 files changed, 75 insertions(+), 9 deletions(-) diff --git a/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java index 7f034712f1..0e69aae031 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/AcceptEncodingHpackBenchmark.java @@ -6,6 +6,12 @@ * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java index 175def5d04..0bc2dc1583 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/ChannelPoolCheckoutBenchmark.java @@ -1,11 +1,17 @@ /* - * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java index 731b457845..be77d4564e 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/CookieStoreGetBenchmark.java @@ -1,11 +1,17 @@ /* - * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java index fe70b134b4..5898b213f3 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/FormParamsEncodeBenchmark.java @@ -1,11 +1,17 @@ /* - * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java index b2fefc4a61..32792c60af 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/HeaderEncodeBenchmark.java @@ -6,6 +6,12 @@ * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java index c35bee7c90..11ab79ab99 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/MultipartPreContentBenchmark.java @@ -1,11 +1,17 @@ /* - * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java index af76194a89..bd402253b4 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/RequestHeaderCopyBenchmark.java @@ -6,6 +6,12 @@ * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java index c2cedd1f59..184356844c 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/ResponseBodyPartBenchmark.java @@ -6,6 +6,12 @@ * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java index a2a8153ce6..8330f36c18 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/UriEncoderBenchmark.java @@ -1,11 +1,17 @@ /* - * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java b/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java index 33f034dca1..7c8d4dcc06 100644 --- a/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java +++ b/client/src/jmh/java/org/asynchttpclient/bench/UriParseBenchmark.java @@ -1,11 +1,17 @@ /* - * Copyright (c) 2024 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2026 AsyncHttpClient Project. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.asynchttpclient.bench; diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java index 1d30ca2abd..8203d621d5 100644 --- a/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java +++ b/client/src/test/java/org/asynchttpclient/netty/channel/DefaultChannelPoolTest.java @@ -217,14 +217,20 @@ public void cleanerLeavesHealthyChannelLeasable() throws Exception { @Test public void channelReofferedAfterExpiryIsNotReaped() throws Exception { + // maxIdleTime must comfortably exceed the re-offer -> cleaner-fire gap below: reset() stamps a + // fresh start, and the channel must read as fresh when the cleaner runs. A tiny timeout (e.g. + // 1ms) is shorter than millisecond clock granularity, so the re-offered channel would re-expire + // before fire() and be reaped — a test artifact, not a pool bug. + final long maxIdle = 1000; CapturingTimer timer = new CapturingTimer(); - DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(1)); + DefaultChannelPool pool = idlePool(timer, Duration.ofMillis(maxIdle)); Channel channel = new EmbeddedChannel(); pool.offer(channel, KEY); - Thread.sleep(40); // channel is now idle-expired + Thread.sleep(maxIdle + 100); // first lifetime exceeds maxIdleTime: this channel was reapable - // Lease and re-offer it: reset() stamps a fresh start, so the next cleaner pass must spare it. + // Lease and re-offer it: reset() stamps a fresh start, so the cleaner (firing immediately, + // far inside maxIdleTime) must spare it. assertSame(channel, pool.poll(KEY)); pool.offer(channel, KEY);