Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/java-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ jobs:

- name: Run NATS tests
env:
RQUEUE_TEST_BACKEND: nats
NATS_RUNNING: "true"
NATS_URL: nats://127.0.0.1:4222
run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ default Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs));
}

/**
* Resolve the wait duration that listener pollers should pass to {@link #pop}. The default uses
* the listener container's polling interval, preserving existing Redis behavior where that value
* also controls idle sleeps. Backends with native long-poll controls can override this so their
* fetch wait can be tuned independently.
*
* @param pollingInterval listener container polling interval
* @return wait duration for listener-driven pop calls
*/
default Duration getPollWait(Duration pollingInterval) {
return pollingInterval;
}

List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable
}
}

private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
return rqueueMessage.getRetryCount() == null
? queueDetail.getNumRetry()
: rqueueMessage.getRetryCount();
}

private void handleFailure(JobImpl job, int failureCount, Throwable throwable) {
if (job.getQueueDetail().isDoNotRetryError(throwable)) {
handleRetryExceededMessage(job, failureCount, throwable);
} else {
int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail());
if (failureCount < maxRetryCount) {
if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) {
long delay = taskExecutionBackoff.nextBackOff(
job.getMessage(), job.getRqueueMessage(), failureCount, throwable);
if (delay == TaskExecutionBackOff.STOP) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
final class RetryPolicy {

static final int UNLIMITED_RETRY_LIMIT = 100_000;

static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
int maxRetryCount = rqueueMessage.getRetryCount() == null
? queueDetail.getNumRetry()
: rqueueMessage.getRetryCount();
if (maxRetryCount == Integer.MAX_VALUE) {
return UNLIMITED_RETRY_LIMIT;
}
return maxRetryCount;
}

static int remainingRetryCount(
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail);
return Math.max(0, maxRetryCount - failureCount);
}

static int retryCountForPoll(
RqueueConfig rqueueConfig,
RqueueMessage rqueueMessage,
QueueDetail queueDetail,
int failureCount) {
int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount);
if (rqueueConfig.getRetryPerPoll() == -1) {
return remainingRetryCount;
}
return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount);
}

static boolean isExhausted(
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ private void init() {
this.failureCount = job.getRqueueMessage().getFailureCount();
}

private int getMaxRetryCount() {
return Objects.isNull(job.getRqueueMessage().getRetryCount())
? job.getQueueDetail().getNumRetry()
: job.getRqueueMessage().getRetryCount();
}

private void updateCounter(boolean fail) {
RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter();
if (Objects.isNull(counter)) {
Expand Down Expand Up @@ -179,11 +173,8 @@ private boolean isOldMessage() {
}

private int getRetryCount() {
int maxRetry = getMaxRetryCount();
if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) {
return maxRetry;
}
return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry);
return RetryPolicy.retryCountForPoll(
beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount);
}

private boolean queueInactive() {
Expand Down Expand Up @@ -283,6 +274,7 @@ private void execute() {
private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) {
if (retryCount > 0
&& ExecutionStatus.FAILED.equals(status)
&& !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount)
&& System.currentTimeMillis() < maxProcessingTime) {
boolean doNoRetry = queueDetail.isDoNotRetryError(error);
// it should not be retried based on the exception list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,14 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin

@Override
public void start() {
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
synchronized (lifecycleMgr) {
running = true;
if (running) {
log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId());
return;
}
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
doStart();
running = true;
rqueueBeanProvider
.getApplicationEventPublisher()
.publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.spi.MessageBroker;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
Expand Down Expand Up @@ -59,13 +60,9 @@ abstract class RqueueMessagePoller extends MessageContainerBase {
}

private List<RqueueMessage> getMessages(QueueDetail queueDetail, int count) {
return rqueueBeanProvider
.getMessageBroker()
.pop(
queueDetail,
queueDetail.resolvedConsumerName(),
count,
Duration.ofMillis(pollingInterval));
MessageBroker broker = rqueueBeanProvider.getMessageBroker();
Duration wait = broker.getPollWait(Duration.ofMillis(pollingInterval));
return broker.pop(queueDetail, queueDetail.resolvedConsumerName(), count, wait);
}

private void execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.listener;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;

import com.github.sonus21.rqueue.CoreUnitTest;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.utils.TestUtils;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

@CoreUnitTest
class RetryPolicyTest {

private final QueueDetail queueDetail = TestUtils.createQueueDetail("queue", 2, 900000L, null);

@Test
void retryCountForPollUsesRemainingRetryBudget() {
RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class);
doReturn(100).when(rqueueConfig).getRetryPerPoll();
RqueueMessage rqueueMessage = new RqueueMessage();

assertEquals(1, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1));
}

@Test
void retryCountForPollKeepsExplicitMessageRetryCount() {
RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class);
doReturn(-1).when(rqueueConfig).getRetryPerPoll();
RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build();

assertEquals(999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1));
}

@Test
void isExhaustedUsesEffectiveRetryCount() {
RqueueMessage rqueueMessage = new RqueueMessage();

assertFalse(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 1));
assertTrue(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 2));
}

@Test
void retryForeverSentinelUsesFiniteLimit() {
QueueDetail retryForeverQueue =
TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null);
RqueueMessage rqueueMessage = new RqueueMessage();

assertEquals(
RetryPolicy.UNLIMITED_RETRY_LIMIT,
RetryPolicy.maxRetryCount(rqueueMessage, retryForeverQueue));
assertEquals(
1,
RetryPolicy.remainingRetryCount(
rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT - 1));
assertTrue(RetryPolicy.isExhausted(
rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.github.sonus21.rqueue.listener;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -108,6 +109,7 @@ static class CountingBroker implements MessageBroker, AutoCloseable {
final AtomicInteger popCalls = new AtomicInteger();
final AtomicBoolean closed = new AtomicBoolean();
volatile Duration lastWait;
volatile Duration pollWait;
private final Capabilities caps;

CountingBroker(Capabilities caps) {
Expand All @@ -120,6 +122,11 @@ public void enqueue(QueueDetail q, RqueueMessage m) {}
@Override
public void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs) {}

@Override
public Duration getPollWait(Duration pollingInterval) {
return pollWait != null ? pollWait : MessageBroker.super.getPollWait(pollingInterval);
}

@Override
public List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait) {
popCalls.incrementAndGet();
Expand Down Expand Up @@ -221,6 +228,24 @@ void redisDefaultsBrokerAlsoUsesNormalStartQueuePath() throws Exception {
}
}

@Test
void startDoesNotStartQueuesAgainWhenContainerIsAlreadyRunning() throws Exception {
EndpointRegistry.delete();
CountingBroker broker = new CountingBroker(Capabilities.REDIS_DEFAULTS);
TrackingContainer container = new TrackingContainer(messageHandler);
container.setMessageBroker(broker);
container.afterPropertiesSet();
try {
container.start();
container.start();

assertEquals(1, container.startQueueCalls.get() + container.startGroupCalls.get());
} finally {
container.stop();
container.destroy();
}
}

@Test
void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception {
EndpointRegistry.delete();
Expand Down Expand Up @@ -248,15 +273,41 @@ void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception {
Duration wait = broker.lastWait;
assertNotNull(wait, "broker should have received a wait duration");
assertFalse(wait.isZero(), "wait must not be Duration.ZERO; should match pollingInterval");
assertTrue(
wait.toMillis() == pollingInterval,
"wait should equal the configured pollingInterval (got " + wait + ")");
assertEquals(Duration.ofMillis(pollingInterval), wait);
}

@Test
void pollerUsesBrokerResolvedFetchWait() throws Exception {
EndpointRegistry.delete();
CountingBroker broker =
new CountingBroker(new Capabilities(true, false, false, true, true, true));
broker.pollWait = Duration.ofSeconds(2);
RqueueMessageListenerContainer container =
new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate);
container.rqueueBeanProvider = beanProvider;
container.setMessageBroker(broker);
container.setPollingInterval(137L);
container.afterPropertiesSet();
container.start();
try {
long deadline = System.currentTimeMillis() + 2000;
while (broker.popCalls.get() == 0 && System.currentTimeMillis() < deadline) {
Thread.sleep(20);
}
} finally {
container.stop();
container.destroy();
}
assertTrue(broker.popCalls.get() > 0, "poller should have issued at least one pop call");
assertEquals(broker.pollWait, broker.lastWait);
}

private class TrackingContainer extends RqueueMessageListenerContainer {
final AtomicBoolean startBrokerPollersCalled = new AtomicBoolean();
final AtomicBoolean startQueueCalled = new AtomicBoolean();
final AtomicBoolean startGroupCalled = new AtomicBoolean();
final AtomicInteger startQueueCalls = new AtomicInteger();
final AtomicInteger startGroupCalls = new AtomicInteger();

TrackingContainer(RqueueMessageHandler handler) {
super(handler, rqueueMessageTemplate);
Expand All @@ -266,12 +317,14 @@ private class TrackingContainer extends RqueueMessageListenerContainer {
@Override
protected void startQueue(String pollerKey, QueueDetail queueDetail) {
startQueueCalled.set(true);
startQueueCalls.incrementAndGet();
// Do not actually start the poller; it would need a real broker.
}

@Override
protected void startGroup(String groupName, List<QueueDetail> queueDetails) {
startGroupCalled.set(true);
startGroupCalls.incrementAndGet();
}
}
}
Loading