Reactive stubs for gRPC

Overview

CircleCI

What is reactive-grpc?

Reactive gRPC is a suite of libraries for using gRPC with Reactive Streams programming libraries. Using a protocol buffers compiler plugin, Reactive gRPC generates alternative gRPC bindings for each reactive technology. The reactive bindings support unary and streaming operations in both directions. Reactive gRPC also builds on top of gRPC's back-pressure support, to deliver end-to-end back-pressure-based flow control in line with Reactive Streams back-pressure model.

Reactive gRPC supports the following reactive programming models:

Akka gRPC is now mature and production ready. Use that for Akka-based services.

Usage

See the readme in each technology-specific sub-directory for usage details.

Demos

Android support

Reactive gRPC supports Android to the same level of the underlying reactive technologies.

  • Rx-Java - Generated code targets Java 6, so it should work with all versions of Android >= 2.3 (SDK 9).
  • Spring Reactor - Not officially supported. "Reactor 3 does not officially support or target Android, however, it should work fine with Android SDK 26 (Android O) and above."

Back-pressure

Reactive gRPC stubs support bi-directional streaming with back-pressure. Under the hood, Reactive gRPC is built atop the vanilla gRPC service stubs generated by protoc. As such, they inherit gRPC's HTTP/2-based back-pressure model.

Internally, gRPC and Reactive gRPC implement a pull-based back-pressure strategy. At the HTTP/2 layer, gRPC maintains a buffer of serialized protocol buffer messages. As frames are consumed on the consumer side, the producer is signaled to transmit more frames. If this producer-side transmit buffer fills, the HTTP/2 layer signals to the gRPC messaging layer to stop producing new messages in the stream. Reactive gRPC handles this signal, applying back-pressure to Reactive Streams using the Publisher api. Reactive gRPC also implements Publisher back-pressure on the consumer side of a stream. As messages are consumed by the consumer-side Publisher, signals are sent down through gRPC and HTTP/2 to request more data.

An example of back-pressure in action can be found in BackpressureIntegrationTest.java.

backpressure

Understanding Reactive gRPC Flow Control

For simple unary request and response services, Reactive gRPC's flow control model is transparent. However, Reactive gRPC is built on top of three different interacting flow control models, and, as a result, backpressure doesn't always behave exactly as you would expect. For streaming services, flow control isn't always intuitive, especially when infinite streams are involved.

  • At the bottom is the HTTP/2's byte-based flow control. HTTP/2 works on streams of bytes and is completely unaware of gRPC messages or reactive streams. By default, the stream consumer allocates a budget of 65536 bytes. The stream producer can send up to this many bytes before backpressure engages. As the consumer reads bytes, WINDOW_UPDATE messages are sent to the producer to increase its send budget.

  • In the middle is the gRPC-Java message-based flow control. gRPC's flow control adapts the stream-based flow control of HTTP/2 to a message-based flow control model. Importantly, gRPC's flow control is aware of how it interacts with HTTP/2 and the network.

    On producing side, an on-ready handler reads a message, serializes it into bytes using protobuf, and then queues it up for transmission over the HTTP/2 byte stream. If there is insuficient room in the HTTP/2 flow control window to transmit, backpressure engages an no more messages are requested from the producer until space becomes available.

    On the consuming side, each time a consumer calls request(x), gRPC attempts to read and deserialize x messages from the HTTP/2 stream. Since the size of a protobuf encoded message is variable, there is not a one-to-one correlation between pulling messages from gRPC and pulling bytes over HTTP/2.

  • At the top is the Reactive Streams message-based flow control. Reactive Streams' flow control is designed for producing and consuming messages from end to end. Since the producer and consumer are in the same address space, when the consumer calls request(x), the producer creates a message and calls onNext() x times. Reactive Streams flow control assumes all the parts of the chain are linked by method calls. Inserting gRPC and HTTP/2 in the middle of a reactive stream is a bending of the protocol.

When reasoning about flow control with Reactive gRPC, you cannot assume everything works like Reactive Streams. A call to request(1) on the consuming side of the wire will not necessarially result in a request(1) call on the producing side. Zero or more messages may be requested from the producer based on the state of the HTTP/2 flow control window, and the serialized size of each protobuf message. Instead, you need to think about how each stage in processing interacts with the stage before and after it.

ReactivegRPCHTTP/2...HTTP/2gRPCReactive

flow control

Exception Handling

Exception handling with Reactive gRPC is a little strange due to the way gRPC deals with errors. Servers that produce an error by calling onError(Throwable) will terminate the call with a StatusRuntimeException. The client will have its onError(Throwable) subscription handler called as expected.

Exceptions going from client to server are a little less predictable. Depending on the timing, gRPC may cancel the request before sending any messages due to an exception in the outbound stream.

Contributing

Found a bug? Think you've got an awesome feature you want to add? We welcome contributions!

Submitting a Contribution

  1. Search for an existing issue. If none exists, create a new issue so that other contributors can keep track of what you are trying to add/fix and offer suggestions (or let you know if there is already an effort in progress). Be sure to clearly state the problem you are trying to solve and an explanation of why you want to use the strategy you're proposing to solve it.
  2. Fork this repository on GitHub and create a branch for your feature.
  3. Clone your fork and branch to your local machine.
  4. Commit changes to your branch.
  5. Push your work up to GitHub.
  6. Submit a pull request so that we can review your changes.

Make sure that you rebase your branch off of master before opening a new pull request. We might also ask you to rebase it if master changes after you open your pull request.

Acceptance Criteria

We love contributions, but it's important that your pull request adhere to the standards that we maintain in this repository.

  • All tests must be passing
  • All code changes require tests
  • All code changes must be consistent with our Checkstyle rules. We use the Google Java Style Guide with a few small alterations.
  • Code should have great inline comments
Comments
  • Feature/rx java3

    Feature/rx java3

    PR for merge of work by Andreas Larson in 2020...for moving to RxJava3 rather than RxJava2, and work by Scott Lewis adjusting test cases (and in case of fusion removing test cases).

    cla:signed 
    opened by scottslewis 16
  • reactor fails to generate gRPC stub in latest version

    reactor fails to generate gRPC stub in latest version

    Hey,

    |gRPC|protoc|reactive| |---|---|---| |1.29.0|3.12.2|1.0.1|

    I just updated to the latest gRPC/protoc/reactive-grpc and the compiler fails with the following message:

    AdminServiceGrpc.java:[76,29] error: cannot find symbol
      symbol:   class StubFactory
      location: class AbstractStub
    

    And the generated code:

      public static AdminServiceBlockingStub newBlockingStub(
          io.grpc.Channel channel) {
        io.grpc.stub.AbstractStub.StubFactory<AdminServiceBlockingStub> factory =
          new io.grpc.stub.AbstractStub.StubFactory<AdminServiceBlockingStub>() {
            @java.lang.Override
            public AdminServiceBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
              return new AdminServiceBlockingStub(channel, callOptions);
            }
          };
        return AdminServiceBlockingStub.newStub(factory, channel);
      }
    

    I'm using this maven plugin to generate the stubs:

      <properties>
        <protobuf.version>3.12.2</protobuf.version>
        <grpc.version>1.29.0</grpc.version>
      </properties>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <extensions>true</extensions>
            <configuration>
              <pluginId>grpc-java</pluginId>
              <protocArtifact>
                com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
              </protocArtifact>
              <pluginArtifact>
                io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
              </pluginArtifact>
            </configuration>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>compile-custom</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    

    I also tried your protoc plugin, but I get the same issues.

    I cannot find a table of version mapping, so I'm not sure if the versions do not match or the maven plugin doesn't work correctly or something else. Any help is appreciated.

    bug 
    opened by dimi-nk 15
  • fixes problem racing and late subscription in case of switchOnFirst usage

    fixes problem racing and late subscription in case of switchOnFirst usage

    NOTE. This is not the final changes. Work In Progress

    I provided this PR just to show where is the problem. Probably we need to eliminate all CountDownLatches and ensure correctness of implemented operators. From my experience, Reactive-Streams-TCK does not provide a correct test-suite for multithreading usage of Publisher. Therefore, even non-thread safe impl (which is an incorrect one) can easily pass all tests.

    In general, this draft PR fixes possible race condition and eliminate deadlock in case onSubscribe on downstream called too late (for example when the switchOnFirst operator is used) (See #136)

    cla:signed 
    opened by OlegDokuka 14
  • Non-Mono overload for a non-streamable argument

    Non-Mono overload for a non-streamable argument

    Hi!

    Currently, the code generator creates the following signature:

    public Flux<...> doSomething(Mono<SomeType> argument) {
    }
    

    And when someone calls it, they have no options but:

    stub.doSomething(Mono.just(SomeRequest.builder().build()));
    

    Would be nice to generate an overloaded method:

    public Flux<...> doSomething(SomeType argument) {
        return doSomething(Mono.just(argument));
    }
    

    I know that Mono is evaluated lazily (as any other reactive type) but sometimes it's not necessary

    enhancement 
    opened by bsideup 13
  • Code generator uses proto directory hierarchy as part of the outer class name

    Code generator uses proto directory hierarchy as part of the outer class name

    In Rx*Grpc.java files, rxgprc code generation creates wrong package paths when referring to generated message classes. Here is an example method. Notice that the qualified path to HeartbeatRequest includes slashes:

    public io.reactivex.Single<com.google.protobuf.Empty> heartbeat(io.reactivex.Single<com.example.v1.Com/example/v1/frontend.HeartbeatRequest> rxRequest) {
        return com.salesforce.rxgrpc.stub.ClientCalls.oneToOne(rxRequest, delegateStub::heartbeat);
    }
    

    the grpc equivalent:

    public void heartbeat(com.example.v1.FrontendOuterClass.HeartbeatRequest request,
        io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
      asyncUnimplementedUnaryCall(getHeartbeatMethod(), responseObserver);
    }
    

    The protocol buffers specifications looks like this:

    syntax = "proto3";
    
    package com.example.v1;
    
    import "google/protobuf/timestamp.proto";
    import "google/protobuf/empty.proto";
    
    service Frontend {
    	rpc Heartbeat(HeartbeatRequest) returns (google.protobuf.Empty);
    }
    message HeartbeatRequest {
    	google.protobuf.Timestamp timestamp = 1;
    }
    
    

    I tested the code generation in both a Java and an Android projects with java and javalite for code generation on each. Here are two example build.gradle files:

    • Java project, IntelliJ IDEA, java code generator:
    buildscript {
        repositories {
            mavenCentral()
        }
        dependencies {
            classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.3'
        }
    }
    
    
    plugins {
        id "java"
        id "com.google.protobuf" version "0.8.3"
        id "idea"
        id "application"
    }
    
    version '1.0'
    
    sourceCompatibility = 1.8
    targetCompatibility = 1.8
    
    mainClassName = "Client"
    
    repositories {
        mavenCentral()
    }
    
    def grpcVersion = '1.9.0'
    def reactiveGrpcVersion = '0.8.0'
    
    dependencies {
        compile 'com.google.protobuf:protobuf-java:3.5.1'
        compile "io.grpc:grpc-okhttp:${grpcVersion}"
        compile "io.grpc:grpc-stub:${grpcVersion}"
        compile "io.grpc:grpc-protobuf:${grpcVersion}"
        compile "com.salesforce.servicelibs:rxgrpc-stub:${reactiveGrpcVersion}"
    }
    
    jar {
        manifest {
            attributes 'Main-Class': 'Client'
        }
    }
    
    sourceSets {
        main {
            proto {
                srcDir '../kidswatch-watch-appscomm/protocol'
            }
            java {
                srcDirs 'build/generated/source/proto/main/grpc'
                srcDirs 'build/generated/source/proto/main/javalite'
            }
        }
    }
    
    protobuf {
        protoc {
            artifact = "com.google.protobuf:protoc:3.5.1-1"
        }
    
        plugins {
            grpc {
                artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
            }
            rxgrpc {
                artifact = "com.salesforce.servicelibs:rxgrpc:${reactiveGrpcVersion}:jdk8@jar"
            }
        }
    
        generateProtoTasks {
            all().each { task ->
                task.plugins {
                    grpc {}
                    rxgrpc {}
                }
            }
        }
    }
    
    • Android library module, Android Studio, javalite code generator:
    buildscript {
        ext.grpc_version = '1.9.1'
        ext.rxgrpc_version = '0.8.0'
    
        repositories {
            mavenCentral()
        }
        dependencies {
            classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.4'
        }
    }
    
    apply plugin: 'com.android.library'
    apply plugin: 'com.google.protobuf'
    
    android {
        compileSdkVersion 26
    
        defaultConfig {
            minSdkVersion 19
            targetSdkVersion 26
            versionCode 1
            versionName "1.0"
        }
    
        sourceSets {
            main {
                proto {
                    srcDir '../server-protocol-specification/protocol'
                }
            }
            debug {
                java {
                    srcDirs 'build/generated/source/proto/debug/grpc'
                    srcDirs 'build/generated/source/proto/debug/rxgrpc'
                    srcDirs 'build/generated/source/proto/debug/javalite'
                }
            }
            release {
                java {
                    srcDirs 'build/generated/source/proto/release/grpc'
                    srcDirs 'build/generated/source/proto/release/rxgrpc'
                    srcDirs 'build/generated/source/proto/release/javalite'
                }
            }
        }
    
        lintOptions {
            lintConfig file("../lint.xml")
        }
    }
    
    protobuf {
        protoc {
            artifact = 'com.google.protobuf:protoc:3.5.1-1'
        }
        plugins {
            javalite {
                // The code generator for lite comes as a separate artifact
                artifact = "com.google.protobuf:protoc-gen-javalite:3.0.0"
            }
            grpc {
                artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
            }
            rxgrpc {
                artifact = "com.salesforce.servicelibs:rxgrpc:$rxgrpc_version:jdk8@jar"
            }
            sourceSet
        }
        generateProtoTasks {
            all().each { task ->
                task.builtins {
                    // In most cases you don't need the full Java output if you use the lite output.
                    remove java
                }
                task.plugins {
                    javalite {}
                    grpc {
                        // Options added to --grpc_out
                        option 'lite'
                    }
                    rxgrpc {
                    }
                }
            }
        }
    }
    
    dependencies {
        // You need to build grpc-java to obtain these libraries below.
        api "io.grpc:grpc-okhttp:$grpc_version"
        api "io.grpc:grpc-protobuf-lite:$grpc_version"
        api "io.grpc:grpc-stub:$grpc_version"
        api 'javax.annotation:javax.annotation-api:1.2'
        api "com.salesforce.servicelibs:rxgrpc-stub:$rxgrpc_version"
    }
    
    
    bug 
    opened by p-fischer 13
  • Allow to override prefetch on client and server side

    Allow to override prefetch on client and server side

    Default Flowable behavior on Client and Server sending side is to send messages into the queue with length 512. When there is too many concurrent calls and when messages are quite big it can cause OOM. This pull requests allows to override these for such RPC calls.

    Client example:

            RxNassStub api = RxNassGrpc.newRxStub(channel)
                .withOption(ClientCalls.CALL_OPTIONS_PREFETCH, 16)
                .withOption(ClientCalls.CALL_OPTIONS_LOW_TIDE, 4);
    

    Server example:

    public class TestingService extends NassImplBase {
    
        @Override
        public int getLowTide(int methodId) {
            return 4;
        }
    
        @Override
        public int getPrefetch(int methodId) {
            return 8;
        }
    
    cla:signed 
    opened by koldat 12
  • Gradle Plugin fails when importing protobufs with underscore in filename

    Gradle Plugin fails when importing protobufs with underscore in filename

    I have the following protobuf definitions in src/main/proto:

    some_parameter.proto:

    syntax = "proto3";
    
    package my.some_parameters;
    
    message SomeParameter {
        string id = 1;
    }
    

    my_service.proto:

    syntax = "proto3";
    
    package my.some_service;
    
    import "some_parameter.proto";
    
    service MyService {
        rpc DoStuff (my.some_parameters.SomeParameter) returns (my.some_parameters.SomeParameter);
    }
    

    gradle.build:

    buildscript {
        repositories {
            mavenCentral()
        }
        dependencies {
            classpath "com.google.protobuf:protobuf-gradle-plugin:0.8.3"
        }
    }
    
    apply plugin: 'java'
    apply plugin: 'com.google.protobuf'
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        compile "com.google.protobuf:protobuf-java:3.4.0"
        compile "io.grpc:grpc-all:1.8.0"
        compile 'com.salesforce.servicelibs:reactor-grpc-stub:0.7.1'
    }
    
    protobuf {
        protoc {
            // Download from repositories
            artifact = "com.google.protobuf:protoc:3.4.0"
        }
        plugins {
            grpc {
                artifact = "io.grpc:protoc-gen-grpc-java:1.8.0"
            }
            reactor {
                artifact = "com.salesforce.servicelibs:reactor-grpc:0.7.1:jdk8@jar"
            }
        }
        generateProtoTasks {
            all().each { task ->
                task.plugins {
                    grpc {}
                    reactor {} // including this fails with:
                    // /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:28: error: package my.some_parameters.SomeParameter does not exist
                    // public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> reactorRequest) {
                    //                                                                                                                                                        ^
                    }
            }
        }
    }
    

    Gradle error message:

    gradle clean build
    :clean
    :extractIncludeProto
    :extractProto UP-TO-DATE
    :generateProto
    :compileJava
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:28: error: package my.some_parameters.SomeParameter does not exist
            public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> reactorRequest) {
                                                                                                                                                                   ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:28: error: package my.some_parameters.SomeParameter does not exist
            public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> reactorRequest) {
                                                                               ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:36: error: package my.some_parameters.SomeParameter does not exist
            public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> request) {
                                                                                                                                                                   ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:36: error: package my.some_parameters.SomeParameter does not exist
            public reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> doStuff(reactor.core.publisher.Mono<my.some_parameters.SomeParameter.SomeParameter> request) {
                                                                               ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:46: error: package my.some_parameters.SomeParameter does not exist
                                                my.some_parameters.SomeParameter.SomeParameter,
                                                                                ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:47: error: package my.some_parameters.SomeParameter does not exist
                                                my.some_parameters.SomeParameter.SomeParameter>(
                                                                                ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:73: error: package my.some_parameters.SomeParameter does not exist
                        com.salesforce.reactorgrpc.stub.ServerCalls.oneToOne((my.some_parameters.SomeParameter.SomeParameter) request,
                                                                                                              ^
    /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java:74: error: package my.some_parameters.SomeParameter does not exist
                                (io.grpc.stub.StreamObserver<my.some_parameters.SomeParameter.SomeParameter>) responseObserver,
                                                                                             ^
    Note: /Users/dsandjaja/development/projects/testing/build/generated/source/proto/main/reactor/my/some_service/ReactorMyServiceGrpc.java uses or overrides a deprecated API.
    Note: Recompile with -Xlint:deprecation for details.
    8 errors
    :compileJava FAILED
    
    FAILURE: Build failed with an exception.
    
    * What went wrong:
    Execution failed for task ':compileJava'.
    > Compilation failed; see the compiler error output for details.
    
    * Try:
    Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
    
    BUILD FAILED
    
    Total time: 1.472 secs
    

    If I rename some_parameter.proto to someparameter.proto and change the imports accordingly, compilation and Reactor class generation works.

    It also works if I add the following to some_parameter.proto:

    option java_outer_classname = "SomeParameterProto";
    

    As it works without the reactor plugin, I would consider this a bug.

    bug 
    opened by dadadom 12
  • NetworkOnMainThreadException on two way stream.

    NetworkOnMainThreadException on two way stream.

    Hello, I'm sporadically having a NetworkOnMainThreadException when pushing an action on a twoway stream. The action is pushed through a subject, code below.

    public class GrpcApiImpl implements GrpcApi {
    
        private final BehaviorProcessor<Session.Action> stringAsyncProcessor;
        private RxSessionServiceGrpc.RxSessionServiceStub stub;
    
        public GrpcApiImpl(RxSessionServiceGrpc.RxSessionServiceStub stub) {
            this.stub = stub;
            stringAsyncProcessor = BehaviorProcessor.create();
        }
    
        @Override
        public Flowable<Session.Command> listenForCommands()  {
            return stub.startSession(stringAsyncProcessor.onBackpressureBuffer());
    
        }
    
        @Override
        public void postAction(Session.Action action){
            stringAsyncProcessor.onNext(action);
        }
    }
    

    Where is the thread in which the action is fired defined?

    opened by emanueleDiVizio 11
  • Lambda expressions break Android

    Lambda expressions break Android

    Could u explain? how to add u library to Android Studio?

    All classes have generated. But after call stub I have a crash. crash Multidex is enabled. MinSdk 21. grpc

    opened by ozh-dev 11
  • Best way to retry on a reactive-grpc stream after an error

    Best way to retry on a reactive-grpc stream after an error

    Hello,

    I have a pretty simple use case: a gRPC service with a method taking a stream as argument and returning a stream.

    I would like to resubscribe, if for example the remote server goes down. On a normal stream if would just use retry (with some delay) to resubscribe on error:

    Flux<ResultType> resulfFlux = myService.myMethod(sourceStream).retry();

    However it is not possible with reactive-grpc:

    Exception in thread "grpc-default-executor-0" java.lang.NullPointerException: You cannot directly subscribe to a gRPC service multiple times concurrently. Use Flowable.share() instead.
    	at com.salesforce.reactorgrpc.stub.SubscribeOnlyOnceLifter$1.onSubscribe(SubscribeOnlyOnceLifter.java:32)
    	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onSubscribe(FluxPublishOn.java:202)
    

    (btw, this should be Flux.share() instead of Flowable.share())

    Do you have any elegant way to achieve this retry on error?

    A naive approach with recursion to resume the stream will of course result in a stackoverflow after some time:

    Flux<ResultType> getResultStream(Flux<SourceType> sourceStream){
            return myService.myMethod(sourceStream)
                    .onErrorResume(err -> Flux.defer(() -> getResultStream(sourceStream)))
                    
    }
    

    The only partially working approach I found is to chain it with thenMany to avoid a stackoverflow, but it is then not possible to subscribe to the result stream since thenMany waits for the completion of the previous stream...

    Flux<ResultType> getResultStream(Flux<SourceType> sourceStream){
            return myService.myMethod(sourceStream)
                     .onErrorResume(err -> Flux.empty())
                     .thenMany(Flux.defer(() -> getResultStream(sourceStream)))    
    }
    
    opened by matgabriel 11
  • BackpressureIntegrationTest failing

    BackpressureIntegrationTest failing

    I'm running this on macOS Sierra 10.12.6 MacBookPro, 2.5GH Intel Core 17 16GB. It passes if I set madMultipleCutoff = 1. Anything higher will fail at least one of the tests.

    opened by dturanski 11
  • UnsupportedOperationException thrown in the generated stub with Java 17

    UnsupportedOperationException thrown in the generated stub with Java 17

    Hello! I'm getting and UnsupportedOperationException when running ClientCalls.manytoMany with the stub generated with both rx3Java and rxJava. I'm using Java 17. What would be the possible cause of the problem?

    opened by ivy-kl-chan 0
  • In ManyToOne and ManyToMany, remote method is called before the client subscribe

    In ManyToOne and ManyToMany, remote method is called before the client subscribe

    Hi folks, I found a problem when I use it like below.

    stub = xxx.newReactorStub(channel);
    stub.manyToMany(Flux.range(1, 10).map(i -> Message.newBuilder().setNumber(i).build())
                    .doOnNext(System.out::println));
    

    When I did not subscribe, the client terminal printed out the data, and the server side also showed that the data was received. Is this a bug? Or are there any other considerations?

    Maybe we should add a doOnSubscribe after Flux.from(xxx) and defer delegate.apply until it is subscribed? https://github.com/salesforce/reactive-grpc/blob/4040f60842e8ddbb244049e1b1409c2e5360bcfe/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ClientCalls.java#L137-L139

    opened by JooKS-me 0
  • Possible to have non-publisher for RPC method parameter?

    Possible to have non-publisher for RPC method parameter?

    Using the example in the README, can we have this method signature

    ReactorGreeterGrpc.GreeterImplBase svc = new ReactorGreeterGrpc.GreeterImplBase() {
        @Override
        public Mono<HelloResponse> sayHello(HelloRequest request) {
            return greet("Hello", request);
        }
    };
    

    instead of

    ReactorGreeterGrpc.GreeterImplBase svc = new ReactorGreeterGrpc.GreeterImplBase() {
        @Override
        public Mono<HelloResponse> sayHello(Mono<HelloRequest> request) {
            return request.map(protoRequest -> greet("Hello", protoRequest));
        }
    };
    

    ?

    This way, we can access the request object from anywhere in the method. It will prevent us from writing an extra map function as in the example above.

    opened by deepak-auto 2
  • host_javabase is deprecated

    host_javabase is deprecated

    Since bazel 5.0 the parameter used here is deprecated:

    https://github.com/salesforce/reactive-grpc/blob/071f153e61c2f8dcd1df287b51dd71cd34a39fd3/bazel/java_reactive_grpc_library.bzl#L74

    Generating the following error:

    Traceback (most recent call last):
    	File "/home/juan/.cache/bazel/_bazel_juan/7e6e30ecb52b472e0900bbc0b820a797/external/com_salesforce_servicelibs_reactive_grpc/bazel/java_reactive_grpc_library.bzl", line 71, column 36, in _reactive_grpc_library_impl
    		java_info = java_common.compile(
    Error in compile: in call to compile(), parameter 'host_javabase' is deprecated and will be removed soon. It may be temporarily re-enabled by setting --incompatible_java_common_parameters=false
    

    The documentation suggests that it can be just dropped: image

    Do you find any risk on doing this?

    opened by juanpmarin 1
  • Add onDiscard hook support to AbstractStreamObserverAndPublisher

    Add onDiscard hook support to AbstractStreamObserverAndPublisher

    Currently, upon cancellation it just clears the queue, but its elements, for example, can be backed by reference counted objects which need to be released before being garbage collected (it's possible thanks to https://github.com/GoogleCloudPlatform/grpc-gcp-java/pull/77 ) An example of when and how discarding is done can be found in FluxFlatMap, for example, or any other operator impl with a queue

    opened by bruto1 0
Releases(v1.2.3)
  • v1.2.3(Oct 10, 2021)

  • v1.2.2(Sep 30, 2021)

  • v1.2.0(Aug 23, 2021)

    Added

    • RxJava 3 support #246 🎉 Thanks @scottslewis @AndreasLarssons

    Updated

    • Updated to gRPC 1.39.0

    Fixed

    • Java path separator for Windows. #250 Thanks @Zetten
    Source code(tar.gz)
    Source code(zip)
  • v1.1.0(Jul 6, 2021)

    ⚠️ Starting with this version, Java8 is required ⚠️

    Added

    • Add Bazel instructions #231
    • Support for proto3 optional directives
    • Reactor retries now correctly support versions >= 3.3.9 #241

    Updated

    • jProtoc updated, fixes snake case file name bug.
    • Canteen updated, adds Mac and Linux ARM support.
    • Upgraded to latest RxJava and Reactor

    Fixed

    • NullPointerException in server side during closing stream #221
    • Dependency conflicts on io.netty:netty-common, leading to inconsistent program behaviors #226
    • Shade and relocate dependencies in reactor-grpc and rx-grpc plugin #214
    Source code(tar.gz)
    Source code(zip)
  • v1.0.1(May 16, 2020)

    Added

    • Added CallOptions for controlling flow control prefetch, useful for managing memory consumption with large message payloads. Thanks @koldat

    Updated

    • Improved Reactor documentation regarding Flux mutualization operations. Thanks @mlex
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0(Aug 22, 2019)

    Added

    • 1.0.0 🎉 Feature complete
    • Gradle binaries now work for Windows
    • Bazel build files - thanks @mjduijn
    • Interoperability tests between Java, C#, and Go

    Updated

    • Overall documentation
    • Better explained gRPC Context propagation for Reactor

    Fixed

    • Improved error handling for empty Mono<T> server responses - thanks @krakowski

    Thanks so much to everybody who contributed! Special thanks to @cbornet and @OlegDokuka for basically rewriting the project, twice.

    Source code(tar.gz)
    Source code(zip)
  • v0.10.0(May 30, 2019)

    Added

    • Now compiles with JDK 11

    Updated

    • Minor doc improvements
    • Moved to CircleCI

    Fixed

    • Protobuf comments with asterisks now generate correct javadoc

    Thanks to

    • @engineerdev
    • @chbatey
    Source code(tar.gz)
    Source code(zip)
  • v0.10.0-RC1(Mar 11, 2019)

    v0.10.0 is a major rewrite of the internal reactive pipeline in Reactive-gRPC by @OlegDokuka.

    Added

    • OSGI manifest metadata @chijoungso
    • Command line debugging support for Reactive-gRPC generators
    • JMH benchmark harness

    Updated

    • Demo improvements
    • Completely rewrote internal reactive operator pipeline
    • Updated gRPC to 1.19.0
    • Updated RxJava to 2.2.7
    • Updated Reactor to 3.1.15.RELEASE

    Fixed

    • Gradle demo now compiles correctly @bkolb
    • Illegal classname characters in proto filenames now generates correctly named classes @ehiggs
    • gRPC in-process channel no longer hangs @mjduijn
    • Reactor switchOnFirst operator now works correctly @simonbasle
    • Nested proto enums with clashing names now correctly generate @AntKos
    • gRPC documentation now generates correctly closed <pre> tags
    Source code(tar.gz)
    Source code(zip)
  • v0.9.0(Jul 12, 2018)

    Added

    • GrpcRetry for transparently re-establishing streaming gRPC service calls without breaking the reactive stream.
    • RxJava GrpcContextOnScheduleHook for propagating gRPC Context objects between RxJava threads.
    • Lots of demos: Android, JavaFX, and reactive backpressure.

    Updated

    • Converted Reactor tests to use fluent APIs. Thanks @rickeyski!

    Fixed

    • Better support for old Android API versions: don't use method references in generated rxjava code.
    • Fixed undeliverable cancelled before receiving half close errors.
    • Fixed cancellation when consumers of response streams dispose their subscription.
    Source code(tar.gz)
    Source code(zip)
  • v0.8.2(Jun 1, 2018)

    Added

    • Unary request client stubs now accept simple types in addition to types wrapped in Single/Mono.

    Updated

    • Upgraded gRPC to 1.12.0

    Fixed

    • Downstream terminal events now correctly propagate upstream cancellation.
    • Unary services that complete with no response now correctly signal error.
    • Improved compliance with Reactive Streams.
    • Fixed "Queue Full?!" errors.
    Source code(tar.gz)
    Source code(zip)
  • v0.8.1(Mar 16, 2018)

    Added

    • Generated client stubs now extend AbstractStub<T>, thereby gaining its powers

    Updated

    • More concise example for stream resumption by @matgabriel
    • Improved context propagation documentation by @alexnederlof
    • Documentation and examples now use compose() and as() for fluent functional chaining
    • Upgrade RxJava to 2.1.10
    • Upgrade Reactor to 3.1.5.RELEASE
    • Upgrade reactive-streams-tck to 1.0.2
    • Upgrade Kotlin to 1.2.30

    Fixed

    • Correctly generate stubs for .proto files in nested directories
    • Correctly generate stubs for .proto RPC operations with underscores in the name
    • Finally fixed spec317 TCK tests - TCK 💯
    • Stubs now correctly inherit their gRPC channel executor

    Known Issues

    • Cancellation-related issues - #61 and #68
    Source code(tar.gz)
    Source code(zip)
  • 0.8.0(Feb 2, 2018)

    Added

    • Android support for RxGrpc 🤖
      • Reactive-gRPC shared libraries compiled against Java6
      • RxGrpc stubs generate Java6 compatible code
      • Spring Reactor is not officially Android compatible, so Reactor-gRPC is not officially Android compatible
    • Demos for automatic stream reestablishment in the face of a server error

    Updated

    • Upgraded to gRPC 1.9.0

    Fixed

    • Client stubs would refuse to gracefully shutdown under rare circumstances involving server initiated stream cancellation
    • Deprecation warnings when generating stubs against gRPC 1.9.0
    Source code(tar.gz)
    Source code(zip)
  • 0.7.3(Dec 20, 2017)

  • 0.7.2(Dec 6, 2017)

  • 0.7.1(Nov 30, 2017)

  • 0.7.0(Nov 27, 2017)

    • Support for Spring Reactor, thanks to @cbornet.
    • Generated *Impl service base classes are now stand-alone. They no longer derive from gRPC's generated base classes.
    Source code(tar.gz)
    Source code(zip)
Owner
Salesforce
A variety of vendor agnostic projects which power Salesforce
Salesforce
SCG used as as proxy to connect gRPC-Web and back end gRPC services

gRPC-Web Spring Cloud Gateway Spring Cloud Gateway 3.1.1 supports for gRPC and HTTP/2. It is possible to use Spring Cloud Gateway to connect gRPC-Web

null 1 Apr 4, 2022
LINE 4.1k Dec 31, 2022
The Java gRPC implementation. HTTP/2 based RPC

gRPC-Java - An RPC library and framework gRPC-Java works with JDK 7. gRPC-Java clients are supported on Android API levels 16 and up (Jelly Bean and l

grpc 10.2k Jan 1, 2023
Book Finder application is a client-server application (gRPC) for educational purposes.

Book-Finder Book Finder application is a client-server application (gRPC) for educational purposes. Instalation These projects (Client/Server) are Mav

Mihai-Lucian Rîtan 21 Oct 27, 2022
gRPC Facade for Transactional Keyvalue Stores

lionrock An implementation agnostic client/server communication protocol (using protobuf and grpc) inspired heavily by FoundationDB (https://github.co

Clement Pang 23 Dec 8, 2022
Akka gRPC - Support for building streaming gRPC servers and clients on top of Akka Streams.

akka-grpc Support for building streaming gRPC servers and clients on top of Akka Streams. This library is meant to be used as a building block in proj

Akka Project 420 Dec 29, 2022
SCG used as as proxy to connect gRPC-Web and back end gRPC services

gRPC-Web Spring Cloud Gateway Spring Cloud Gateway 3.1.1 supports for gRPC and HTTP/2. It is possible to use Spring Cloud Gateway to connect gRPC-Web

null 1 Apr 4, 2022
OpenAPI Generator allows generation of API client libraries (SDK generation), server stubs, documentation and configuration automatically given an OpenAPI Spec (v2, v3)

OpenAPI Generator Master (5.4.x): 6.0.x (6.0.x): ⭐ ⭐ ⭐ If you would like to contribute, please refer to guidelines and a list of open tasks. ⭐ ⭐ ⭐ ‼️

OpenAPI Tools 14.8k Dec 30, 2022
LINE 4.1k Jan 2, 2023
LINE 4.1k Dec 31, 2022
Reactive Streams Utilities - Future standard utilities library for Reactive Streams.

Reactive Streams Utilities This is an exploration of what a utilities library for Reactive Streams in the JDK might look like. Glossary: A short gloss

Lightbend 61 May 27, 2021
SpringBoot show case application for reactive-pulsar library (Reactive Streams adapter for Apache Pulsar Java Client)

Reactive Pulsar Client show case application Prerequisites Cloning reactive-pulsar Running this application requires cloning https://github.com/lhotar

Lari Hotari 9 Nov 10, 2022
gRPC and protocol buffers for Android, Kotlin, and Java.

Wire “A man got to have a code!” - Omar Little See the project website for documentation and APIs. As our teams and programs grow, the variety and vol

Square 3.9k Dec 23, 2022
The Java gRPC implementation. HTTP/2 based RPC

gRPC-Java - An RPC library and framework gRPC-Java works with JDK 7. gRPC-Java clients are supported on Android API levels 16 and up (Jelly Bean and l

grpc 10.2k Jan 1, 2023
gRPC and protocol buffers for Android, Kotlin, and Java.

Wire “A man got to have a code!” - Omar Little See the project website for documentation and APIs. As our teams and programs grow, the variety and vol

Square 3.9k Jan 5, 2023
Spring Boot starter module for gRPC framework.

Spring Boot starter module for gRPC framework.

Michael Zhang 2.8k Jan 4, 2023
Spring Boot starter module for gRPC framework.

Spring Boot starter module for gRPC framework.

Michael Zhang 1.8k Mar 17, 2021
Book Finder application is a client-server application (gRPC) for educational purposes.

Book-Finder Book Finder application is a client-server application (gRPC) for educational purposes. Instalation These projects (Client/Server) are Mav

Mihai-Lucian Rîtan 21 Oct 27, 2022