Rqueue aka Redis Queue [Task Queue, Message Broker] for Spring framework

Overview
Rqueue Logo

Rqueue: Redis Queue, Task Queue, Scheduled Queue for Spring and Spring Boot

Build Status Coverage Status Maven Central Javadoc License

Rqueue is an asynchronous task executor(worker) built for spring and spring-boot framework based on the spring framework's messaging library backed by Redis. It can be used as message broker as well, where all services code is in Spring.


Message Flow

Features

  • Instant delivery : Instant execute this message
  • Message scheduling : A message can be scheduled for any arbitrary period
  • Unique message : Unique message processing for a queue based on the message id
  • Periodic message : Process same message at certain interval
  • Priority tasks : task having some special priority like high, low, medium
  • Message delivery : It's guaranteed that a message is consumed at least once. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
  • Message retry : Message would be retried automatically on application crash/failure/restart etc.
  • Automatic message serialization and deserialization
  • Message Multicasting : Call multiple message listeners on every message
  • Batch Message Polling : Fetch multiple messages from Redis at once
  • Metrics : In flight messages, waiting for consumption and scheduled messages
  • Competing Consumers : multiple messages can be consumed in parallel by different workers/listeners.
  • Concurrency : Concurrency of any listener can be configured
  • Queue Priority :
    • Group level queue priority(weighted and strict)
    • Sub queue priority(weighted and strict)
  • Long execution job : Long running jobs can check in periodically.
  • Execution Backoff : Exponential and fixed back off (default fixed back off)
  • Middleware : Add one or more middleware, middlewares are called before listener method.
  • Callbacks : Callbacks for dead letter queue, discard etc
  • Events : 1. Bootstrap event 2. Task execution event.
  • Redis connection : A different redis setup can be used for Rqueue
  • Redis cluster : Redis cluster can be used with Lettuce client.
  • Redis Sentinel : Redis sentinel can be used with Rqueue.
  • Reactive Programming : Supports reactive Redis and spring webflux
  • Web Dashboard : Web dashboard to manage a queue and queue insights including latency

Requirements

  • Spring 5+
  • Spring boot 2+
  • Lettuce client for Redis cluster
  • Read master preference for Redis cluster

Getting Started

Dependency

Snapshot Version: https://s01.oss.sonatype.org/content/repositories/snapshots/com/github/sonus21/ Release Version: Maven central

Spring-boot

  • Get the latest one from Maven central

  • Add dependency

    • Gradle
          implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.1-RELEASE'
    • Maven
       <dependency>
          <groupId>com.github.sonus21</groupId>
          <artifactId>rqueue-spring-boot-starter</artifactId>
          <version>2.10.1-RELEASE</version>
      </dependency>

    No additional configurations are required, only dependency is required.

Spring framework

  • Get the latest one from Maven central
  • Add Dependency
    • Gradle
          implementation 'com.github.sonus21:rqueue-spring:2.10.1-RELEASE'
    • Maven
       <dependency>
         <groupId>com.github.sonus21</groupId>
         <artifactId>rqueue-spring</artifactId>
         <version>2.10.1-RELEASE</version>
       </dependency>
  1. Add annotation EnableRqueue on application config class
  2. Provide a RedisConnectionFactory bean
Configuration
@EnableRqueue
public class Application {

  @Bean
  public RedisConnectionFactory redisConnectionFactory() {
    // return a redis connection factory
  }
}

Message publishing/Task submission

All messages need to be sent using RqueueMessageEnqueuer bean's enqueueXXX, enqueueInXXX and enqueueAtXXX methods. It has handful number of enqueue, enqueueIn, enqueueAt methods, we can use one of them based on the use case.

public class MessageService {

  @AutoWired
  private RqueueMessageEnqueuer rqueueMessageEnqueuer;

  public void doSomething() {
    rqueueMessageEnqueuer.enqueue("simple-queue", "Rqueue is configured");
  }

  public void createJOB(Job job) {
    rqueueMessageEnqueuer.enqueue("job-queue", job);
  }

  // send notification in 30 seconds
  public void sendNotification(Notification notification) {
    rqueueMessageEnqueuer.enqueueIn("notification-queue", notification, 30 * 1000L);
  }

  // enqueue At example
  public void createInvoice(Invoice invoice, Instant instant) {
    rqueueMessageEnqueuer.enqueueAt("invoice-queue", invoice, instant);
  }

  // enqueue with priority, when sub queues are used as explained in the queue priority section.
  enum SmsPriority {
    CRITICAL("critical"),
    HIGH("high"),
    MEDIUM("medium"),
    LOW("low");
    private String value;
  }

  public void sendSms(Sms sms, SmsPriority priority) {
    rqueueMessageEnqueuer.enqueueWithPriority("sms-queue", priority.value(), sms);
  }

  // enqueue periodic job, email should be sent every 30 seconds
  public void sendPeriodicEmail(Email email) {
    rqueueMessageEnqueuer.enqueuePeriodic("email-queue", invoice, 30_000);
  }

}

Worker/Consumer/Task Executor/Listener

Any method that's part of spring bean, can be marked as worker/message listener using RqueueListener annotation

@Component
@Slf4j
public class MessageListener {

  @RqueueListener(value = "simple-queue")
  public void simpleMessage(String message) {
    log.info("simple-queue: {}", message);
  }

  @RqueueListener(value = "job-queue", numRetries = "3",
      deadLetterQueue = "failed-job-queue", concurrency = "5-10")
  public void onMessage(Job job) {
    log.info("Job alert: {}", job);
  }

  @RqueueListener(value = "push-notification-queue", numRetries = "3",
      deadLetterQueue = "failed-notification-queue")
  public void onMessage(Notification notification) {
    log.info("Push notification: {}", notification);
  }

  @RqueueListener(value = "sms", priority = "critical=10,high=8,medium=4,low=1")
  public void onMessage(Sms sms) {
    log.info("Sms : {}", sms);
  }

  @RqueueListener(value = "chat-indexing", priority = "20", priorityGroup = "chat")
  public void onMessage(ChatIndexing chatIndexing) {
    log.info("ChatIndexing message: {}", chatIndexing);
  }

  @RqueueListener(value = "chat-indexing-daily", priority = "10", priorityGroup = "chat")
  public void onMessage(ChatIndexing chatIndexing) {
    log.info("ChatIndexing message: {}", chatIndexing);
  }

  // checkin job example
  @RqueueListener(value = "chat-indexing-weekly", priority = "5", priorityGroup = "chat")
  public void onMessage(ChatIndexing chatIndexing,
      @Header(RqueueMessageHeaders.JOB) com.github.sonus21.rqueue.core.Job job) {
    log.info("ChatIndexing message: {}", chatIndexing);
    job.checkIn("Chat indexing...");
  }
}

Queue Statistics

Micrometer based dashboard for queue

Grafana Dashboard

Web

Link: http://localhost:8080/rqueue

Dashboard

Dashboard

Message Waiting For Execution

Explore Queue

Recent jobs details

Jobs

Status

Rqueue is stable and production ready, it's processing 100K+ messages daily in production environment.

We would love to add your organization name here, if you're one of the Rqueue users, please raise a PR/issue .

TuneYou    PokerStars    Bitbot   

Support

  • Please report bug,question,feature(s) to issue tracker.
  • Ask question on StackOverflow using rqueue tag

Contribution

You are most welcome for any pull requests for any feature/bug/enhancement. You would need Java8 and gradle to start with. In root build.gradle file comment out spring related versions, or set environment variables for Spring versions.

Please format your code with Google Java formatter.

// springBootVersion = '2.0.6.RELEASE'
// springVersion = '5.0.10.RELEASE'
// springDataVersion = '2.0.6.RELEASE'
// microMeterVersion = '1.1.0'

Links

License

© Sonu Kumar 2019-Instant.now

The Rqueue is released under version 2.0 of the Apache License.

Comments
  • Unable to configure Rqueue in jhipster Spring boot microservice application

    Unable to configure Rqueue in jhipster Spring boot microservice application

    Describe the bug I'm not able to configure Rqueue in a jHipster Spring boot microservice application. I have followed all your resources over the internet and also your tutorial projects, but without luck

    this is my pom file:

    <spring-boot.version>2.2.7.RELEASE</spring-boot.version>
    ...............
    <dependency>
    	<groupId>com.github.sonus21</groupId>
    	<artifactId>rqueue-spring-boot-starter</artifactId>
    	<version>2.0.1-RELEASE</version>
    </dependency>
    ...............
    # adding /removing this dependency didn't help.
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    

    Here is my configuration file:

    import com.github.sonus21.rqueue.config.SimpleRqueueListenerContainerFactory;
    import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    
    @EnableRedisRepositories
    @Configuration
    public class RQueueConfiguration {
    
    
        @Bean()
        public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory(RqueueMessageHandler rqueueMessageHandler) {
            SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
            factory.setRqueueMessageHandler(rqueueMessageHandler);
    
            // adding this ThreadPoolTaskExecutor didn't help either.
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor");
            threadPoolTaskExecutor.setCorePoolSize(10);
            threadPoolTaskExecutor.setMaxPoolSize(50);
            threadPoolTaskExecutor.setQueueCapacity(0);
            threadPoolTaskExecutor.afterPropertiesSet();
            factory.setTaskExecutor(threadPoolTaskExecutor);
    
            return factory;
        }
    
    }
    

    How to Reproduce

    generate a jHipster microservice project and integrate Rqueue in it.
    here is the error i'm getting:

    ERROR 2704 --- [  restartedMain] o.s.b.d.LoggingFailureAnalysisReporter   : 
    
    ***************************
    APPLICATION FAILED TO START
    ***************************
    
    Description:
    
    The bean 'rqueueMetrics' could not be injected as a 'com.github.sonus21.rqueue.metrics.RqueueMetrics' because it is a JDK dynamic proxy that implements:
    	org.springframework.context.ApplicationListener
    
    
    Action:
    
    Consider injecting the bean as one of its interfaces or forcing the use of CGLib-based proxies by setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.
    
    
    Process finished with exit code 1
    

    additional resources

    use https://start.jhipster.tech/ to generate a jHipster app in just 1min. here is my configurations: https://i.ibb.co/rZ7mrsS/image.png

    please help me with this.

    bug 
    opened by chlegou 20
  • Dashboard resources aren't accessible from gateway

    Dashboard resources aren't accessible from gateway

    Hi again :)

    I have noticed that the rqueue dashboard isn't accessible from a gateway of a microservice architecture server.

    How to Reproduce

    i have checked the source code of the dashboard template, and it looks like the href links (css + js files) are static to direct endpoint. See here.
    for this, the resource files aren't accessible. please change the href links to accept any entry path.

    Additional Details

    Here is an example of HTTP calling the dashboard from a direct microservice and from the gateway.

    MS: http://localhost:8001/rqueue   
    Gateway: http://localhost:8000/services/rqueue-service/rqueue
    

    Please make resources accessible from any dynamic endpoint.

    Thanks. :)

    bug 
    opened by chlegou 18
  • Periodic task add

    Periodic task add

    I think, In some cases, the support of periodic tasks is required, and periodic tasks allow cancellation. Hope to achieve this function.

    Many thanks!!

    enhancement 
    opened by yuluo-zy 16
  • Too many queue and rqueue don't working anymore

    Too many queue and rqueue don't working anymore

    Describe the bug Presently in production, we have 72 queue. I add 73 queue today and rqueue don't working anymore. If I remove last one, rqueue function normally.

    How to Reproduce Add 73 queue and try it.

    Library Dependencies

    • Spring Boot: 2.4.4
    • Spring Messaging: 5.3.5
    • Spring Data Redis: 2.4.6

    Additional Details I try on windows and docker and It have same problem. I check it with debugger and function is not executed. If I check in /rqueue/running, task in on running but function is not trigger.

    not-a-bug 
    opened by j0nathan33 15
  • Option to add dashboard path prefix using rqueue.web.url.prefix config

    Option to add dashboard path prefix using rqueue.web.url.prefix config

    rqueue.web.url.prefix config option looks to have partially been removed in v2.7.0 Commit: https://github.com/sonus21/rqueue/pull/81/commits/c96d816302de13580bda45a292d90103859748da

    It's been removed from: rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueWebConfig.java rqueue-spring-boot-example/src/main/resources/application.properties

    but left in the view controller and referenced in the dashboard html template and JS file

    Was this an accidental removal?

    enhancement 
    opened by patkinson01 14
  • argument type mismatch

    argument type mismatch

    Thank you for this library :) I got java.lang.IllegalStateException: argument type mismatch error while trying to send objects to the queue.

    Stacktrace:

    java.lang.IllegalStateException: argument type mismatch
    Endpoint [com.octovan.service.redis.queue.RedisMessageListener]
    Method [public void com.octovan.service.redis.queue.RedisMessageListener.onMessage(com.octovan.service.redis.queue.JobX) throws java.lang.Exception] with argument values:
     [0] [type=com.octovan.service.redis.queue.JobX] [value=JobX(message=Test2, id=id)] 
    	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)
    	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:565)
    	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520)
    	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
    	at com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer$MessageExecutor.run(RqueueMessageListenerContainer.java:506)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.lang.IllegalArgumentException: argument type mismatch
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    	... 8 common frames omitted
    

    My classes:

    Object that I want to send over queue

    public class JobX {
        private String message;
        private String id;
    
        public JobX() {
        }
    
        public JobX(String message, String id) {
            this.message = message;
            this.id = id;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    

    My queue listener

    @RqueueListener(value = "delayed-queue-job", numRetries = "1", deadLetterQueue = "failed-job-queue")
        public void onMessage(JobX job) throws Exception {
            logger.info("Job:" + job);
        }
    

    My message send function

    public void sendDelayedMessage(String message, String id){
            rqueueMessageSender.put("delayed-queue-job", new JobX(message, id), 1);
        }
    

    Thanks. Edit: Markdown

    bug 
    opened by mertkiray 13
  • Enqueue a list of Objects in Rqueue

    Enqueue a list of Objects in Rqueue

    Describe the bug

    I have enqueued a List<MyObject> in rqueue which was successful, but i have got a CastException in the message executor @RqueueListener once the message is loaded from the database in the run method. Is it possible to enqueue a list of objects?

    How to Reproduce

    the enqueue method:

    rqueueMessageSender.enqueueIn(myQueue, Lists.newArrayList(MyObjectInstance), 1, TimeUnit.SECONDS);
    

    the listener:

    @RqueueListener(value = myQueue)
    public void informNextRoll(List<MyObject> MyObjectList) { ... }
    

    Additional Details

    here is the stack trace:

        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:176)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
        at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:565)
        at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520)
        at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
        at com.github.sonus21.rqueue.listener.RqueueExecutor.start(RqueueExecutor.java:158)
        at com.github.sonus21.rqueue.listener.MessageContainerBase.run(MessageContainerBase.java:50)
        at io.github.jhipster.async.ExceptionHandlingAsyncTaskExecutor.lambda$createWrappedRunnable$1(ExceptionHandlingAsyncTaskExecutor.java:78)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: java.lang.IllegalArgumentException: java.lang.ClassCastException@1a1632c2
        at jdk.internal.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
        ... 10 common frames omitted
    

    thanks for looking into this :)

    enhancement 
    opened by chlegou 12
  • Listeners are running at lower concurrency than the configured concurrency

    Listeners are running at lower concurrency than the configured concurrency

    Describe the bug

    I'm doing some tests with rqueue and want to test how many concurrent jobs our server can handle. Currently, I have two queues that are having these configs

    @RqueueListener(value = "tags", concurrency = "10-20", deadLetterQueue = "tags-failed")
    public void sendTag(String message) {
        log.info("tags: {}", message);
        String url = "https://jsonplaceholder.typicode.com/todos/1";
        this.restTemplate.getForObject(url, String.class);
        rqueueMessageEnqueuer.enqueue("notifications", "notifications added");
    
    }
    
    @RqueueListener(value = "notifications", concurrency = "10-20", deadLetterQueue = "notifications-failed")
    public void sendNotifications(String message) {
        log.info("notifications: {}", message);
        String url = "https://jsonplaceholder.typicode.com/todos/1";
        this.restTemplate.getForObject(url, String.class);
    }
    

    but the number of concurrent jobs always throttle at two regardless of the concurrency value that I set. I noticed that the server create 10 different consumers to handle the jobs successful and but it will always use the last two consumers regardless of how many consumers we create. (see the log for more details)
    the machine I run the code in has good specs and most of the CPU cores are idle while the queue is running (check the top command output picture).

    Screenshots & logs

    *sample log: notice how only the last two workers keep working while the rest are idle

    2021-05-04 09:44:16.260  INFO 28882 --- [ tagsConsumer-1] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:16.346  INFO 28882 --- [ tagsConsumer-2] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:16.521  INFO 28882 --- [ tagsConsumer-3] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:16.655  INFO 28882 --- [tionsConsumer-1] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:16.723  INFO 28882 --- [ tagsConsumer-4] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:16.805  INFO 28882 --- [tionsConsumer-2] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:16.843  INFO 28882 --- [ tagsConsumer-5] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:16.926  INFO 28882 --- [tionsConsumer-3] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.019  INFO 28882 --- [ tagsConsumer-6] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.077  INFO 28882 --- [tionsConsumer-4] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.137  INFO 28882 --- [ tagsConsumer-7] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.195  INFO 28882 --- [tionsConsumer-5] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.258  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.314  INFO 28882 --- [tionsConsumer-6] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.383  INFO 28882 --- [ tagsConsumer-9] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.427  INFO 28882 --- [tionsConsumer-7] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.540  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.566  INFO 28882 --- [tionsConsumer-8] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.622  INFO 28882 --- [ tagsConsumer-9] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.665  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.702  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.773  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.808  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.890  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:17.940  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:17.984  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.071  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.117  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.158  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.186  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.231  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.257  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.300  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.314  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.371  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.378  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.431  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.431  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.495  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.495  INFO 28882 --- [tagsConsumer-10] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.557  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.557  INFO 28882 --- [ionsConsumer-10] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.605  INFO 28882 --- [ tagsConsumer-9] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    2021-05-04 09:44:18.619  INFO 28882 --- [tionsConsumer-9] c.p.p.Queue.MessageListener              : notifications: {"msg":"\"notifications added\"","name":"java.lang.String"}
    2021-05-04 09:44:18.673  INFO 28882 --- [ tagsConsumer-8] c.p.p.Queue.MessageListener              : tags: {"msg":"\"Rqueue is configured\"","name":"java.lang.String"}
    

    Number of pending and running jobs

    Number of pending and running jobs

    top command output while the queue is running top command output while the queue is running

    Library Dependencies

    • Spring Boot:
    • Spring Messaging:
    • Spring Data Redis:

    Extra notes

    this is how I run the war file for the application:

    java -Dgrails.env=prod -jar sample-app-0.0.1-test.war -Xms2g -Xmx2g -Xmn150m -XX:GCTimeRatio=2 -XX:ParallelGCThreads=20 -XX:+UseParNewGC -XX:MaxGCPauseMillis=50 -XX:+DisableExplicitGC
    
    on hold 
    opened by OmarBish 10
  • How to guarantee that one message consumed exactly once?

    How to guarantee that one message consumed exactly once?

    Describe the bug

    I have 3 job queues. Jobs in the first queue create jobs in the second queue. Even I assign numRetries to 0. But the second RqueueListener seems to consume one message twice? How can I configure to guarantee that one message consumed exactly once? Thank you.

    How to Reproduce

        @RqueueListener(value = FLASH_BID_BOT_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-bot-queue", concurrency = "3-5")
        public void executeFlashBidBot(Bot bot) {
        ...
        }
    
        @RqueueListener(value = FLASH_BID_AFTER_MIN_PRICE_BOT_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-after-min-price-bot-queue", concurrency = "1-3")
        public void makeInstantBidAfterMinPrice(LateBot lateBot) {
        ...
        }
    
       @RqueueListener(value = FLASH_BID_INSTANT_BID_PADDING_QUEUE, numRetries = "0", deadLetterQueueListenerEnabled = "true", deadLetterQueue = "failed-instant-bid-padding-queue", concurrency = "1-3")
        private void padInstantBid(PaddingInstantBid instantBid) {
        ...
        }
    

    Screenshots

    e3

    Library Dependencies

    • Spring Boot:
    • Spring Data Redis:
    more information required 
    opened by vking34 10
  • RqueueListener doesn't unwrap String messages

    RqueueListener doesn't unwrap String messages

    What's not working?

    I have the following code snippets:

    Producer.java:

    RqueueMessageEnqueuer enqueuer;
    enqueuer.enqueue("my-topic", "test message");
    

    Consumer.java:

    @RqueueListener(value="my-topic")
    public void processMessage(String msg) {
      logger.info("Message: {}", msg);
    }
    

    I'd expect the logger to log Message: test message but instead it logs Message: {"msg":"\"test message\"","name":"java.lang.String"}

    The listener correctly unwraps messages that aren't of type String, but doesn't do it here.

    Should I just use ObjectMapper to handle this json? Is there something I'm doing wrong?

    (Some more explanation: the "test message" string I'm sending is actually a serialized json itself, since I don't want to share a depedency jar with a single Java object between my two services that are producing/consuming this message type. I noticed the GenericMessageConverter also does an extra layer of character escaping, which is a little annoying but workable.)

    What're application dependencies ?

    • Rqueue Version: 2.8.1-RELEASE
    • Spring Boot Version: 1.4.3
    • Spring Messaging Version: 5.3.4
    • Spring Data Redis Version: 2.4.5
    • Any other spring library dependencies and their version: mostly 5.3.4
    bug 
    opened by damien-swarm 9
  • What's not working?

    What's not working?

    Describe the bug

    My project is composed of a user-side module, a management-side module, and a public module. I created a mail entity class in the public module, and then put the mail entity into the queue on the management side, and then consume it. My client and management The end uses the same redis, and then there is a problem today. The mail entity message queued by the management end was successfully consumed on the management end, but an error was reported on the web end. String could not be converted to a mail entity, and the deserialization failed. Makes me very confused

    exception: ERROR rqueueMessageListenerContainer-6 com.github.sonus21.rqueue.listener.RqueueExecutor - [email-queue_low] Message consumer failed org.springframework.messaging.MessagingException: An exception occurred while invoking the handler method; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.xxxx.queue.EmailEntity] for GenericMessage at com.github.sonus21.rqueue.listener.RqueueMessageHandler.processHandlerMethodException(RqueueMessageHandler.java:290) ~[rqueue-2.0.0-RELEASE.jar!/:?] at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:581) ~[spring-messaging-5.2.6.RELEASE.jar!/:5.2.6.RELEASE] at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520) ~[spring-messaging-5.2.6.RELEASE.jar!/:5.2.6.RELEASE] at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454) ~[spring-messaging-5.2.6.RELEASE.jar!/:5.2.6.RELEASE] at com.github.sonus21.rqueue.listener.RqueueExecutor.start(RqueueExecutor.java:377) [rqueue-2.0.0-RELEASE.jar!/:?] at com.github.sonus21.rqueue.listener.MessageContainerBase.run(MessageContainerBase.java:90) [rqueue-2.0.0-RELEASE.jar!/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

    How to Reproduce

    • Steps to reproduce the behavior Start two services, share a redis, and monitor the same queue. When one service generates a message, the other service sometimes reports an error

    • A sample reproducable code if possible. send code: here i user priorityCode is low private void sendEmailQueue(EmailEntity emailEntity,Integer priorityCode){ String priority ; try { priority = EmailPriorityEnum.getDescription(priorityCode);

            rqueueMessageSender.enqueueWithPriority("email-queue", priority,emailEntity);
        }catch(Exception e ){
            log.error("邮箱打入队列失败",e);
        }
      

      } consume code: @RqueueListener(value = "email-queue",numRetries="3",priority="critical=10,high=8,medium=4,low=1") public void onMessage(EmailEntity emailEntity) { log.info("邮件开始消费{},{},{}",new Date(),emailEntity.getTitle(),emailEntity.getReceiver()); try{ if(!EmptyUtils.isEmpty(emailEntity.getReceiver())){ log.info("发送html邮件至:{}", emailEntity.getReceiver()); sendHtmlEmail(emailEntity.getTitle(),emailEntity.getText(),emailEntity.getReceiver()); } }catch (EmailException e) { log.error("邮件发送失败 {} {} ",emailEntity.getReceiver(),emailEntity.getTitle()); log.error("邮件内容为--------"+emailEntity.getText()); }catch(Exception e ){ log.error("邮件消费失败",e); } }

    Library Dependencies

    • Spring Boot: 2.3.0.RELEASE
    • rquequ: 2.0.0-RELEASE
    • redis.clients.jedis: 2.8.2
    • Any other Spring library dependencies
    help wanted 
    opened by lihongze 9
  • Supporto to spring boot 3

    Supporto to spring boot 3

    What's not working?

    Sending message with RqueueMessageEnqueuer.

    What're application dependencies ?

    • Rqueue Version: 2.13.0-RELEASE
    • Spring Boot Version: 3.0.0
    • Spring Messaging Version
    • Spring Data Redis Version 3.00
    • Any other spring library dependencies and their version

    How to Reproduce (optional)?

    • Steps to reproduce the behaviour -Add Rqueue on a spring boot 3
    • A sample reproducible code if possible. private final RqueueMessageEnqueuer rqueueMessageEnqueuer; @Override public void execute(Long orderId, Long time) { log.info("Scheduling order {} to expire in {} minutes", orderId, time); rqueueMessageEnqueuer.enqueueIn(ORDER_SCHEDULE_EXPIRATION_QUEUE.getQueueName(), orderId, Duration.ofMinutes(time)); log.info("Order {} scheduled", orderId); }

    Additional Details (optional) Error msg: Consider defining a bean of type 'com.github.sonus21.rqueue.core.RqueueMessageEnqueuer' in your configuration.

    Add any other context about the problem that would be helpful like OS, Redis, Docker etc

    opened by matheuscoimbra 0
  • Even distribution of messages at consumer level?

    Even distribution of messages at consumer level?

    Lets say we have 4 producer (of same instance) who are en-queuing messages to a queue. There are 4 consumer (of same instance) with concurrency limit set to 20-25 in each consumer @RqueueListener settings. Now is it guaranteed that all consumers will get messages evenly? Or is there any way to achieve such goal so that all consumers get messages evenly from the queue like round-robin process?

    opened by nazmul-prince 1
  • Pass multi message to handler using batchSize

    Pass multi message to handler using batchSize

    Is your feature request related to a problem? Please describe. For now we're able to fetch multi messages using batchSize, and single message will be passed to executor.

    Describe the solution you'd like We want a new feature to passing the batched messages to executor. In other words, we want to execute like below.

    @RqueueListener(value = ["email-queue"], concurrency="1", batchSize = "3")
    fun handleEmailMessage(messages: Collection<GenericMessage<Any>>) {
      println("received email message: $messages")
    }
    

    I appreciate if you let me know a way with current implementation or any thought about my request.

    enhancement 
    opened by ryukato 1
  • RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue

    RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue

    What's not working?

    RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue, I basically call it but if I call RqueueMessageManager.getMessage() is still there afterwards

    While RqueueMessageManager.deleteAllMessages(queueName) does work

    What're application dependencies ?

    • Rqueue Version: 2.10.2-RELEASE
    • Spring Boot Version: 2.6.8
    • Spring Data Redis Version: 2.6.4

    Sample code:

    @Slf4j
    public abstract class AbstractTaskScheduler {
        @Value("${auction-segment.online.rqueue.enabled:true}")
        private boolean isRqueueEnabled;
        @Value("${auction-segment.online.rqueue.slack-offset:100ms}")
        private Duration slackOffset;
    
        private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
        private final RqueueMessageManager rqueueMessageManager;
        private final String queueName;
    
        protected final TaskScheduler scheduler;
        protected Map<Long, ScheduledFuture<?>> jobsMap = new HashMap<>();
    
        public AbstractTaskScheduler(
            TaskScheduler scheduler,
            RqueueMessageEnqueuer rqueueMessageEnqueuer,
            RqueueMessageManager rqueueMessageManager,
            String queueName
        ) {
            this.scheduler = scheduler;
            this.rqueueMessageEnqueuer = rqueueMessageEnqueuer;
            this.rqueueMessageManager = rqueueMessageManager;
            this.queueName = queueName;
        }
    
        @SuppressWarnings("PMD.DoNotUseThreads")
        protected abstract Runnable getTask(Long id);
    
        public List<Long> getTaskIds() {
            if (isRqueueEnabled) {
                List<Object> taskIds = rqueueMessageManager.getAllMessages(queueName);
                return !CollectionUtils.isEmpty(taskIds)
                    ? taskIds.stream().map(t -> Long.valueOf(t.toString()))
                    .collect(Collectors.toList())
                    : Collections.emptyList();
            } else {
                return jobsMap != null
                    ? new ArrayList<>(jobsMap.keySet())
                    : Collections.emptyList();
            }
        }
    
        public RqueueMessage getTaskById(Long id) {
            if (isRqueueEnabled) {
                return rqueueMessageManager.getRqueueMessage(queueName, String.valueOf(id));
            }
            return null;
        }
    
        public boolean taskExists(Long id) {
            return getTaskById(id) != null;
        }
    
        @Retryable(
            maxAttempts = 3,
            backoff = @Backoff(delay = 30),
            value = {Exception.class}
        )
        protected void addTaskToScheduler(long id, Instant startDateTime) {
            if (isRqueueEnabled) {
                log.info("adding message to {} queue - message id {} - to be started at {}", queueName, id, startDateTime);
                boolean success = rqueueMessageEnqueuer.enqueueAt(
                    queueName,
                    String.valueOf(id),
                    id,
                    startDateTime.minusMillis(slackOffset.toMillis())
                );
    
                if (!success) {
                   throw new IllegalStateException(String.format("Failed to enqueue message %s for listing %s", queueName, id));
                }
            } else {
                ScheduledFuture<?> scheduledTask = scheduler.schedule(getTask(id), startDateTime);
                jobsMap.put(id, scheduledTask);
            }
        }
    
        public void removeTaskFromScheduler(long id) {
            if (isRqueueEnabled) {
                log.info("removing message from {} queue - message id {}", queueName, id);
                rqueueMessageManager.deleteMessage(queueName, String.valueOf(id));
            } else {
                ScheduledFuture<?> scheduledTask = jobsMap.get(id);
                if(scheduledTask != null) {
                    scheduledTask.cancel(true);
                    jobsMap.put(id, null);
                }
            }
        }
    
        @EventListener({ ContextRefreshedEvent.class })
        protected abstract void contextRefreshedEvent();
    }
    

    I am not sure if this is related to this part of your code:

      @Override
      public boolean deleteMessage(String queueName, String messageId, Duration duration) {
        String lockValue = UUID.randomUUID().toString();
        try {
          if (lockManager.acquireLock(messageId, lockValue, Duration.ofSeconds(1))) {
            String id = RqueueMessageUtils.getMessageMetaId(queueName, messageId);
            MessageMetadata messageMetadata = rqueueMessageMetadataDao.get(id);
            if (messageMetadata == null) {
              messageMetadata = new MessageMetadata(id, MessageStatus.DELETED);
            }
            messageMetadata.setDeleted(true);
            messageMetadata.setDeletedOn(System.currentTimeMillis());
            save(messageMetadata, duration);
            return true;
          }
        } finally {
          lockManager.releaseLock(messageId, lockValue);
        }
        return false;
      }
    

    you are not releasing the lock here when it enters in the IF

    not-a-bug 
    opened by pcastroadc 5
  • With active-active + cluster mode enabled often messages get processed twice

    With active-active + cluster mode enabled often messages get processed twice

    We had originally implemented a scheduler using your library with Redis elasticache in AWS with cluster mode disabled. Every thing seemed to work out great. For context, our service in production that implements the scheduler runs 3 instances. The scheduled task was always dispatched to a single instance as expected.

    We have now tried to switch to an enterprise version of redis with cluster mode enabled and an active-active database. By that I mean that we are running our service in two regions (x3 instances per region) both pointing to different redis databases (i.e. redis in region A and redis in region b) that are replicated in between.

    What're application dependencies ?

    • Rqueue Version: 2.8.0-RELEASE
    • Spring Boot Version: 2.4.4
    • Spring Data Redis Version: 2.4.6

    How to Reproduce (optional)?

    • This happens maybe 1 in every 10 times we schedule a job. *

    We aren't running under any load as we currently only really run smoke tests. We schedule a job with a wait of x seconds. x seconds later the job is consumed and executed by two instances. The instances are always in different regions. I'm guessing library was just never built to handle this case?

    limitation 
    opened by Dbasdeo 7
Releases(v2.12.0)
  • v2.12.0(Dec 23, 2022)

    What's Changed

    • Message scheduler reliability, some times message consumers stop suddenly due to failure with Redis
    • Display DateTime based on the current time zone setting

    Full Changelog: https://github.com/sonus21/rqueue/compare/v2.10.2...2.12

    Source code(tar.gz)
    Source code(zip)
  • v2.10.2(Jul 16, 2022)

    What's Changed

    Fixes

    • Message status is not enqueued only, but it should be successful, failed etc
    • Weighted queue with size 1 was not working
    • Deleted message reappears

    Features

    • Disable Rqueue using rqueue.enabled flag

    Full Changelog: https://github.com/sonus21/rqueue/compare/v2.10.1...v2.10.2

    Source code(tar.gz)
    Source code(zip)
  • v2.10.1(Oct 27, 2021)

  • v2.10.0(Oct 10, 2021)

    Fixes

    • Fixes for post processor calls (post processor calls were not made)
    • Fixes message move message count (by default 1000 messages are moved)
    • Potential issue in rename collection
    • More than one (-) sign in the dashboard
    • Fixes for server context path. Rqueue end points would be served relative to x-forwarded-prefix/server.servlet.context-path

    Features

    • Display completed jobs in the dashboard
    • Option to choose number of days in the chart
    Source code(tar.gz)
    Source code(zip)
  • v2.8.1(Jul 19, 2021)

    Option to add rqueue web url prefix, the prefix is configured from application.properties file using rqueue.web.url.prefix=my-application, now rqueue dashboard would be served at my-application/rqueue instead of /rqueue, the configuration has higher priority than the HTTP request header x-forwarded-prefix.

    Source code(tar.gz)
    Source code(zip)
  • v2.8.0(Jun 8, 2021)

    Added

    • Pause/Unpause queue from dashboard
    • Pause/Unpause queue programatically
    • Batch message fetching
    • Default queue priority to WEIGHTED
    • Added an API to update the visibility timeout of running job

    Fixes

    • Producer mode is not honored in Message scheduler
    • Message scheduler disable flag is not honored
    • Aggregator should not be running in producer mode
    • Listener concurrency is not reached, even though messages are in queue
    • Register queue in producer mode for all listener methods
    Source code(tar.gz)
    Source code(zip)
  • v2.7.0(Apr 13, 2021)

    Fixes

    • Spring Boot App could not start due to class not found error Boot 2.0
    • Utility UI message move not working due to invalid data type

    Added

    • Support for Reactive Redis and Spring Webflux
    • Delete message metadata when rqueue.message.durability.in-terminal-state is less than equal to zero
    • Delete job detail when rqueue.job.durability.in-terminal-state is less than equal to zero
    Source code(tar.gz)
    Source code(zip)
  • v2.6.1(Mar 17, 2021)

  • v2.6.0(Mar 17, 2021)

  • v2.5.0(Feb 9, 2021)

  • v2.4.0(Feb 6, 2021)

    • Job Middleware: One or more middleware can be added in the message listener flow, each middleware is called in the order they have been added. A middleware can skip processing of current message usingjob.release method.
    • Delay execution of message when it's moved to enqueue instead of consuming it immediately.
    Source code(tar.gz)
    Source code(zip)
  • v2.3.0(Jan 2, 2021)

    Added

    • Job checkin for long-running tasks
    • Display job and failure details in UI for each message
    • Allow deleting messages from normal and scheduled queues instead of only dead letter queue.
    • Scan only required beans for RqueueListener annotated methods

    Fixes

    • Redis string deserialization issue, string were inserted without quote''
    • Dashboard CSP rule error for inline javascript
    • Double minus sign (--) in UI

    Miscellaneous

    • Delete message metadata along with messages using background job
    • Potential error for a periodic message, if period was longer than 24 hours
    • Add retry limit exceeded messages at the front of dead letter queue instead at the back.
    Source code(tar.gz)
    Source code(zip)
  • v2.1.1(Sep 24, 2020)

  • v2.1.0(Sep 16, 2020)

    Added

    • Allow application to provide message id while enqueuing messages
    • Unique message enqueue
    • Api to check if message was enqueued or not
    • Api to delete single message
    • Proxy for outbound http connection
    • Enqueue list of objects and process them, like batch-processing

    Fixes:

    • Registered queues should not be deleted when used in producer mode
    Source code(tar.gz)
    Source code(zip)
  • v2.0.4(Aug 4, 2020)

  • v2.0.2(Jul 13, 2020)

  • v2.0.1(May 17, 2020)

    1. Fixed issue in queue deletion
    2. Fixed bug of argument mismatch
    3. New apis to enqueue messages using enqueueAt
    4. Refined apis for enqueueIn using Duration, TimeUnit
    Source code(tar.gz)
    Source code(zip)
  • v2.0.0(May 10, 2020)

    New Features

    • Web Interface
      • Web interface to visualize queue
      • Latency visualizer
      • Delete message from the queue
      • Move message from one queue to another
    • Allow prefixing redis keys to avoid accidental key delete
    • Allow deactivating a consumer in a given environment
    • Redis cluster support
    • Queue concurrency
    • Queue priority (Weighted and strict)
    • Queue priority at group level
    • Queue priority at sub queue level like critical, high, medium, low

    Breaking Changes

    • Queue names are prefixed, version 1.0 users need to set a redis key __rq::version with value 1
    • Renamed annotation field maxJobExecutionTime to visibilityTimeout

    Fixes

    • Spring Optional Micrometer, in older version config class was importing micrometer related classes, that could lead to error if classes are not found. In this version now code depends on bean name using DependsOn annotation.
    • Complete isolation of Redis, allow application to configure one Redis for the application and one for the Rqueue
    Source code(tar.gz)
    Source code(zip)
  • v1.3(Dec 11, 2019)

    1. Expose 6 queue metrics using micrometer. (queue-size, delay queue size, processing queue size, dead letter queue size, execution counter, failure counter)
    2. Fix an issue in scheduler that's getting scheduled at the delay of 5 seconds. [this leads to messages are not copied from delayed queue to main queue on high load.
    3. An api to move messages from dead letter queue to other queue (any source queue to target queue).
    Source code(tar.gz)
    Source code(zip)
  • v1.2(Nov 18, 2019)

    • A message can be delayed for an arbitrary period of time or delivered immediately.
    • Multiple messages can be consumed in parallel by different workers.
    • Message delivery: It's guaranteed that a message is consumed at least once. (Message would be consumed by a worker more than once due to the failure in the underlying worker/restart-process etc, otherwise exactly one delivery)
    Source code(tar.gz)
    Source code(zip)
hibernate redis 二级缓存使用 spring boot redis 配置

hibernate-redisson-spring-boot-starter 介绍 使用 Redisson 作为 hibernate redis 二级缓存提供器,redisson-hibernate-53(hibernate-version) 无法使用 spring boot 配置文件创建的 org

null 4 Jul 3, 2022
Hi, Spring fans! In this installment we look Spring Integration's support for MQTT and the HiveMQ broker

Spring Integration MQTT & HiveMQ Hi, Spring fans! In this installment we look Spring Integration's support for MQTT and the HiveMQ broker. I'm joined

Spring Tips 5 Nov 21, 2022
Flash Sale System AKA. seckill system

FlashSaleSystem Project highlights Distributed system scheme From a single machine to a cluster, it is easy to scale horizontally simply by adding ser

wsbleek 12 Sep 13, 2022
循序渐进,学习Spring Boot、Spring Boot & Shiro、Spring Batch、Spring Cloud、Spring Cloud Alibaba、Spring Security & Spring Security OAuth2,博客Spring系列源码:https://mrbird.cc

Spring 系列教程 该仓库为个人博客https://mrbird.cc中Spring系列源码,包含Spring Boot、Spring Boot & Shiro、Spring Cloud,Spring Boot & Spring Security & Spring Security OAuth2

mrbird 24.8k Jan 6, 2023
A high availability shopping(ecommerce) system using SpringBoot, Spring Cloud, Eureka Server, Spring Cloud Gateway, resillience4j, Kafka, Redis and MySQL.

High-availability-shopping-system A high availability shopping(ecommerce) system using SpringBoot, Spring Cloud, Eureka Server, Spring Cloud Gateway,

LeiH 1 Oct 26, 2022
MQTT broker(java实现高性能的、可扩展、支持集群)

SMQTT是一款开源的MQTT消息代理Broker, SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件! smqtt目前拥有的功能如下: 消息质量等级实现(支持q

quickmsg 485 Jan 6, 2023
An Open-Source, Distributed MQTT Broker for IoT.

简体中文 | English MMQ broker MMQ broker 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序。 MMQ broker 完整支持MQTT V3.1 和 V3.1.1。 特征 分布式MQTT服务 万级连接数并发(3台

Solley 60 Dec 15, 2022
Implementation of Greedy Particle Swarm Optimization, HSGA and Hybrid(GA+PSO) for the purpose of Task Scheduling in cloud computing environment using CloudSim

Implementation of Greedy Particle Swarm Optimization, HSGA and Hybrid(GA+PSO) for the purpose of Task Scheduling in cloud computing environment using CloudSim

Yash Jain 5 Dec 18, 2022
Task tracker application

TaskManager Three-level project architecture (data layer, domain layer, representation layer) with an additional layer linking the representation laye

Yaroslav Novichkov 1 Feb 4, 2022
My task for " The Sparks Foundation

MyBank This is a Sparks Foundation GRIP (Graduate Rotational Internship Program) Technology Task. Task 2: Basic Banking App ◇ Create a simple mobile a

Sanjeev 1 Oct 19, 2021
Tuya 37 Dec 26, 2022
🦄 开源社区系统:基于 SpringBoot + MyBatis + MySQL + Redis + Kafka + Elasticsearch + Spring Security + ... 并提供详细的开发文档和配套教程。包含帖子、评论、私信、系统通知、点赞、关注、搜索、用户设置、数据统计等模块。

Echo — 开源社区系统 项目上线到服务器之后可能会出现各种各样的 BUG,比如 Elasticsearch 服务启动失败导致搜索模块不可用,但是在本地运行是完全没问题的,所以各位小伙伴可以放心下载部署。 ?? 项目简介 Echo 是一套前后端不分离的开源社区系统,基于目前主流 Java Web

小牛肉 434 Jan 7, 2023
Spring-boot application using redis as a caching database

Java Spring-boot application using Redis as a caching database Running Application Entities involved Two main entities are involved MasterHouse (maste

null 18 Aug 9, 2022
Spring Data Redis extensions for better search, documents models, and more

Object Mapping (and more) for Redis! Redis OM Spring extends Spring Data Redis to take full advantage of the power of Redis. Project Stage Snapshot Is

Redis 303 Dec 29, 2022
该仓库中主要是 Spring Boot 的入门学习教程以及一些常用的 Spring Boot 实战项目教程,包括 Spring Boot 使用的各种示例代码,同时也包括一些实战项目的项目源码和效果展示,实战项目包括基本的 web 开发以及目前大家普遍使用的线上博客项目/企业大型商城系统/前后端分离实践项目等,摆脱各种 hello world 入门案例的束缚,真正的掌握 Spring Boot 开发。

Spring Boot Projects 该仓库中主要是 Spring Boot 的入门学习教程以及一些常用的 Spring Boot 实战项目教程,包括 Spring Boot 使用的各种示例代码,同时也包括一些实战项目的项目源码和效果展示,实战项目包括基本的 web 开发以及目前大家普遍使用的前

十三 4.5k Dec 30, 2022
This app is simple and awesome notepad. It is a quick notepad editing experience when writing notes,emails,message,shoppings and to do list.

This app is simple and awesome notepad. It is a quick notepad editing experience when writing notes,emails,message,shoppings and to do list.It is easy to use and enjoy hassle free with pen and paper.

Md Arif Hossain 1 Jan 18, 2022
Slueth(Zipkin) 를 통한 SQS Message Tracing POC(Proof of concept) 입니다.

Sleuth AWS SQS POC 해당 프로젝트는 Slueth(Zipkin) 를 통한 메시지 추적 POC(Proof of concept) 입니다. Rest API 를 통해 POST 요청을 받으면, 메시지를 발행/소비 합니다. 이 과정에서 유지되는 TraceId 를 확인

Hyunjin Jeong 10 Nov 29, 2022
Plugin for Spigot, PaperMC, BungeeCord and Velocity to create custom MOTDs, playercount message and playercount hover with priorities and conditions.

AdvancedServerList AdvancedServerList is a server and proxy plugin that allows you to create custom MOTDs and more in your server list with priority a

Andre_601 19 Dec 14, 2022
React Native app demonstrating using xmtp-js to send a message

XMTP React Native Example This is a work in progress to demonstrate importing xmtp-js into a React Native app. The example currently generates a rando

XMTP 19 Dec 20, 2022