diff --git a/.github/workflows/java-ci.yaml b/.github/workflows/java-ci.yaml index eb4518b4..f3d450b4 100644 --- a/.github/workflows/java-ci.yaml +++ b/.github/workflows/java-ci.yaml @@ -446,6 +446,7 @@ jobs: - name: Run NATS tests env: + RQUEUE_TEST_BACKEND: nats NATS_RUNNING: "true" NATS_URL: nats://127.0.0.1:4222 run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats diff --git a/build.gradle b/build.gradle index 4a0d8b21..b5791994 100644 --- a/build.gradle +++ b/build.gradle @@ -84,7 +84,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "4.0.0-RC10" + version = "4.0.0-RC11" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ff7dfdc3..b1324221 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,6 +18,52 @@ foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17 baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard work, and middleware additions that build on top. +## Release [4.0.0.RC11] 2026-05-24 + +{: .highlight} +Release candidate. + +### Fixes +* **Delayed listener startup for Spring Boot web apps** — Rqueue listener + containers in servlet and reactive Spring Boot web applications now wait until + `ApplicationReadyEvent` before consuming work. Non-web worker applications keep + the existing `SmartLifecycle` startup behavior. +* **Idempotent listener container startup** — repeated `start()` calls no longer + re-run queue startup, and the container marks itself running only after + startup succeeds. +* **Global retry cap enforcement** — `rqueue.retry.max` now caps the remaining + retry budget even when `rqueue.retry.per.poll` is low or high. The retry logic + is centralized in `RetryPolicy`, preserving explicit message/listener retry + counts while preventing implicit retry-forever jobs from bypassing the global + max. +* **NATS listener polling wait** — NATS pollers now use the backend-configured + fetch wait via the broker SPI, reducing short-poll churn while keeping Redis + behavior unchanged. + +### Build +* **Shared backend contract E2E tests** — Redis and NATS now run the same backend + contract E2E coverage through environment-selected bootstrapping, replacing + duplicated NATS-only E2E classes. +* **Broker coverage** — added focused unit coverage for broker defaults and NATS + JetStream pop, in-flight, size, subscriber, and dashboard-label paths. + +## Release [4.0.0.RC10] 2026-05-21 + +{: .highlight} +Release candidate. + +### Fixes +* **Spring Boot 3.x to 4.x message compatibility** — restored Jackson 2.x + property ordering compatibility in `RqueueRedisSerializer` so messages written + by Rqueue 3.x can be acknowledged or parked for retry after upgrading to + Rqueue 4.x. This prevents stale processing-queue entries caused by byte-exact + Redis `ZSCORE` / `ZREM` lookups using a different serialized property order. + +### Docs +* **Migration guidance** — clarified the 3.x to 4.x upgrade notes around + `rqueue.serialization.property.order` so applications can choose the + compatibility mode intentionally during rolling upgrades. + ## Release [4.0.0.RC9] 2026-05-13 {: .highlight} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java index 1f368f5a..578d314e 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java @@ -83,6 +83,19 @@ default Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs)); } + /** + * Resolve the wait duration that listener pollers should pass to {@link #pop}. The default uses + * the listener container's polling interval, preserving existing Redis behavior where that value + * also controls idle sleeps. Backends with native long-poll controls can override this so their + * fetch wait can be tuned independently. + * + * @param pollingInterval listener container polling interval + * @return wait duration for listener-driven pop calls + */ + default Duration getPollWait(Duration pollingInterval) { + return pollingInterval; + } + List pop(QueueDetail q, String consumerName, int batch, Duration wait); /** diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java index 5090f524..824a1537 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java @@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable } } - private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) { - return rqueueMessage.getRetryCount() == null - ? queueDetail.getNumRetry() - : rqueueMessage.getRetryCount(); - } - private void handleFailure(JobImpl job, int failureCount, Throwable throwable) { if (job.getQueueDetail().isDoNotRetryError(throwable)) { handleRetryExceededMessage(job, failureCount, throwable); } else { - int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail()); - if (failureCount < maxRetryCount) { + if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) { long delay = taskExecutionBackoff.nextBackOff( job.getMessage(), job.getRqueueMessage(), failureCount, throwable); if (delay == TaskExecutionBackOff.STOP) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java new file mode 100644 index 00000000..218e4f00 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RetryPolicy.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.RqueueMessage; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +final class RetryPolicy { + + static final int UNLIMITED_RETRY_LIMIT = 100_000; + + static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) { + int maxRetryCount = rqueueMessage.getRetryCount() == null + ? queueDetail.getNumRetry() + : rqueueMessage.getRetryCount(); + if (maxRetryCount == Integer.MAX_VALUE) { + return UNLIMITED_RETRY_LIMIT; + } + return maxRetryCount; + } + + static int remainingRetryCount( + RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) { + int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail); + return Math.max(0, maxRetryCount - failureCount); + } + + static int retryCountForPoll( + RqueueConfig rqueueConfig, + RqueueMessage rqueueMessage, + QueueDetail queueDetail, + int failureCount) { + int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount); + if (rqueueConfig.getRetryPerPoll() == -1) { + return remainingRetryCount; + } + return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount); + } + + static boolean isExhausted( + RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) { + return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0; + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java index ad75f4d1..d72dfc0f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java @@ -111,12 +111,6 @@ private void init() { this.failureCount = job.getRqueueMessage().getFailureCount(); } - private int getMaxRetryCount() { - return Objects.isNull(job.getRqueueMessage().getRetryCount()) - ? job.getQueueDetail().getNumRetry() - : job.getRqueueMessage().getRetryCount(); - } - private void updateCounter(boolean fail) { RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter(); if (Objects.isNull(counter)) { @@ -179,11 +173,8 @@ private boolean isOldMessage() { } private int getRetryCount() { - int maxRetry = getMaxRetryCount(); - if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) { - return maxRetry; - } - return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry); + return RetryPolicy.retryCountForPoll( + beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount); } private boolean queueInactive() { @@ -283,6 +274,7 @@ private void execute() { private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) { if (retryCount > 0 && ExecutionStatus.FAILED.equals(status) + && !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount) && System.currentTimeMillis() < maxProcessingTime) { boolean doNoRetry = queueDetail.isDoNotRetryError(error); // it should not be retried based on the exception list diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 190c7be4..a4b3ded8 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -517,10 +517,14 @@ private List getQueueDetail(String queue, MappingInformation mappin @Override public void start() { - log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId()); synchronized (lifecycleMgr) { - running = true; + if (running) { + log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId()); + return; + } + log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId()); doStart(); + running = true; rqueueBeanProvider .getApplicationEventPublisher() .publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true)); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java index dfdddf92..8927c4da 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java @@ -19,6 +19,7 @@ import com.github.sonus21.rqueue.core.RqueueBeanProvider; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.middleware.Middleware; +import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.QueueThreadPool; @@ -59,13 +60,9 @@ abstract class RqueueMessagePoller extends MessageContainerBase { } private List getMessages(QueueDetail queueDetail, int count) { - return rqueueBeanProvider - .getMessageBroker() - .pop( - queueDetail, - queueDetail.resolvedConsumerName(), - count, - Duration.ofMillis(pollingInterval)); + MessageBroker broker = rqueueBeanProvider.getMessageBroker(); + Duration wait = broker.getPollWait(Duration.ofMillis(pollingInterval)); + return broker.pop(queueDetail, queueDetail.resolvedConsumerName(), count, wait); } private void execute( diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java new file mode 100644 index 00000000..ccc15b9e --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/core/spi/MessageBrokerDefaultMethodsTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.core.spi; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.TestUtils; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +@CoreUnitTest +class MessageBrokerDefaultMethodsTest { + + private final QueueDetail queue = TestUtils.createQueueDetail("queue", 1, 30_000L, null); + private final RqueueMessage oldMessage = + RqueueMessage.builder().id("old").message("old").build(); + private final RqueueMessage updatedMessage = + RqueueMessage.builder().id("updated").message("updated").processAt(1L).build(); + + @Test + void priorityAndReactiveDefaultsDelegateToBlockingOperations() { + RecordingBroker broker = new RecordingBroker(); + + broker.enqueue(queue, "high", updatedMessage); + StepVerifier.create(broker.enqueueReactive(queue, oldMessage)).verifyComplete(); + StepVerifier.create(broker.enqueueWithDelayReactive(queue, updatedMessage, 17L)) + .verifyComplete(); + broker.pop(queue, "high", "consumer", 3, Duration.ofMillis(25L)); + + assertEquals(2, broker.enqueueCalls); + assertEquals(oldMessage, broker.lastEnqueued); + assertEquals(1, broker.delayCalls); + assertEquals(17L, broker.lastDelayMs); + assertEquals(1, broker.popCalls); + assertEquals("consumer", broker.lastConsumerName); + assertEquals(Duration.ofMillis(25L), broker.lastWait); + } + + @Test + void retryDlqAndScheduleDefaultsUseBackendPrimitives() { + RecordingBroker broker = new RecordingBroker(); + + broker.parkForRetry(queue, oldMessage, updatedMessage, 123L); + broker.moveToDlq(queue, "dlq", oldMessage, updatedMessage, 0L); + broker.moveToDlq(queue, "dlq", oldMessage, updatedMessage, 99L); + broker.scheduleNext(queue, "period-key", updatedMessage, 60L); + + assertEquals(1, broker.nackCalls); + assertEquals(updatedMessage, broker.lastNacked); + assertEquals(123L, broker.lastRetryDelayMs); + assertEquals(1, broker.enqueueCalls); + assertEquals(2, broker.delayCalls); + assertEquals(updatedMessage, broker.lastDelayed); + } + + @Test + void dashboardDefaultsExposeRedisLabelsAndSingleSubscriberFallback() { + RecordingBroker broker = new RecordingBroker(); + + assertEquals("Redis", broker.storageKicker()); + assertEquals( + "Underlying Redis structures for the queues visible on this page.", + broker.storageDescription()); + assertNull(broker.storageDisplayName(queue)); + assertNull(broker.dlqStorageDisplayName(queue)); + assertNull(broker.consumerPendingSizes(queue)); + assertNull(broker.dataTypeLabel(null, null)); + assertFalse(broker.isSizeApproximate(queue)); + assertNull(broker.getVisibilityTimeoutScore(queue, oldMessage)); + assertFalse(broker.extendVisibilityTimeout(queue, oldMessage, 1L)); + + List subscribers = broker.subscribers(queue); + + assertEquals(1, subscribers.size()); + assertEquals(queue.resolvedConsumerName(), subscribers.get(0).consumerName()); + assertEquals(42L, subscribers.get(0).pending()); + assertEquals(0L, subscribers.get(0).inFlight()); + assertTrue(subscribers.get(0).pendingShared()); + } + + @Test + void subscribersDefaultFallsBackToZeroWhenSizeFails() { + RecordingBroker broker = new RecordingBroker(); + broker.failSize = true; + + List subscribers = broker.subscribers(queue); + + assertEquals(1, subscribers.size()); + assertEquals(0L, subscribers.get(0).pending()); + } + + private static final class RecordingBroker implements MessageBroker { + + int enqueueCalls; + int delayCalls; + int popCalls; + int nackCalls; + boolean failSize; + String lastConsumerName; + Duration lastWait; + long lastDelayMs; + long lastRetryDelayMs; + RqueueMessage lastEnqueued; + RqueueMessage lastDelayed; + RqueueMessage lastNacked; + + @Override + public void enqueue(QueueDetail q, RqueueMessage m) { + enqueueCalls++; + lastEnqueued = m; + } + + @Override + public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) { + delayCalls++; + lastDelayed = m; + lastDelayMs = delayMs; + } + + @Override + public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { + popCalls++; + lastConsumerName = consumerName; + lastWait = wait; + return Collections.emptyList(); + } + + @Override + public boolean ack(QueueDetail q, RqueueMessage m) { + return true; + } + + @Override + public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) { + nackCalls++; + lastNacked = m; + lastRetryDelayMs = retryDelayMs; + return true; + } + + @Override + public long moveExpired(QueueDetail q, long now, int batch) { + return 0; + } + + @Override + public List peek(QueueDetail q, long offset, long count) { + return Collections.emptyList(); + } + + @Override + public long size(QueueDetail q) { + if (failSize) { + throw new IllegalStateException("backend down"); + } + return 42L; + } + + @Override + public AutoCloseable subscribe(String channel, Consumer handler) { + return () -> {}; + } + + @Override + public void publish(String channel, String payload) {} + + @Override + public Capabilities capabilities() { + return Capabilities.REDIS_DEFAULTS; + } + } +} diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java new file mode 100644 index 00000000..0eb47e6f --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RetryPolicyTest.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doReturn; + +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.utils.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +@CoreUnitTest +class RetryPolicyTest { + + private final QueueDetail queueDetail = TestUtils.createQueueDetail("queue", 2, 900000L, null); + + @Test + void retryCountForPollUsesRemainingRetryBudget() { + RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class); + doReturn(100).when(rqueueConfig).getRetryPerPoll(); + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertEquals(1, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + } + + @Test + void retryCountForPollKeepsExplicitMessageRetryCount() { + RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class); + doReturn(-1).when(rqueueConfig).getRetryPerPoll(); + RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build(); + + assertEquals(999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1)); + } + + @Test + void isExhaustedUsesEffectiveRetryCount() { + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertFalse(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 1)); + assertTrue(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 2)); + } + + @Test + void retryForeverSentinelUsesFiniteLimit() { + QueueDetail retryForeverQueue = + TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null); + RqueueMessage rqueueMessage = new RqueueMessage(); + + assertEquals( + RetryPolicy.UNLIMITED_RETRY_LIMIT, + RetryPolicy.maxRetryCount(rqueueMessage, retryForeverQueue)); + assertEquals( + 1, + RetryPolicy.remainingRetryCount( + rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT - 1)); + assertTrue(RetryPolicy.isExhausted( + rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT)); + } +} diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java index 3b4ebe34..1e0f1ea5 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java @@ -15,6 +15,7 @@ */ package com.github.sonus21.rqueue.listener; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -108,6 +109,7 @@ static class CountingBroker implements MessageBroker, AutoCloseable { final AtomicInteger popCalls = new AtomicInteger(); final AtomicBoolean closed = new AtomicBoolean(); volatile Duration lastWait; + volatile Duration pollWait; private final Capabilities caps; CountingBroker(Capabilities caps) { @@ -120,6 +122,11 @@ public void enqueue(QueueDetail q, RqueueMessage m) {} @Override public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {} + @Override + public Duration getPollWait(Duration pollingInterval) { + return pollWait != null ? pollWait : MessageBroker.super.getPollWait(pollingInterval); + } + @Override public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { popCalls.incrementAndGet(); @@ -221,6 +228,24 @@ void redisDefaultsBrokerAlsoUsesNormalStartQueuePath() throws Exception { } } + @Test + void startDoesNotStartQueuesAgainWhenContainerIsAlreadyRunning() throws Exception { + EndpointRegistry.delete(); + CountingBroker broker = new CountingBroker(Capabilities.REDIS_DEFAULTS); + TrackingContainer container = new TrackingContainer(messageHandler); + container.setMessageBroker(broker); + container.afterPropertiesSet(); + try { + container.start(); + container.start(); + + assertEquals(1, container.startQueueCalls.get() + container.startGroupCalls.get()); + } finally { + container.stop(); + container.destroy(); + } + } + @Test void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception { EndpointRegistry.delete(); @@ -248,15 +273,41 @@ void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception { Duration wait = broker.lastWait; assertNotNull(wait, "broker should have received a wait duration"); assertFalse(wait.isZero(), "wait must not be Duration.ZERO; should match pollingInterval"); - assertTrue( - wait.toMillis() == pollingInterval, - "wait should equal the configured pollingInterval (got " + wait + ")"); + assertEquals(Duration.ofMillis(pollingInterval), wait); + } + + @Test + void pollerUsesBrokerResolvedFetchWait() throws Exception { + EndpointRegistry.delete(); + CountingBroker broker = + new CountingBroker(new Capabilities(true, false, false, true, true, true)); + broker.pollWait = Duration.ofSeconds(2); + RqueueMessageListenerContainer container = + new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate); + container.rqueueBeanProvider = beanProvider; + container.setMessageBroker(broker); + container.setPollingInterval(137L); + container.afterPropertiesSet(); + container.start(); + try { + long deadline = System.currentTimeMillis() + 2000; + while (broker.popCalls.get() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(20); + } + } finally { + container.stop(); + container.destroy(); + } + assertTrue(broker.popCalls.get() > 0, "poller should have issued at least one pop call"); + assertEquals(broker.pollWait, broker.lastWait); } private class TrackingContainer extends RqueueMessageListenerContainer { final AtomicBoolean startBrokerPollersCalled = new AtomicBoolean(); final AtomicBoolean startQueueCalled = new AtomicBoolean(); final AtomicBoolean startGroupCalled = new AtomicBoolean(); + final AtomicInteger startQueueCalls = new AtomicInteger(); + final AtomicInteger startGroupCalls = new AtomicInteger(); TrackingContainer(RqueueMessageHandler handler) { super(handler, rqueueMessageTemplate); @@ -266,12 +317,14 @@ private class TrackingContainer extends RqueueMessageListenerContainer { @Override protected void startQueue(String pollerKey, QueueDetail queueDetail) { startQueueCalled.set(true); + startQueueCalls.incrementAndGet(); // Do not actually start the poller; it would need a real broker. } @Override protected void startGroup(String groupName, List queueDetails) { startGroupCalled.set(true); + startGroupCalls.incrementAndGet(); } } } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java index 688841f1..4e661ff2 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java @@ -100,10 +100,7 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable { /** * Lower bound for fetch wait when the caller passes a non-positive duration. JetStream rejects - * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. Callers that - * want long-poll semantics should pass the desired wait explicitly (e.g. the listener - * container's {@code pollingInterval}); this constant only guards against accidental zero waits - * from non-listener callers. + * zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. */ private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50); @@ -463,6 +460,12 @@ public Mono enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long .then(); } + @Override + public Duration getPollWait(Duration pollingInterval) { + Duration fetchWait = config == null ? null : config.getDefaultFetchWait(); + return fetchWait != null ? fetchWait : MessageBroker.super.getPollWait(pollingInterval); + } + @Override public List pop(QueueDetail q, String consumerName, int batch, Duration wait) { return popInternal( @@ -533,10 +536,10 @@ private List popInternal( Duration wait, Duration ackWait, long maxDeliver) { - // Honour the caller-supplied wait — this is the listener container's pollingInterval for - // RqueueMessagePoller, and lets JetStream long-poll instead of the broker firing a steady - // stream of $JS.API.CONSUMER.MSG.NEXT requests. Only fall back when the caller didn't - // express a preference; zero/negative waits are rounded up to the JetStream minimum. + // Honour the caller-supplied wait. Listener pollers obtain this from getPollWait(), so NATS + // can use rqueue.nats.consumer.fetch-wait for long polling while keeping the listener + // pollingInterval as its idle sleep. Only fall back when the caller didn't express a + // preference; zero/negative waits are rounded up to the JetStream minimum. Duration fetchWait; if (wait == null) { fetchWait = config.getDefaultFetchWait(); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java index 2aa9a912..5c1be1b3 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java @@ -13,9 +13,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -25,7 +27,11 @@ import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.spi.Capabilities; +import com.github.sonus21.rqueue.core.spi.SubscriberView; +import com.github.sonus21.rqueue.enums.QueueType; import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.models.enums.DataType; +import com.github.sonus21.rqueue.models.enums.NavTab; import com.github.sonus21.rqueue.nats.internal.NatsProvisioner; import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker; import com.github.sonus21.rqueue.serdes.RqJacksonSerDes; @@ -35,10 +41,21 @@ import io.nats.client.JetStream; import io.nats.client.JetStreamApiException; import io.nats.client.JetStreamManagement; +import io.nats.client.JetStreamSubscription; +import io.nats.client.Message; import io.nats.client.MessageHandler; +import io.nats.client.PullSubscribeOptions; +import io.nats.client.api.ConsumerInfo; import io.nats.client.api.PublishAck; +import io.nats.client.api.RetentionPolicy; +import io.nats.client.api.SequenceInfo; +import io.nats.client.api.StreamConfiguration; +import io.nats.client.api.StreamInfo; +import io.nats.client.api.StreamState; import io.nats.client.impl.Headers; import java.io.IOException; +import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -55,9 +72,52 @@ class JetStreamMessageBrokerUnitTest { private static QueueDetail queueNamed(String name) { QueueDetail q = mock(QueueDetail.class); when(q.getName()).thenReturn(name); + when(q.getType()).thenReturn(QueueType.QUEUE); + when(q.resolvedConsumerName()).thenReturn(name + "-consumer"); return q; } + private static QueueDetail queueNamed( + String name, long visibilityTimeout, int numRetry, String consumerName) { + QueueDetail q = queueNamed(name); + when(q.getVisibilityTimeout()).thenReturn(visibilityTimeout); + when(q.getNumRetry()).thenReturn(numRetry); + when(q.resolvedConsumerName()).thenReturn(consumerName); + return q; + } + + private static StreamInfo streamInfo(long firstSeq, long lastSeq, long msgCount) { + return streamInfo(firstSeq, lastSeq, msgCount, RetentionPolicy.WorkQueue); + } + + private static StreamInfo streamInfo( + long firstSeq, long lastSeq, long msgCount, RetentionPolicy retentionPolicy) { + StreamState state = mock(StreamState.class); + when(state.getFirstSequence()).thenReturn(firstSeq); + when(state.getLastSequence()).thenReturn(lastSeq); + when(state.getMsgCount()).thenReturn(msgCount); + StreamConfiguration configuration = mock(StreamConfiguration.class); + when(configuration.getRetentionPolicy()).thenReturn(retentionPolicy); + StreamInfo info = mock(StreamInfo.class); + when(info.getStreamState()).thenReturn(state); + when(info.getConfiguration()).thenReturn(configuration); + return info; + } + + private static ConsumerInfo consumerInfo( + long pending, long ackPending, long ackFloorSequence, long deliveredSequence) { + ConsumerInfo info = mock(ConsumerInfo.class); + when(info.getNumPending()).thenReturn(pending); + when(info.getNumAckPending()).thenReturn(ackPending); + SequenceInfo ackFloor = mock(SequenceInfo.class); + when(ackFloor.getStreamSequence()).thenReturn(ackFloorSequence); + when(info.getAckFloor()).thenReturn(ackFloor); + SequenceInfo delivered = mock(SequenceInfo.class); + when(delivered.getStreamSequence()).thenReturn(deliveredSequence); + when(info.getDelivered()).thenReturn(delivered); + return info; + } + /** Build a broker with all NATS primitives mocked and stream provisioning short-circuited. */ private static Fixture newFixture(RqueueNatsConfig config) { return newFixture(config, false); @@ -79,7 +139,7 @@ private static Fixture newFixture(RqueueNatsConfig config, boolean schedulingSup config, new RqJacksonSerDes(SerializationUtils.getObjectMapper()), provisioner); - return new Fixture(conn, js, jsm, broker); + return new Fixture(conn, js, jsm, provisioner, broker); } @Test @@ -92,6 +152,78 @@ void enqueue_publishesToPrefixedSubject() throws Exception { verify(f.js, times(1)).publish(eq("rqueue.js.orders"), any(Headers.class), any(byte[].class)); } + @Test + void getPollWait_usesConfiguredFetchWait() { + RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(Duration.ofSeconds(9)); + Fixture f = newFixture(cfg); + + assertEquals(Duration.ofSeconds(9), f.broker.getPollWait(Duration.ofMillis(137))); + } + + @Test + void getPollWait_fallsBackToPollingIntervalWhenFetchWaitIsUnset() { + RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(null); + Fixture f = newFixture(cfg); + Duration pollingInterval = Duration.ofMillis(137); + + assertEquals(pollingInterval, f.broker.getPollWait(pollingInterval)); + } + + @Test + void popPriorityBindsConsumerWithQueueRetrySettingsAndTracksInFlightMessage() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + QueueDetail q = queueNamed("orders", 2_500L, 4, "worker-a"); + JetStreamSubscription subscription = mock(JetStreamSubscription.class); + when(f.provisioner.ensureConsumer( + eq("rqueue-js-orders_high"), + eq("worker-a"), + eq("rqueue.js.orders_high"), + eq(Duration.ofMillis(2_500L)), + eq(5L), + anyLong())) + .thenReturn("worker-a"); + when(f.js.subscribe(eq(null), any(PullSubscribeOptions.class))).thenReturn(subscription); + RqueueMessage message = RqueueMessage.builder().id("m1").message("payload").build(); + Message natsMessage = mock(Message.class); + when(natsMessage.getData()) + .thenReturn(new RqJacksonSerDes(SerializationUtils.getObjectMapper()).serialize(message)); + when(subscription.fetch(eq(1), eq(Duration.ofMillis(50L)))).thenReturn(List.of(natsMessage)); + + List messages = f.broker.pop(q, "high", "worker-a", 1, Duration.ZERO); + + assertEquals(1, messages.size()); + assertEquals("m1", messages.get(0).getId()); + assertTrue(f.broker.extendVisibilityTimeout(q, messages.get(0), 1_000L)); + verify(natsMessage, times(1)).inProgress(); + assertTrue(f.broker.nack(q, messages.get(0), -1L)); + verify(natsMessage, times(1)).nakWithDelay(Duration.ZERO); + assertFalse(f.broker.ack(q, messages.get(0))); + } + + @Test + void popNaksUndeserializablePayloadAndReturnsRemainingMessages() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + JetStreamSubscription subscription = mock(JetStreamSubscription.class); + when(f.provisioner.ensureConsumer( + eq("rqueue-js-orders"), + eq("orders-consumer"), + eq("rqueue.js.orders"), + any(Duration.class), + anyLong(), + anyLong())) + .thenReturn("orders-consumer"); + when(f.js.subscribe(eq(null), any(PullSubscribeOptions.class))).thenReturn(subscription); + Message badMessage = mock(Message.class); + when(badMessage.getData()).thenReturn("not-json".getBytes(UTF_8)); + when(subscription.fetch(eq(1), eq(Duration.ofMillis(10L)))).thenReturn(List.of(badMessage)); + + List messages = + f.broker.pop(queueNamed("orders"), "orders-consumer", 1, Duration.ofMillis(10L)); + + assertTrue(messages.isEmpty()); + verify(badMessage, times(1)).nak(); + } + @Test void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception { Fixture f = newFixture(RqueueNatsConfig.defaults()); @@ -172,6 +304,77 @@ void subscribe_createsDispatcherAndSubscribesChannel_closeReleasesIt() throws Ex verify(f.conn, times(1)).closeDispatcher(d); } + @Test + void sizeUsesStreamCountForWorkQueueAndApproximatesLimitsFromSlowestConsumer() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + StreamInfo workInfo = streamInfo(1L, 9L, 7L); + when(f.jsm.getStreamInfo("rqueue-js-work")).thenReturn(workInfo); + assertEquals(7L, f.broker.size(queueNamed("work"))); + assertFalse(f.broker.isSizeApproximate(queueNamed("work"))); + + StreamInfo fanoutInfo = streamInfo(1L, 10L, 10L, RetentionPolicy.Limits); + ConsumerInfo fastConsumer = consumerInfo(1L, 0L, 8L, 9L); + ConsumerInfo slowConsumer = consumerInfo(5L, 2L, 3L, 4L); + when(f.jsm.getStreamInfo("rqueue-js-fanout")).thenReturn(fanoutInfo); + when(f.jsm.getConsumerNames("rqueue-js-fanout")).thenReturn(List.of("fast", "slow")); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "fast")).thenReturn(fastConsumer); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "slow")).thenReturn(slowConsumer); + + assertEquals(7L, f.broker.size(queueNamed("fanout"))); + assertTrue(f.broker.isSizeApproximate(queueNamed("fanout"))); + } + + @Test + void subscribersExposeSharedPendingForWorkQueueAndPerConsumerPendingForLimits() throws Exception { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + StreamInfo workInfo = streamInfo(1L, 4L, 4L); + ConsumerInfo workConsumer = consumerInfo(12L, 3L, 1L, 1L); + when(f.jsm.getStreamInfo("rqueue-js-work")).thenReturn(workInfo); + when(f.jsm.getConsumerNames("rqueue-js-work")).thenReturn(List.of("a")); + when(f.jsm.getConsumerInfo("rqueue-js-work", "a")).thenReturn(workConsumer); + + List workSubscribers = f.broker.subscribers(queueNamed("work")); + + assertEquals(1, workSubscribers.size()); + assertEquals("a", workSubscribers.get(0).consumerName()); + assertEquals(4L, workSubscribers.get(0).pending()); + assertEquals(3L, workSubscribers.get(0).inFlight()); + assertTrue(workSubscribers.get(0).pendingShared()); + + StreamInfo fanoutInfo = streamInfo(1L, 6L, 6L, RetentionPolicy.Limits); + ConsumerInfo fanoutA = consumerInfo(2L, 1L, 3L, 4L); + ConsumerInfo fanoutB = consumerInfo(5L, 0L, 1L, 1L); + when(f.jsm.getStreamInfo("rqueue-js-fanout")).thenReturn(fanoutInfo); + when(f.jsm.getConsumerNames("rqueue-js-fanout")).thenReturn(List.of("a", "b")); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "a")).thenReturn(fanoutA); + when(f.jsm.getConsumerInfo("rqueue-js-fanout", "b")).thenReturn(fanoutB); + + List fanoutSubscribers = f.broker.subscribers(queueNamed("fanout")); + + assertEquals(2, fanoutSubscribers.size()); + assertEquals(2L, fanoutSubscribers.get(0).pending()); + assertEquals(1L, fanoutSubscribers.get(0).inFlight()); + assertFalse(fanoutSubscribers.get(0).pendingShared()); + assertEquals(5L, fanoutSubscribers.get(1).pending()); + } + + @Test + void storageDisplayAndDataTypeLabelsUseNatsTerminology() { + Fixture f = newFixture(RqueueNatsConfig.defaults()); + QueueDetail q = queueNamed("orders"); + + assertEquals("NATS", f.broker.storageKicker()); + assertTrue(f.broker.storageDescription().contains("NATS JetStream")); + assertEquals("rqueue-js-orders", f.broker.storageDisplayName(q)); + assertEquals("rqueue-js-orders-dlq", f.broker.dlqStorageDisplayName(q)); + assertEquals("Queue (Stream)", f.broker.dataTypeLabel(NavTab.PENDING, DataType.LIST)); + assertEquals("Dead Letter (Stream)", f.broker.dataTypeLabel(NavTab.DEAD, DataType.ZSET)); + assertEquals("Completed (KV)", f.broker.dataTypeLabel(NavTab.COMPLETED, DataType.SET)); + assertEquals("Stream", f.broker.dataTypeLabel(NavTab.RUNNING, DataType.ZSET)); + assertEquals("Stream", f.broker.dataTypeLabel(null, DataType.LIST)); + assertNull(f.broker.dataTypeLabel(null, null)); + } + @Test void enqueueReactive_completesWhenPublishFutureCompletes() { Fixture f = newFixture(RqueueNatsConfig.defaults()); @@ -344,12 +547,19 @@ private static final class Fixture { final Connection conn; final JetStream js; final JetStreamManagement jsm; + final NatsProvisioner provisioner; final JetStreamMessageBroker broker; - Fixture(Connection conn, JetStream js, JetStreamManagement jsm, JetStreamMessageBroker broker) { + Fixture( + Connection conn, + JetStream js, + JetStreamManagement jsm, + NatsProvisioner provisioner, + JetStreamMessageBroker broker) { this.conn = conn; this.js = js; this.jsm = jsm; + this.provisioner = provisioner; this.broker = broker; } } diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java new file mode 100644 index 00000000..449aad45 --- /dev/null +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueAutoStartupLifecycle.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.spring.boot; + +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; + +/** + * Delays Rqueue poller startup in Boot web applications until the servlet or reactive web server + * is ready and Spring Boot has published {@link ApplicationReadyEvent}. + */ +public class RqueueAutoStartupLifecycle + implements BeanPostProcessor, ApplicationListener { + + private final Set delayedContainers = + ConcurrentHashMap.newKeySet(); + + public void delayAutoStartup(RqueueMessageListenerContainer container) { + container.setAutoStartup(false); + delayedContainers.add(container); + } + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) + throws BeansException { + if (bean instanceof RqueueMessageListenerContainer) { + RqueueMessageListenerContainer container = (RqueueMessageListenerContainer) bean; + if (container.isAutoStartup()) { + delayAutoStartup(container); + } + } + return bean; + } + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + for (RqueueMessageListenerContainer container : delayedContainers) { + if (!container.isRunning()) { + container.start(); + } + } + } +} diff --git a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java index 9ac5c4fc..863d9d5a 100644 --- a/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java +++ b/rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java @@ -33,8 +33,11 @@ import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled; import com.github.sonus21.rqueue.utils.condition.RqueueEnabled; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -42,6 +45,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Role; @Configuration @AutoConfigureAfter(DataRedisAutoConfiguration.class) @@ -54,6 +58,16 @@ @Import(RqueueRedisConfigImportSelector.class) public class RqueueListenerAutoConfig extends RqueueListenerBaseConfig { + @Autowired(required = false) + private RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle; + + @Bean + @Role(BeanDefinition.ROLE_INFRASTRUCTURE) + @ConditionalOnWebApplication + public static RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle() { + return new RqueueAutoStartupLifecycle(); + } + @Bean @ConditionalOnMissingBean public RqueueMessageHandler rqueueMessageHandler(MessageBroker messageBroker) { @@ -73,7 +87,23 @@ public RqueueMessageListenerContainer rqueueMessageListenerContainer( if (simpleRqueueListenerContainerFactory.getMessageBroker() == null) { simpleRqueueListenerContainerFactory.setMessageBroker(messageBroker); } - return simpleRqueueListenerContainerFactory.createMessageListenerContainer(); + boolean delayAutoStartup = + rqueueAutoStartupLifecycle != null && simpleRqueueListenerContainerFactory.getAutoStartup(); + if (delayAutoStartup) { + simpleRqueueListenerContainerFactory.setAutoStartup(false); + } + RqueueMessageListenerContainer container; + try { + container = simpleRqueueListenerContainerFactory.createMessageListenerContainer(); + } finally { + if (delayAutoStartup) { + simpleRqueueListenerContainerFactory.setAutoStartup(true); + } + } + if (delayAutoStartup) { + rqueueAutoStartupLifecycle.delayAutoStartup(container); + } + return container; } @Bean diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java deleted file mode 100644 index cadadf01..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsBackendEndToEndIT.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -/** - * End-to-end integration test wiring a Spring Boot application against a NATS JetStream - * instance via {@code rqueue.backend=nats}, an {@link RqueueListener}, and the default - * {@link RqueueMessageEnqueuer}. It exercises the full intended path: - * - *
- *   Enqueue -> JetStreamMessageBroker.enqueue -> JetStream stream
- *           -> BrokerMessagePoller.pop -> @RqueueListener invocation -> broker.ack
- * 
- * - *

The NATS instance is supplied by {@link AbstractNatsBootIT}: when {@code NATS_RUNNING=true} - * (CI), the test connects to a locally running nats-server; otherwise it falls back to a - * Testcontainers-managed container, which itself skips gracefully without Docker. - * - *

Boots without any Redis at all: every Redis-shaped bean (config DAOs, dashboard controllers, - * pub/sub channel, schedulers) is gated by {@code @Conditional(RedisBackendCondition.class)} and - * stays out of the context when {@code rqueue.backend=nats}. {@code DataRedisAutoConfiguration} - * is excluded so Spring Boot doesn't try to wire a Lettuce client either. - */ -@SpringBootTest( - classes = NatsBackendEndToEndIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NatsBackendEndToEndIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsBackendEndToEndIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsBackendEndToEndIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-backendE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.backendE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - TestListener listener; - - @Test - void enqueueIsReceivedByListener() throws Exception { - for (int i = 0; i < 5; i++) { - enqueuer.enqueue("e2e-test", "payload-" + i); - } - assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.received) - .containsExactlyInAnyOrder("payload-0", "payload-1", "payload-2", "payload-3", "payload-4"); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(TestListener.class) - static class TestApp {} - - @Component - static class TestListener { - final CountDownLatch latch = new CountDownLatch(5); - final List received = Collections.synchronizedList(new ArrayList<>()); - - @RqueueListener(value = "e2e-test") - void onMessage(String payload) { - received.add(payload); - latch.countDown(); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java deleted file mode 100644 index 6076017d..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsConcurrencyE2EIT.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -/** - * End-to-end test confirming that {@code @RqueueListener(concurrency=...)} actually runs more - * than one handler invocation in parallel against the NATS backend. We don't assert an exact - * parallelism value because JetStream prefetch + thread scheduling makes that flaky; observing - * any parallelism > 1 is enough proof the concurrency knob is wired through to a pull - * subscription with multiple poller threads. - */ -@SpringBootTest( - classes = NatsConcurrencyE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NatsConcurrencyE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsConcurrencyE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsConcurrencyE2EIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-concurrencyE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.concurrencyE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - ConcurrencyListener listener; - - @Test - void parallelInvocationsAreObserved() throws Exception { - for (int i = 0; i < 30; i++) { - enqueuer.enqueue("conc-e2e", "msg-" + i); - } - assertThat(listener.latch.await(45, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.maxParallel.get()) - .as("at least 2 concurrent invocations should have been observed") - .isGreaterThanOrEqualTo(2); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(ConcurrencyListener.class) - static class TestApp {} - - @Component - static class ConcurrencyListener { - final CountDownLatch latch = new CountDownLatch(30); - final AtomicInteger active = new AtomicInteger(); - final AtomicInteger maxParallel = new AtomicInteger(); - - @RqueueListener(value = "conc-e2e", concurrency = "3") - void onMessage(String payload) throws InterruptedException { - int now = active.incrementAndGet(); - maxParallel.updateAndGet(curr -> Math.max(curr, now)); - try { - Thread.sleep(200L); - } finally { - active.decrementAndGet(); - latch.countDown(); - } - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java deleted file mode 100644 index 918cfa61..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import io.nats.client.JetStreamManagement; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -@SpringBootTest( - classes = NatsGlobalRetryLimitE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.retry.max=2", - "rqueue.retry.per.poll=1000", - "rqueue.nats.naming.stream-prefix=" + NatsGlobalRetryLimitE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsGlobalRetryLimitE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsGlobalRetryLimitE2EIT extends AbstractNatsBootIT { - - static final String QUEUE = "global-retry-nats"; - static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - JetStreamManagement jsm; - - @Autowired - FailingListener listener; - - @Test - void globalRetryLimitCapsImplicitRetryForever() throws Exception { - enqueuer.enqueue(QUEUE, "payload"); - - assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await() - .during(Duration.ofSeconds(2)) - .atMost(Duration.ofSeconds(3)) - .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - - assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") - .getConsumerConfiguration() - .getMaxDeliver()) - .isEqualTo(3L); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(FailingListener.class) - static class TestApp {} - - @Component - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); - final CountDownLatch twoAttempts = new CountDownLatch(2); - - @RqueueListener(value = QUEUE) - void onMessage(String ignored) { - attempts.incrementAndGet(); - twoAttempts.countDown(); - throw new IllegalStateException("force retry"); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java deleted file mode 100644 index a366a7f1..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsPriorityQueuesE2EIT.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; - -/** - * Verifies queue-level priority on the NATS backend: a single listener with - * {@code priority="high=10,low=1"} consumes from two internal sub-queues - * ({@code pq_high} and {@code pq_low}) and the producer sends to each via - * {@link RqueueMessageEnqueuer#enqueueWithPriority}. We assert all 10 messages are - * received and that 5 messages with payload prefix "high-" and 5 with "low-" arrive. - */ -@SpringBootTest( - classes = NatsPriorityQueuesE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.nats.naming.stream-prefix=" + NatsPriorityQueuesE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsPriorityQueuesE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsPriorityQueuesE2EIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-priorityE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.priorityE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - PriorityListener listener; - - @Test - void messagesEnqueuedAtBothPrioritiesAreReceived() throws Exception { - for (int i = 0; i < 5; i++) { - enqueuer.enqueueWithPriority("pq", "high", "high-" + i); - enqueuer.enqueueWithPriority("pq", "low", "low-" + i); - } - assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue(); - - long highCount = - listener.received.stream().filter(s -> s.startsWith("high-")).count(); - long lowCount = listener.received.stream().filter(s -> s.startsWith("low-")).count(); - assertThat(highCount).isEqualTo(5); - assertThat(lowCount).isEqualTo(5); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(PriorityListener.class) - static class TestApp {} - - @Component - static class PriorityListener { - final CountDownLatch latch = new CountDownLatch(10); - final List received = Collections.synchronizedList(new ArrayList<>()); - - @RqueueListener(value = "pq", priority = "high=10,low=1") - void onMessage(String payload) { - received.add(payload); - latch.countDown(); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java deleted file mode 100644 index bb92cca0..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsReactiveEnqueueE2EIT.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration; -import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Flux; - -/** - * Verifies the reactive producer path on the NATS backend: enqueueing 5 messages via - * {@link ReactiveRqueueMessageEnqueuer} (subscribed via {@link Flux#merge}) and confirming a - * synchronous {@code @RqueueListener} on the same queue receives all 5. - */ -@SpringBootTest( - classes = NatsReactiveEnqueueE2EIT.TestApp.class, - properties = { - "rqueue.backend=nats", - "rqueue.reactive.enabled=true", - "rqueue.nats.naming.stream-prefix=" + NatsReactiveEnqueueE2EIT.STREAM_PREFIX, - "rqueue.nats.naming.subject-prefix=" + NatsReactiveEnqueueE2EIT.SUBJECT_PREFIX - }) -@Tag("nats") -class NatsReactiveEnqueueE2EIT extends AbstractNatsBootIT { - - static final String STREAM_PREFIX = "rqueue-js-reactiveEnqE2E-"; - static final String SUBJECT_PREFIX = "rqueue.js.reactiveEnqE2E."; - - @BeforeAll - static void wipeOwnedStreams() { - deleteStreamsWithPrefix(STREAM_PREFIX); - } - - @Autowired - ReactiveRqueueMessageEnqueuer reactiveEnqueuer; - - @Autowired - ReactiveListener listener; - - @Test - void reactivelyEnqueuedMessagesAreReceivedByListener() throws Exception { - List> publishers = new ArrayList<>(); - for (int i = 0; i < 5; i++) { - publishers.add(reactiveEnqueuer.enqueue("reactive-e2e", "rx-" + i)); - } - List ids = Flux.merge(publishers).collectList().block(Duration.ofSeconds(15)); - assertThat(ids).hasSize(5); - - assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue(); - assertThat(listener.received).containsExactlyInAnyOrder("rx-0", "rx-1", "rx-2", "rx-3", "rx-4"); - } - - @SpringBootApplication( - exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class}) - @Import(ReactiveListener.class) - static class TestApp {} - - @Component - static class ReactiveListener { - final CountDownLatch latch = new CountDownLatch(5); - final List received = Collections.synchronizedList(new ArrayList<>()); - - @RqueueListener(value = "reactive-e2e") - void onMessage(String payload) { - received.add(payload); - latch.countDown(); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java new file mode 100644 index 00000000..d23da3b1 --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BackendContractE2EIT.java @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Flux; + +@SpringBootTest(classes = BackendContractE2EIT.TestApp.class) +@SpringBootIntegrationTest +@Tag("nats") +class BackendContractE2EIT { + + private static final Logger log = Logger.getLogger(BackendContractE2EIT.class.getName()); + private static final String BACKEND = System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); + private static final String STREAM_PREFIX = "rqueue-js-backendContract-"; + private static final String SUBJECT_PREFIX = "rqueue.js.backendContract."; + private static final String EXTERNAL_NATS_URL = + System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222"); + private static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null; + + private static GenericContainer nats; + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + ReactiveRqueueMessageEnqueuer reactiveEnqueuer; + + @Autowired + ContractListener listener; + + @BeforeAll + static void bootstrapBackend() { + if (isNatsBackend()) { + startNats(); + deleteStreamsWithPrefix(STREAM_PREFIX); + } + } + + @DynamicPropertySource + static void properties(DynamicPropertyRegistry registry) { + registry.add("rqueue.backend", () -> BACKEND); + registry.add("rqueue.reactive.enabled", () -> "true"); + if (isNatsBackend()) { + registry.add("rqueue.nats.connection.url", BackendContractE2EIT::activeNatsUrl); + registry.add("rqueue.nats.naming.stream-prefix", () -> STREAM_PREFIX); + registry.add("rqueue.nats.naming.subject-prefix", () -> SUBJECT_PREFIX); + } else { + registry.add("spring.data.redis.port", () -> "8028"); + registry.add("mysql.db.name", () -> "BackendContractE2EIT"); + registry.add("use.system.redis", () -> "false"); + } + } + + @BeforeEach + void resetListener() { + listener.reset(); + } + + @Test + void enqueuedMessagesAreReceivedByListener() throws Exception { + for (int i = 0; i < 5; i++) { + enqueuer.enqueue("contract-basic", "payload-" + i); + } + + assertThat(listener.basicLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.basicReceived) + .containsExactlyInAnyOrder("payload-0", "payload-1", "payload-2", "payload-3", "payload-4"); + } + + @Test + void reactivelyEnqueuedMessagesAreReceivedByListener() throws Exception { + List> publishers = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + publishers.add(reactiveEnqueuer.enqueue("contract-reactive", "rx-" + i)); + } + + List ids = Flux.merge(publishers).collectList().block(Duration.ofSeconds(15)); + assertThat(ids).hasSize(5).doesNotContainNull(); + assertThat(listener.reactiveLatch.await(20, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.reactiveReceived) + .containsExactlyInAnyOrder("rx-0", "rx-1", "rx-2", "rx-3", "rx-4"); + } + + @Test + void concurrentListenerInvocationsAreObserved() throws Exception { + for (int i = 0; i < 30; i++) { + enqueuer.enqueue("contract-concurrency", "msg-" + i); + } + + assertThat(listener.concurrencyLatch.await(45, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.maxParallel.get()).isGreaterThanOrEqualTo(2); + } + + @Test + void messagesEnqueuedAtBothPrioritiesAreReceived() throws Exception { + for (int i = 0; i < 5; i++) { + enqueuer.enqueueWithPriority("contract-priority", "high", "high-" + i); + enqueuer.enqueueWithPriority("contract-priority", "low", "low-" + i); + } + + assertThat(listener.priorityLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.priorityReceived.stream() + .filter(s -> s.startsWith("high-")) + .count()) + .isEqualTo(5); + assertThat( + listener.priorityReceived.stream().filter(s -> s.startsWith("low-")).count()) + .isEqualTo(5); + } + + private static boolean isNatsBackend() { + return "nats".equalsIgnoreCase(BACKEND); + } + + private static void startNats() { + if (!isNatsBackend() || USE_EXTERNAL_NATS || nats != null) { + return; + } + Assumptions.assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping: Docker is not available and NATS_RUNNING is not set"); + nats = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) + .withCommand("-js") + .withExposedPorts(4222) + .waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1)); + nats.start(); + Runtime.getRuntime().addShutdownHook(new Thread(nats::stop)); + } + + private static String activeNatsUrl() { + if (USE_EXTERNAL_NATS) { + return EXTERNAL_NATS_URL; + } + startNats(); + return "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222); + } + + private static void deleteStreamsWithPrefix(String prefix) { + try (Connection c = Nats.connect(activeNatsUrl())) { + JetStreamManagement management = c.jetStreamManagement(); + List names = management.getStreamNames(); + if (names == null) { + return; + } + for (String name : names) { + if (name.startsWith(prefix)) { + management.deleteStream(name); + } + } + } catch (IOException | JetStreamApiException e) { + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } + } + + @SpringBootApplication + @Import({ContractListener.class, RedisTestConfig.class}) + static class TestApp {} + + @Configuration + @ConditionalOnProperty(name = "rqueue.backend", havingValue = "redis", matchIfMissing = true) + static class RedisTestConfig extends BaseApplication {} + + static class ContractListener { + CountDownLatch basicLatch = new CountDownLatch(5); + List basicReceived = Collections.synchronizedList(new ArrayList<>()); + + CountDownLatch reactiveLatch = new CountDownLatch(5); + List reactiveReceived = Collections.synchronizedList(new ArrayList<>()); + + CountDownLatch concurrencyLatch = new CountDownLatch(30); + AtomicInteger active = new AtomicInteger(); + AtomicInteger maxParallel = new AtomicInteger(); + + CountDownLatch priorityLatch = new CountDownLatch(10); + List priorityReceived = Collections.synchronizedList(new ArrayList<>()); + + void reset() { + basicLatch = new CountDownLatch(5); + basicReceived = Collections.synchronizedList(new ArrayList<>()); + reactiveLatch = new CountDownLatch(5); + reactiveReceived = Collections.synchronizedList(new ArrayList<>()); + concurrencyLatch = new CountDownLatch(30); + active = new AtomicInteger(); + maxParallel = new AtomicInteger(); + priorityLatch = new CountDownLatch(10); + priorityReceived = Collections.synchronizedList(new ArrayList<>()); + } + + @RqueueListener(value = "contract-basic") + void onBasic(String payload) { + basicReceived.add(payload); + basicLatch.countDown(); + } + + @RqueueListener(value = "contract-reactive") + void onReactive(String payload) { + reactiveReceived.add(payload); + reactiveLatch.countDown(); + } + + @RqueueListener(value = "contract-concurrency", concurrency = "3") + void onConcurrent(String payload) throws InterruptedException { + int now = active.incrementAndGet(); + maxParallel.updateAndGet(curr -> Math.max(curr, now)); + try { + Thread.sleep(200L); + } finally { + active.decrementAndGet(); + concurrencyLatch.countDown(); + } + } + + @RqueueListener( + value = "contract-priority", + priority = "high=10,low=1", + batchSize = "5", + concurrency = "5") + void onPriority(String payload) { + priorityReceived.add(payload); + priorityLatch.countDown(); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java new file mode 100644 index 00000000..ac86770a --- /dev/null +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/GlobalRetryLimitE2EIT.java @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ +package com.github.sonus21.rqueue.spring.boot.tests.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory; +import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; +import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; +import com.github.sonus21.rqueue.test.application.BaseApplication; +import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff; +import io.nats.client.Connection; +import io.nats.client.JetStreamApiException; +import io.nats.client.JetStreamManagement; +import io.nats.client.Nats; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +@SpringBootTest(classes = GlobalRetryLimitE2EIT.TestApp.class) +@SpringBootIntegrationTest +@Tag("nats") +class GlobalRetryLimitE2EIT { + + private static final Logger log = Logger.getLogger(GlobalRetryLimitE2EIT.class.getName()); + private static final String BACKEND = System.getProperty( + "rqueue.test.backend", System.getenv().getOrDefault("RQUEUE_TEST_BACKEND", "redis")) + .toLowerCase(Locale.ROOT); + private static final String QUEUE = "global-retry-" + BACKEND; + private static final String STREAM_PREFIX = "rqueue-js-globalRetryE2E-"; + private static final String SUBJECT_PREFIX = "rqueue.js.globalRetryE2E."; + private static final String EXTERNAL_NATS_URL = + System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222"); + private static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null; + + private static GenericContainer nats; + + @Autowired + RqueueMessageEnqueuer enqueuer; + + @Autowired + FailingListener listener; + + @Autowired + RqueueConfig rqueueConfig; + + @Autowired(required = false) + JetStreamManagement jsm; + + @BeforeAll + static void bootstrapBackend() { + if (isNatsBackend()) { + startNats(); + deleteStreamsWithPrefix(STREAM_PREFIX); + } + } + + @DynamicPropertySource + static void properties(DynamicPropertyRegistry registry) { + registry.add("rqueue.backend", () -> BACKEND); + registry.add("rqueue.retry.max", () -> "2"); + registry.add("rqueue.retry.per.poll", () -> "1"); + registry.add("global.retry.limit.queue", () -> QUEUE); + if (isNatsBackend()) { + registry.add("rqueue.nats.connection.url", GlobalRetryLimitE2EIT::activeNatsUrl); + registry.add("rqueue.nats.naming.stream-prefix", () -> STREAM_PREFIX); + registry.add("rqueue.nats.naming.subject-prefix", () -> SUBJECT_PREFIX); + } else { + registry.add("spring.data.redis.port", () -> "8027"); + registry.add("mysql.db.name", () -> "GlobalRetryLimitE2EIT"); + registry.add("use.system.redis", () -> "false"); + } + } + + @BeforeEach + void resetListener() { + listener.reset(); + rqueueConfig.setRetryPerPoll(1); + } + + @Test + void globalRetryLimitCapsSimpleEnqueueWhenRetryPerPollIsOne() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertTwoAttemptsOnly(); + if (isNatsBackend()) { + assertThat(jsm).isNotNull(); + assertThat(jsm.getConsumerInfo(STREAM_PREFIX + QUEUE, QUEUE + "-consumer") + .getConsumerConfiguration() + .getMaxDeliver()) + .isEqualTo(3L); + } + } + + @Test + void globalRetryLimitCapsSimpleEnqueueWhenRetryPerPollIsHigh() throws Exception { + rqueueConfig.setRetryPerPoll(100); + + enqueuer.enqueue(QUEUE, "payload"); + + assertTwoAttemptsOnly(); + } + + @Test + void globalRetryLimitUsesRemainingRetriesWhenRetryPerPollIncreases() throws Exception { + enqueuer.enqueue(QUEUE, "payload"); + + assertThat(listener.firstAttempt.await(20, TimeUnit.SECONDS)).isTrue(); + rqueueConfig.setRetryPerPoll(100); + + assertTwoAttemptsOnly(); + } + + private void assertTwoAttemptsOnly() throws InterruptedException { + assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); + Awaitility.await() + .during(Duration.ofMillis(600)) + .atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); + } + + private static boolean isNatsBackend() { + return "nats".equalsIgnoreCase(BACKEND); + } + + private static void startNats() { + if (!isNatsBackend() || USE_EXTERNAL_NATS || nats != null) { + return; + } + Assumptions.assumeTrue( + DockerClientFactory.instance().isDockerAvailable(), + "Skipping: Docker is not available and NATS_RUNNING is not set"); + nats = new GenericContainer<>(DockerImageName.parse("nats:2.12-alpine")) + .withCommand("-js") + .withExposedPorts(4222) + .waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1)); + nats.start(); + Runtime.getRuntime().addShutdownHook(new Thread(nats::stop)); + } + + private static String activeNatsUrl() { + if (USE_EXTERNAL_NATS) { + return EXTERNAL_NATS_URL; + } + startNats(); + return "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222); + } + + private static void deleteStreamsWithPrefix(String prefix) { + try (Connection c = Nats.connect(activeNatsUrl())) { + JetStreamManagement management = c.jetStreamManagement(); + List names = management.getStreamNames(); + if (names == null) { + return; + } + for (String name : names) { + if (name.startsWith(prefix)) { + management.deleteStream(name); + } + } + } catch (IOException | JetStreamApiException e) { + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.log(Level.WARNING, "Failed to clean NATS streams: " + e.getMessage()); + } + } + + @SpringBootApplication + @Import({FailingListener.class, RedisTestConfig.class}) + static class TestApp { + + @Bean + public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { + FixedTaskExecutionBackOff backOff = new FixedTaskExecutionBackOff(); + backOff.setInterval(100); + SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); + factory.setTaskExecutionBackOff(backOff); + return factory; + } + } + + @Configuration + @ConditionalOnProperty(name = "rqueue.backend", havingValue = "redis", matchIfMissing = true) + static class RedisTestConfig extends BaseApplication {} + + static class FailingListener { + final AtomicInteger attempts = new AtomicInteger(); + CountDownLatch firstAttempt = new CountDownLatch(1); + CountDownLatch twoAttempts = new CountDownLatch(2); + + void reset() { + attempts.set(0); + firstAttempt = new CountDownLatch(1); + twoAttempts = new CountDownLatch(2); + } + + @RqueueListener(value = "${global.retry.limit.queue}") + void onMessage(String ignored) { + attempts.incrementAndGet(); + firstAttempt.countDown(); + twoAttempts.countDown(); + throw new IllegalStateException("force retry"); + } + } +} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java deleted file mode 100644 index d0df932e..00000000 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RedisGlobalRetryLimitE2EIT.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2026 Sonu Kumar - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * You may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - */ -package com.github.sonus21.rqueue.spring.boot.tests.integration; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.github.sonus21.rqueue.annotation.RqueueListener; -import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer; -import com.github.sonus21.rqueue.spring.boot.tests.SpringBootIntegrationTest; -import com.github.sonus21.rqueue.test.application.BaseApplication; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.context.TestPropertySource; - -@SpringBootTest(classes = RedisGlobalRetryLimitE2EIT.TestApp.class) -@TestPropertySource( - properties = { - "rqueue.retry.max=2", - "rqueue.retry.per.poll=1000", - "spring.data.redis.port=8027", - "mysql.db.name=RedisGlobalRetryLimitE2EIT", - "use.system.redis=false" - }) -@SpringBootIntegrationTest -class RedisGlobalRetryLimitE2EIT { - - static final String QUEUE = "global-retry-redis"; - - @Autowired - RqueueMessageEnqueuer enqueuer; - - @Autowired - FailingListener listener; - - @Test - void globalRetryLimitCapsImplicitRetryForever() throws Exception { - enqueuer.enqueue(QUEUE, "payload"); - - assertThat(listener.twoAttempts.await(20, TimeUnit.SECONDS)).isTrue(); - Awaitility.await() - .during(Duration.ofSeconds(2)) - .atMost(Duration.ofSeconds(3)) - .untilAsserted(() -> assertThat(listener.attempts).hasValue(2)); - } - - @SpringBootApplication - @Import(FailingListener.class) - static class TestApp extends BaseApplication {} - - static class FailingListener { - final AtomicInteger attempts = new AtomicInteger(); - final CountDownLatch twoAttempts = new CountDownLatch(2); - - @RqueueListener(value = QUEUE) - void onMessage(String ignored) { - attempts.incrementAndGet(); - twoAttempts.countDown(); - throw new IllegalStateException("force retry"); - } - } -} diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java index b7e2986e..5b8f4686 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java @@ -17,8 +17,10 @@ package com.github.sonus21.rqueue.spring.boot.tests.unit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @@ -35,6 +37,8 @@ import com.github.sonus21.rqueue.core.spi.Capabilities; import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.spring.boot.RqueueAutoStartupLifecycle; import com.github.sonus21.rqueue.spring.boot.RqueueListenerAutoConfig; import com.github.sonus21.rqueue.spring.boot.tests.SpringBootUnitTest; import org.apache.commons.lang3.reflect.FieldUtils; @@ -119,10 +123,76 @@ void rqueueMessageListenerContainer() "com.github.sonus21.rqueue.converter.DefaultMessageConverterProvider", true); FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); - messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); + FieldUtils.writeField( + messageAutoConfig, "rqueueAutoStartupLifecycle", new RqueueAutoStartupLifecycle(), true); + RqueueMessageListenerContainer container = + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); assertEquals(factory.getRqueueMessageHandler(null).hashCode(), rqueueMessageHandler.hashCode()); // The broker must be propagated onto the factory so the container picks it up. assertSame(messageBroker, factory.getMessageBroker()); + assertFalse(container.isAutoStartup()); + assertTrue(factory.getAutoStartup()); + } + + @Test + void rqueueMessageListenerContainerKeepsAutoStartupWhenLifecycleMissing() + throws IllegalAccessException { + SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); + factory.setMessageConverterProvider(new DefaultMessageConverterProvider()); + factory.setMessageBroker(messageBroker); + RqueueListenerAutoConfig messageAutoConfig = new RqueueListenerAutoConfig(); + FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); + + RqueueMessageListenerContainer container = + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker); + + assertTrue(container.isAutoStartup()); + assertTrue(factory.getAutoStartup()); + assertSame(messageBroker, factory.getMessageBroker()); + } + + @Test + void rqueueMessageListenerContainerRestoresFactoryAutoStartupWhenCreationFails() + throws IllegalAccessException { + FailingListenerContainerFactory factory = new FailingListenerContainerFactory(); + RqueueListenerAutoConfig messageAutoConfig = new RqueueListenerAutoConfig(); + FieldUtils.writeField(messageAutoConfig, "simpleRqueueListenerContainerFactory", factory, true); + FieldUtils.writeField( + messageAutoConfig, "rqueueAutoStartupLifecycle", new RqueueAutoStartupLifecycle(), true); + + IllegalStateException exception = assertThrows( + IllegalStateException.class, + () -> + messageAutoConfig.rqueueMessageListenerContainer(rqueueMessageHandler, messageBroker)); + + assertEquals("boom", exception.getMessage()); + assertTrue(factory.getAutoStartup()); + } + + @Test + void autoStartupLifecycleDelaysOnlyAutoStartupContainers() { + RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle(); + TestRqueueMessageListenerContainer autoStartupContainer = + new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer disabledContainer = new TestRqueueMessageListenerContainer(); + TestRqueueMessageListenerContainer alreadyRunningContainer = + new TestRqueueMessageListenerContainer(); + disabledContainer.setAutoStartup(false); + + lifecycle.postProcessBeforeInitialization(autoStartupContainer, "autoStartupContainer"); + lifecycle.postProcessBeforeInitialization(disabledContainer, "disabledContainer"); + lifecycle.postProcessBeforeInitialization(alreadyRunningContainer, "alreadyRunningContainer"); + alreadyRunningContainer.running = true; + + assertFalse(autoStartupContainer.isAutoStartup()); + assertFalse(disabledContainer.isAutoStartup()); + assertFalse(alreadyRunningContainer.isAutoStartup()); + + lifecycle.onApplicationEvent(null); + + assertEquals(1, autoStartupContainer.startCount); + assertEquals(0, disabledContainer.startCount); + assertEquals(0, alreadyRunningContainer.startCount); } @Test @@ -158,4 +228,34 @@ void rqueueMessageSenderUsesConfiguredMessageConverter() throws IllegalAccessExc MessageConverter converter = messageSender.getMessageConverter(); assertTrue(converter.hashCode() == messageConverter.hashCode()); } + + private static class FailingListenerContainerFactory + extends SimpleRqueueListenerContainerFactory { + + @Override + public RqueueMessageListenerContainer createMessageListenerContainer() { + throw new IllegalStateException("boom"); + } + } + + private class TestRqueueMessageListenerContainer extends RqueueMessageListenerContainer { + + private int startCount; + private boolean running; + + private TestRqueueMessageListenerContainer() { + super(rqueueMessageHandler, messageTemplate); + } + + @Override + public void start() { + startCount++; + running = true; + } + + @Override + public boolean isRunning() { + return running; + } + } }