Powerful event-bus optimized for high throughput in multi-threaded applications. Features: Sync and Async event publication, weak/strong references, event filtering, annotation driven

Overview

build status maven central javadoc wiki

MBassador

MBassador is a light-weight, high-performance event bus implementing the publish subscribe pattern. It is designed for ease of use and aims to be feature rich and extensible while preserving resource efficiency and performance.

The core of MBassador is built around a custom data structure that provides non-blocking reads and minimized lock contention for writes such that performance degradation of concurrent read/write access is minimal. Benchmarks that illustrate the advantages of this design are available in this github repository.

wiki

The code is production ready: 86% instruction coverage, 82% branch coverage with randomized and concurrently run test sets, no major bug has been reported in the last 18 month. No modifications to the core will be made without thoroughly testing the code.

Usage | Features | Installation | Wiki | Release Notes | Integrations | Credits | Contribute | License

Usage

Using MBassador in your project is very easy. Create as many instances of MBassador as you like (usually a singleton will do) bus = new MBassador(), mark and configure your message handlers with @Handler annotations and finally register the listeners at any MBassador instance bus.subscribe(aListener). Start sending messages to your listeners using one of MBassador's publication methods bus.post(message).now() or bus.post(message).asynchronously().

As a first reference, consider this illustrative example. You might want to have a look at the collection of examples to see its features on more detail.

      
// Define your handlers

@Listener(references = References.Strong)
class SimpleFileListener{

    @Handler
    public void handle(File file){
      // do something with the file
    }
    
    @Handler(delivery = Invoke.Asynchronously)
    public void expensiveOperation(File file){
      // do something with the file
    }
    
    @Handler(condition = "msg.size >= 10000")
    @Enveloped(messages = {HashMap.class, LinkedList.class})
    public void handleLarge(MessageEnvelope envelope) {
       // handle objects without common super type
    }

}

// somewhere else in your code

MBassador bus = new MBassador();
bus.subscribe (new SimpleFileListener());
bus.post(new File("/tmp/smallfile.csv")).now();
bus.post(new File("/tmp/bigfile.csv")).asynchronously();

Features

Annotation driven

Annotation Function
@Handler Mark a method as message handler
@Listener Can be used to customize listener wide configuration like the used reference type
@Enveloped A message envelope can be used to pass messages of different types into a single handler
@Filter Add filtering to prevent certain messages from being published

Delivers everything, respects type hierarchy

Messages do not need to implement any interface and can be of any type. The class hierarchy of a message is considered during message delivery, such that handlers will also receive subtypes of the message type they consume for - e.g. a handler of Object.class receives everything. Messages that do not match any handler result in the publication of a DeadMessage object which wraps the original message. DeadMessage events can be handled by registering listeners that handle DeadMessage.

Synchronous and asynchronous message delivery

There are two types of (a-)synchronicity when using MBassador: message dispatch and handler invocation. Message dispatch

Synchronous dispatch means that the publish method blocks until all handlers have been processed. Note: This does not necessarily imply that each handler has been invoked and received the message - due to the possibility to combine synchronous dispatch with asynchronous handlers. This is the semantics of publish(Object obj) and post(Objec obj).now()

Asynchronous dispatch means that the publish method returns immediately and the message will be dispatched in another thread (fire and forget). This is the semantics of publishAsync(Object obj) and post(Objec obj).asynchronously()

Handler invocation

Synchronous handlers are invoked sequentially and from the same thread within a running publication. Asynchronous handlers means that the actual handler invocation is pushed to a queue that is processed by a pool of worker threads.

Configurable reference types

By default, MBassador uses weak references for listeners to relieve the programmer of the need to explicitly unsubscribe listeners that are not used anymore and avoid memory-leaks. This is very comfortable in container managed environments where listeners are created and destroyed by frameworks, i.e. Spring, Guice etc. Just add everything to the bus, it will ignore objects without handlers and automatically clean-up orphaned weak references after the garbage collector has done its job.

Instead of using weak references, a listener can be configured to be referenced using strong references using @Listener(references=References.Strong). Strongly referenced listeners will stick around until explicitly unsubscribed.

Message filtering

MBassador offers static message filtering. Filters are configured using annotations and multiple filters can be attached to a single message handler. Since version 1.2.0 Java EL expressions in @Handler are another way to define conditional message dispatch. Messages that have matching handlers but do not pass the configured filters result in the publication of a FilteredMessage object which wraps the original message. FilteredMessage events can be handled by registering listeners that handle FilteredMessage.

Note: Since version 1.3.1 it is possible to wrap a filter in a custom annotation for reuse


    public static final class RejectAllFilter implements IMessageFilter {

        @Override
        public boolean accepts(Object event,  SubscriptionContext context) {
            return false;
        }
    }

    @IncludeFilters({@Filter(RejectAllFilter.class)})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RejectAll {}
    
    public static class FilteredMessageListener{
    
        // will cause republication of a FilteredEvent
        @Handler
        @RejectAll
        public void handleNone(Object any){
            FilteredEventCounter.incrementAndGet();
        }

        
    }

Enveloped messages

Message handlers can declare to receive an enveloped message using Enveloped. The envelope can wrap different types of messages to allow a single handler to handle multiple, unrelated message types.

Handler priorities

A handler can be associated with a priority to influence the order in which messages are delivered when multiple matching handlers exist

Custom error handling

Errors during message delivery are sent to all registered error handlers which can be added to the bus as necessary.

Extensibility

MBassador is designed to be extensible with custom implementations of various components like message dispatchers and handler invocations (using the decorator pattern), metadata reader (you can add your own annotations) and factories for different kinds of objects. A configuration object is used to customize the different configurable parts, see Features

Installation

MBassador is available from the Maven Central Repository using the following coordinates:

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>{see.git.tags.for.latest.version}</version>
</dependency>

You can also download binary release and javadoc from the maven central repository. Of course you can always clone the repository and build from source.

Documentation

There is ongoing effort to extend documentation and provide code samples and detailed explanations of how the message bus works. Code samples can also be found in the various test cases. Please read about the terminology used in this project to avoid confusion and misunderstanding.

Integrations

There is a spring-extension available to support CDI-like transactional message sending in a Spring environment. This is a good example of integration with other frameworks. Another example is the Guice integration.

Credits

The initial inspiration for creating this component comes from Google Guava's event bus implementation. I liked the simplicity of its design and I trust in the code quality of google libraries. Unfortunately it uses strong references only.

Thanks to all contributors, especially

Many thanks also to ej-technologies for providing an open source license of JProfiler and Jetbrains for a license of IntelliJ IDEA

OSS used by MBassador: jUnit | maven | mockito | slf4j | Odysseus JUEL

Contribute

Pick an issue from the list of open issues and start implementing. Make your PRs small and provide test code! Take a look at this issue for a good example.

Note: Due to the complexity of the data structure and synchronization code it took quite a while to get a stable core. New features will only be implemented if they do not require significant modification to the core. The primary focus of MBassador is to provide high-performance extended pub/sub.

Sample code and documentation are both very appreciated contributions. Especially integration with different frameworks is of great value. Feel free and welcome to create Wiki pages to share your code and ideas. Example: Guice integration

License

This project is distributed under the terms of the MIT License. See file "LICENSE" for further reference.

Comments
  • Send a message to a specific listener

    Send a message to a specific listener

    Hello! is there a way to post a message to a specific listener. e.g: Listener1 - with receiverId field value = 1067 Listener2 - with receiverId field value = 2098 both listeners are handling same SuperMessage class

    bus.post(message, 1067);

    And only Listener1 will receive a message as a result

    P.S: Maybe it can be implemented as a Filter

    opened by vrudikov 19
  • Concurrent map

    Concurrent map

    starting place for enhancements. This by itself isn't a huge improvement, as ultimately the async threading model is what needs to be tweaked. ConcurrentHashMapV8 really shines once those tweaks are made.

    Changing the IConcurrentSet -> Set, simplifies testing with different data structures, since there are many different Weak/Strong/Soft concurrent implementations available.

    opened by dorkbox 14
  • bugfix: exception raised by error handler terminates the async msg dispatcher

    bugfix: exception raised by error handler terminates the async msg dispatcher

    Exception in async event handler followed by exception in error handler prevents further asyc messages to be dispatched. Submitting bugfix + UT that covers the scenario

    opened by yaronyam 12
  • mbassador in the GWT environment

    mbassador in the GWT environment

    In the GWT or SmartGWT I found two issues:

    1. The library cannot be used because there is no GWT module.
    2. If I try to include the source to be compiled in the GWT environment it won’t compile due to the reference to threading API.

    My request is that one of the two solutions below to be provided:

    A. A version of the library that 1) also has a GWT module for inheritance and 2) has no reference to threading API

    B. A source version which includes no reference to threading API that can be included and compiled along with the application code. A is preferred but I understand the B is less work. Either would be usable.

    Just as a motivator I believe that Web applications are trending to the AJAX paradigm and also that GWT and/or SmartGWT is very popular with Java developers. As you've pointed out event driven UI applications is a great loosely coupled paradigm and helps support the MVC pattern. This is a great library that could be used with a GWT application and reduce a lot of hard-wired code.

    GWT's built-in EvetBus is pretty message, complicated, hard to read.

    Thanks.

    enhancement 
    opened by engineeringatreplystream 12
  • Error Handler Not Recognized

    Error Handler Not Recognized

    After upgrading to MBassador 1.2.1, I've simplified my event bus initialization code by using the default constructor, while still opting to add an error handler via the MBassador object itself, instead of in a BusConfiguration:

    final MBassador <Event> eventBus = new MBassador <> ();
    
    eventBus.addErrorHandler (new IPublicationErrorHandler() {
      @Override
      public void handleError (final PublicationError error) {
        log.error (error.toString(), error.getCause());
      }
    });
    

    However, I now receive the following warning, which only began to occur in 1.2.1:

    WARN: No error handler configured to handle exceptions during publication.
        Error handlers can be added to any instance of  AbstractPubSubSupport or via BusConfiguration.
        Falling back to console logger.
    

    What's the point of allowing the the above method of adding an error handler in conjunction with a default constructor, if it doesn't recognize it? I hope it's just a simple bug...

    I really don't want to have to do this, although it does eliminate the error message:

    final MBassador <Event> eventBus = new MBassador <> (new BusConfiguration()
             .addFeature (Feature.SyncPubSub.Default())
             .addFeature (Feature.AsynchronousHandlerInvocation.Default())
             .addFeature (Feature.AsynchronousMessageDispatch.Default())
             .setProperty (Properties.Handler.PublicationError, new IPublicationErrorHandler() {
                    @Override
                    public void handleError (final PublicationError error) {
                      log.error (error.toString(), error.getCause());
                    }
    }));
    

    P.S. Your warning log message has an extra space between the two words of AbstractPubSubSupport.

    opened by ghost 11
  • Listeners loaded in different class loader are not called.

    Listeners loaded in different class loader are not called.

    We have a plugin based system, Mbassador works very well till the event bus and listeners are loaded by same class loader, however when listeners are in different class loader, even if they are subscribed, they will not get called.

    Here's our scenario.

    • Main application holds a singleton instance of event bus
    • Main application creates a new class loader for each plugin and loads the plugin.
    • Once the plugin is loaded, plugin manager gives newly loaded plugin, a chance to register its listeners and passes the event bus as parameter. Here plugin subscribes listeners.
    • When main application receives a message, it will publish it using event bus.
    • Listeners in plugins (different class loader) will not get called.

    Tried @Listener(references=References.Strong) too.

    bug wontfix 
    opened by snimavat 11
  • Unit Tests failing

    Unit Tests failing

    Just downloaded mbassador to try out in code I'm writing. The following tests are failing for me:

    1. net.engio.mbassy.MBassadorTest.testAsynchronousMessagePublication()
    2. net.engio.mbassy.SyncBusTest.testSynchronousMessagePublication()
    3. net.engio.mbassy.DeadMessageTest.testDeadMessage()

    They all seem to fail in the same way: the expected count and the actual count of iterations run are not equal. Each time I run, I get different values for the actual iterations, so my expectation is that something isn't waiting around for the concurrent tests to complete. I've glanced briefly at the code and it looks as though it ought to work, but it still fails.

    In addition, if I run the tests using the debugger some of the failing tests succeed (although its not the same set every time), but running outside of the debugger all three fail every time.

    My configuration is as follows:

    Win 7, 32GB RAM, IBM RSA 8.5 (running Java 1.6)

    opened by sdussin 11
  • Method `addPublicationErrorHandler` not found on `BusConfiguration` class in 1.2.1.

    Method `addPublicationErrorHandler` not found on `BusConfiguration` class in 1.2.1.

    Good: The method BusConfiguration::addPublicationErrorHandler method is found in the source code on GitHub. Perhaps that is the unreleased 1.2.2 code?

    Bad: That method is not found by NetBeans 8.0.2 on Java 8 Update 51 when using a dependency of 1.2.1. I have no other problems with MBassador except this one method. Commenting out that one method calls makes the compiler happy.

    Example app:

    package space.dawg.mavenprojectbogus;
    
    import net.engio.mbassy.bus.MBassador;
    import net.engio.mbassy.bus.common.Properties;
    import net.engio.mbassy.bus.config.BusConfiguration;
    import net.engio.mbassy.bus.config.Feature;
    
    public class Main
    {
    
        public static void main ( String[] args ) {
            System.out.println( "BASIL - sRunning main method of Bogus." );
    
            MBassador globalBus = new MBassador( new BusConfiguration()
                    .addFeature( Feature.SyncPubSub.Default() )
                    .addFeature( Feature.AsynchronousHandlerInvocation.Default() )
                    .addFeature( Feature.AsynchronousMessageDispatch.Default() )
                    .addPublicationErrorHandler( null )
                    .setProperty( Properties.Common.Id , "global bus" )
            );
    
        }
    
    }
    

    …with this pom.xml…

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>space.dawg</groupId>
        <artifactId>mavenprojectBogus</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
        <dependencies>
            <dependency>
                <groupId>net.engio</groupId>
                <artifactId>mbassador</artifactId>
                <version>1.2.1</version>
            </dependency>
        </dependencies>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
        </properties>
    </project>
    

    …reports this compiler error…

    cannot find symbol
      symbol:   method addPublicationErrorHandler(<null>)
      location: interface IBusConfiguration
    

    Passing null is not the issue. Code-completion of the addPublicationErrorHandler method fails, as the compiler cannot identify any such method.

    opened by basil-bourque 9
  • Need best practices example of construction.

    Need best practices example of construction.

    Unfortunately both examples of constructing MBassador in the documentation are incorrect. The instructions say to use new MBassador(BusConfiguration.SyncAsync()), which has been deprecated (at least in version 1.2.0). And the code has new MBassador<TestMessage>(), which doesn't even work --- there doesn't appear to be a default constructor for MBassador. The BusConfiguration documentation says to "Use feature driven configuration instead". But what does that mean? What are best practices? What should I do by default?

    Supposedly MBassador is very simple to get up and running. The documentation should at least show correctly how to do that. Thanks.

    opened by garretwilson 8
  • Messages pooling

    Messages pooling

    Hi! Is it possible to pool created messages? I mean, that I can simply obtain new messages/events from my own pool, but I can't find a proper moment to free those objects (put them back into the pool).

    enhancement 
    opened by Namek 8
  • Concurrent access to HashMap

    Concurrent access to HashMap

    In AbstractSyncMessageBus field subscriptionsPerListener is accessed asynchronously. Simplest solution would be to use ReadWriteLock or ConcurrentHashMap. Please see this discussion:

    http://www.coderanch.com/t/232685/threads/java/Concurrent-access-HashMap

    bug 
    opened by ikoblik 8
  • Build fails using Maven 3.8.3 and JDK 19.0.1 on Arch Linux

    Build fails using Maven 3.8.3 and JDK 19.0.1 on Arch Linux

    $ java -version
    openjdk version "19.0.1" 2022-10-18
    OpenJDK Runtime Environment (build 19.0.1+11)
    OpenJDK 64-Bit Server VM (build 19.0.1+11, mixed mode, sharing)
    $ mvn --version
    Apache Maven 3.8.3 (ff8e977a158738155dc465c6a97ffaf31982d739)
    Maven home: /opt/maven
    Java version: 19.0.1, vendor: BellSoft, runtime: /opt/jdk-19.0.1-full
    Default locale: en_CA, platform encoding: UTF-8
    OS name: "linux", version: "6.0.12-arch1-1", arch: "amd64", family: "unix"
    $ cd /tmp
    $ git clone https://github.com/bennidi/mbassador
    $ cd mb*
    $ mvn package
    

    Expected Build succeeds, archives created.

    Actual Build fails.

    From surefire-report.html.txt:

       [Summary] [Package List] [Test Cases]
       testHandlePublicationError_handlers_present_async
       java.lang.ExceptionInInitializerError:
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_constrcut_with_handler_async
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_raises_exception
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_construct_with_handler_sync
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_handlers_present_sync
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_default_construct_async
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_no_handlers_present_construct_with_config_as
       ync
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
       testHandlePublicationError_default_construct_sync
       java.lang.NoClassDefFoundError: Could not initialize class
       org.mockito.internal.creation.cglib.ClassImposterizer$3
       net.engio.mbassy.bus.AbstractPubSubSupportTest:
    
    opened by DaveJarvis 0
  • Potential lambda enhancement

    Potential lambda enhancement

    I wanted to find a way to use lambdas with this library, so that listeners can be created like this:

    MBassadorLFP<Date> bus = new MBassadorLFP<Date>();
    var listener1 = bus.subscribe(d -> {
    	System.out.println("event consumer 1: " + d);
    });
    

    I was able to achieve this by adding 2 methods and a couple of hacks that depend on this library: https://github.com/jhalterman/typetools/

    The methods are:

    public Consumer<T> subscribe(Consumer<T> listener);
    
    public Consumer<T> subscribe(Consumer<T> listener, HandlerOptions handlerOptions);
    

    This allows me to simplify code as such:

    Code:

    MBassadorLFP<Date> bus = new MBassadorLFP<Date>();
    var listener1 = bus.subscribe(d -> {
    	System.out.println("event consumer 1: " + d);
    });
    Consumer<Date> listener2 = d -> {
    	System.out.println("event consumer 2: " + d);
    };
    bus.subscribe(listener2);
    System.out.println("should print 1 & 2");
    bus.post(new Date()).now();
    bus.unsubscribe(listener1);
    System.out.println("should print 2");
    bus.post(new Date()).now();
    bus.unsubscribe(listener2);
    System.out.println("should print none");
    bus.post(new Date()).now();
    System.err.println("done");
    

    Output:

    should print 1 & 2
    event consumer 2: Thu Feb 25 18:30:13 EST 2021
    event consumer 1: Thu Feb 25 18:30:13 EST 2021
    should print 2
    event consumer 2: Thu Feb 25 18:30:13 EST 2021
    should print none
    

    The HandlerOptions class represents a builder class for the handler annotation. It is accessed in a hacky way by tracking threads. This works for me, but it'd be great if there was a way to have something similar implimented without the current thread hack. Below are the two class modifications

    MBassadorLFP.java

    @SuppressWarnings("rawtypes")
    public class MBassadorLFP<T> extends MBassador<T> {
    
    	private static final Object CONSUMER_ACCEPT_METHOD_NAME = "accept";
    
    	private static final Map<Thread, HandlerOptions> HANDLER_OPTIONS_TRACKER = new ConcurrentHashMap<>();
    
    	/**
    	 * Default constructor using default setup. super() will also add a default
    	 * publication error logger
    	 */
    	public MBassadorLFP() {
    		this(getDefaultConfiguration());
    	}
    
    	/**
    	 * Construct with default settings and specified publication error handler
    	 *
    	 * @param errorHandler
    	 */
    	public MBassadorLFP(IPublicationErrorHandler errorHandler) {
    		this(getDefaultConfiguration().addPublicationErrorHandler(errorHandler));
    	}
    
    	/**
    	 * Construct with fully specified configuration
    	 *
    	 * @param configuration
    	 */
    	public MBassadorLFP(IBusConfiguration configuration) {
    		super(modifyConfiguration(configuration));
    	}
    
    	public Consumer<T> subscribe(Consumer<T> listener) {
    		return subscribe(listener, null);
    	}
    
    	public Consumer<T> subscribe(Consumer<T> listener, HandlerOptions handlerOptions) {
    		if (handlerOptions == null)
    			handlerOptions = new HandlerOptions();
    		var thread = Thread.currentThread();
    		HANDLER_OPTIONS_TRACKER.put(thread, handlerOptions);
    		try {
    			subscribe((Object) listener);
    		} finally {
    			HANDLER_OPTIONS_TRACKER.remove(thread);
    		}
    		return listener;
    	}
    
    	private static IBusConfiguration modifyConfiguration(IBusConfiguration configuration) {
    		if (configuration == null)
    			configuration = getDefaultConfiguration();
    		var syncPubSub = configuration.getFeature(Feature.SyncPubSub.class);
    		if (syncPubSub == null) {
    			syncPubSub = Feature.SyncPubSub.Default();
    			configuration.addFeature(syncPubSub);
    		}
    		var delegate = syncPubSub.getMetadataReader();
    		syncPubSub.setMetadataReader(new MetadataReader() {
    
    			@Override
    			public MessageListener getMessageListener(Class target) {
    				MessageListener messageListener;
    				if (delegate != null)
    					messageListener = delegate.getMessageListener(target);
    				else
    					messageListener = super.getMessageListener(target);
    				return modifyMessageListener(target, messageListener);
    			}
    		});
    		return configuration;
    	}
    
    	private static MessageListener modifyMessageListener(Class target, MessageListener messageListener) {
    		if (!Consumer.class.isAssignableFrom(target))
    			return messageListener;
    		var handlers = messageListener.getHandlers();
    		if (handlers != null && handlers.length > 0)
    			return messageListener;
    		MessageHandler messageHandler = getConsumerMessageHandler(target, messageListener);
    		if (messageHandler != null)
    			messageListener.addHandler(messageHandler);
    		return messageListener;
    	}
    
    	@SuppressWarnings("unchecked")
    	private static MessageHandler getConsumerMessageHandler(Class target, MessageListener messageListener) {
    		var handlerOptions = HANDLER_OPTIONS_TRACKER.get(Thread.currentThread());
    		if (handlerOptions == null)
    			return null;
    		var applyMethods = getConsumerAcceptMethods(target);
    		if (applyMethods.length != 1)
    			return null;
    //*** WHERE THINGS GET SPICY ***
    		var rawArgument = TypeResolver.resolveRawArgument(Consumer.class, target);
    		if (rawArgument == null || TypeResolver.Unknown.class.equals(rawArgument))
    			return null;
    		var handler = applyMethods[0];
    		Handler handlerConfig = handlerOptions.toHandler();
    		Enveloped enveloped = new Enveloped() {
    
    			@Override
    			public Class<? extends Annotation> annotationType() {
    				return Enveloped.class;
    			}
    
    			@Override
    			public Class[] messages() {
    				return new Class[] { rawArgument };
    			}
    		};
    		if (!handlerConfig.enabled())
    			return null;
    		var handlerProperties = MessageHandler.Properties.Create(handler, handlerConfig, enveloped,
    				new IMessageFilter[0], messageListener);
    		handlerProperties.put(MessageHandler.Properties.Enveloped, false);
    		MessageHandler handlerMetadata = new MessageHandler(handlerProperties);
    		return handlerMetadata;
    	}
    
    	private static Method[] getConsumerAcceptMethods(Class target) {
    		var methods = ReflectionUtils.getMethods(m -> {
    			if (Modifier.isAbstract(m.getModifiers()))
    				return false;
    			if (!CONSUMER_ACCEPT_METHOD_NAME.equals(m.getName()))
    				return false;
    			if (m.getParameterCount() != 1)
    				return false;
    			if (!Object.class.isAssignableFrom(m.getParameterTypes()[0]))
    				return false;
    			return true;
    		}, target);
    		return methods;
    	}
    
    	private static IBusConfiguration getDefaultConfiguration() {
    		return new BusConfiguration().addFeature(Feature.SyncPubSub.Default())
    				.addFeature(Feature.AsynchronousHandlerInvocation.Default())
    				.addFeature(Feature.AsynchronousMessageDispatch.Default());
    	}
    
    }
    

    HandlerOptions.java

    public class HandlerOptions {
    
    	private static final Method HandlerOptions_toHandler_METHOD;
    	static {
    		try {
    			HandlerOptions_toHandler_METHOD = HandlerOptions.class.getMethod("toHandler");
    		} catch (NoSuchMethodException e) {
    			throw new RuntimeException(e);
    		}
    	}
    
    	private Filter[] filters;
    	private String condition;
    	private Invoke delivery;
    	private Integer priority;
    	private Boolean rejectSubtypes;
    	private Boolean enabled;
    	private Class<? extends HandlerInvocation> invocation;
    
    	@Handler
    	public Handler toHandler() {
    		var handler = HandlerOptions_toHandler_METHOD.getAnnotation(Handler.class);
    		return new Handler() {
    
    			@Override
    			public Class<? extends Annotation> annotationType() {
    				return handler.annotationType();
    			}
    
    			@Override
    			public Filter[] filters() {
    				return Optional.ofNullable(filters).orElse(handler.filters());
    			}
    
    			@Override
    			public String condition() {
    				return Optional.ofNullable(condition).orElse(handler.condition());
    			}
    
    			@Override
    			public Invoke delivery() {
    				return Optional.ofNullable(delivery).orElse(handler.delivery());
    			}
    
    			@Override
    			public int priority() {
    				return Optional.ofNullable(priority).orElse(handler.priority());
    			}
    
    			@Override
    			public boolean rejectSubtypes() {
    				return Optional.ofNullable(rejectSubtypes).orElse(handler.rejectSubtypes());
    			}
    
    			@Override
    			public boolean enabled() {
    				return Optional.ofNullable(enabled).orElse(handler.enabled());
    			}
    
    			@Override
    			public Class<? extends HandlerInvocation> invocation() {
    				return Optional.ofNullable(invocation).orElse((Class) handler.invocation());
    			}
    		};
    	}
    
    	public HandlerOptions() {
    
    	}
    
    	public HandlerOptions(Filter[] filters, String condition, Invoke delivery, Integer priority, Boolean rejectSubtypes,
    			Boolean enabled, Class<? extends HandlerInvocation> invocation) {
    		this.filters = filters;
    		this.condition = condition;
    		this.delivery = delivery;
    		this.priority = priority;
    		this.rejectSubtypes = rejectSubtypes;
    		this.enabled = enabled;
    		this.invocation = invocation;
    	}
    
    	public HandlerOptions setFilters(Filter[] filters) {
    		this.filters = filters;
    		return HandlerOptions.this;
    	}
    
    	public Filter[] getFilters() {
    		return this.filters;
    	}
    
    	public HandlerOptions setCondition(String condition) {
    		this.condition = condition;
    		return HandlerOptions.this;
    	}
    
    	public String getCondition() {
    		return this.condition;
    	}
    
    	public HandlerOptions setDelivery(Invoke delivery) {
    		this.delivery = delivery;
    		return HandlerOptions.this;
    	}
    
    	public Invoke getDelivery() {
    		return this.delivery;
    	}
    
    	public HandlerOptions setPriority(Integer priority) {
    		this.priority = priority;
    		return HandlerOptions.this;
    	}
    
    	public Integer getPriority() {
    		return this.priority;
    	}
    
    	public HandlerOptions setRejectSubtypes(Boolean rejectSubtypes) {
    		this.rejectSubtypes = rejectSubtypes;
    		return HandlerOptions.this;
    	}
    
    	public Boolean getRejectSubtypes() {
    		return this.rejectSubtypes;
    	}
    
    	public HandlerOptions setEnabled(Boolean enabled) {
    		this.enabled = enabled;
    		return HandlerOptions.this;
    	}
    
    	public Boolean getEnabled() {
    		return this.enabled;
    	}
    
    	public HandlerOptions setInvocation(Class<? extends HandlerInvocation> invocation) {
    		this.invocation = invocation;
    		return HandlerOptions.this;
    	}
    
    	public Class<? extends HandlerInvocation> getInvocation() {
    		return this.invocation;
    	}
    
    }
    
    
    opened by regbo 0
  • Q: How to create a blocking wait for messages programmatically ?

    Q: How to create a blocking wait for messages programmatically ?

    Hi all, this sounds a bit weird, but I need something like a blocking calls that waits for a message to arrive. Just like the plain JAVA concurrent queues, but with all the PROs of mbassador.

    Background is that my msg receiver must run in a specific (thread) environment, that must be the same from one handler call to the other. I have no influence on that enviroment.

    Thanks for any hint.

    Regards Gerry

    documentation 
    opened by Gerry33 2
  • Explain the supported use cases of generics in message handlers

    Explain the supported use cases of generics in message handlers

    #101 Shows an unsupported use case of generics in mbassador. The extent to which generics are supported and the current limitations should be clearly documented.

    documentation 
    opened by bennidi 0
  • IMessageFilter Context?

    IMessageFilter Context?

    Is it currently possible to use the SubscriptionContext of IMessageFIlter in a way that allows you to get the currently-being-tested handler's actual object rather than just the method that will be called upon if the filtering is passed? I can't seem to find it, if it actually exists at the moment.

    enhancement 
    opened by GregoryHlavac 1
Releases(1.3.0)
Owner
Benjamin Diedrichsen
Benjamin Diedrichsen
Event bus for Android and Java that simplifies communication between Activities, Fragments, Threads, Services, etc. Less code, better quality.

EventBus EventBus is a publish/subscribe event bus for Android and Java. EventBus... simplifies the communication between components decouples event s

Markus Junginger 24.2k Jan 3, 2023
A distributed event bus that implements a RESTful API abstraction on top of Kafka-like queues

Nakadi Event Broker Nakadi is a distributed event bus broker that implements a RESTful API abstraction on top of Kafka-like queues, which can be used

Zalando SE 866 Dec 21, 2022
Demo for schema references feature on the Confluent Schema Registry

Schema references demos This project aims to showcase the schema references feature on Confluent Schema Registry. Two distinct use case are considered

David Araujo 7 Sep 5, 2022
EventStoreDB is the database for Event Sourcing. This repository provides a sample of event sourced system that uses EventStoreDB as event store.

Event Sourcing with EventStoreDB Introduction Example Domain Event Sourcing and CQRS 101 State-Oriented Persistence Event Sourcing CQRS Advantages of

Evgeniy Khyst 53 Dec 15, 2022
HornetQ is an open source project to build a multi-protocol, embeddable, very high performance, clustered, asynchronous messaging system.

HornetQ If you need information about the HornetQ project please go to http://community.jboss.org/wiki/HornetQ http://www.jboss.org/hornetq/ This file

HornetQ 245 Dec 3, 2022
A high available,high performance distributed messaging system.

#新闻 MetaQ 1.4.6.2发布。更新日志 MetaQ 1.4.6.1发布。更新日志 MetaQ 1.4.5.1发布。更新日志 MetaQ 1.4.5发布。更新日志 Meta-ruby 0.1 released: a ruby client for metaq. SOURCE #介绍 Meta

dennis zhuang 1.3k Dec 12, 2022
SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of massive data (offline & real-time).

SeaTunnel SeaTunnel was formerly named Waterdrop , and renamed SeaTunnel since October 12, 2021. SeaTunnel is a very easy-to-use ultra-high-performanc

The Apache Software Foundation 4.4k Jan 2, 2023
High Performance Inter-Thread Messaging Library

LMAX Disruptor A High Performance Inter-Thread Messaging Library Maintainer LMAX Development Team Support Open a ticket in GitHub issue tracker Google

LMAX Group 15.5k Jan 9, 2023
Tiny and fast event dispatcher.

HookDispatcher - Tiny and fast event dispatcher. Installation Gradle repositories { maven { url 'https://jitpack.io' } } dependencies { imple

null 8 Dec 7, 2021
Plugin for keycloak that serves as an event listener, displaying user information in the log when there are registration and login events

Keycloak - Event listener Details Plugin for keycloak that serves as an event listener, displaying user information in the log when there are registra

José alisson 2 Jan 14, 2022
A modular and portable open source XMPP client library written in Java for Android and Java (SE) VMs

Smack About Smack is an open source, highly modular, easy to use, XMPP client library written in Java for Java SE compatible JVMs and Android. A pure

Ignite Realtime 2.3k Dec 28, 2022
Microservice-based online payment system for customers and merchants using RESTful APIs and message queues

Microservice-based online payment system for customers and merchants using RESTful APIs and message queues

Daniel Larsen 1 Mar 23, 2022
Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Firehose - Firehose is an extensible, no-code, and cloud-native service to load real-time streaming data from Kafka to data stores, data lakes, and analytical storage systems.

Open DataOps Foundation 279 Dec 22, 2022
gMark: a domain- and query language-independent graph instance and query workload generator

gMark is a domain- and query language-independent graph instance and query workload generator.

Roan 3 Nov 19, 2022
Efficient reliable UDP unicast, UDP multicast, and IPC message transport

Aeron Efficient reliable UDP unicast, UDP multicast, and IPC message transport. Java and C++ clients are available in this repository, and a .NET clie

Real Logic 6.3k Jan 9, 2023
Apache Camel is an open source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data.

Apache Camel Apache Camel is a powerful, open-source integration framework based on prevalent Enterprise Integration Patterns with powerful bean integ

The Apache Software Foundation 4.7k Dec 31, 2022
Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Allegro Tech 742 Jan 3, 2023
This repository contains a functional example of an order delivery service similar to UberEats, DoorDash, and Instacart.

Order Delivery Microservice Example In an event-driven microservices architecture, the concept of a domain event is central to the behavior of each se

Kenny Bastani 198 Dec 7, 2022
Dataflow template which read data from Kafka (Support SSL), transform, and outputs the resulting records to BigQuery

Kafka to BigQuery Dataflow Template The pipeline template read data from Kafka (Support SSL), transform the data and outputs the resulting records to

DoiT International 12 Jun 1, 2021