RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Overview

RxJava: Reactive Extensions for the JVM

codecov.io Maven Central

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Version 3.x (Javadoc)

  • single dependency: Reactive-Streams
  • Java 8+ (Android desugar friendly)
  • Java 8 lambda-friendly API
  • fixed API mistakes and many limits of RxJava 2
  • intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
  • non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.)
  • async or synchronous execution
  • virtual time and schedulers for parameterized concurrency
  • test and diagnostic support via test schedulers, test consumers and plugin hooks

Learn more about RxJava in general on the Wiki Home.

ℹ️ Please read the What's different in 3.0 for details on the changes and migration information when upgrading from 2.x.

Version 2.x

The 2.x version is end-of-life as of February 28, 2021. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 2.2.21, will remain accessible.

Version 1.x

The 1.x version is end-of-life as of March 31, 2018. No further development, support, maintenance, PRs and updates will happen. The Javadoc of the very last version, 1.3.8, will remain accessible.

Getting started

Setting up the dependency

The first step is to include RxJava 3 into your project, for example, as a Gradle compile dependency:

implementation "io.reactivex.rxjava3:rxjava:3.x.y"

(Please replace x and y with the latest version numbers: Maven Central )

Hello World

The second is to write the Hello World program:

package rxjava.examples;

import io.reactivex.rxjava3.core.*;

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.just("Hello world").subscribe(System.out::println);
    }
}

Note that RxJava 3 components now live under io.reactivex.rxjava3 and the base classes and interfaces live under io.reactivex.rxjava3.core.

Base classes

RxJava 3 features several base classes you can discover operators on:

Some terminology

Upstream, downstream

The dataflows in RxJava consist of a source, zero or more intermediate steps followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means):

source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

Here, if we imagine ourselves on operator2, looking to the left towards the source is called the upstream. Looking to the right towards the subscriber/consumer is called the downstream. This is often more apparent when each element is written on a separate line:

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

Objects in motion

In RxJava's documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated to the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

Assembly time

The preparation of dataflows by applying various intermediate operators happens in the so-called assembly time:

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

At this point, the data is not flowing yet and no side-effects are happening.

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals:

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Simple background computation

One of the common use cases for RxJava is to run some computation, network request on a background thread and show the results (or error) on the UI thread:

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior. To illustrate, the example can be rewritten as follows:

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Typically, you can move computations or blocking IO to some other thread via subscribeOn. Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.

Schedulers

RxJava operators don't work with Threads or ExecutorServices directly but with so-called Schedulers that abstract away sources of concurrency behind a uniform API. RxJava 3 features several standard schedulers accessible via Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

These are available on all JVM platforms but some specific platforms, such as Android, have their own typical Schedulers defined: AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui().

In addition, there is an option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).

The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.

Concurrency within a flow

Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.

Parallel processing

Processing the numbers 1 to 10 in parallel is a bit more involved:

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.

Note, however, that flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved. There are alternative operators:

  • concatMap that maps and runs one inner flow at a time and
  • concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.

Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.

Dependent

The most typical scenario is to given a value, invoke another service, await and continue with its result:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

Here, the original value will be available inside the inner flatMap, courtesy of lambda variable capture.

Non-dependent

In other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, flatMap works as well:

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

however, the continuation in this case stays Observable instead of the likely more appropriate Single. (This is understandable because from the perspective of flatMapSingle, sourceObservable is a multi-valued source and thus the mapping may result in multiple values as well).

Often though there is a way that is somewhat more expressive (and also lower overhead) by using Completable as the mediator and its operator andThen to resume with something else:

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

The only dependency between the sourceObservable and the someSingleSource is that the former should complete normally in order for the latter to be consumed.

Deferred-dependent

Sometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

Unfortunately, this prints 0 because Single.just(count.get()) is evaluated at assembly time when the dataflow hasn't even run yet. We need something that defers the evaluation of this Single source until runtime when the main source completes:

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

or

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversions

Sometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, getDemandAsync could return a Single<DemandRecord>. If the code example is left unchanged, this will result in a compile-time error (however, often with a misleading error message about lack of overload).

In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type.

Converting to the desired type

Each reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options:

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Observable toFlowable2 first, firstOrError, single, singleOrError, last, lastOrError1 firstElement, singleElement, lastElement ignoreElements
Single toFlowable3 toObservable toMaybe ignoreElement
Maybe toFlowable3 toObservable toSingle ignoreElement
Completable toFlowable toObservable toSingle toMaybe

1: When turning a multi-valued source into a single-valued source, one should decide which of the many source values should be considered as the result.

2: Turning an Observable into Flowable requires an additional decision: what to do with the potential unconstrained flow of the source Observable? There are several strategies available (such as buffering, dropping, keeping the latest) via the BackpressureStrategy parameter or via standard Flowable operators such as onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest which also allow further customization of the backpressure behavior.

3: When there is only (at most) one source item, there is no problem with backpressure as it can be always stored until the downstream is ready to consume.

Using an overload with the desired type

Many frequently used operator has overloads that can deal with the other types. These are usually named with the suffix of the target type:

Operator Overloads
flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

The reason these operators have a suffix instead of simply having the same name with different signature is type erasure. Java doesn't consider signatures such as operator(Function<T, Single<R>>) and operator(Function<T, Maybe<R>>) different (unlike C#) and due to erasure, the two operators would end up as duplicate methods with the same signature.

Operator naming conventions

Naming in programming is one of the hardest things as names are expected to be not long, expressive, capturing and easily memorable. Unfortunately, the target language (and pre-existing conventions) may not give too much help in this regard (unusable keywords, type erasure, type ambiguities, etc.).

Unusable keywords

In the original Rx.NET, the operator that emits a single item and then completes is called Return(T). Since the Java convention is to have a lowercase letter start a method name, this would have been return(T) which is a keyword in Java and thus not available. Therefore, RxJava chose to name this operator just(T). The same limitation exists for the operator Switch, which had to be named switchOnNext. Yet another example is Catch which was named onErrorResumeNext.

Type erasure

Many operators that expect the user to provide some function returning a reactive type can't be overloaded because the type erasure around a Function<T, X> turns such method signatures into duplicates. RxJava chose to name such operators by appending the type as suffix as well:

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

Even though certain operators have no problems from type erasure, their signature may turn up being ambiguous, especially if one uses Java 8 and lambdas. For example, there are several overloads of concatWith taking the various other reactive base types as arguments (for providing convenience and performance benefits in the underlying implementation):

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

Both Publisher and SingleSource appear as functional interfaces (types with one abstract method) and may encourage users to try to provide a lambda expression:

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

Unfortunately, this approach doesn't work and the example does not print 2 at all. In fact, since version 2.1.10, it doesn't even compile because at least 4 concatWith overloads exist and the compiler finds the code above ambiguous.

The user in such situations probably wanted to defer some computation until the someSource has completed, thus the correct unambiguous operator should have been defer:

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

Sometimes, a suffix is added to avoid logical ambiguities that may compile but produce the wrong type in a flow:

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

This can get also ambiguous when functional interface types get involved as the type argument T.

Error handling

Dataflows can fail, at which point the error is emitted to the consumer(s). Sometimes though, multiple sources may fail at which point there is a choice whether or not wait for all of them to complete or fail. To indicate this opportunity, many operator names are suffixed with the DelayError words (while others feature a delayError or delayErrors boolean flag in one of their overloads):

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

Of course, suffixes of various kinds may appear together:

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

The base classes can be considered heavy due to the sheer number of static and instance methods on them. RxJava 3's design was heavily influenced by the Reactive Streams specification, therefore, the library features a class and an interface per each reactive type:

Type Class Interface Consumer
0..N backpressured Flowable Publisher1 Subscriber
0..N unbounded Observable ObservableSource2 Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

1The org.reactivestreams.Publisher is part of the external Reactive Streams library. It is the main type to interact with other reactive libraries through a standardized mechanism governed by the Reactive Streams specification.

2The naming convention of the interface was to append Source to the semi-traditional class name. There is no FlowableSource since Publisher is provided by the Reactive Streams library (and subtyping it wouldn't have helped with interoperation either). These interfaces are, however, not standard in the sense of the Reactive Streams specification and are currently RxJava specific only.

R8 and ProGuard settings

By default, RxJava itself doesn't require any ProGuard/R8 settings and should work without problems. Unfortunately, the Reactive Streams dependency since version 1.0.3 has embedded Java 9 class files in its JAR that can cause warnings with the plain ProGuard:

Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher

It is recommended one sets up the following -dontwarn entry in the application's proguard-ruleset file:

-dontwarn java.util.concurrent.Flow*

For R8, the RxJava jar includes the META-INF/proguard/rxjava3.pro with the same no-warning clause and should apply automatically.

Further reading

For further details, consult the wiki.

Communication

Versioning

Version 3.x is in development. Bugfixes will be applied to both 2.x and 3.x branches, but new features will only be added to 3.x.

Minor 3.x increments (such as 3.1, 3.2, etc) will occur when non-trivial new functionality is added or significant enhancements or bug fixes occur that may have behavioral changes that may affect some edge cases (such as dependence on behavior resulting from a bug). An example of an enhancement that would classify as this is adding reactive pull backpressure support to an operator that previously did not support it. This should be backwards compatible but does behave differently.

Patch 3.x.y increments (such as 3.0.0 -> 3.0.1, 3.3.1 -> 3.3.2, etc) will occur for bug fixes and trivial functionality (like adding a method overload). New functionality marked with an @Beta or @Experimental annotation can also be added in the patch releases to allow rapid exploration and iteration of unstable new functionality.

@Beta

APIs marked with the @Beta annotation at the class or method level are subject to change. They can be modified in any way, or even removed, at any time. If your code is a library itself (i.e. it is used on the CLASSPATH of users outside your control), you should not use beta APIs, unless you repackage them (e.g. using ProGuard, shading, etc).

@Experimental

APIs marked with the @Experimental annotation at the class or method level will almost certainly change. They can be modified in any way, or even removed, at any time. You should not use or rely on them in any production code. They are purely to allow broad testing and feedback.

@Deprecated

APIs marked with the @Deprecated annotation at the class or method level will remain supported until the next major release but it is recommended to stop using them.

io.reactivex.rxjava3.internal.*

All code inside the io.reactivex.rxjava3.internal.* packages are considered private API and should not be relied upon at all. It can change at any time.

Full Documentation

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

Example for Gradle:

implementation 'io.reactivex.rxjava3:rxjava:x.y.z'

and for Maven:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

and for Ivy:

<dependency org="io.reactivex.rxjava3" name="rxjava" rev="x.y.z" />

Snapshots

Snapshots are available via https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}

JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot

Build

To build:

$ git clone [email protected]:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build

Further details on building can be found on the Getting Started page of the wiki.

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright (c) 2016-present, RxJava Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Comments
  • Idiomatic Scala Support

    Idiomatic Scala Support

    As of version 0.11.0 Scala support is provided through the use of implicits. Conversations on Twitter are bringing up other possible improvements. Let's use this issue to discuss.

    opened by benjchristensen 96
  • 2.x: Handling null values

    2.x: Handling null values

    With the upcoming RxJava2 release one of the important changes is that null is no longer accepted as a stream element.

    Honestly, I have mixed feelings about this change and part of me understands that it will enforce clean APIs, but I can see a number of use cases when this might be a problem.

    For instance, in my app I have an in-memory cache:

    @Nullable CacheItem findCacheItem(long id);
    

    CacheItem might not be present in cache, so method might return null value.

    The way it is used with Rx* - is as following:

    Observable<CacheItem> getStream(final long id) {
        return Observable.fromCallable(new Callable<CacheItem>() {
            @Override public CacheItem call() throws Exception {
                return findCacheItem(id);
            }
        });
    }
    

    So with this approach, I might get null in my stream which is totally valid situation, so it is handled properly on receiving side - let's say UI changes its state if item is not present in cache:

    Observable.just(user)
              .map(user -> user.getName())
              .map(name -> convertNameToId(name))
              .flatMap(id -> getStream(id))
              .map(cacheItem -> getUserInfoFromCacheItem(cacheItem))
              .subscribe(
                  userInfo -> {
                      if(userInfo != null) showUserInfo();
                      else showPrompt();
                  }
              );
    

    With RxJava2 I am no longer allowed to post null down the stream, so I either need to wrap my CacheItem into some other class and make my stream produce that wrapper instead or make quite big architectural changes.

    Wrapping every single stream element into nullable counterpart doesn't look right to me.

    Am I missing something fundamental here?

    It seems like the situation like mine is quite popular, so Im curious what is the recommended strategy to tackle this problem given new "no null" policy in RxJava2?

    Question 
    opened by paveldudka 89
  • OnSubscribeRedo - fix race conditions

    OnSubscribeRedo - fix race conditions

    While searching for the cause of #2863 I bumped into this race condition (which doesn't fix #2863):

    If a request is made between L238 and L239 then consumerCapacity may become Long.MAX_VALUE on arriving at L239 in which case we don't wish to decrement it. To fix, used compareAndSet.

    What is interesting about this fix is that in the test loop of 5000 in OperatorRetryTest I see many more occurrences of the failure on average (3 -> 50) presumably because the extra time to perform the compareAndSet action has expanded the window for the race condition causing the failures.

    Bug 
    opened by davidmoten 83
  • Observer reference is not released on unsubscribe

    Observer reference is not released on unsubscribe

    With the following code http://pastebin.com/VUZ5aApe and using the latest version of rxjava (0.18.4),

    rotations should result in the activity calling unsubscribe on my subscription.

    This should release the reference to my observer, which should in turn release the reference to my activity.

    However, the memory tests I've done show that as I rotate the phone, my activity count keeps going up, which means the reference to my activity is not being released.

    (let me know if the pastebin expires for some reason, I'll get something back up)

    opened by fdoyle 74
  • Roadmap to 1.0

    Roadmap to 1.0

    I want everyone to know where we're heading and what's left before we hit 1.0.

    From the beginning we have allowed ourselves to do breaking changes on each 0.x release as we were aware that the design was not finished when we started this project. We are getting close to being done. Our goal is to release 1.0 in the coming months after stabilizing the API so that it can be relied upon without breaking every couple months.

    Project Structure

    When we hit 1.0 we intend on splitting out the language adaptors into their own top-level projects such as RxScala, RxClojure, RxGroovy, RxKotlin, RxJRuby etc.

    This will allow each project to iterate as needed at their own pace, especially since some will need to continue iterating while the RxJava core stabilizes. For example, if RxScala needs breaking changes it can bump it's major version while RxJava does not. This is particularly important to RxScala for handling changes such as Scala 2.10 -> 2.11 -> 2.12 etc.

    Major contrib modules will also be moved out, such as RxAndroid which also needs its own life-cycle.

    Outstanding Work

    The major items of work to be finished before 1.0 are:

    • ~~Backpressure: https://github.com/Netflix/RxJava/issues/1000~~ [Completed]
    • ~~Serialization Behavior: https://github.com/Netflix/RxJava/issues/998~~ [Completed in new merge implementation]
    • ~~Scheduler API: https://github.com/Netflix/RxJava/issues/997~~ Completed
    • ~~Remove all deprecated methods/classes~~ [Completed] #1053 #1621
    • ~~Finish migrating all operators to using lift and chaining the Subscription via Subscriber (current priority)~~ [Completed]

    The primary goal is to nail down the public API. New functionality can come in 1.1, 1.2, etc. The secondary goal is for all operators to work as advertised (regarding unsubscribe, back pressure and non-blocking). There will always be bugs, that's why 1.x.y will still be active after release, but the desire is to not need to ship 2.x soon after 1.x as this is a low level library that once entrenched becomes hard to migrate (we create significant pain at Netflix on each 0.x release).

    Going Forward

    Please comment if you feel there are other critical things to achieve before 1.0. The fastest way to getting us to 1.0 is helping us achieve the work stated above.

    Information 
    opened by benjchristensen 59
  • Version 0.17.0 Release Notes [Preview]

    Version 0.17.0 Release Notes [Preview]

    #0.17.0 Release Notes

    Version 0.17.0 contains some significant signature changes that allow us to significantly improve handling of synchronous Observables and simplify Schedulers. Many of the changes have backwards compatible deprecated methods to ease the migration while some are breaking.

    The new signatures related to Observable in this release are:

    // A new create method takes `OnSubscribe` instead of `OnSubscribeFunc`
    public final static <T> Observable<T> create(OnSubscribe<T> f)
    
    // The new OnSubscribe type accepts a Subscriber instead of Observer and does not return a Subscription
    public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
    
    // Subscriber is an Observer + Subscription
    public abstract class Subscriber<T> implements Observer<T>, Subscription
    
    // The main `subscribe` behavior receives a Subscriber instead of Observer
    public final Subscription subscribe(Subscriber<? super T> observer)
    
    // Subscribing with an Observer however is still appropriate
    // and the Observer is automatically converted into a Subscriber
    public final Subscription subscribe(Observer<? super T> observer)
    
    // A new 'lift' function allows composing Operator implementations together
    public <R> Observable<R> lift(Func1<Subscriber<? super R>, Subscriber<? super T>> lift)
    
    

    Also changed is the Scheduler interface which is much simpler:

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This release applies many lessons learned over the past year and seeks to streamline the API before we hit 1.0.

    As shown in the code above the changes fall into 2 major sections:

    1) Lift/OnSubscribe/Subscriber

    Changes that allow unsubscribing from synchronous Observables without needing to add concurrency.

    2) Schedulers

    Simplification of the Scheduler interface and make clearer the concept of "outer" and "inner" Schedulers for recursion.

    Lift/OnSubscribe/Subscriber

    New types Subscriber and OnSubscribe along with the new lift operator have been added. The reasons and benefits are as follows:

    1) Synchronous Unsubscribe

    RxJava versions up until 0.16.x are unable to unsubscribe from a synchronous Observable such as this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Observer<? super Integer> Observer) {
            for (int i = 1; i < 1000000; i++) {
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    });
    

    Subscribing to this Observable will always emit all 1,000,000 values even if unsubscribed such as via oi.take(10).

    Version 0.17.0 fixes this issue by injecting the Subscription into the OnSubscribe function to allow code like this:

    Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            // we now receive a Subscriber instead of Observer
            for (int i = 1; i < 1000000; i++) {
                // the OnSubscribe can now check for isUnsubscribed
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onNext(i);
            }
            subscriber.onCompleted();
        }
    
    });
    

    Subscribing to this will now correctly only emit 10 onNext and unsubscribe:

    // subscribe with an Observer
    oi.take(10).subscribe(new Observer<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
        }
    
    })
    

    Or the new Subscriber type can be used and the Subscriber itself can unsubscribe:

    // or subscribe with a Subscriber which supports unsubscribe
    oi.subscribe(new Subscriber<Integer>() {
    
        @Override
        public void onCompleted() {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onNext(Integer t) {
            println("Received: " + t);
            if(t >= 10) {
                // a Subscriber can unsubscribe
                this.unsubscribe();
            }
        }
    
    })
    

    2) Custom Operator Chaining

    Because Java doesn't support extension methods, the only approach to applying custom operators without getting them added to rx.Observable is using static methods. This has meant code like this:

    MyCustomerOperators.operate(observable.map(...).filter(...).take(5)).map(...).subscribe()
    

    In reality we want:

    observable.map(...).filter(...).take(5).myCustomOperator().map(...).subscribe()
    

    Using the newly added lift we can get quite close to this:

    observable.map(...).filter(...).take(5).lift(MyCustomOperator.operate()).map(...).subscribe()
    

    Here is how the proposed lift method looks if all operators were applied with it:

    Observable<String> os = OBSERVABLE_OF_INTEGERS.lift(TAKE_5).lift(MAP_INTEGER_TO_STRING);
    

    Along with the lift function comes a new Operator signature:

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
    

    All operator implementations in the rx.operators package will over time be migrated to this new signature.

    3) Simpler Operator Implementations

    The lift operator injects the necessary Observer and Subscription instances (via the new Subscriber type) and eliminates (for most use cases) the need for manual subscription management. Because the Subscription is available in-scope there are no awkward coding patterns needed for creating a Subscription, closing over it and returning and taking into account synchronous vs asynchronous.

    For example, the body of fromIterable is simply:

    public void call(Subscriber<? super T> o) {
        for (T i : is) {
            if (o.isUnsubscribed()) {
                return;
            }
            o.onNext(i);
        }
        o.onCompleted();
    }
    

    The take operator is:

    public final class OperatorTake<T> implements Operator<T, T> {
    
        final int limit;
    
        public OperatorTake(int limit) {
            this.limit = limit;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> o) {
            CompositeSubscription parent = new CompositeSubscription();
            if (limit == 0) {
                o.onCompleted();
                parent.unsubscribe();
            }
            return new Subscriber<T>(parent) {
    
                int count = 0;
                boolean completed = false;
    
                @Override
                public void onCompleted() {
                    if (!completed) {
                        o.onCompleted();
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    if (!completed) {
                        o.onError(e);
                    }
                }
    
                @Override
                public void onNext(T i) {
                    if (!isUnsubscribed()) {
                        o.onNext(i);
                        if (++count >= limit) {
                            completed = true;
                            o.onCompleted();
                            unsubscribe();
                        }
                    }
                }
    
            };
        }
    
    }
    

    4) Recursion/Loop Performance

    The fromIterable use case is 20x faster when implemented as a loop instead of recursive scheduler (see https://github.com/Netflix/RxJava/commit/a18b8c1a572b7b9509b7a7fe1a5075ce93657771).

    Several places we can remove recursive scheduling used originally for unsubscribe support and use a loop instead.

    Schedulers

    Schedulers were greatly simplified to a design based around Action1<Inner>.

    public abstract class Scheduler {
        public Subscription schedule(Action1<Scheduler.Inner> action);
        public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public long now();
        public int degreeOfParallelism();
    
        public static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    This design change originated from three findings:

    1. It was very easy to cause memory leaks or inadvertent parallel execution since the distinction between outer and inner scheduling was not obvious.

    To solve this the new design explicitly has the outer Scheduler and then Scheduler.Inner for recursion.

    1. The passing of state is not useful since scheduling over network boundaries with this model does not work.

    In this new design all state passing signatures have been removed. This was determined while implementing a RemoteScheduler that attempted to use observeOn to transition execution from one machine to another. This does not work because of the requirement for serializing/deserializing the state of the entire execution stack. Migration of work over the network has been bound to be better suited to explicit boundaries established by Subjects. Thus, the complications within the Schedulers are unnecessary.

    1. The number of overloads with different ways of doing the same things were confusing.

    This new design removes all but the essential and simplest methods.

    1. A scheduled task could not do work in a loop and easily be unsubscribed which generally meant less efficient recursive scheduling.

    This new design applies similar principles as done with lift/create/OnSubscribe/Subscriber and injects the Subscription via the Inner interface so a running task can check isUnsubscribed().

    WIth this new design, the simplest execution of a single task is:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
        }
    
    });
    

    Recursion is easily invoked:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed (the schedule will do nothing if unsubscribed)
            inner.schedule(this);
        }
    
    });
    

    The use of Action1<Inner> on both the outer and inner levels makes it so recursion that refer to this and it works easily.

    Similar to the new lift/create pattern with Subscriber the Inner is also a Subscription so it allows efficient loops with unsubscribe support:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    

    An action can now unsubscribe the Scheduler.Inner:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                int i = doOtherWork();
                if(i > 100) {
                    // an Action can cause the Scheduler to unsubscribe and stop
                    inner.unsubscribe();
                }
            }
        }
    
    });
    

    Typically just stopping is sufficient:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            }
        }
    
    });
    

    but if other work in other tasks is being done and you want to unsubscribe conditionally you could:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            int i = doOtherWork();
            if (i < 10) {
                // recurse until done 10
                inner.schedule(this);
            } else {
                inner.unsubscribe();
            }
        }
    
    });
    

    and the recursion can be delayed:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            doWork();
            // recurse until unsubscribed ... but delay the recursion
            inner.schedule(this, 500, TimeUnit.MILLISECONDS);
        }
    
    });
    

    The methods on the Inner never return a Subscription because they are always a single thread/event-loop/actor/etc and controlled by the Subscription returned by the initial Scheduler.schedule method. This is part of clarifying the contract.

    Thus an unsubscribe controlled from the outside would be done like this:

    Subscription s = Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            while(!inner.isUnsubscribed()) {
                doWork();
            }
        }
    
    });
    
    // unsubscribe from outside
    s.unsubscribe();
    

    Migration Path

    1) Lift/OnSubscribe/Subscriber

    The lift function will not be used by most and is additive so will not affect backwards compatibility. The Subscriber type is also additive and for most use cases does not need to be used directly, the Observer interface can continue being used.

    The previous create(OnSubscribeFunc f) signature has been deprecated so code will work but now have warnings. Please begin migrating code as this will be deleted prior to the 1.0 release.

    Code such as this:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(Observer<? super Integer> o) {
            o.onNext(1);
            o.onNext(2);
            o.onCompleted();
            return Subscriptions.empty();
        }
    });
    

    should change to this:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onCompleted();
        }
    });
    

    If concurrency was being injected:

    Observable.create(new OnSubscribeFunc<Integer>() {
    
        @Override
        public Subscription onSubscribe(final Observer<? super Integer> o) {
            final BooleanSubscription s = new BooleanSubscription();
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (s.isUnsubscribed()) {
                        o.onNext(i++);
                    }
                }
    
            });
            t.start();
            return s;
        }
    });
    

    you may no longer need it and can implement like this instead:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    });
    

    or can just simplify the Subscription management:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Thread t = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
            t.start();
        }
    });
    

    or use a Scheduler:

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(final Subscriber<? super Integer> subscriber) {
            Schedulers.io().schedule(new Action1<Inner>() {
    
                @Override
                public void call(Inner inner) {
                    int i = 0;
                    while (subscriber.isUnsubscribed()) {
                        subscriber.onNext(i++);
                    }
                }
    
            });
        }
    });
    

    or use subscribeOn which now works to make synchronous Observables async while supporting unsubscribe (this didn't work before):

    Observable.create(new OnSubscribe<Integer>() {
    
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            int i = 0;
            while (subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            }
        }
    }).subscribeOn(Schedulers.newThread());
    

    2) Schedulers

    Custom Scheduler implementations will need to be re-implemented and any direct use of the Scheduler interface will also need to be updated.

    3) Subscription

    If you have custom Subscription implementations you will see they now need an isUnsubscribed() method.

    You can either add this method, or wrap your function using Subscriptions.create and it will handle the isUnsubscribed behavior and execute your function when unsubscribe() is called.

    The Future...

    This is hopefully the last of the major refactors to rxjava-core and we're approaching version 1.0. We have most if not all operators from Rx.Net that we want or intend to port. We think we have got the create/subscribe signatures as we want and the Subscription and Scheduler interfaces are now clean.

    We need to improve on some of the Subject implementations still, particularly ReplaySubject. We are beginning to focus after this release on cleaning up all of the operator implementations, stabilizing, fixing bugs and performance tuning.

    We appreciate your usage, feedback and contributions and hope the library is creating value for you!

    opened by benjchristensen 57
  • Proposed Scheduler Interface Change for 0.18 (yes, again)

    Proposed Scheduler Interface Change for 0.18 (yes, again)

    Reviewing the Scheduler interface changes of 0.17 with @headinthebox revealed that we're not 100% happy with the outcome, particularly after learning that Java 8 does not allow referencing this from within a lambda.

    The Scheduler interface as of 0.17 is:

    class Scheduler {
        public abstract Subscription schedule(Action1<Scheduler.Inner> action);
        public abstract Subscription schedule(Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
        public Subscription scheduleRecursive(Action1<Recurse> action);
        public Subscription schedulePeriodically(Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit);
        public int degreeOfParallelism();
        public long now();
    
        public static final class Recurse {
            public void schedule() {
            public void schedule(long delay, TimeUnit unit) {
        }
    
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Scheduler.Inner> action);
            public long now();
        }
    }
    

    We have determined two problems with this:

    1. Inner/Outer Dance

    In practice we have found that usage is always one of two things, either you just interact with the outer and don't care about the Inner, or you immediately need the Inner and have to do an awkward first scheduling just to get access to the Inner. (See here and weep.)

    1. Recursion

    The Action1<Scheduler.Inner> signature was chosen and put on both outer and inner so that an inner class could refer to itself using this to simply reschedule itself from the outer onto the inner.

    It was assumed this would work in Java 8 lambdas but unfortunately we did not prove it.

    This works with anonymous classes:

    Schedulers.newThread().schedule(new Action1<Inner>() {
    
        @Override
        public void call(Inner inner) {
            System.out.println("do stuff");
            // recurse
            inner.schedule(this);
        }
    
    });
    

    but this does not with lambdas:

    Schedulers.newThread().schedule((inner) -> {
        System.out.println("do stuff");
        inner.schedule(this); // doesn't compile
    });
    

    So we end up with this:

    Schedulers.newThread().scheduleRecursive((recurse) -> {
        System.out.println("do stuff");
        recurse.schedule();
    });
    

    At that point it's clear that Inner is not working well and we have Recurse to fix the problem.

    Thus, the proposed changes (breaking again) are:

    class Scheduler {
        public final Subscription schedule(Action1<Recurse> action);
        public final Subscription schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        public final Subscription schedulePeriodically(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit);
        public abstract Inner createInner(); // for advanced use cases like `observeOn`
        public int degreeOfParallelism();
        public long now();
    
        // now the primary interface
        public static final class Recurse {
            public final void schedule();
            public final void schedule(long delay, TimeUnit unit);
            public final void schedule(Action1<Recurse> action);
            public final void schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
        }
    
        // now mostly an implementation detail except for advanced use cases
        public abstract static class Inner implements Subscription {
            public abstract void schedule(Action1<Recurse> action, long delayTime, TimeUnit unit);
            public abstract void schedule(Action1<Recurse> action);
            public long now();
        }
    }
    

    The name of Recurse is up for debate. It may be possible to merge Recurse and Inner but I haven't figured it out yet. The reason is that Inner is a single instance representing a thread or event-loop whereas Recurse represents an Action or work. Thus a given Inner could have multiple Recurse actions scheduled on to it. It is being an Action that allows it to recurse by invoking schedule() that just reschedules itself.

    This would make it better support Java 8 lambdas and simply recursion, while also better supporting (via the createInner() method) the more complicated use cases like observeOn where current code is very awkward.

    This needs to be the last refactor of this so we nail it down and stop breaking things and can get to 1.0.

    Let the discussion begin ...

    opened by benjchristensen 55
  • Profiling Memory Usage and Object Creation

    Profiling Memory Usage and Object Creation

    We need to spend time profiling memory and object allocation and finding places where we can improve.

    I would really appreciate help diving into this and finding problem areas. Even if you don't fix them, but just identity use cases, operators, etc that would be very valuable.

    This is partly a result of the fact that in Netflix production we have seen an increase in YoungGen GCs since 0.17.x.

    The areas to start should probably be:

    • Observable.create
    • Observable.lift
    • Subscriber
    • CompositeSubscription
    • map
    • flatMap

    If you can or want to get involved in this please comment here so we all can collaborate together.

    opened by benjchristensen 54
  • Experimental Proposal of rx.Task

    Experimental Proposal of rx.Task

    Adds rx.Task as a "scalar Observable" for representing work with a single return value.

    See https://github.com/ReactiveX/RxJava/issues/1594 rx.Future/Task

    This provides a type similar to Future in that it represents a scalar unit of work, but it is lazy like an Observable and many Tasks can be combined into an Observable stream. Note how Task.zip returns Task<R> whereas Task.merge returns Observable<R>.

    NOTE: This is for experimentation and feedback at this time.

    Items requiring review and work that I'm particularly aware of:

    • naming of OnExecute
    • naming of TaskObserver (this one in particular I don't like)
    • design and implementation of Task.Promise
    • should the public lift use the Observable.Operator or should that only be for internal reuse?
    • should we have a public lift that uses a Task.Operator?
    • the Task.toObservable implementation right now is efficient but will likely break something so it likely needs to change to use subscribe
    • implementation of this merge variant: Task<T> merge(final Task<? extends Task<? extends T>> source)
    • several operators currently just wrap as Observable to reuse existing operators ... is that okay performance wise?
    • Javadocs

    Examples of using this class:

    import rx.Observable;
    import rx.Task;
    import rx.Task.Promise;
    
    public class TaskExamples {
    
        public static void main(String... args) {
            // scalar synchronous value
            Task<String> t1 = Task.create(t -> {
                t.onSuccess("Hello World!");
            });
    
            // scalar synchronous value using helper method
            Task<Integer> t2 = Task.just(1);
    
            // synchronous error
            Task<String> error = Task.create(t -> {
                t.onError(new RuntimeException("failed!"));
            });
    
            // executing
            t1.subscribe(System.out::println);
            t2.subscribe(System.out::println);
            error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));
    
            // scalar Tasks for request/response like a Future
            getData(1).subscribe(System.out::println);
            getDataUsingPromise(2).subscribe(System.out::println);
    
            // combining Tasks into another Task
            Task<String> zipped = Task.zip(t1, t2, (a, b) -> a + " -- " + b);
    
            // combining Tasks into an Observable stream
            Observable<String> merged = Task.merge(t1, t2.map(String::valueOf), getData(3));
            Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));
    
            zipped.subscribe(v -> System.out.println("zipped => " + v));
            merged.subscribe(v -> System.out.println("merged => " + v));
            mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
        }
    
        /**
         * Example of an async scalar execution using Task.create
         * <p>
         * This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
         *
         * @param arg
         * @return
         */
        public static Task<String> getData(int arg) {
            return Task.create(s -> {
                new Thread(() -> {
                    try {
                        Thread.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    // deliver value
                        s.onSuccess("Data=" + arg);
                    }).start();
            });
        }
    
        /**
         * Example of an async scalar execution using a Task.Promise
         * <p>
         * This shows how an eager (hot) process would work like using a Future.
         *
         * @param arg
         * @return
         */
        public static Task<String> getDataUsingPromise(int arg) {
            Task.Promise<String> p = Promise.create();
    
            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // deliver value
                    p.onSuccess("Data=" + arg);
                }).start();
    
            return p.getTask();
        }
    }
    
    Enhancement 
    opened by benjchristensen 50
  • Stacktraces and subscribeOn/observeOn

    Stacktraces and subscribeOn/observeOn

    RxJava in not nice when it comes to stacktraces.

    The worst thing is observeOn that schedules execution on different threads. Every time we use observeOn we get our stacktrace from the process root, all the call history gets erased.

    The idea is to make schedulers save the caller's stack, wrap action calls and attach the stacktrace to each exception that has been thrown by scheduled actions.

    Pros: traceable exceptions Cons: Performance cost Leaks because of recursion

    I think that we can have Schedulers.io(boolean trace) alternative that will save the stacktrace, so we could call it Schedules.io(DEBUG) to turn stacktracing off on production for performance critical parts.

    How do you guys find this idea?

    Question 
    opened by konmik 46
  • Adding super/extends so that Observable is covariant

    Adding super/extends so that Observable is covariant

    Ok, so this pull request changes a lot of lines. It's mostly generalizing all the FuncXs to be used like FuncX[-T1, -T2, ..., -TX, +R] (contravariant parameters, covariant return type) and all the Observers to be used "in a contravariant way". A few of the Observable uses are covariant, now, too (mostly zip).

    This is the pull request for #326.

    This doesn't look very good in the code (thanks Java). Also, it doesn't seem to make Scala interop easier at all (at least not yet).

    Please take a look. I'm not exactly happy with the result. - Maybe I'm doing something wrong here? - I've still got hope that there's an easier way...

    The pull request compiles and tests ok for me (except for the Clojure module, but that's another story and not due to my changes).

    opened by jmhofer 46
  • Bump com.vanniktech.maven.publish from 0.19.0 to 0.23.1

    Bump com.vanniktech.maven.publish from 0.19.0 to 0.23.1

    Bumps com.vanniktech.maven.publish from 0.19.0 to 0.23.1.

    Release notes

    Sourced from com.vanniktech.maven.publish's releases.

    0.23.1

    Changelog

    0.23.0

    Changelog

    0.22.0

    CHANGELOG

    0.21.0

    Changelog

    0.20.0

    Changelog

    Changelog

    Sourced from com.vanniktech.maven.publish's changelog.

    Version 0.23.1 (2022-12-30)

    • also support publishing sources for the java-test-fixtures plugin in Kotlin/JVM projects
    • suppress Gradle warnings when publishing a project that uses java-test-fixtures

    Version 0.23.0 (2022-12-29)

    Updated docs can be found on the new website.

    • NEW: It is now possible to set group id, artifact id directly through the DSL
      mavenPublishing {
        coordinates("com.example", "library", "1.0.3")
      }
      
    • project.group and project.version will still be used as default values for group and version if the GROUP/VERSION_NAME Gradle properties do not exist and coordinates was not called, however there are 2 behavior changes:
      • The GROUP and VERSION_NAME Gradle properties take precedence over project.group and project.version instead of being overwritten by them. If you need to define the properties but replace them for some projects, please use the new coordinates method instead.
      • The GROUP and VERSION_NAME Gradle properties will not be explicitly set as project.group and project.version anymore.
    • NEW: Added dropRepository task that will drop a Sonatype staging repository. It is possible to specify which repository to drop by adding a --repository parameter with the id of the staging repository that was printed during publish. If no repository is specified and there is only one staging repository, that one will be dropped.
    • Added workaround to also publish sources for the java-test-fixtures plugin
    • Fixed publishing Kotlin/JS projects with the base plugin.
    • Fixed that a POM configured through the DSL is incomplete when publishing Gradle plugins.
    • The minimum supported Gradle version has been increased to 7.3.

    Version 0.22.0 (2022-09-09)

    • NEW: When publishing to maven central by setting SONATYPE_HOST or calling publishToMavenCentral(...) the plugin will now explicitly create a staging repository on Sonatype. This avoids issues where a single build would create multiple repositories
    • The above change means that the plugin supports parallel builds and it is not neccessary anymore to use --no-parallel and --no-daemon together with publish
    • NEW: When publishing with the publish or publishAllPublicationsToMavenCentralRepository tasks the plugin will automatically close the staging repository at the end of the build if it was successful.
    • NEW: Option to also automatically release the staging repository after closing was susccessful
    SONATYPE_HOST=DEFAULT # or S01
    SONATYPE_AUTOMATIC_RELEASE=true
    

    or

    mavenPublishing {
      publishToMavenCentral("DEFAULT", true)
      // or publishToMavenCentral("S01", true)
    </tr></table> 
    

    ... (truncated)

    Commits
    • 55b2fbf Prepare for release 0.23.1.
    • 925f5d2 also setup test fixtures sources jar for Kotlin/JVM, suppress warnings (#486)
    • a05892d update branch name in actions
    • 08f2406 test against Gradle 8.0-rc-1 (#485)
    • 6f60a18 remove unused test dependencies (#484)
    • aae16f7 Update dependency com.vanniktech:gradle-maven-publish-plugin to v0.23.0 (#483)
    • 8d84276 Prepare for next development version
    • 1f8a8c1 Prepare for release 0.23.0.
    • c6d8640 typo2
    • bc3a43e typo
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies java 
    opened by dependabot[bot] 0
  • Bump testng from 7.5 to 7.7.1

    Bump testng from 7.5 to 7.7.1

    Bumps testng from 7.5 to 7.7.1.

    Release notes

    Sourced from testng's releases.

    TestNG v7.7.1

    What's Changed

    Full Changelog: https://github.com/cbeust/testng/compare/7.7.0...7.7.1

    TestNG v7.7.0

    What's Changed

    New Contributors

    ... (truncated)

    Changelog

    Sourced from testng's changelog.

    7.7.1 Fixed: GITHUB-2854: overloaded assertEquals methods do not work from Groovy (Krishnan Mahadevan)

    7.7.0 Fixed: GITHUB-2852: [SECURITY] Fix Zip Slip Vulnerability (Jonathan Leitschuh) Fixed: GITHUB-2792: JUnitTestClass sets XmlTest as null when running JUnit 4 Tests using TestNG (Krishnan Mahadevan) Fixed: GITHUB-2847: Deprecate support for running JUnit tests (Krishnan Mahadevan) Fixed: GITHUB-2844: Deprecate support for running Spock Tests (Krishnan Mahadevan) Fixed: GITHUB-550: Weird @​BeforeMethod and @​AfterMethod behaviour with dependsOnMethods (Krishnan Mahadevan) Fixed: GITHUB-893: TestNG should provide an Api which allow to find all dependent of a specific test (Krishnan Mahadevan) New: Added .yml file extension for yaml suite files, previously only .yaml was allowed for yaml (Steven Jubb) Fixed: GITHUB-141: regular expression in "dependsOnMethods" does not work (Krishnan Mahadevan) Fixed: GITHUB-2770: FileAlreadyExistsException when report is generated (melloware) Fixed: GITHUB-2825: Programmatically Loading TestNG Suite from JAR File Fails to Delete Temporary Copy of Suite File (Steven Jubb) Fixed: GITHUB-2818: Add configuration key for callback discrepancy behavior (Krishnan Mahadevan) Fixed: GITHUB-2819: Ability to retry a data provider in case of failures (Krishnan Mahadevan) Fixed: GITHUB-2308: StringIndexOutOfBoundsException in findClassesInPackage - Surefire/Maven - JDK 11 fails (Krishnan Mahadevan) Fixed: GITHUB:2788: TestResult.isSuccess() is TRUE when test fails due to expectedExceptions (Krishnan Mahadevan) Fixed: GITHUB-2800: Running Test Classes with Inherited @​Factory and @​DataProvider Annotated Non-Static Methods Fail (Krishnan Mahadevan) New: Ability to provide custom error message for assertThrows\expectThrows methods (Anatolii Yuzhakov) Fixed: GITHUB-2780: Use SpotBugs instead of abandoned FindBugs Fixed: GITHUB-2801: JUnitReportReporter is too slow Fixed: GITHUB-2807: buildStackTrace should be fail-safe (Sergey Chernov) Fixed: GITHUB-2830: TestHTMLReporter parameter toString should be fail-safe (Sergey Chernov) Fixed: GITHUB-2798: Parallel executions coupled with retry analyzer results in duplicate retry analyzer instances being created (Krishnan Mahadevan)

    7.6.1 Fixed: GITHUB-2761: Exception: ERROR java.nio.file.NoSuchFileException: /tmp/testngXmlPathInJar-15086412835569336174 (Krishnan Mahadevan) 7.6.0 Fixed: GITHUB-2741: Show fully qualified name of the test instead of just the function name for better readability of test output.(Krishnan Mahadevan) Fixed: GITHUB-2725: Honour custom attribute values in TestNG default reports (Krishnan Mahadevan) Fixed: GITHUB-2726: @​AfterClass config method is executed for EACH @​Test method when parallel == methods (Krishnan Mahadevan) Fixed: GITHUB-2752: TestListener is being lost when implenting both IClassListener and ITestListener (Krishnan Mahadevan) New: GITHUB-2724: DataProvider: possibility to unload dataprovider class, when done with it (Dzmitry Sankouski) Fixed: GITHUB-217: Configure TestNG to fail when there's a failure in data provider (Krishnan Mahadevan) Fixed: GITHUB-2743: SuiteRunner could not be initial by default Configuration (Nan Liang) Fixed: GITHUB-2729: beforeConfiguration() listener method should be invoked for skipped configurations as well(Nan Liang) Fixed: assertEqualsNoOrder for Collection and Iterators size check was missing (Adam Kaczmarek) Fixed: GITHUB-2709: Testnames not working together with suites in suite (Martin Aldrin) Fixed: GITHUB-2704: IHookable and IConfigurable callback discrepancy (Krishnan Mahadevan) Fixed: GITHUB-2637: Upgrade to JDK11 as the minimum JDK requirements (Krishnan Mahadevan) Fixed: GITHUB-2734: Keep the initial order of listeners (Andrei Solntsev) Fixed: GITHUB-2359: Testng @​BeforeGroups is running in parallel with testcases in the group (Anton Velma) Fixed: Possible StringIndexOutOfBoundsException in XmlReporter (Anton Velma) Fixed: GITHUB-2754: @​AfterGroups is executed for each "finished" group when it has multiple groups defined (Anton Velma)

    Commits
    • b94395d Bump version to 7.7.1 for release
    • 89dc584 Streamline overloaded assertion methods for Groovy
    • 5ac0021 Adding release notes
    • c0e1e77 Adjust version reference in deprecation msgs.
    • 011527d Bump version to 7.7.0 for release
    • 7846c44 Deprecate support for running JUnit tests
    • 8630a7e Ensure ITestContext available for JUnit4 tests
    • 7070b02 Streamline dependsOnMethods for configurations
    • d7e0bb1 Deprecate support for running Spock Tests
    • ca7a3a2 Ensure All tests run all the time
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies java 
    opened by dependabot[bot] 0
  • Bump actions/cache from 3.0.11 to 3.2.2

    Bump actions/cache from 3.0.11 to 3.2.2

    Bumps actions/cache from 3.0.11 to 3.2.2.

    Release notes

    Sourced from actions/cache's releases.

    v3.2.2

    What's Changed

    New Contributors

    Full Changelog: https://github.com/actions/cache/compare/v3.2.1...v3.2.2

    v3.2.1

    What's Changed

    Full Changelog: https://github.com/actions/cache/compare/v3.2.0...v3.2.1

    v3.2.0

    What's Changed

    New Contributors

    ... (truncated)

    Changelog

    Sourced from actions/cache's changelog.

    3.0.11

    • Update toolkit version to 3.0.5 to include @actions/core@^1.10.0
    • Update @actions/cache to use updated saveState and setOutput functions from @actions/core@^1.10.0

    3.1.0-beta.1

    • Update @actions/cache on windows to use gnu tar and zstd by default and fallback to bsdtar and zstd if gnu tar is not available. (issue)

    3.1.0-beta.2

    • Added support for fallback to gzip to restore old caches on windows.

    3.1.0-beta.3

    • Bug fixes for bsdtar fallback if gnutar not available and gzip fallback if cache saved using old cache action on windows.

    3.2.0-beta.1

    • Added two new actions - restore and save for granular control on cache.

    3.2.0

    • Released the two new actions - restore and save for granular control on cache

    3.2.1

    • Update @actions/cache on windows to use gnu tar and zstd by default and fallback to bsdtar and zstd if gnu tar is not available. (issue)
    • Added support for fallback to gzip to restore old caches on windows.
    • Added logs for cache version in case of a cache miss.

    3.2.2

    • Reverted the changes made in 3.2.1 to use gnu tar and zstd by default on windows.
    Commits
    • 4723a57 Revert compression changes related to windows but keep version logging (#1049)
    • d1507cc Merge pull request #1042 from me-and/correct-readme-re-windows
    • 3337563 Merge branch 'main' into correct-readme-re-windows
    • 60c7666 save/README.md: Fix typo in example (#1040)
    • b053f2b Fix formatting error in restore/README.md (#1044)
    • 501277c README.md: remove outdated Windows cache tip link
    • c1a5de8 Upgrade codeql to v2 (#1023)
    • 9b0be58 Release compression related changes for windows (#1039)
    • c17f4bf GA for granular cache (#1035)
    • ac25611 docs: fix an invalid link in workarounds.md (#929)
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies github_actions 
    opened by dependabot[bot] 1
  • ThreadDeath is deprecated for removal in Java 20

    ThreadDeath is deprecated for removal in Java 20

    src/main/java/io/reactivex/rxjava3/exceptions/Exceptions.java:70: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
            } else if (t instanceof ThreadDeath) {
                                    ^
    src/main/java/io/reactivex/rxjava3/exceptions/Exceptions.java:71: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
                throw (ThreadDeath) t;
                       ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:143: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
        @Test(expected = ThreadDeath.class)
                         ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:164: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
                    throw new ThreadDeath();
                              ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:179: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
                Exceptions.throwIfFatal(new ThreadDeath());
                                            ^
    src/test/java/io/reactivex/rxjava3/exceptions/ExceptionsTest.java:181: warning: [removal] ThreadDeath in java.lang has been deprecated and marked for removal
            } catch (ThreadDeath ex) {
                     ^
    
    Enhancement 3.x 
    opened by akarnokd 0
  • 3.x: Add onDropped callbacks to operators

    3.x: Add onDropped callbacks to operators

    Add onDropped callback overloads to most operators that drop items that can't be recovered by other means.

    • [x] throttleLatest PR #7457
    • [x] throttleFirst PR #7482
    • [x] throttleLast / sample PR #7488
    • [ ] throttleWithTimeout / debounce (PR TBD)
    • [ ] onBackpressureLatest (PR TBD)
    • [ ] onBackpressureBuffer (PR TBD)
      • 📓 The current overloads with Action are somewhat unhelpful, however, we'll have to add overloads in a way that avoids lambda ambiguity.
    Enhancement 3.x 
    opened by akarnokd 7
Releases(v3.1.5)
  • v3.1.5(Jun 1, 2022)

  • v3.1.4(Mar 21, 2022)

  • v3.1.3(Nov 23, 2021)

  • v3.1.2(Oct 12, 2021)

    Maven JavaDocs

    Compatibility

    • Add full Java 9 module descriptor. (#7241)

    Bugfixes

    • Fix missing nullability on Single.subscribe(BiConsumer). (#7331)

    Documentation

    • Fix javadoc wording of {Publish|Behavior}Processor::offer(). (#7328)
    • Indicate takeUntil stops on completion of other. (#7341)

    Other

    • Update assert messages format to be compliant with GradleRunner and JUnitRunner. (#7345)
    Source code(tar.gz)
    Source code(zip)
  • v3.1.1(Aug 30, 2021)

    Maven JavaDocs

    API promotions

    • The operator fusion-related interfaces and two atomic queue implementations have been promoted to standard, thus officially supported in the io.reactivex.rxjava3.operators package. (#7320)

    Bugfixes

    • Specify proper OSGi unique bundle symbolic name of io.reactivex.rxjava3.rxjava. (#7319)
    • Fix ExecutorScheduler initializing Schedulers prematurely when using RxJavaPlugins.createExecutorScheduler. (#7323)
    • Fix the LamdbaConsumerIntrospection of Completable's lambda-based observer to use the same missing onError indicator as the other types' lambda-based consumers. (#7326)
    Source code(tar.gz)
    Source code(zip)
  • v3.1.0(Aug 9, 2021)

    Maven JavaDocs

    :warning: With this release, the minimum required Android API level is API 21 (Android 5.0).

    :warning: Note that the 3.0.x patch line won't be developed further.

    API promotions

    • Flowable.onBackpressureReduce() + 1 (#7296)
    • RxJavaPlugins.getOnParallelSubscribe() and RxJavaPlugins.setOnParallelSubscribe() (#7296)
    • TestScheduler([...] boolean useOnScheduleHook) (#7296)

    API additions

    • subscribe([...], DisposableContainer) for better Disposable management and reference cleanup. (#7298)
    • RxJavaPlugins.createExecutorScheduler() for creating an Executor-based Scheduler before the Schedulers class (and thus the standard schedulers) gets initialized. (#7306)

    Behavior changes

    • The scheduler purge thread has been removed. Removing cancelled timed operations is now managed by the setRemoveOnCancelPolicy of the underlying ScheduledExecutorService. (#7293)

    Documentation

    • Fixed wording of the fair parameter of Schedulers.from. (#7301)
    • Update withLatestFrom javadoc about upstream early complete (#7289)

    Other

    • @NonNull annotations on generic type arguments were made consistent across. (#7302, #7303)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.13(Jun 1, 2021)

    Maven JavaDocs

    :warning: RxJava is now signed with a new private key. The new public key fingerprint is 1D9AA7F9E1E2824728B8CD1794B291AEF984A085.

    Documentation

    • Fix wording of *OnSubscribe interfaces (#7274)

    Other

    • Mitigated the security risks caused by the Codecov backdoor (#7237).
    • Improve the build process (#7225, #7253, #7255, #7257, #7258, #7260, #7261, #7262, #7264, #7263)
    • Upgrade to Gradle 7.0 (#7259)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.13-RC5(May 26, 2021)

  • v3.0.13-RC4(May 3, 2021)

  • v3.0.13-RC3(Apr 26, 2021)

  • v3.0.13-RC2(Apr 17, 2021)

  • v3.0.13-RC1(Apr 17, 2021)

  • v3.0.12(Apr 8, 2021)

    Maven JavaDocs

    Bugfix

    • CompositeException.printStackTrace to write directly into PrintStream/PrintWriter. (#7212)

    Documentation

    • Fix wrong reference in Single.flattenStreamAsObservable javadoc. (#7206)
    • Fix style violating Javadoc. (#7210)

    Other

    • Fix POM_URL (#7214)
    • Upgrade Gradle to 6.8.3 (#7208)
    • Bump gradle to 6.8.3 & optimize gradle config (#7207)
    • Added Javadoc checks to Checkstyle. Fix violating Javadoc. (#7210)
    • Modernize gradle plugin block, change maven to maven-publish (#7219)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.12-RC1(Apr 8, 2021)

  • v3.0.11(Mar 6, 2021)

    Maven JavaDocs

    ℹ️ RxJava 2 is now end-of-life (EOL) and no further development or support will be provided by the project.

    Enhancement

    • Add onSubscribe hook to ParallelFlowable operators (#7191)

    Bugfix

    • Allow Single.zip and Maybe.zip result to be garbage collected (#7196)
    • Direct scheduling via Schedulers.from to honor the interruptibleWorker setting (#7203)

    Documentation

    • Fix typos in Schedulers.java (#7178)

    Other

    • Release to Sonatype directly (#7181)
    • Upgrade to Gradle 6.8.2 (#7184)
    • Cleanup of source code headers (#7205)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC5(Feb 16, 2021)

  • v3.0.11-RC4(Feb 13, 2021)

  • v2.2.21(Feb 13, 2021)

    Maven JavaDocs

    :warning: This is the last planned update for the 2.x version line. After February 28, 2021, 2.x becomes End-of-Life (EoL); no further patches, bugfixes, enhancements, documentation or support will be provided by the project.

    Enhancements

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7162)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7170)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC3(Feb 5, 2021)

  • v3.0.11-RC2(Feb 5, 2021)

    Specify the staging profile name to be "io.reactivex" so the close operation finds the repo.

    Unfortunately, there is no other way to test the release process.

    Source code(tar.gz)
    Source code(zip)
  • v3.0.11-RC1(Feb 5, 2021)

  • v3.0.10(Feb 1, 2021)

    Maven JavaDocs

    Enhancement

    • Add a system parameter to allow scheduled worker release in the Io Scheduler. (#7160)
    • Add TestScheduler option to use onSchedule hook. (#7163)
    • Add a system parameter to allow Schedulers to use System.nanoTime() for now(). (#7169)
    • Add fusion support to concatMap{Maybe|Single|Completable}. (#7165)

    Documentation

    • Update marbles of amb(), ambArray() and ambWith() (#7144)
    • Fix take() mentioning the old limit() operator (#7145)
    • Document Schedulers.from vs. RejectedExecutionException behavior. (#7150)
    • Update documentation for NewThreadWorker.scheduleActual method. (#7164)
    • Improve Javadocs style of Schedulers. (#7168)

    Other

    • onReduceBackpressure internals cleanup (#7151)
    • Workaround for FutureTask.toString recursion on JDK 10+. (#7173)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.9(Dec 30, 2020)

  • v3.0.8(Dec 2, 2020)

  • v3.0.8-RC3(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC2(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.8-RC1(Dec 2, 2020)

    Maven JavaDocs

    This is a pre-release for 3.0.8 to verify the release process still works after the switch to GitHub actions (#7114).

    Bugfixes

    • Remove unnecessary cancel/dispose calls from terminating using (#7121)

    Documentation

    • Flowable scan/scanWith backpressure documentation update (#7110)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.7(Oct 7, 2020)

    Maven JavaDocs

    Bugfixes

    • Fix Observable.toFlowable(ERROR) not cancelling on MissingBackpressureException. (#7083)
    • Fix Flowable.concatMap backpressure with scalars. (#7089)

    Documentation

    • fromRunnable/fromAction javadoc improvements. (#7071)
    • Patch out duplicate @NonNull annotation in generated javadocs. (#7073)
    • Clarify the documentation for scan operators. (#7093)
    Source code(tar.gz)
    Source code(zip)
  • v2.2.20(Oct 6, 2020)

    Maven JavaDocs

    :warning: The 2.x version line is now in maintenance mode and will be supported only through bugfixes until February 28, 2021. No new features, behavior changes or documentation adjustments will be accepted or applied to 2.x. It is recommended to migrate to 3.x within this time period.

    Bugfixes

    • Fix Observable.flatMap with maxConcurrency hangs (#6960)
    • Fix Observable.toFlowable(ERROR) not cancelling upon MissingBackpressureException (#7084)
    • Fix Flowable.concatMap backpressure with scalars. (#7091)
    Source code(tar.gz)
    Source code(zip)
  • v3.0.6(Aug 20, 2020)

Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
jproblemgenerator creates scenarios in which Java programs leak memory or crash the JVM

jproblemgenerator creates scenarios in which Java programs leak memory or crash the JVM. It is intended to train the use of debugging tools

null 1 Jan 6, 2022
🔥Android懒人框架,基于谷歌最新AAC架构,MVVM设计模式,组件化开发的一套快速开发库,整合Okhttp+RxJava+Retrofit+Glide等主流模块,满足日常开发需求。使用该框架可以快速开发一个高质量、易维护的Android应用

MvvmLazy Android懒人框架 目前,android流行的MVC、MVP模式的开发框架很多,然而一款基于MVVM模式开发框架却很少。 个人搜寻了市面上大量的开源框架,秉承减少重复造轮子的原则,汲取了各位大神的框架优点,集成了大量常用的开源框架和工具类,进行了部分公用模块封装,丰富了Bind

rui 20 May 25, 2022
Chronicle Bytes has a similar purpose to Java NIO's ByteBuffer with many extensions

Chronicle-Bytes Chronicle-Bytes Chronicle Bytes contains all the low level memory access wrappers. It is built on Chronicle Core’s direct memory and O

Chronicle Software : Open Source 334 Jan 1, 2023
Zero-dependency Reactive Streams publishers library

⚡️ Mutiny Zero: a zero-dependency Reactive Streams publishers library for Java Mutiny Zero is a minimal API for creating reactive-streams compliant pu

SmallRye 14 Dec 14, 2022
Immutable in-memory R-tree and R*-tree implementations in Java with reactive api

rtree In-memory immutable 2D R-tree implementation in java using RxJava Observables for reactive processing of search results. Status: released to Mav

Dave Moten 999 Dec 20, 2022
Reactive Programming for Android

Reactive Programming for Android Agera is a set of classes and interfaces to help write functional, asynchronous, and reactive applications for Androi

Google 7.3k Jan 5, 2023
Fault tolerance and resilience patterns for the JVM

Failsafe Failsafe is a lightweight, zero-dependency library for handling failures in Java 8+, with a concise API for handling everyday use cases and t

Jonathan Halterman 3.9k Jan 2, 2023
A fast object pool for the JVM

Stormpot Stormpot is an object pooling library for Java. Use it to recycle objects that are expensive to create. The library will take care of creatin

Chris Vest 302 Nov 14, 2022
A Java library for quickly and efficiently parsing and writing UUIDs

fast-uuid fast-uuid is a Java library for quickly and efficiently parsing and writing UUIDs. It yields the most dramatic performance gains when compar

Jon Chambers 142 Jan 1, 2023
An embedded database implemented in pure java based on bitcask which is a log-structured hash table for K/V Data.

Baka Db An embedded database implemented in pure java based on bitcask which is a log-structured hash table for K/V Data. Usage import cn.ryoii.baka.B

ryoii 3 Dec 20, 2021
Hollow is a java library and toolset for disseminating in-memory datasets from a single producer to many consumers for high performance read-only access.

Hollow Hollow is a java library and toolset for disseminating in-memory datasets from a single producer to many consumers for high performance read-on

Netflix, Inc. 1.1k Dec 25, 2022
LWJGL is a Java library that enables cross-platform access to popular native APIs useful in the development of graphics (OpenGL, Vulkan), audio (OpenAL), parallel computing (OpenCL, CUDA) and XR (OpenVR, LibOVR) applications.

LWJGL - Lightweight Java Game Library 3 LWJGL (https://www.lwjgl.org) is a Java library that enables cross-platform access to popular native APIs usef

Lightweight Java Game Library 4k Dec 29, 2022
A modern I/O library for Android, Kotlin, and Java.

Okio See the project website for documentation and APIs. Okio is a library that complements java.io and java.nio to make it much easier to access, sto

Square 8.2k Dec 31, 2022
Jalgorithm is an open-source Java library which has implemented various algorithms and data structure

We loved Java and algorithms, so We made Jalgorithm ❤ Jalgorithm is an open-source Java library which has implemented various algorithms and data stru

Muhammad Karbalaee 35 Dec 15, 2022
based client the 2nd, for 1.17

Atomic client WARNING This is under heavy development, and not yet ready to use. Why? This is the sequel to cornos, but ported to 1.17, with a new loo

Cornos client 83 Jan 14, 2022
Fast integer compression in C using the StreamVByte codec

streamvbyte StreamVByte is a new integer compression technique that applies SIMD instructions (vectorization) to Google's Group Varint approach. The n

Daniel Lemire 281 Dec 27, 2022
A fork of Cliff Click's High Scale Library. Improved with bug fixes and a real build system.

High Scale Lib This is Boundary's fork of Cliff Click's high scale lib. We will be maintaining this fork with bug fixes, improvements and versioned bu

BMC TrueSight Pulse (formerly Boundary) 402 Jan 2, 2023
A Primitive Collection library that reduces memory usage and improves performance

Primitive-Collections This is a Simple Primitive Collections Library i started as a hobby Project. It is based on Java's Collection Library and FastUt

Speiger 26 Dec 25, 2022