Library for composability of interdependent non-blocking I/O tasks

Overview

Composer

Release License Android Arsenal

Composer helps you to organize and execute multiple interdependent asynchronous input/output tasks such as webservice calls, database read/writes and file i/o together with concurrency support using java.util.concurrent APIs. It is compatible with Java 8 & above on all JVM based platforms including Android.

Here is an example of how you can use Composer to create a chain of tasks. Consider a scenario where you want to get an associated Twitter account details for your app user, fetch different kinds of twitter data for that user, show them on app UI and then track the event in your analytics database. All of these tasks are asynchronous (except refreshing the UI) and dependent on each other.

Composer.startWith(currentUser.getUserId(), err -> logger.error("Error executing tasks", err))
        .thenExecute(userId -> { return accountService.getTwitterAccountDetails(userId); })
        .thenContinueIf(response -> response.status.isOk())
        .thenExecuteTogether(
            Results::new,
            response -> twitterService.getTweets(response.username), 
            response -> twitterService.getMedia(response.username), 
            response -> twitterService.getFollowers(response.username), 
        )
        .thenWaitFor(results -> { refreshUI(results); })
        .thenExecute(() -> { analyticsDb.trackEvent("get_twitter_details"); })
        .thenFinish();

Please note that Composer does not aim to provide an extensible API for managing asynchronous tasks. Instead, it aims to provide a minimal, easy to use API which can be useful for the scenarios where interdependency between such tasks forces you to write boilerplate code for managing state, validating conditions or handling errors. Most client-side mobile/web applications and backend services communicating with each other require an extensible framework in which interdependent asynchronous tasks can be glued together. Composer serves only specific use cases and it may not be a good fit all the use cases, especially when having an extensibe asynchronous framework is critical to the application.

For detailed usage information, please refer Getting Started section.

Table of Contents

Setup

  • Gradle:
allprojects {
        repositories {
                ...
                maven { url 'https://jitpack.io' }
        }
}

dependencies {
        implementation 'com.github.krupalshah:Composer:2.0.1'
}
  • Maven:
<repositories>
        <repository>
            <id>jitpack.io</id>
            <url>https://jitpack.io</url>
        </repository>
</repositories>

<dependency>
    <groupId>com.github.krupalshah</groupId>
    <artifactId>Composer</artifactId>
    <version>2.0.1</version>
</dependency>

Getting Started

Overview

The API consists of an interface Composable and its implementation Composer. The implementation serves as an entrypoint and returns Composable at each step of execution until chaining is discontinued.

Use startWith() to create your first Composable like below:

Composer.startWith(someInputOrTask, err -> logger.error("Error executing tasks", err))

The first param is only required if you want to pass some pre-known value as an input, or a task that may produce the same.
The second param ErrorStream receives all errors during execution.

If you don't have any pre-known input or task, you can simply create your first Composable by just providing an ErrorStream like below:

Composer.startWith(err -> logger.error("Error executing tasks", err))

Use thenFinish() to discontinue further chaining and complete the awaiting task execution. Between startWith and thenFinish, chain your tasks according to their dependencies.

Chaining tasks

A Task can be of type

  • SimpleTask if it takes no input and returns no output.
  • ConsumingTask<Input> if it takes an input but returns no output.
  • ProducingTask<Output> if it takes no input but returns an output.
  • TransformingTask<Input,Output> if it takes an input and converts it into output.

Consider a very straightforward scenario in which some independent data is to be fetched from remote data source via webservice, converted into csv format, written to a file, and an email is to triggered when all of this is done.

Given this information, a chain can be as written as below:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err))
        .thenExecute(response -> { return converter.convertToCsv(response.data); })
        .thenExecute(csv -> { writer.writeCsvFile(csv); })
        .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); })
        .thenFinish();

Each step returns Composable, which can be detached and glued wherever required:

Composable<String> myComposable = Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err))
        .thenExecute(response -> { return converter.convertToCsv(response.data); })

doSomething();

myComposable.thenExecute(csv -> { writer.writeCsvFile(csv); })
        .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); })
        .thenFinish();

Please note that chained tasks are executed asynchronously by default. Hence, in the above example there is no guarantee that doSomething() will be run after the data is converted to csv. If something needs to be executed synchronously in-between, chain it as specified under Executing task synchronously section.

Executing tasks concurrently

Different method variants have been provided to execute multiple tasks concurrently. All you have to do is to specify a collection of tasks to be executed in parallel. The order of execution is never guaranteed.

  • Executing multiple tasks

Consider a slight modification in the previous scenario where converted csv is persisted in the database along with a file.

In that case, both tasks can be executed concurrently using thenExecuteTogether() variants like below:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err))
        .thenExecute(response -> { return converter.convertToCsv(response.data); })
        .thenExecuteTogether( 
            csv -> writer.writeCsvFile(csv),
            db.storeCsv(csv) //both tasks will be executed concurrently
        )
        .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); })
        .thenFinish();
  • Collecting output from multiple tasks

In the cases where a task produces an output, concurrent variants can execute any number of tasks with the same type of output, or maximum three tasks with different types of output.

Such tasks will require a Collector as an additional parameter. A Collector collects results from multiple producer tasks and returns something which can hold those results.

Consider a modification in the first scenario where data is to be converted into multiple formats such as csv, xml and yaml. In that case, we can use concurrent method variants and collect results like below:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err))
        .thenExecuteTogether(
                (response, csv, xml, yaml) -> new ConvertedData(csv, xml, yaml), //ConvertedData is a pojo returned from collector to hold outputs from concurrently executing tasks
                response -> converter.convertToCsv(response.data),
                response -> converter.convertToXml(response.data),
                response -> converter.convertToYaml(response.data)
        )
        .thenExecuteTogether(
            convertedData -> writer.writeCsvFile(convertedData.csv),
            convertedData -> writer.writeXmlFile(convertedData.xml),
            convertedData -> writer.writeYamlFile(convertedData.yaml)
        )
        .thenExecute(() -> mailer.sendEmail("All Tasks Completed"))
        .thenFinish();
  • Iterating over upstream results

In the cases where an upstream output contains a collection, and you want to execute a task concurrently for each value in that collection, use thenExecuteForEach() variants.

Consider a scenario where you need to fetch some posts from a service and then fetch comments for each post in the response. In that case, you will need to expand the upstream response to a collection of posts, provide the task to be executed concurrently for each post and finally collect the comments grouped by posts like below:

Composer.startWith(() -> service.fetchPosts(), err -> logger.error("Error executing tasks", err))
        .thenExecuteForEach(
                response -> response.getPosts(), //provide a collection to iterate over
                post -> service.fetchComments(post), //this task will be applied for each post in the list
                (response, postAndComments) -> new GroupedData(postAndComments) //collector will receive results as pairs of <Post,List<Comment>> assuming that the service is retuning the list of comments for a specific post
        )
        .thenExecute(data -> { db.insertPostsAndComments(data); })
        .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); })
        .thenFinish();

Validating task output

A task output must be non-null. Any task in a chain that receives null as an input will discontinue further execution.

Use thenContinueIf() to validate the task output before it is used as an input of dependent tasks. If the condition specified returns false, you will receive a ComposerException on the ErrorStream provided. Further execution will be discontinued and downstream consuming tasks will receive null as a result.

For example, in the first scenario, consider that you want to check the status and size of the data in response before converting to csv:

Composer.startWith(() -> service.fetchData(), err -> logger.error("Error executing tasks", err))
        .thenContinueIf(response -> response.status.isOk() && !response.data.isEmpty()) //this will discontinue further execution if the specified condition returns false.
        .thenExecute(response -> { return converter.convertToCsv(response.data); })
        .thenExecute(csv -> { writer.writeCsvFile(csv); })
        .thenExecute(() -> { mailer.sendEmail("All Tasks Completed"); })
        .thenFinish();

Executing task synchronously

By default, all tasks will be executed asynchronously. If you want to execute something synchronously on the same thread the method has been called (in most cases - the application main thread), thenWaitFor variants can be used like below:

Composer.startWith(() -> produceSomething(), err -> logger.error("Error executing tasks", err))
        .thenWaitFor(data -> { showOnUI(data); })
        .thenFinish();

Providing custom executor service

Finally, Composer uses an ExecutorService that creates a cached thread pool internally. If you want to provide your custom executor service, pass it as a third param of startWith() like below (not recommended unless required):

Composer.startWith(() -> produceSomething(), err -> logger.error("Error executing tasks", err), customExecutorService)

Change Log

  • 2.0.1

    • Minor changes to avoid compiler warnings.
  • 2.0.0

    • This release contains breaking changes. Major API refactorings include renaming all methods to reduce verbosity.
    • Collection parameters in then..Together variants have been replaced with varargs.
  • 1.0.1

    • Fixed a bug where an ErrorStream was not transmitting errors synchronously.

Licence

Copyright 2020 Krupal Shah

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
You might also like...

Simple & Lightweight Netty packet library + event system

Minimalistic Netty-Packet library Create packets with ease Bind events to packets Example Packet: public class TestPacket extends Packet { privat

Dec 7, 2022

A Minecraft library for working with minecraft packets on various platforms, using MCProtocolLib

BetterProtocol A Minecraft library for working with minecraft packets on various platforms, using MCProtocolLib This library is still based on the pro

Jul 2, 2022

Telegram API Client and Telegram BOT API Library and Framework in Pure java.

Javagram Telegram API Client and Telegram Bot API library and framework in pure Java. Hello Telegram You can use Javagram for both Telegram API Client

Oct 17, 2021

๐Ÿงšโ€โ™€๏ธ Java library to interact with YouTrack's REST API.

YouTrack API for Java ๐Ÿงšโ€ Java library to interact with YouTrack's REST API.

Oct 1, 2021

This is library that look like Scarlet Wrapper Socket.io

This is library that look like Scarlet Wrapper Socket.io

Jan 2, 2023

An netty based asynchronous socket library for benchion java applications

An netty based asynchronous socket library for benchion java applications

Benchion Sockets Library An netty based asynchronous socket library for benchion java applications ๐Ÿ“– Documents ๐Ÿ“– Report Bug ยท Request Feature Conten

Dec 25, 2022

Tasks Planner : A minimalist collaborative app for scheduling and managing your tasks with the team and getting notifications through discord.

Tasks Planner : A minimalist collaborative app for scheduling and managing your tasks with the team and getting notifications through discord.

Tasks Planner โœจ Overview Tasks planner is a minimalist collaborative app for planning and managing your tasks with the team and get notifications thro

Dec 1, 2022

Low-overhead, non-blocking I/O, external Process implementation for Java

Low-overhead, non-blocking I/O, external Process implementation for Java

NuProcess NuProcess is proud to power Facebook's Buck build. A low-overhead, non-blocking I/O, external Process execution implementation for Java. It

Dec 29, 2022

High performance non-blocking webserver

Undertow Undertow is a Java web server based on non-blocking IO. It consists of a few different parts: A core HTTP server that supports both blocking

Jan 1, 2023

Magician is an asynchronous non-blocking network protocol analysis package, supports TCP, UDP protocol, built-in Http, WebSocket decoder

Magician is an asynchronous non-blocking network protocol analysis package, supports TCP, UDP protocol, built-in Http, WebSocket decoder

An asynchronous non-blocking network protocol analysis package Project Description Magician is an asynchronous non-blocking network protocol analysis

Nov 30, 2022

Non-Blocking Reactive Foundation for the JVM

Non-Blocking Reactive Foundation for the JVM

Reactor Core Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming s

Dec 30, 2022

vสŒvr (formerly called Javaslang) is a non-commercial, non-profit object-functional library that runs with Java 8+. It aims to reduce the lines of code and increase code quality.

vสŒvr (formerly called Javaslang) is a non-commercial, non-profit object-functional library that runs with Java 8+. It aims to reduce the lines of code and increase code quality.

Vavr is an object-functional language extension to Java 8, which aims to reduce the lines of code and increase code quality. It provides persistent co

Jan 3, 2023

Blocking the your minecraft plugins to show from server member

BSP Blocking your minecraft server plugins to show. ํ•œ๊ตญ์–ด README Features Blocking your minecraft server plugins to show. Custom Events. Install Install

Jan 22, 2022

Protect your Spigot server against IP forwarding exploits, as well as blocking unknown BungeeCord and/or Velocity proxies.

Sentey Protect your Spigot server against IP forwarding exploits, as well as blocking unknown BungeeCord and/or Velocity proxies. But firewalls are a

Dec 28, 2022

Distributed lock for your scheduled tasks

Distributed lock for your scheduled tasks

ShedLock ShedLock makes sure that your scheduled tasks are executed at most once at the same time. If a task is being executed on one node, it acquire

Jan 7, 2023

A small client useful for a variety of tasks ranging from raiding to duping.

A small client useful for a variety of tasks ranging from raiding to duping.

CornClient A small utility mod for Minecraft useful for a variety of tasks ranging from raiding to duping. Support You can open an issue for help or f

Jan 4, 2022

Force clear delaying & no longer needed Gradle tasks.

Force clear delaying & no longer needed Gradle tasks.

gradle-cleaner-intellij-plugin Force clear delaying & no longer needed Gradle tasks. Description Plugin for Intellij IDEA which performs simple comman

Oct 28, 2021

tasks, async await, actors and channels for java

AsyncUtils tasks, async, await, actors and channels for java This project tries to explore several approaches to simplify async/concurrent programming

Dec 1, 2022

A smart personal voice assistant powered by Alan AI. Enjoy hands free application that can manage daily tasks

Todogenix A smart personal voice assistant using Alan AI. Intro Todogenix is a personal assistant app powered by Alan AI that helps store and manage o

Mar 15, 2022
Releases(1321527775)
Owner
krupal
krupal
tasks, async await, actors and channels for java

AsyncUtils tasks, async, await, actors and channels for java This project tries to explore several approaches to simplify async/concurrent programming

Michael Hoffer 6 Dec 1, 2022
TCP/UDP client/server library for Java, based on Kryo

KryoNet can be downloaded on the releases page. Please use the KryoNet discussion group for support. Overview KryoNet is a Java library that provides

Esoteric Software 1.7k Jan 2, 2023
An annotation-based Java library for creating Thrift serializable types and services.

Drift Drift is an easy-to-use, annotation-based Java library for creating Thrift clients and serializable types. The client library is similar to JAX-

null 225 Dec 24, 2022
A Java library that implements a ByteChannel interface over SSLEngine, enabling easy-to-use (socket-like) TLS for Java applications.

TLS Channel TLS Channel is a library that implements a ByteChannel interface over a TLS (Transport Layer Security) connection. It delegates all crypto

Mariano Barrios 149 Dec 31, 2022
Java library for representing, parsing and encoding URNs as in RFC2141 and RFC8141

urnlib Java library for representing, parsing and encoding URNs as specified in RFC 2141 and RFC 8141. The initial URN RFC 2141 of May 1997 was supers

SLUB 24 May 10, 2022
A High Performance Network ( TCP/IP ) Library

Chronicle-Network About A High Performance Network library Purpose This library is designed to be lower latency and support higher throughputs by empl

Chronicle Software : Open Source 231 Dec 31, 2022
Unconventional I/O library for Java

one-nio one-nio is a library for building high performance Java servers. It features OS capabilities and JDK internal APIs essential for making your h

OK.ru 589 Dec 29, 2022
A Java library for capturing, crafting, and sending packets.

Japanese Logos Pcap4J Pcap4J is a Java library for capturing, crafting and sending packets. Pcap4J wraps a native packet capture library (libpcap, Win

Kaito Yamada 1k Dec 30, 2022
Full-featured Socket.IO Client Library for Java, which is compatible with Socket.IO v1.0 and later.

Socket.IO-client Java This is the Socket.IO Client Library for Java, which is simply ported from the JavaScript client. See also: Android chat demo en

Socket.IO 5k Jan 4, 2023
Asynchronous Http and WebSocket Client library for Java

Async Http Client Follow @AsyncHttpClient on Twitter. The AsyncHttpClient (AHC) library allows Java applications to easily execute HTTP requests and a

AsyncHttpClient 6k Dec 31, 2022