From 7f8f3377b58dd9ed829041e07a16f1c6bf2fae1e Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 16:49:11 +0530 Subject: [PATCH 01/11] Delay Rqueue startup for Boot web apps --- .../RqueueMessageListenerContainer.java | 8 ++- .../boot/RqueueAutoStartupLifecycle.java | 58 +++++++++++++++++++ .../spring/boot/RqueueListenerAutoConfig.java | 10 ++++ .../unit/RqueueListenerAutoConfigTest.java | 51 ++++++++++++++++ 4 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 190c7be4..a4b3ded8 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -517,10 +517,14 @@ private List getQueueDetail(String queue, MappingInformation mappin @Override public void start() { - log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId()); synchronized (lifecycleMgr) { - running = true; + if (running) { + log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId()); + return; + } + log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId()); doStart(); + running = true; rqueueBeanProvider .getApplicationEventPublisher() .publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true)); diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java new file mode 100644 index 00000000..0f9013c1 --- /dev/null +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.spring.boot; + +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; + +/** + * Delays Rqueue poller startup in Boot web applications until the servlet or reactive web server + * is ready and Spring Boot has published {@link ApplicationReadyEvent}. + */ +public class RqueueAutoStartupLifecycle + implements BeanPostProcessor, ApplicationListener { + + private final Set delayedContainers = + ConcurrentHashMap.newKeySet(); + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) + throws BeansException { + if (bean instanceof RqueueMessageListenerContainer) { + RqueueMessageListenerContainer container = (RqueueMessageListenerContainer) bean; + if (container.isAutoStartup()) { + container.setAutoStartup(false); + delayedContainers.add(container); + } + } + return bean; + } + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + for (RqueueMessageListenerContainer container : delayedContainers) { + if (!container.isRunning()) { + container.start(); + } + } + } +} diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java index 9ac5c4fc..cfb166d4 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java @@ -33,8 +33,10 @@ import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled; import com.github.sonus21.rqueue.utils.condition.RqueueEnabled; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -42,6 +44,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Role; @Configuration @AutoConfigureAfter(DataRedisAutoConfiguration.class) @@ -54,6 +57,13 @@ @Import(RqueueRedisConfigImportSelector.class) public class RqueueListenerAutoConfig extends RqueueListenerBaseConfig { + @Bean + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + @ConditionalOnWebApplication + public static RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle() { + return new RqueueAutoStartupLifecycle(); + } + @Bean @ConditionalOnMissingBean public RqueueMessageHandler rqueueMessageHandler(MessageBroker messageBroker) { diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index b7e2986e..8234dc93 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -17,6 +17,7 @@ package com.github.sonus21.rqueue.spring.boot.tests.unit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -35,6 +36,8 @@ import com.github.sonus21.rqueue.core.spi.Capabilities; import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.spring.boot.RqueueAutoStartupLifecycle; import com.github.sonus21.rqueue.spring.boot.RqueueListenerAutoConfig; import com.github.sonus21.rqueue.spring.boot.tests.SpringBootUnitTest; import org.apache.commons.lang3.reflect.FieldUtils; @@ -125,6 +128,33 @@ void rqueueMessageListenerContainer() assertSame(messageBroker, factory.getMessageBroker()); } + @Test + void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { + RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); + TestRqueueMessageListenerContainer autoStartupContainer = + new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer disabledContainer = + new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer alreadyRunningContainer = + new TestRqueueMessageListenerContainer(); + disabledContainer.setAutoStartup(false); + + lifecycle.postProcessBeforeInitialization(autoStartupContainer, "autoStartupContainer"); + lifecycle.postProcessBeforeInitialization(disabledContainer, "disabledContainer"); + lifecycle.postProcessBeforeInitialization(alreadyRunningContainer, "alreadyRunningContainer"); + alreadyRunningContainer.running = true; + + assertFalse(autoStartupContainer.isAutoStartup()); + assertFalse(disabledContainer.isAutoStartup()); + assertFalse(alreadyRunningContainer.isAutoStartup()); + + lifecycle.onApplicationEvent(null); + + assertEquals(1, autoStartupContainer.startCount); + assertEquals(0, disabledContainer.startCount); + assertEquals(0, alreadyRunningContainer.startCount); + } + @Test void rqueueMessageEnqueuerWiresBroker() throws IllegalAccessException { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); @@ -158,4 +188,25 @@ void rqueueMessageSenderUsesConfiguredMessageConverter() throws IllegalAccessExc MessageConverter converter = messageSender.getMessageConverter(); assertTrue(converter.hashCode() == messageConverter.hashCode()); } + + private class TestRqueueMessageListenerContainer extends RqueueMessageListenerContainer { + + private int startCount; + private boolean running; + + private TestRqueueMessageListenerContainer() { + super(rqueueMessageHandler, messageTemplate); + } + + @Override + public void start() { + startCount++; + running = true; + } + + @Override + public boolean isRunning() { + return running; + } + } } From b612ab2d56309446e2562819c7f4229451bf7d03 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 21 May 2026 11:21:06 +0000 Subject: [PATCH 02/11] Apply Palantir Java Format --- .../spring/boot/tests/unit/RqueueListenerAutoConfigTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index 8234dc93..1c6e33a6 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -133,8 +133,7 @@ void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); TestRqueueMessageListenerContainer autoStartupContainer = new TestRqueueMessageListenerContainer(); - TestRqueueMessageListenerContainer disabledContainer = - new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer disabledContainer = new TestRqueueMessageListenerContainer(); TestRqueueMessageListenerContainer alreadyRunningContainer = new TestRqueueMessageListenerContainer(); disabledContainer.setAutoStartup(false); From 2bb9ac0effe910ab5b3363157d2c0621ff99a296 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 17:55:53 +0530 Subject: [PATCH 03/11] Make Boot Rqueue auto-start delay explicit --- .../boot/RqueueAutoStartupLifecycle.java | 8 +++++-- .../spring/boot/RqueueListenerAutoConfig.java | 22 ++++++++++++++++++- .../unit/RqueueListenerAutoConfigTest.java | 7 +++++- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java index 0f9013c1..449aad45 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java @@ -34,14 +34,18 @@ public class RqueueAutoStartupLifecycle private final Set delayedContainers = ConcurrentHashMap.newKeySet(); + public void delayAutoStartup(RqueueMessageListenerContainer container) { + container.setAutoStartup(false); + delayedContainers.add(container); + } + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof RqueueMessageListenerContainer) { RqueueMessageListenerContainer container = (RqueueMessageListenerContainer) bean; if (container.isAutoStartup()) { - container.setAutoStartup(false); - delayedContainers.add(container); + delayAutoStartup(container); } } return bean; diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java index cfb166d4..863d9d5a 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java @@ -33,6 +33,7 @@ import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled; import com.github.sonus21.rqueue.utils.condition.RqueueEnabled; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -57,6 +58,9 @@ @Import(RqueueRedisConfigImportSelector.class) public class RqueueListenerAutoConfig extends RqueueListenerBaseConfig { + @Autowired(required = false) + private RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle; + @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @ConditionalOnWebApplication @@ -83,7 +87,23 @@ public RqueueMessageListenerContainer rqueueMessageListenerContainer( if (simpleRqueueListenerContainerFactory.getMessageBroker() == null) { simpleRqueueListenerContainerFactory.setMessageBroker(messageBroker); } - return simpleRqueueListenerContainerFactory.createMessageListenerContainer(); + boolean delayAutoStartup = + rqueueAutoStartupLifecycle != null && simpleRqueueListenerContainerFactory.getAutoStartup(); + if (delayAutoStartup) { + simpleRqueueListenerContainerFactory.setAutoStartup(false); + } + RqueueMessageListenerContainer container; + try { + container = simpleRqueueListenerContainerFactory.createMessageListenerContainer(); + } finally { + if (delayAutoStartup) { + simpleRqueueListenerContainerFactory.setAutoStartup(true); + } + } + if (delayAutoStartup) { + rqueueAutoStartupLifecycle.delayAutoStartup(container); + } + return container; } @Bean diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index 1c6e33a6..2f572c19 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -122,10 +122,15 @@ void rqueueMessageListenerContainer() "com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider", true); FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); - messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); + FieldUtils.writeField( + messageAutoConfig, "rqueueAutoStartupLifecycle", new RqueueAutoStartupLifecycle(), true); + RqueueMessageListenerContainer container = + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); assertEquals(factory.getRqueueMessageHandler(null).hashCode(), rqueueMessageHandler.hashCode()); // The broker must be propagated onto the factory so the container picks it up. assertSame(messageBroker, factory.getMessageBroker()); + assertFalse(container.isAutoStartup()); + assertTrue(factory.getAutoStartup()); } @Test From d3e3e512ed6c29e129bb7d446040c0caea7051b3 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 19:34:52 +0530 Subject: [PATCH 04/11] Use NATS fetch wait for listener long polling --- .../rqueue/core/spi/MessageBroker.java | 13 +++++++ .../rqueue/listener/RqueueMessagePoller.java | 11 ++---- ...sageListenerContainerBrokerBranchTest.java | 37 +++++++++++++++++-- .../nats/js/JetStreamMessageBroker.java | 19 ++++++---- .../nats/JetStreamMessageBrokerUnitTest.java | 9 +++++ 5 files changed, 71 insertions(+), 18 deletions(-) diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java index 1f368f5a..578d314e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java @@ -83,6 +83,19 @@ default Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs)); } + /** + * Resolve the wait duration that listener pollers should pass to {@link #pop}. The default uses + * the listener container's polling interval, preserving existing Redis behavior where that value + * also controls idle sleeps. Backends with native long-poll controls can override this so their + * fetch wait can be tuned independently. + * + * @param pollingInterval listener container polling interval + * @return wait duration for listener-driven pop calls + */ + default Duration getPollWait(Duration pollingInterval) { + return pollingInterval; + } + List pop(QueueDetail q, String consumerName, int batch, Duration wait); /** diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java index dfdddf92..8927c4da 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java @@ -19,6 +19,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.middleware.Middleware; +import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.QueueThreadPool; @@ -59,13 +60,9 @@ abstract class RqueueMessagePoller extends MessageContainerBase { } private List getMessages(QueueDetail queueDetail, int count) { - return rqueueBeanProvider - .getMessageBroker() - .pop( - queueDetail, - queueDetail.resolvedConsumerName(), - count, - Duration.ofMillis(pollingInterval)); + MessageBroker broker = rqueueBeanProvider.getMessageBroker(); + Duration wait = broker.getPollWait(Duration.ofMillis(pollingInterval)); + return broker.pop(queueDetail, queueDetail.resolvedConsumerName(), count, wait); } private void execute( diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java index 3b4ebe34..f407ed9e 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java @@ -15,6 +15,7 @@ */ package com.github.sonus21.rqueue.listener; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -108,6 +109,7 @@ static class CountingBroker implements MessageBroker, AutoCloseable { final AtomicInteger popCalls = new AtomicInteger(); final AtomicBoolean closed = new AtomicBoolean(); volatile Duration lastWait; + volatile Duration pollWait; private final Capabilities caps; CountingBroker(Capabilities caps) { @@ -120,6 +122,11 @@ public void enqueue(QueueDetail q, RqueueMessage m) {} @Override public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {} + @Override + public Duration getPollWait(Duration pollingInterval) { + return pollWait != null ? pollWait : MessageBroker.super.getPollWait(pollingInterval); + } + @Override public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { popCalls.incrementAndGet(); @@ -248,9 +255,33 @@ void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception { Duration wait = broker.lastWait; assertNotNull(wait, "broker should have received a wait duration"); assertFalse(wait.isZero(), "wait must not be Duration.ZERO; should match pollingInterval"); - assertTrue( - wait.toMillis() == pollingInterval, - "wait should equal the configured pollingInterval (got " + wait + ")"); + assertEquals(Duration.ofMillis(pollingInterval), wait); + } + + @Test + void pollerUsesBrokerResolvedFetchWait() throws Exception { + EndpointRegistry.delete(); + CountingBroker broker = + new CountingBroker(new Capabilities(true, false, false, true, true, true)); + broker.pollWait = Duration.ofSeconds(2); + RqueueMessageListenerContainer container = + new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate); + container.rqueueBeanProvider = beanProvider; + container.setMessageBroker(broker); + container.setPollingInterval(137L); + container.afterPropertiesSet(); + container.start(); + try { + long deadline = System.currentTimeMillis() + 2000; + while (broker.popCalls.get() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(20); + } + } finally { + container.stop(); + container.destroy(); + } + assertTrue(broker.popCalls.get() > 0, "poller should have issued at least one pop call"); + assertEquals(broker.pollWait, broker.lastWait); } private class TrackingContainer extends RqueueMessageListenerContainer { diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java index 688841f1..4e661ff2 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java @@ -100,10 +100,7 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable { /** * Lower bound for fetch wait when the caller passes a non-positive duration. JetStream rejects - * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. Callers that - * want long-poll semantics should pass the desired wait explicitly (e.g. the listener - * container's {@code pollingInterval}); this constant only guards against accidental zero waits - * from non-listener callers. + * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. */ private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50); @@ -463,6 +460,12 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long .then(); } + @Override + public Duration getPollWait(Duration pollingInterval) { + Duration fetchWait = config == null ? null : config.getDefaultFetchWait(); + return fetchWait != null ? fetchWait : MessageBroker.super.getPollWait(pollingInterval); + } + @Override public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { return popInternal( @@ -533,10 +536,10 @@ private List popInternal( Duration wait, Duration ackWait, long maxDeliver) { - // Honour the caller-supplied wait — this is the listener container's pollingInterval for - // RqueueMessagePoller, and lets JetStream long-poll instead of the broker firing a steady - // stream of $JS.API.CONSUMER.MSG.NEXT requests. Only fall back when the caller didn't - // express a preference; zero/negative waits are rounded up to the JetStream minimum. + // Honour the caller-supplied wait. Listener pollers obtain this from getPollWait(), so NATS + // can use rqueue.nats.consumer.fetch-wait for long polling while keeping the listener + // pollingInterval as its idle sleep. Only fall back when the caller didn't express a + // preference; zero/negative waits are rounded up to the JetStream minimum. Duration fetchWait; if (wait == null) { fetchWait = config.getDefaultFetchWait(); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 2aa9a912..62b1a80c 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -39,6 +39,7 @@ import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; import java.io.IOException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -92,6 +93,14 @@ void enqueue_publishesToPrefixedSubject() throws Exception { verify(f.js, times(1)).publish(eq("rqueue.js.orders"), any(Headers.class), any(byte[].class)); } + @Test + void getPollWait_usesConfiguredFetchWait() { + RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(Duration.ofSeconds(9)); + Fixture f = newFixture(cfg); + + assertEquals(Duration.ofSeconds(9), f.broker.getPollWait(Duration.ofMillis(137))); + } + @Test void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults()); From 3839668e2ce2eebba14262a234db03c6a853a18c Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 21:51:05 +0530 Subject: [PATCH 05/11] Add MsgPack listener E2E coverage --- .../MessagePackageListenerTest.java | 243 ++++++++++++++++++ .../MsgPackMessageConverterProvider.java | 166 ++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java new file mode 100644 index 00000000..5b5adee2 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.listener.RqueueMessageHeaders; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; +import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +@Tag("springBootIntegration") +@Tag("integration") +@Tag("springBoot") +class MessagePackageListenerTest { + + private static final String MESSAGE_PACKAGE_QUEUE = "message-package-listener-package"; + private static final String MESSAGE_QUEUE = "message-package-listener-message"; + private static final String NATS_STREAM_PREFIX = "rqueue-js-messagePackageListener-"; + private static final String NATS_SUBJECT_PREFIX = "rqueue.js.messagePackageListener."; + private static final int REDIS_PORT = 8032; + + @ParameterizedTest(name = "{0}") + @EnumSource(BackendUnderTest.class) + void listenerReceivesMsgPackPayloadInsideSpringMessage(BackendUnderTest backend) + throws Exception { + try (ConfigurableApplicationContext context = startContext(backend)) { + TestListener listener = context.getBean(TestListener.class); + ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message-package"); + + String messageId = + context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_PACKAGE_QUEUE, payload); + + assertThat(listener.messagePackageLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.messagePackage.get()).isNotNull(); + assertThat(listener.messagePackage.get().getPayload()).isEqualTo(payload); + assertRqueueMessage( + listener.messagePackageRqueueMessage.get(), messageId, MESSAGE_PACKAGE_QUEUE, payload); + } + } + + @ParameterizedTest(name = "{0}") + @EnumSource(BackendUnderTest.class) + void listenerReceivesMsgPackPayload(BackendUnderTest backend) throws Exception { + try (ConfigurableApplicationContext context = startContext(backend)) { + TestListener listener = context.getBean(TestListener.class); + ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message"); + + String messageId = + context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_QUEUE, payload); + + assertThat(listener.messageLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.message.get()).isEqualTo(payload); + assertRqueueMessage(listener.messageRqueueMessage.get(), messageId, MESSAGE_QUEUE, payload); + } + } + + private ConfigurableApplicationContext startContext(BackendUnderTest backend) { + if (backend.isNats()) { + AbstractNatsBootIT.startNats(); + AbstractNatsBootIT.deleteStreamsWithPrefix(NATS_STREAM_PREFIX); + } + return new SpringApplicationBuilder(backend.applicationClass()) + .web(WebApplicationType.NONE) + .properties(backend.properties()) + .run(); + } + + private static void assertRqueueMessage( + RqueueMessage rqueueMessage, + String messageId, + String queueName, + ListenerPayload expectedPayload) { + assertThat(rqueueMessage).isNotNull(); + assertThat(rqueueMessage.getId()).isEqualTo(messageId); + assertThat(rqueueMessage.getQueueName()).isEqualTo(queueName); + assertThat(MsgPackMessageConverterProvider.isMsgPack(rqueueMessage.getMessage())) + .isTrue(); + assertThat(MsgPackMessageConverterProvider.decode(rqueueMessage.getMessage())) + .isEqualTo(expectedPayload); + } + + enum BackendUnderTest { + REDIS(RedisTestApp.class, new String[] { + "rqueue.backend=redis", + "spring.data.redis.host=127.0.0.1", + "spring.data.redis.port=" + REDIS_PORT, + "mysql.db.name=MessagePackageListenerTestRedis", + "use.system.redis=false" + }), + NATS(NatsTestApp.class, new String[] {}); + + private final Class applicationClass; + private final String[] properties; + + BackendUnderTest(Class applicationClass, String[] properties) { + this.applicationClass = applicationClass; + this.properties = properties; + } + + Class applicationClass() { + return applicationClass; + } + + String[] properties() { + String[] common = new String[] { + "rqueue.message.converter.provider.class=" + + MsgPackMessageConverterProvider.class.getName(), + }; + String[] backendProperties = isNats() + ? new String[] { + "rqueue.backend=nats", + "rqueue.nats.naming.stream-prefix=" + NATS_STREAM_PREFIX, + "rqueue.nats.naming.subject-prefix=" + NATS_SUBJECT_PREFIX, + "rqueue.nats.connection.url=" + AbstractNatsBootIT.activeNatsUrl() + } + : properties; + String[] merged = new String[common.length + backendProperties.length]; + System.arraycopy(common, 0, merged, 0, common.length); + System.arraycopy(backendProperties, 0, merged, common.length, backendProperties.length); + return merged; + } + + boolean isNats() { + return this == NATS; + } + } + + @SpringBootApplication + @Import(TestListener.class) + static class RedisTestApp extends BaseApplication {} + + @SpringBootApplication( + exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) + @Import(TestListener.class) + static class NatsTestApp {} + + @Component + static class TestListener { + + final CountDownLatch messagePackageLatch = new CountDownLatch(1); + final CountDownLatch messageLatch = new CountDownLatch(1); + final AtomicReference> messagePackage = new AtomicReference<>(); + final AtomicReference messagePackageRqueueMessage = new AtomicReference<>(); + final AtomicReference message = new AtomicReference<>(); + final AtomicReference messageRqueueMessage = new AtomicReference<>(); + + @RqueueListener(value = MESSAGE_PACKAGE_QUEUE) + void onMessagePackage( + Message message, + @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { + messagePackage.set(message); + messagePackageRqueueMessage.set(rqueueMessage); + messagePackageLatch.countDown(); + } + + @RqueueListener(value = MESSAGE_QUEUE) + void onMessage( + ListenerPayload message, + @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { + this.message.set(message); + messageRqueueMessage.set(rqueueMessage); + messageLatch.countDown(); + } + } + + static class ListenerPayload { + + private String backend; + private String body; + + ListenerPayload() {} + + ListenerPayload(String backend, String body) { + this.backend = backend; + this.body = body; + } + + public String getBackend() { + return backend; + } + + public void setBackend(String backend) { + this.backend = backend; + } + + public String getBody() { + return body; + } + + public void setBody(String body) { + this.body = body; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof ListenerPayload)) { + return false; + } + ListenerPayload that = (ListenerPayload) other; + return Objects.equals(backend, that.backend) && Objects.equals(body, that.body); + } + + @Override + public int hashCode() { + return Objects.hash(backend, body); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java new file mode 100644 index 00000000..0ed73e15 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.integration; + +import com.github.sonus21.rqueue.converter.MessageConverterProvider; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConversionException; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.GenericMessage; + +public class MsgPackMessageConverterProvider implements MessageConverterProvider { + + private static final String PREFIX = "msgpack:"; + + @Override + public MessageConverter getConverter() { + return new MsgPackMessageConverter(); + } + + static boolean isMsgPack(String payload) { + return payload != null && payload.startsWith(PREFIX); + } + + static MessagePackageListenerTest.ListenerPayload decode(String payload) { + if (!isMsgPack(payload)) { + throw new MessageConversionException("Payload is not MsgPack encoded"); + } + return MsgPackCodec.decode(Base64.getDecoder().decode(payload.substring(PREFIX.length()))); + } + + private static class MsgPackMessageConverter implements MessageConverter { + + @Override + public Object fromMessage(Message message, Class targetClass) { + Object payload = message.getPayload(); + if (payload instanceof MessagePackageListenerTest.ListenerPayload) { + return payload; + } + if (!(payload instanceof String)) { + return null; + } + return decode((String) payload); + } + + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + if (payload instanceof MessagePackageListenerTest.ListenerPayload) { + byte[] msgPackBytes = + MsgPackCodec.encode((MessagePackageListenerTest.ListenerPayload) payload); + return new GenericMessage<>(PREFIX + Base64.getEncoder().encodeToString(msgPackBytes)); + } + if (payload instanceof String) { + return new GenericMessage<>(payload); + } + return null; + } + } + + private static final class MsgPackCodec { + + private MsgPackCodec() {} + + static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(0x82); + writeString(out, "backend"); + writeString(out, payload.getBackend()); + writeString(out, "body"); + writeString(out, payload.getBody()); + return out.toByteArray(); + } + + static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { + Cursor cursor = new Cursor(bytes); + int mapHeader = cursor.readUnsignedByte(); + int entries; + if ((mapHeader & 0xf0) == 0x80) { + entries = mapHeader & 0x0f; + } else { + throw new MessageConversionException("Expected MsgPack fixmap"); + } + String backend = null; + String body = null; + for (int i = 0; i < entries; i++) { + String key = readString(cursor); + String value = readString(cursor); + if ("backend".equals(key)) { + backend = value; + } else if ("body".equals(key)) { + body = value; + } + } + return new MessagePackageListenerTest.ListenerPayload(backend, body); + } + + private static void writeString(ByteArrayOutputStream out, String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + if (bytes.length <= 31) { + out.write(0xa0 | bytes.length); + } else if (bytes.length <= 255) { + out.write(0xd9); + out.write(bytes.length); + } else { + throw new MessageConversionException("Test MsgPack codec supports strings up to 255 bytes"); + } + out.writeBytes(bytes); + } + + private static String readString(Cursor cursor) { + int header = cursor.readUnsignedByte(); + int length; + if ((header & 0xe0) == 0xa0) { + length = header & 0x1f; + } else if (header == 0xd9) { + length = cursor.readUnsignedByte(); + } else { + throw new MessageConversionException("Expected MsgPack string"); + } + return new String(cursor.readBytes(length), StandardCharsets.UTF_8); + } + } + + private static final class Cursor { + + private final byte[] bytes; + private int index; + + Cursor(byte[] bytes) { + this.bytes = bytes; + } + + int readUnsignedByte() { + if (index >= bytes.length) { + throw new MessageConversionException("Unexpected end of MsgPack payload"); + } + return bytes[index++] & 0xff; + } + + byte[] readBytes(int length) { + if (index + length > bytes.length) { + throw new MessageConversionException("Unexpected end of MsgPack payload"); + } + byte[] value = new byte[length]; + System.arraycopy(bytes, index, value, 0, length); + index += length; + return value; + } + } +} From fb227d5f8728d63e4ba09c171b1f0b3faa75b2e8 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:11:45 +0530 Subject: [PATCH 06/11] Stabilize NATS priority queue E2E test --- .../rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java index a366a7f1..f4a62f5f 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java @@ -91,7 +91,7 @@ static class PriorityListener { final CountDownLatch latch = new CountDownLatch(10); final List received = Collections.synchronizedList(new ArrayList<>()); - @RqueueListener(value = "pq", priority = "high=10,low=1") + @RqueueListener(value = "pq", priority = "high=10,low=1", batchSize = "5", concurrency = "5") void onMessage(String payload) { received.add(payload); latch.countDown(); From dd3b6e45908f0f1d48f00f151a72c222d5bb82ea Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:16:34 +0530 Subject: [PATCH 07/11] Use MessagePack dependency in listener E2E test --- rqueue-spring-boot-starter/build.gradle | 1 + .../MsgPackMessageConverterProvider.java | 109 +++++------------- 2 files changed, 30 insertions(+), 80 deletions(-) diff --git a/rqueue-spring-boot-starter/build.gradle b/rqueue-spring-boot-starter/build.gradle index ac5dc746..de02433c 100644 --- a/rqueue-spring-boot-starter/build.gradle +++ b/rqueue-spring-boot-starter/build.gradle @@ -68,6 +68,7 @@ dependencies { testImplementation "org.springframework.boot:spring-boot-starter-data-redis-reactive:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-devtools:${springBootVersion}" + testImplementation "org.msgpack:msgpack-core:0.9.9" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java index 0ed73e15..688971af 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java @@ -16,9 +16,11 @@ package com.github.sonus21.rqueue.spring.boot.integration; import com.github.sonus21.rqueue.converter.MessageConverterProvider; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; +import java.io.IOException; import java.util.Base64; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; @@ -78,89 +80,36 @@ private static final class MsgPackCodec { private MsgPackCodec() {} static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - out.write(0x82); - writeString(out, "backend"); - writeString(out, payload.getBackend()); - writeString(out, "body"); - writeString(out, payload.getBody()); - return out.toByteArray(); + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { + packer.packMapHeader(2); + packer.packString("backend"); + packer.packString(payload.getBackend()); + packer.packString("body"); + packer.packString(payload.getBody()); + return packer.toByteArray(); + } catch (IOException e) { + throw new MessageConversionException("MsgPack encoding failed", e); + } } static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { - Cursor cursor = new Cursor(bytes); - int mapHeader = cursor.readUnsignedByte(); - int entries; - if ((mapHeader & 0xf0) == 0x80) { - entries = mapHeader & 0x0f; - } else { - throw new MessageConversionException("Expected MsgPack fixmap"); - } - String backend = null; - String body = null; - for (int i = 0; i < entries; i++) { - String key = readString(cursor); - String value = readString(cursor); - if ("backend".equals(key)) { - backend = value; - } else if ("body".equals(key)) { - body = value; + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { + int entries = unpacker.unpackMapHeader(); + String backend = null; + String body = null; + for (int i = 0; i < entries; i++) { + String key = unpacker.unpackString(); + String value = unpacker.unpackString(); + if ("backend".equals(key)) { + backend = value; + } else if ("body".equals(key)) { + body = value; + } } + return new MessagePackageListenerTest.ListenerPayload(backend, body); + } catch (IOException e) { + throw new MessageConversionException("MsgPack decoding failed", e); } - return new MessagePackageListenerTest.ListenerPayload(backend, body); - } - - private static void writeString(ByteArrayOutputStream out, String value) { - byte[] bytes = value.getBytes(StandardCharsets.UTF_8); - if (bytes.length <= 31) { - out.write(0xa0 | bytes.length); - } else if (bytes.length <= 255) { - out.write(0xd9); - out.write(bytes.length); - } else { - throw new MessageConversionException("Test MsgPack codec supports strings up to 255 bytes"); - } - out.writeBytes(bytes); - } - - private static String readString(Cursor cursor) { - int header = cursor.readUnsignedByte(); - int length; - if ((header & 0xe0) == 0xa0) { - length = header & 0x1f; - } else if (header == 0xd9) { - length = cursor.readUnsignedByte(); - } else { - throw new MessageConversionException("Expected MsgPack string"); - } - return new String(cursor.readBytes(length), StandardCharsets.UTF_8); - } - } - - private static final class Cursor { - - private final byte[] bytes; - private int index; - - Cursor(byte[] bytes) { - this.bytes = bytes; - } - - int readUnsignedByte() { - if (index >= bytes.length) { - throw new MessageConversionException("Unexpected end of MsgPack payload"); - } - return bytes[index++] & 0xff; - } - - byte[] readBytes(int length) { - if (index + length > bytes.length) { - throw new MessageConversionException("Unexpected end of MsgPack payload"); - } - byte[] value = new byte[length]; - System.arraycopy(bytes, index, value, 0, length); - index += length; - return value; } } } From b5aeb980f377275dbafe3486240120524b037458 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:19:41 +0530 Subject: [PATCH 08/11] Use MessagePack ObjectMapper in listener E2E test --- rqueue-spring-boot-starter/build.gradle | 2 +- .../MsgPackMessageConverterProvider.java | 35 ++++++------------- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/rqueue-spring-boot-starter/build.gradle b/rqueue-spring-boot-starter/build.gradle index de02433c..f00e045f 100644 --- a/rqueue-spring-boot-starter/build.gradle +++ b/rqueue-spring-boot-starter/build.gradle @@ -68,7 +68,7 @@ dependencies { testImplementation "org.springframework.boot:spring-boot-starter-data-redis-reactive:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-devtools:${springBootVersion}" - testImplementation "org.msgpack:msgpack-core:0.9.9" + testImplementation "org.msgpack:jackson-dataformat-msgpack:0.9.12" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java index 688971af..94f380bf 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java @@ -15,12 +15,12 @@ */ package com.github.sonus21.rqueue.spring.boot.integration; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.sonus21.rqueue.converter.MessageConverterProvider; import java.io.IOException; import java.util.Base64; -import org.msgpack.core.MessageBufferPacker; -import org.msgpack.core.MessagePack; -import org.msgpack.core.MessageUnpacker; +import org.msgpack.jackson.dataformat.MessagePackFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConversionException; @@ -77,36 +77,21 @@ public Message toMessage(Object payload, MessageHeaders headers) { private static final class MsgPackCodec { + private static final ObjectMapper MAPPER = new ObjectMapper(new MessagePackFactory()); + private MsgPackCodec() {} static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { - try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { - packer.packMapHeader(2); - packer.packString("backend"); - packer.packString(payload.getBackend()); - packer.packString("body"); - packer.packString(payload.getBody()); - return packer.toByteArray(); - } catch (IOException e) { + try { + return MAPPER.writeValueAsBytes(payload); + } catch (JsonProcessingException e) { throw new MessageConversionException("MsgPack encoding failed", e); } } static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { - try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(bytes)) { - int entries = unpacker.unpackMapHeader(); - String backend = null; - String body = null; - for (int i = 0; i < entries; i++) { - String key = unpacker.unpackString(); - String value = unpacker.unpackString(); - if ("backend".equals(key)) { - backend = value; - } else if ("body".equals(key)) { - body = value; - } - } - return new MessagePackageListenerTest.ListenerPayload(backend, body); + try { + return MAPPER.readValue(bytes, MessagePackageListenerTest.ListenerPayload.class); } catch (IOException e) { throw new MessageConversionException("MsgPack decoding failed", e); } From 2c7f5f723ba255dbb739413cc475e6e401ba5aab Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Thu, 21 May 2026 23:21:16 +0530 Subject: [PATCH 09/11] Remove unsupported MessagePack listener E2E test --- rqueue-spring-boot-starter/build.gradle | 1 - .../MessagePackageListenerTest.java | 243 ------------------ .../MsgPackMessageConverterProvider.java | 100 ------- 3 files changed, 344 deletions(-) delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java diff --git a/rqueue-spring-boot-starter/build.gradle b/rqueue-spring-boot-starter/build.gradle index f00e045f..ac5dc746 100644 --- a/rqueue-spring-boot-starter/build.gradle +++ b/rqueue-spring-boot-starter/build.gradle @@ -68,7 +68,6 @@ dependencies { testImplementation "org.springframework.boot:spring-boot-starter-data-redis-reactive:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-starter-data-jpa:${springBootVersion}" testImplementation "org.springframework.boot:spring-boot-devtools:${springBootVersion}" - testImplementation "org.msgpack:jackson-dataformat-msgpack:0.9.12" testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java deleted file mode 100644 index 5b5adee2..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MessagePackageListenerTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessage; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import com.github.sonus21.rqueue.listener.RqueueMessageHeaders; -import com.github.sonus21.rqueue.test.application.BaseApplication; -import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Import; -import org.springframework.messaging.Message; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Component; - -@Tag("springBootIntegration") -@Tag("integration") -@Tag("springBoot") -class MessagePackageListenerTest { - - private static final String MESSAGE_PACKAGE_QUEUE = "message-package-listener-package"; - private static final String MESSAGE_QUEUE = "message-package-listener-message"; - private static final String NATS_STREAM_PREFIX = "rqueue-js-messagePackageListener-"; - private static final String NATS_SUBJECT_PREFIX = "rqueue.js.messagePackageListener."; - private static final int REDIS_PORT = 8032; - - @ParameterizedTest(name = "{0}") - @EnumSource(BackendUnderTest.class) - void listenerReceivesMsgPackPayloadInsideSpringMessage(BackendUnderTest backend) - throws Exception { - try (ConfigurableApplicationContext context = startContext(backend)) { - TestListener listener = context.getBean(TestListener.class); - ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message-package"); - - String messageId = - context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_PACKAGE_QUEUE, payload); - - assertThat(listener.messagePackageLatch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.messagePackage.get()).isNotNull(); - assertThat(listener.messagePackage.get().getPayload()).isEqualTo(payload); - assertRqueueMessage( - listener.messagePackageRqueueMessage.get(), messageId, MESSAGE_PACKAGE_QUEUE, payload); - } - } - - @ParameterizedTest(name = "{0}") - @EnumSource(BackendUnderTest.class) - void listenerReceivesMsgPackPayload(BackendUnderTest backend) throws Exception { - try (ConfigurableApplicationContext context = startContext(backend)) { - TestListener listener = context.getBean(TestListener.class); - ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message"); - - String messageId = - context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_QUEUE, payload); - - assertThat(listener.messageLatch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.message.get()).isEqualTo(payload); - assertRqueueMessage(listener.messageRqueueMessage.get(), messageId, MESSAGE_QUEUE, payload); - } - } - - private ConfigurableApplicationContext startContext(BackendUnderTest backend) { - if (backend.isNats()) { - AbstractNatsBootIT.startNats(); - AbstractNatsBootIT.deleteStreamsWithPrefix(NATS_STREAM_PREFIX); - } - return new SpringApplicationBuilder(backend.applicationClass()) - .web(WebApplicationType.NONE) - .properties(backend.properties()) - .run(); - } - - private static void assertRqueueMessage( - RqueueMessage rqueueMessage, - String messageId, - String queueName, - ListenerPayload expectedPayload) { - assertThat(rqueueMessage).isNotNull(); - assertThat(rqueueMessage.getId()).isEqualTo(messageId); - assertThat(rqueueMessage.getQueueName()).isEqualTo(queueName); - assertThat(MsgPackMessageConverterProvider.isMsgPack(rqueueMessage.getMessage())) - .isTrue(); - assertThat(MsgPackMessageConverterProvider.decode(rqueueMessage.getMessage())) - .isEqualTo(expectedPayload); - } - - enum BackendUnderTest { - REDIS(RedisTestApp.class, new String[] { - "rqueue.backend=redis", - "spring.data.redis.host=127.0.0.1", - "spring.data.redis.port=" + REDIS_PORT, - "mysql.db.name=MessagePackageListenerTestRedis", - "use.system.redis=false" - }), - NATS(NatsTestApp.class, new String[] {}); - - private final Class applicationClass; - private final String[] properties; - - BackendUnderTest(Class applicationClass, String[] properties) { - this.applicationClass = applicationClass; - this.properties = properties; - } - - Class applicationClass() { - return applicationClass; - } - - String[] properties() { - String[] common = new String[] { - "rqueue.message.converter.provider.class=" - + MsgPackMessageConverterProvider.class.getName(), - }; - String[] backendProperties = isNats() - ? new String[] { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NATS_STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NATS_SUBJECT_PREFIX, - "rqueue.nats.connection.url=" + AbstractNatsBootIT.activeNatsUrl() - } - : properties; - String[] merged = new String[common.length + backendProperties.length]; - System.arraycopy(common, 0, merged, 0, common.length); - System.arraycopy(backendProperties, 0, merged, common.length, backendProperties.length); - return merged; - } - - boolean isNats() { - return this == NATS; - } - } - - @SpringBootApplication - @Import(TestListener.class) - static class RedisTestApp extends BaseApplication {} - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(TestListener.class) - static class NatsTestApp {} - - @Component - static class TestListener { - - final CountDownLatch messagePackageLatch = new CountDownLatch(1); - final CountDownLatch messageLatch = new CountDownLatch(1); - final AtomicReference> messagePackage = new AtomicReference<>(); - final AtomicReference messagePackageRqueueMessage = new AtomicReference<>(); - final AtomicReference message = new AtomicReference<>(); - final AtomicReference messageRqueueMessage = new AtomicReference<>(); - - @RqueueListener(value = MESSAGE_PACKAGE_QUEUE) - void onMessagePackage( - Message message, - @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { - messagePackage.set(message); - messagePackageRqueueMessage.set(rqueueMessage); - messagePackageLatch.countDown(); - } - - @RqueueListener(value = MESSAGE_QUEUE) - void onMessage( - ListenerPayload message, - @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) { - this.message.set(message); - messageRqueueMessage.set(rqueueMessage); - messageLatch.countDown(); - } - } - - static class ListenerPayload { - - private String backend; - private String body; - - ListenerPayload() {} - - ListenerPayload(String backend, String body) { - this.backend = backend; - this.body = body; - } - - public String getBackend() { - return backend; - } - - public void setBackend(String backend) { - this.backend = backend; - } - - public String getBody() { - return body; - } - - public void setBody(String body) { - this.body = body; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof ListenerPayload)) { - return false; - } - ListenerPayload that = (ListenerPayload) other; - return Objects.equals(backend, that.backend) && Objects.equals(body, that.body); - } - - @Override - public int hashCode() { - return Objects.hash(backend, body); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java deleted file mode 100644 index 94f380bf..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/MsgPackMessageConverterProvider.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.sonus21.rqueue.converter.MessageConverterProvider; -import java.io.IOException; -import java.util.Base64; -import org.msgpack.jackson.dataformat.MessagePackFactory; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.MessageConversionException; -import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.support.GenericMessage; - -public class MsgPackMessageConverterProvider implements MessageConverterProvider { - - private static final String PREFIX = "msgpack:"; - - @Override - public MessageConverter getConverter() { - return new MsgPackMessageConverter(); - } - - static boolean isMsgPack(String payload) { - return payload != null && payload.startsWith(PREFIX); - } - - static MessagePackageListenerTest.ListenerPayload decode(String payload) { - if (!isMsgPack(payload)) { - throw new MessageConversionException("Payload is not MsgPack encoded"); - } - return MsgPackCodec.decode(Base64.getDecoder().decode(payload.substring(PREFIX.length()))); - } - - private static class MsgPackMessageConverter implements MessageConverter { - - @Override - public Object fromMessage(Message message, Class targetClass) { - Object payload = message.getPayload(); - if (payload instanceof MessagePackageListenerTest.ListenerPayload) { - return payload; - } - if (!(payload instanceof String)) { - return null; - } - return decode((String) payload); - } - - @Override - public Message toMessage(Object payload, MessageHeaders headers) { - if (payload instanceof MessagePackageListenerTest.ListenerPayload) { - byte[] msgPackBytes = - MsgPackCodec.encode((MessagePackageListenerTest.ListenerPayload) payload); - return new GenericMessage<>(PREFIX + Base64.getEncoder().encodeToString(msgPackBytes)); - } - if (payload instanceof String) { - return new GenericMessage<>(payload); - } - return null; - } - } - - private static final class MsgPackCodec { - - private static final ObjectMapper MAPPER = new ObjectMapper(new MessagePackFactory()); - - private MsgPackCodec() {} - - static byte[] encode(MessagePackageListenerTest.ListenerPayload payload) { - try { - return MAPPER.writeValueAsBytes(payload); - } catch (JsonProcessingException e) { - throw new MessageConversionException("MsgPack encoding failed", e); - } - } - - static MessagePackageListenerTest.ListenerPayload decode(byte[] bytes) { - try { - return MAPPER.readValue(bytes, MessagePackageListenerTest.ListenerPayload.class); - } catch (IOException e) { - throw new MessageConversionException("MsgPack decoding failed", e); - } - } - } -} From 1a5af73bd19a10e95513bf8d3f6f2ec070188865 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sat, 23 May 2026 12:12:10 +0530 Subject: [PATCH 10/11] fix: max retry --- .github/workflows/java-ci.yaml | 1 + .../listener/PostProcessingHandler.java | 9 +- .../sonus21/rqueue/listener/RetryPolicy.java | 61 +++++ .../rqueue/listener/RqueueExecutor.java | 14 +- .../rqueue/listener/RetryPolicyTest.java | 78 ++++++ ...sageListenerContainerBrokerBranchTest.java | 22 ++ .../nats/JetStreamMessageBrokerUnitTest.java | 9 + .../NatsGlobalRetryLimitE2EIT.java | 102 -------- .../integration/GlobalRetryLimitE2EIT.java | 247 ++++++++++++++++++ .../RedisGlobalRetryLimitE2EIT.java | 82 ------ .../unit/RqueueListenerAutoConfigTest.java | 45 ++++ 11 files changed, 467 insertions(+), 203 deletions(-) create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java create mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java delete mode 100644 rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml index eb4518b4..f3d450b4 100644 --- a/.github/workflows/java-ci.yaml +++ b/.github/workflows/java-ci.yaml @@ -446,6 +446,7 @@ jobs: - name: Run NATS tests env: + RQUEUE_TEST_BACKEND: nats NATS_RUNNING: "true" NATS_URL: nats://127.0.0.1:4222 run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java index 5090f524..824a1537 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java @@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable } } - private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) { - return rqueueMessage.getRetryCount() == null - ? queueDetail.getNumRetry() - : rqueueMessage.getRetryCount(); - } - private void handleFailure(JobImpl job, int failureCount, Throwable throwable) { if (job.getQueueDetail().isDoNotRetryError(throwable)) { handleRetryExceededMessage(job, failureCount, throwable); } else { - int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail()); - if (failureCount < maxRetryCount) { + if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) { long delay = taskExecutionBackoff.nextBackOff( job.getMessage(), job.getRqueueMessage(), failureCount, throwable); if (delay == TaskExecutionBackOff.STOP) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java new file mode 100644 index 00000000..218e4f00 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.RqueueMessage; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +final class RetryPolicy { + + static final int UNLIMITED_RETRY_LIMIT = 100_000; + + static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) { + int maxRetryCount = rqueueMessage.getRetryCount() == null + ? queueDetail.getNumRetry() + : rqueueMessage.getRetryCount(); + if (maxRetryCount == Integer.MAX_VALUE) { + return UNLIMITED_RETRY_LIMIT; + } + return maxRetryCount; + } + + static int remainingRetryCount( + RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) { + int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail); + return Math.max(0, maxRetryCount - failureCount); + } + + static int retryCountForPoll( + RqueueConfig rqueueConfig, + RqueueMessage rqueueMessage, + QueueDetail queueDetail, + int failureCount) { + int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount); + if (rqueueConfig.getRetryPerPoll() == -1) { + return remainingRetryCount; + } + return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount); + } + + static boolean isExhausted( + RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) { + return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0; + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java index ad75f4d1..d72dfc0f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java @@ -111,12 +111,6 @@ private void init() { this.failureCount = job.getRqueueMessage().getFailureCount(); } - private int getMaxRetryCount() { - return Objects.isNull(job.getRqueueMessage().getRetryCount()) - ? job.getQueueDetail().getNumRetry() - : job.getRqueueMessage().getRetryCount(); - } - private void updateCounter(boolean fail) { RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter(); if (Objects.isNull(counter)) { @@ -179,11 +173,8 @@ private boolean isOldMessage() { } private int getRetryCount() { - int maxRetry = getMaxRetryCount(); - if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) { - return maxRetry; - } - return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry); + return RetryPolicy.retryCountForPoll( + beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount); } private boolean queueInactive() { @@ -283,6 +274,7 @@ private void execute() { private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) { if (retryCount > 0 && ExecutionStatus.FAILED.equals(status) + && !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount) && System.currentTimeMillis() < maxProcessingTime) { boolean doNoRetry = queueDetail.isDoNotRetryError(error); // it should not be retried based on the exception list diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java new file mode 100644 index 00000000..4d163bdf --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doReturn; + +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.utils.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +@CoreUnitTest +class RetryPolicyTest { + + private final QueueDetail queueDetail = TestUtils.createQueueDetail("queue", 2, 900000L, null); + + @Test + void retryCountForPollUsesRemainingRetryBudget() { + RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class); + doReturn(100).when(rqueueConfig).getRetryPerPoll(); + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertEquals(1, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + } + + @Test + void retryCountForPollKeepsExplicitMessageRetryCount() { + RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class); + doReturn(-1).when(rqueueConfig).getRetryPerPoll(); + RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build(); + + assertEquals( + 999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + } + + @Test + void isExhaustedUsesEffectiveRetryCount() { + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertFalse(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 1)); + assertTrue(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 2)); + } + + @Test + void retryForeverSentinelUsesFiniteLimit() { + QueueDetail retryForeverQueue = + TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null); + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertEquals(RetryPolicy.UNLIMITED_RETRY_LIMIT, RetryPolicy.maxRetryCount( + rqueueMessage, retryForeverQueue)); + assertEquals( + 1, + RetryPolicy.remainingRetryCount( + rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT - 1)); + assertTrue(RetryPolicy.isExhausted( + rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT)); + } +} diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java index f407ed9e..1e0f1ea5 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java @@ -228,6 +228,24 @@ void redisDefaultsBrokerAlsoUsesNormalStartQueuePath() throws Exception { } } + @Test + void startDoesNotStartQueuesAgainWhenContainerIsAlreadyRunning() throws Exception { + EndpointRegistry.delete(); + CountingBroker broker = new CountingBroker(Capabilities.REDIS_DEFAULTS); + TrackingContainer container = new TrackingContainer(messageHandler); + container.setMessageBroker(broker); + container.afterPropertiesSet(); + try { + container.start(); + container.start(); + + assertEquals(1, container.startQueueCalls.get() + container.startGroupCalls.get()); + } finally { + container.stop(); + container.destroy(); + } + } + @Test void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception { EndpointRegistry.delete(); @@ -288,6 +306,8 @@ private class TrackingContainer extends RqueueMessageListenerContainer { final AtomicBoolean startBrokerPollersCalled = new AtomicBoolean(); final AtomicBoolean startQueueCalled = new AtomicBoolean(); final AtomicBoolean startGroupCalled = new AtomicBoolean(); + final AtomicInteger startQueueCalls = new AtomicInteger(); + final AtomicInteger startGroupCalls = new AtomicInteger(); TrackingContainer(RqueueMessageHandler handler) { super(handler, rqueueMessageTemplate); @@ -297,12 +317,14 @@ private class TrackingContainer extends RqueueMessageListenerContainer { @Override protected void startQueue(String pollerKey, QueueDetail queueDetail) { startQueueCalled.set(true); + startQueueCalls.incrementAndGet(); // Do not actually start the poller; it would need a real broker. } @Override protected void startGroup(String groupName, List queueDetails) { startGroupCalled.set(true); + startGroupCalls.incrementAndGet(); } } } diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 62b1a80c..5430ffa7 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -101,6 +101,15 @@ void getPollWait_usesConfiguredFetchWait() { assertEquals(Duration.ofSeconds(9), f.broker.getPollWait(Duration.ofMillis(137))); } + @Test + void getPollWait_fallsBackToPollingIntervalWhenFetchWaitIsUnset() { + RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(null); + Fixture f = newFixture(cfg); + Duration pollingInterval = Duration.ofMillis(137); + + assertEquals(pollingInterval, f.broker.getPollWait(pollingInterval)); + } + @Test void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults()); diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java deleted file mode 100644 index 918cfa61..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import io.nats.client.JetStreamManagement; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -@SpringBootTest( - classes = NatsGlobalRetryLimitE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.retry.max=2", - "rqueue.retry.per.poll=1000", - "rqueue.nats.naming.stream-prefix=" + NatsGlobalRetryLimitE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsGlobalRetryLimitE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsGlobalRetryLimitE2EIT extends AbstractNatsBootIT { - - static final String QUEUE = "global-retry-nats"; - static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - JetStreamManagement jsm; - - @Autowired - FailingListener listener; - - @Test - void globalRetryLimitCapsImplicitRetryForever() throws Exception { - enqueuer.enqueue(QUEUE, "payload"); - - assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await() - .during(Duration.ofSeconds(2)) - .atMost(Duration.ofSeconds(3)) - .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - - assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") - .getConsumerConfiguration() - .getMaxDeliver()) - .isEqualTo(3L); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(FailingListener.class) - static class TestApp {} - - @Component - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); - final CountDownLatch twoAttempts = new CountDownLatch(2); - - @RqueueListener(value = QUEUE) - void onMessage(String ignored) { - attempts.incrementAndGet(); - twoAttempts.countDown(); - throw new IllegalStateException("force retry"); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java new file mode 100644 index 00000000..2144e785 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +@SpringBootTest(classes = GlobalRetryLimitE2EIT.TestApp.class) +@SpringBootIntegrationTest +@Tag("nats") +class GlobalRetryLimitE2EIT { + + private static final Logger log = Logger.getLogger(GlobalRetryLimitE2EIT.class.getName()); + private static final String BACKEND = + System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); + private static final String QUEUE = "global-retry-" + BACKEND; + private static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; + private static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; + private static final String EXTERNAL_NATS_URL = + System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222"); + private static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null; + + private static GenericContainer nats; + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + FailingListener listener; + + @Autowired + RqueueConfig rqueueConfig; + + @Autowired(required = false) + JetStreamManagement jsm; + + @BeforeAll + static void bootstrapBackend() { + if (isNatsBackend()) { + startNats(); + deleteStreamsWithPrefix(STREAM_PREFIX); + } + } + + @DynamicPropertySource + static void properties(DynamicPropertyRegistry registry) { + registry.add("rqueue.backend", () -> BACKEND); + registry.add("rqueue.retry.max", () -> "2"); + registry.add("rqueue.retry.per.poll", () -> "1"); + registry.add("global.retry.limit.queue", () -> QUEUE); + if (isNatsBackend()) { + registry.add("rqueue.nats.connection.url", GlobalRetryLimitE2EIT::activeNatsUrl); + registry.add("rqueue.nats.naming.stream-prefix", () -> STREAM_PREFIX); + registry.add("rqueue.nats.naming.subject-prefix", () -> SUBJECT_PREFIX); + } else { + registry.add("spring.data.redis.port", () -> "8027"); + registry.add("mysql.db.name", () -> "GlobalRetryLimitE2EIT"); + registry.add("use.system.redis", () -> "false"); + } + } + + @BeforeEach + void resetListener() { + listener.reset(); + rqueueConfig.setRetryPerPoll(1); + } + + @Test + void globalRetryLimitCapsSimpleEnqueueWhenRetryPerPollIsOne() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertTwoAttemptsOnly(); + if (isNatsBackend()) { + assertThat(jsm).isNotNull(); + assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") + .getConsumerConfiguration() + .getMaxDeliver()) + .isEqualTo(3L); + } + } + + @Test + void globalRetryLimitCapsSimpleEnqueueWhenRetryPerPollIsHigh() throws Exception { + rqueueConfig.setRetryPerPoll(100); + + enqueuer.enqueue(QUEUE, "payload"); + + assertTwoAttemptsOnly(); + } + + @Test + void globalRetryLimitUsesRemainingRetriesWhenRetryPerPollIncreases() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertThat(listener.firstAttempt.await(20, TimeUnit.SECONDS)).isTrue(); + rqueueConfig.setRetryPerPoll(100); + + assertTwoAttemptsOnly(); + } + + private void assertTwoAttemptsOnly() throws InterruptedException { + assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); + Awaitility.await() + .during(Duration.ofMillis(600)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); + } + + private static boolean isNatsBackend() { + return "nats".equalsIgnoreCase(BACKEND); + } + + private static void startNats() { + if (!isNatsBackend() || USE_EXTERNAL_NATS || nats != null) { + return; + } + Assumptions.assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping: Docker is not available and NATS_RUNNING is not set"); + nats = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) + .withCommand("-js") + .withExposedPorts(4222) + .waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1)); + nats.start(); + Runtime.getRuntime().addShutdownHook(new Thread(nats::stop)); + } + + private static String activeNatsUrl() { + if (USE_EXTERNAL_NATS) { + return EXTERNAL_NATS_URL; + } + startNats(); + return "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222); + } + + private static void deleteStreamsWithPrefix(String prefix) { + try (Connection c = Nats.connect(activeNatsUrl())) { + JetStreamManagement management = c.jetStreamManagement(); + List names = management.getStreamNames(); + if (names == null) { + return; + } + for (String name : names) { + if (name.startsWith(prefix)) { + management.deleteStream(name); + } + } + } catch (IOException | JetStreamApiException e) { + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } + } + + @SpringBootApplication + @Import({FailingListener.class, RedisTestConfig.class}) + static class TestApp { + + @Bean + public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { + FixedTaskExecutionBackOff backOff = new FixedTaskExecutionBackOff(); + backOff.setInterval(100); + SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); + factory.setTaskExecutionBackOff(backOff); + return factory; + } + } + + @Configuration + @ConditionalOnProperty(name = "rqueue.backend", havingValue = "redis", matchIfMissing = true) + static class RedisTestConfig extends BaseApplication {} + + static class FailingListener { + final AtomicInteger attempts = new AtomicInteger(); + CountDownLatch firstAttempt = new CountDownLatch(1); + CountDownLatch twoAttempts = new CountDownLatch(2); + + void reset() { + attempts.set(0); + firstAttempt = new CountDownLatch(1); + twoAttempts = new CountDownLatch(2); + } + + @RqueueListener(value = "${global.retry.limit.queue}") + void onMessage(String ignored) { + attempts.incrementAndGet(); + firstAttempt.countDown(); + twoAttempts.countDown(); + throw new IllegalStateException("force retry"); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java deleted file mode 100644 index d0df932e..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.tests.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; -import com.github.sonus21.rqueue.test.application.BaseApplication; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.context.TestPropertySource; - -@SpringBootTest(classes = RedisGlobalRetryLimitE2EIT.TestApp.class) -@TestPropertySource( - properties = { - "rqueue.retry.max=2", - "rqueue.retry.per.poll=1000", - "spring.data.redis.port=8027", - "mysql.db.name=RedisGlobalRetryLimitE2EIT", - "use.system.redis=false" - }) -@SpringBootIntegrationTest -class RedisGlobalRetryLimitE2EIT { - - static final String QUEUE = "global-retry-redis"; - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - FailingListener listener; - - @Test - void globalRetryLimitCapsImplicitRetryForever() throws Exception { - enqueuer.enqueue(QUEUE, "payload"); - - assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await() - .during(Duration.ofSeconds(2)) - .atMost(Duration.ofSeconds(3)) - .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - } - - @SpringBootApplication - @Import(FailingListener.class) - static class TestApp extends BaseApplication {} - - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); - final CountDownLatch twoAttempts = new CountDownLatch(2); - - @RqueueListener(value = QUEUE) - void onMessage(String ignored) { - attempts.incrementAndGet(); - twoAttempts.countDown(); - throw new IllegalStateException("force retry"); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index 2f572c19..5b8f4686 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @@ -133,6 +134,41 @@ void rqueueMessageListenerContainer() assertTrue(factory.getAutoStartup()); } + @Test + void rqueueMessageListenerContainerKeepsAutoStartupWhenLifecycleMissing() + throws IllegalAccessException { + SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); + factory.setMessageConverterProvider(new DefaultMessageConverterProvider()); + factory.setMessageBroker(messageBroker); + RqueueListenerAutoConfig messageAutoConfig = new RqueueListenerAutoConfig(); + FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); + + RqueueMessageListenerContainer container = + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); + + assertTrue(container.isAutoStartup()); + assertTrue(factory.getAutoStartup()); + assertSame(messageBroker, factory.getMessageBroker()); + } + + @Test + void rqueueMessageListenerContainerRestoresFactoryAutoStartupWhenCreationFails() + throws IllegalAccessException { + FailingListenerContainerFactory factory = new FailingListenerContainerFactory(); + RqueueListenerAutoConfig messageAutoConfig = new RqueueListenerAutoConfig(); + FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); + FieldUtils.writeField( + messageAutoConfig, "rqueueAutoStartupLifecycle", new RqueueAutoStartupLifecycle(), true); + + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker)); + + assertEquals("boom", exception.getMessage()); + assertTrue(factory.getAutoStartup()); + } + @Test void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); @@ -193,6 +229,15 @@ void rqueueMessageSenderUsesConfiguredMessageConverter() throws IllegalAccessExc assertTrue(converter.hashCode() == messageConverter.hashCode()); } + private static class FailingListenerContainerFactory + extends SimpleRqueueListenerContainerFactory { + + @Override + public RqueueMessageListenerContainer createMessageListenerContainer() { + throw new IllegalStateException("boom"); + } + } + private class TestRqueueMessageListenerContainer extends RqueueMessageListenerContainer { private int startCount; From 83b235bd4f7495270c43a26bc1bd9ac9ee7d96c9 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 06:43:15 +0000 Subject: [PATCH 11/11] Apply Palantir Java Format --- .../github/sonus21/rqueue/listener/RetryPolicyTest.java | 8 ++++---- .../boot/tests/integration/GlobalRetryLimitE2EIT.java | 7 +++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java index 4d163bdf..0eb47e6f 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java @@ -48,8 +48,7 @@ void retryCountForPollKeepsExplicitMessageRetryCount() { doReturn(-1).when(rqueueConfig).getRetryPerPoll(); RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build(); - assertEquals( - 999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + assertEquals(999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); } @Test @@ -66,8 +65,9 @@ void retryForeverSentinelUsesFiniteLimit() { TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null); RqueueMessage rqueueMessage = new RqueueMessage(); - assertEquals(RetryPolicy.UNLIMITED_RETRY_LIMIT, RetryPolicy.maxRetryCount( - rqueueMessage, retryForeverQueue)); + assertEquals( + RetryPolicy.UNLIMITED_RETRY_LIMIT, + RetryPolicy.maxRetryCount(rqueueMessage, retryForeverQueue)); assertEquals( 1, RetryPolicy.remainingRetryCount( diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java index 2144e785..ac86770a 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java @@ -63,10 +63,9 @@ class GlobalRetryLimitE2EIT { private static final Logger log = Logger.getLogger(GlobalRetryLimitE2EIT.class.getName()); - private static final String BACKEND = - System.getProperty( - "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) - .toLowerCase(Locale.ROOT); + private static final String BACKEND = System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); private static final String QUEUE = "global-retry-" + BACKEND; private static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; private static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E.";