RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron

Overview

RSocket

Join the chat at https://gitter.im/RSocket/RSocket-Java

RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.

It enables the following symmetric interaction models via async message passing over a single connection:

  • request/response (stream of 1)
  • request/stream (finite stream of many)
  • fire-and-forget (no response)
  • event subscription (infinite stream of many)

Learn more at http://rsocket.io

Build and Binaries

Build Status

⚠️ The master branch is now dedicated to development of the 1.1.x line.

Releases and milestones are available via Maven Central.

Example:

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }  // Reactor milestones (if needed)
}
dependencies {
    implementation 'io.rsocket:rsocket-core:1.1.1'
    implementation 'io.rsocket:rsocket-transport-netty:1.1.1'
}

Snapshots are available via oss.jfrog.org (OJO).

Example:

repositories {
    maven { url 'https://maven.pkg.github.com/rsocket/rsocket-java' }
    maven { url 'https://repo.spring.io/snapshot' }  // Reactor snapshots (if needed)
}
dependencies {
    implementation 'io.rsocket:rsocket-core:1.1.2-SNAPSHOT'
    implementation 'io.rsocket:rsocket-transport-netty:1.1.2-SNAPSHOT'
}

Development

Install the google-java-format in Intellij, from Plugins preferences. Enable under Preferences -> Other Settings -> google-java-format Settings

Format automatically with

$./gradlew goJF

Debugging

Frames can be printed out to help debugging. Set the logger io.rsocket.FrameLogger to debug to print the frames.

Requirements

Trivial Client

s = clientRSocket.requestStream(DefaultPayload.create("peace")); s.take(10).doOnNext(p -> System.out.println(p.getDataUtf8())).blockLast(); } finally { clientRSocket.dispose(); } } }">
package io.rsocket.transport.netty;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;

import java.net.URI;

public class ExampleClient {
    public static void main(String[] args) {
        WebsocketClientTransport ws = WebsocketClientTransport.create(URI.create("ws://rsocket-demo.herokuapp.com/ws"));
        RSocket clientRSocket = RSocketConnector.connectWith(ws).block();

        try {
            Flux<Payload> s = clientRSocket.requestStream(DefaultPayload.create("peace"));

            s.take(10).doOnNext(p -> System.out.println(p.getDataUtf8())).blockLast();
        } finally {
            clientRSocket.dispose();
        }
    }
}

Zero Copy

By default to make RSocket easier to use it copies the incoming Payload. Copying the payload comes at cost to performance and latency. If you want to use zero copy you must disable this. To disable copying you must include a payloadDecoder argument in your RSocketFactory. This will let you manage the Payload without copying the data from the underlying transport. You must free the Payload when you are done with them or you will get a memory leak. Used correctly this will reduce latency and increase performance.

Example Server setup

RSocketServer.create(new PingHandler())
        // Enable Zero Copy
        .payloadDecoder(PayloadDecoder.ZERO_COPY)
        .bind(TcpServerTransport.create(7878))
        .block()
        .onClose()
        .block();

Example Client setup

RSocket clientRSocket =
        RSocketConnector.create()
            // Enable Zero Copy
            .payloadDecoder(PayloadDecoder.ZERO_COPY)
            .connect(TcpClientTransport.create(7878))
            .block();

Bugs and Feedback

For bugs, questions and discussions please use the Github Issues.

LICENSE

Copyright 2015-2020 the original author or authors.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Comments
  • reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

    reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

    Using RSocket REQUEST_STREAM to implement a query where results are streamed back to the client. The client sends REQUEST_N frame 512. It is successfully receiving results. Once the client successfully receives >256 results, it requests an additional 256. The communication works fine. FrameLogger shows expected receiving and sending frames. After some time the following exception is thrown on the event of additional REQUEST_N on client;

    2020-11-18 13:27:47.944 DEBUG 1 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
    Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 296
    Data:
    ......
    
    020-11-18 13:27:47.945 DEBUG 1 --- [pool-5-thread-5] io.rsocket.FrameLogger                   : sending -> 
    Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 256
    Data:
    
    2020-11-18 13:27:47.956 ERROR 1 --- [tor-tcp-epoll-2]  : 
    
    reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
    	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Assembly trace from producer [reactor.core.publisher.FluxSourceFuseable] :
    	reactor.core.publisher.Flux.from(Flux.java:957)
    	io.rsocket.core.RSocketRequester.handleRequestStream(RSocketRequester.java:338)
    Error has been observed at the following site(s):
    	|_        Flux.from ? at io.rsocket.core.RSocketRequester.handleRequestStream(RSocketRequester.java:338)
    	|_ Flux.subscribeOn ? at io.rsocket.core.RSocketRequester.handleRequestStream(RSocketRequester.java:393)
    

    This happens as part of a large application. Threfore I do not have a standalone replication example at the moment.

    • RSocket version(s) used: 1.0.3
    • Other relevant libraries versions: spring-boot: 2.3.5, netty 4.1.53
    • Platform: OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.8+10, mixed mode)
    bug superseded 
    opened by lkolisko 34
  • Handling network errors in ClientServerInputMultiplexer

    Handling network errors in ClientServerInputMultiplexer

    We get these network failures logged as errors and they are not "errors" in some cases... If the other side stops, which the client still tries to communicate with it, you will get such errors. It would be great if there would be some callback support here and there, where clients can decide what is worth logging as an error.

    2020-05-04 09:49:39,725 [RSocket-epoll-2] ERROR io.rsocket.FrameLogger - Error receiving frame: 
    io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
    
    question 
    opened by adrian-tarau 30
  • Server dies on keep-alive ack timeout

    Server dies on keep-alive ack timeout

    The rsocket server dies on keep-alive ack timeout. I've tried adding onErrorResume, but to no avail. How can I prevent the server from closing its socket on error?

    Error

    [2019-05-20 11:12:21.438] ERROR [parallel-1] RegistryRSocketServer: Error occurred during session
    io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
    at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
    at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
    at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
    at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
    at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    
    io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
    at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)20 May 2019  at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
    at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
    at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
    at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    

    Version: 0.12.2-RC2

    Server Code

    server = RSocketFactory
            .receive()
            .frameDecoder(ZERO_COPY)
            .addConnectionPlugin(micrometerDuplexConnectionInterceptor)
            .errorConsumer(e -> log.error("Error occurred during session", e))
            .acceptor(socketAcceptor)
            .transport(serverTransport)
            .start()
            .onErrorResume(e -> Mono.empty())
            .subscribe();
    
    Acceptor
    @Override
      public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        return Mono.just(new RegistryRSocket(scheduler));
      }
    
    requestStream
    return Mono.just(payload)
            .map(this::getRequestFromPayload)
            .flux()
           .map(/*..Does Something..*/)
            .onErrorResume(throwable ->
                Flux.just(createPayloadFromThrowable(throwable)));
    
    private Payload createPayloadFromThrowable(Throwable t) {
        return ByteBufPayload.create(ErrorFrameFlyweight.encode(DEFAULT, 0, t));
      }
    

    Any help would be greatly appreciated

    opened by lksvenoy-r7 27
  • Detect closed server connection and automatically reconnect.

    Detect closed server connection and automatically reconnect.

    I have a simple setup between two services that I've built from provided examples. I have one service that listens for incoming connections:

            TcpServerTransport tcp = TcpServerTransport.create(configProps.getPort());
            RSocketFactory.receive()
                    .acceptor(itemSocketAcceptor)
                    .transport(tcp)
                    .start().log()
                    .subscribe();
    

    Another service attempts to make a connection:

            this.socket = RSocketFactory
                    .connect()
                    .transport(TcpClientTransport.create(<host>, <port>)
                    .start()
                    .block();
    

    If I redeploy the receiving service, the client service never reports an error (unless I set a timeout on the requestResponse method). I can't seem to find a way for the client side to detect that the underlying connection has closed and to automatically try to reconnect. To boot, these are web applications in Spring Boot, so I'm never able to attempt to call block() during a web request.

    The onClose method seems to be an indicator that the connection has closed, but I'm not sure how to use that in the context of a web request to be able to attempt to rebuild the connection which necessarily requires me to call block.

    Not sure if this is an actual issue or if I've simply overlooked some documentation or examples. I'm anxious to adopt rsocket in my company, so any direction would be greatly appreciated.

    opened by joshfix 27
  • Spring Boot 2.3.0.RC1 - RSocket over Websocket transport - requestStream - produces additional request(255)

    Spring Boot 2.3.0.RC1 - RSocket over Websocket transport - requestStream - produces additional request(255)

    Had created the issue at Spring Boot, but may be this should be posted here: https://github.com/spring-projects/spring-boot/issues/21360

    With latest 2.3.0.RC1 RSocket "requestStream" started producing additional request to Flux inside controller with quantity 255.

    Configuration to reproduce the issue:

    Spring Boot 2.3.0.RC1 RSocket plugged to server Transport - Websockets Using @MessageMapping controller RSocket connection with Composite Metadata ('message/x.rsocket.composite-metadata.v0'), Binary MimeType ('application/octet-stream') and Buffer encoders Tested with javascript RSocket client v0.0.19.

    Previous version of Spring Boot 2.3.0.M4 hasn't such issue.

    bug 
    opened by maxim-bandurko-lsvt 22
  • publish to maven central

    publish to maven central

    I see some releases going into jcenter, https://jcenter.bintray.com/io/rsocket/rsocket-core/

    Do you plan to publish them to maven central too? There are only some old releases with groupId io.reactivesocket there. But they do not work for me.

    opened by tsachev 20
  • Improving `ReactiveSocket` abstraction.

    Improving `ReactiveSocket` abstraction.

    Today we have the following primary interfaces that application writers interact with:

    ReactiveSocket

    Primary abstraction for the multiplexed, full duplex connection following the protocol.

    ConnectionSetupHandler

    Used only at the server when a new connection is accepted.

    RequestHandler

    Used only at the server to handle various interaction models requested by the client.

    Problem

    There are a few issues/lack of feature with the above abstraction and current implementations:

    • Client has no way of accepting requests from the server (this isn't currently implemented and I can imagine using ConnectionSetupHandler and RequestHandler for this purpose)
    • RequestHandler and ReactiveSocket have very similar methods (primarily around various request interaction models) and hence is confusing as to why they are two different contracts.
    • Client does not have a way to accept leases from the server. (this is also a lack of feature today but not impossible with the current abstractions)

    Proposal

    We can replace the 3 interfaces with two interfaces

    ReactiveSocket

    Keep the current interface but use it both as a way to request and respond to requests on a ReactiveSocket. This means for every connection there will be two ReactiveSocket instances on each end.

    ReactiveSocketFactory (May be a better name)

    On either end (client/server) there will be a factory which may look something like the below.

    public interface ReactiveSocketFactory {
    
        /**
         * {@code ReactiveSocket} is a symmetric protocol where everything that a client can do can be done on the server
         * and vice-versa. So, every transport connection is associated with two {@code ReactiveSocket} instances, one for
         * responding to requests from the peer and another for sending requests to the peer.
         * This method is the converter that accepts a remote {@code ReactiveSocket} which is used to send requests to the
         * peer and returns a local {@code ReactiveSocket} that responds to requests from the peer.
         * 
         * @param requestingSocket Socket used to send request to the peer.
         * 
         * @return {@code ReactiveSocket} that is used to accept and respond to request from the peer.
         */
        ReactiveSocket accept(ReactiveSocket requestingSocket);
    }
    

    I am not yet sure, but this may look different at client than at the server due to the nature of replying to or sending a ConnectionSetupFrame. Anyways, this will be a replacement for ConnectionSetupHandler and either generify or provide for client side usecases.

    This change will simplify the API a bit and also provide symmetric APIs for both client and server.

    IncompatibleApiChange 0.5.x 
    opened by NiteshKant 20
  • fixes performance degradation when fragmentation is used

    fixes performance degradation when fragmentation is used

    When one define custom mtu to be used for fragment size it significantly degrades performance. Attached example code sends 1M records with size 5 bytes. Before fix it takes 39 seconds. After fix it takes 5 seconds (same time as with no custom fragmentation). We need to enable it, because Websocket max data size is 64kB and we support both transports.

    See #994

    enhancement 
    opened by koldat 17
  • Load balancer throwing several exceptions when refreshing available sockets

    Load balancer throwing several exceptions when refreshing available sockets

    I wanted to use the LoadBalancedRSocketMono in combination with springs ReactiveDiscoveryClient, but this doesn't work well at present.

    The behaviour can be simulated by added the following to LoadBalancedRSocketMonoTest:

    @Test
    public void refreshSocketsTest() {
        LoadBalancedRSocketMono refreshingLoadBalancedRSocketMono = Flux
                .interval(Duration.ZERO, Duration.ofMillis(10))
                .doOnEach(System.out::println)
                .flatMap(i -> getSuppliers().collectList())
                .as(LoadBalancedRSocketMono::create);
    
        Flux
                .range(0, 1000)
                .flatMap(i -> refreshingLoadBalancedRSocketMono)
                .delayUntil(rSocket -> Flux
                        .interval(Duration.ZERO, Duration.ofMillis(1))
                        .filter(i -> rSocket.availability() > 0.0)
                        .next()
                )
                .flatMap(rSocket -> rSocket.requestResponse(
                        DefaultPayload.create("Hello from refreshing load balanced RSocket"))
                )
                .doOnNext(next -> System.out.println(Objects.requireNonNull(next.getDataUtf8())))
                .blockLast();
    }
    
    private Flux<RSocketSupplier> getSuppliers() {
        return Flux
                .fromIterable(Arrays.asList(1, 2, 3))
                .map(serviceId -> new RSocketSupplier(() -> Mono
                        .delay(Duration.ofSeconds(1))
                        .as(longMono -> Mono.just(new TestingRSocket(a ->
                                DefaultPayload.create(a.getDataUtf8() + " number " + serviceId)
                        )))
                ));
    }
    

    This test basically works, however from time to time it throws two different exceptions:

    Index 2 out of bounds for length 2 java.lang.IndexOutOfBoundsException: Index 2 out of bounds for length 2 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) at java.base/java.util.Objects.checkIndex(Objects.java:372) at java.base/java.util.ArrayList.get(ArrayList.java:458) at io.rsocket.client.LoadBalancedRSocketMono.select(LoadBalancedRSocketMono.java:399) at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56) at io.rsocket.client.LoadBalancedRSocketMono$2.subscribe(LoadBalancedRSocketMono.java:231) at reactor.core.publisher.Mono.subscribe(Mono.java:4210) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:418) at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:8264) at reactor.core.publisher.Flux.blockLast(Flux.java:2484) at io.rsocket.client.LoadBalancedRSocketMonoTest.refreshSocketsTest(LoadBalancedRSocketMonoTest.java:124) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:40) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:71) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.stop(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:132) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) at java.base/java.lang.Thread.run(Thread.java:834) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) at reactor.core.publisher.Flux.blockLast(Flux.java:2485) ... 67 more

    java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996) at io.rsocket.client.LoadBalancedRSocketMono.refreshAperture(LoadBalancedRSocketMono.java:295) at io.rsocket.client.LoadBalancedRSocketMono.refreshSockets(LoadBalancedRSocketMono.java:241) at io.rsocket.client.LoadBalancedRSocketMono.select(LoadBalancedRSocketMono.java:377) at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:56) at io.rsocket.client.LoadBalancedRSocketMono$2.subscribe(LoadBalancedRSocketMono.java:231) at reactor.core.publisher.Mono.subscribe(Mono.java:4210) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:418) at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.Flux.subscribe(Flux.java:8264) at reactor.core.publisher.Flux.blockLast(Flux.java:2484) at io.rsocket.client.LoadBalancedRSocketMonoTest.refreshSocketsTest(LoadBalancedRSocketMonoTest.java:121) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:40) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:71) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220) at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188) at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79) at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.stop(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:132) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56) at java.base/java.lang.Thread.run(Thread.java:834) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99) at reactor.core.publisher.Flux.blockLast(Flux.java:2485) ... 67 more

    bug superseded 
    opened by PMacho 17
  • Payload reclaim code assumes synchronous onNext() dispatch which is not guaranteed by RS specification

    Payload reclaim code assumes synchronous onNext() dispatch which is not guaranteed by RS specification

    I have been playing around with the latest code from the 0.5.x branch. I started with the following code from the examples subproject:

    Flowable.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
      .doOnNext(toConsumer(x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}")))
      .blockingFirst()
    

    which prints out the response as expected. However when trying to connect the Publisher that is returned from the requestResponse to Akka Streams with the following code:

    Source.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
      .map { x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}"); x }
      .runWith(Sink.head)
    

    I get an exception:

    io.netty.util.IllegalReferenceCountException: refCnt: 0
            at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
            at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
            at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:415)
            at io.netty.buffer.PooledSlicedByteBuf.getInt(PooledSlicedByteBuf.java:188)
            at io.reactivesocket.transport.tcp.MutableDirectByteBuf.getInt(MutableDirectByteBuf.java:284)
            at io.reactivesocket.frame.FrameHeaderFlyweight.frameLength(FrameHeaderFlyweight.java:229)
            at io.reactivesocket.frame.FrameHeaderFlyweight.dataLength(FrameHeaderFlyweight.java:251)
            at io.reactivesocket.frame.FrameHeaderFlyweight.sliceFrameData(FrameHeaderFlyweight.java:202)
            at io.reactivesocket.Frame.getData(Frame.java:98)
            at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
            at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
            at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
            at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
            at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
            at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
            at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
            at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:410)
            at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
            at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
            at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
            at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
            at akka.actor.ActorCell.invoke(ActorCell.scala:495)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
            at akka.dispatch.Mailbox.run(Mailbox.scala:224)
            at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
            at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
            at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
            at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
            at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    

    I there a race condition between something a Publisher from reactivesocket expects and my call to getData?

    P.S. currently I am publishing locally latest code of the 0.5.x branch. Would it be possible to get a milestone release, so it is easier to share and run these experiments?

    opened by 2m 17
  • SocketAcceptorInterceptor receive an unexpected payload object rather than receive a setup payload object

    SocketAcceptorInterceptor receive an unexpected payload object rather than receive a setup payload object

    We create a client and server with reconnection feature. The server register a interceptor to receive the setup payload and then deserialized for authentication.

    Client have another daemon thread called Thread A and send data by rsocket periodically. The sended payload data we call it payload-A.

    After server is stopped and we can see the priodically sended data is failed as what we expected. After the server is started again, client begin to reconnect to the server. We can see the socket acceptor receive the setup payload data, but it is not the setup data we generated by method getAuthInfo() method. The server sidecar acceptor receive an unexpected normal payload data which is very strange.

    Related server and client code:

    // client 
            requesterMono = rsocketRequesterBuilder.setupRoute(SETUP_ROUTE)
                .setupData(new ObjectMapper().writeValueAsString(getAuthInfo()))
                .dataMimeType(MimeTypeUtils.parseMimeType(WellKnownMimeType.APPLICATION_JSON.getString()))
                .rsocketConnector(connector -> connector.acceptor(responder).fragment(MTU_BYTE_SIZE)
                    .keepAlive(Duration.ofSeconds(KEEP_ALIVE_INTERVAL_SEC), Duration.ofSeconds(KEEP_ALIVE_MAX_TIME_SEC))
                    .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(RSOCKET_RETRY_INTERVAL_SECONDS))
                        .doAfterRetry(retrySignal -> log.warn("RSocket client is reconnecting to get the newest connection...."))))
                .connect(TcpClientTransport.create(TcpClient.create()
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TCP_CONN_TIMEOUT)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .host(consoleDomain)
                    .port(SERVER_PORT)
                    .secure(ssl -> ssl.sslContext(sslContext))));
    
    // register a socket acceptor interceptor to receive the authentication setup payload object in the server side
        protected SocketAcceptorInterceptor genAcceptorAuthInterceptor() {
            SocketAcceptorInterceptor interceptor = acceptor -> (setup, sendingSocket) -> {
                try {
                    log.info("receive an task connection try to init.");
                    LocalTaskAuthInfo authInfo = new ObjectMapper().readValue(setup.getDataUtf8(), LocalTaskAuthInfo.class);
    
                  //ignore some non-related code...
                    return acceptor.accept(setup, sendingSocket);
                } catch (Exception e) {
                    String errMsg = "try to accept a connection error.msg:" + ExceptionUtils.getRootCauseMessage(e);
                    log.warn(errMsg, e);
                    exceptionService.saveException(new SidecarRuntimeException(e));
                    return Mono.error(new RSocketAuthException(errMsg));
                }
            };
            return interceptor;
        }
    
    
    
    //server
     RSocketServer.create(routeHandler.responder()).fragment(RSocketConnConstans.MTU_BYTE_SIZE)
                .interceptors(registry -> registry.forSocketAcceptor(interceptor))
                .bind(TcpServerTransport.create(TcpServer.create()
                    .option(EpollChannelOption.SO_KEEPALIVE, true)
                    .option(EpollChannelOption.TCP_NODELAY, true)
                    .secure(ssl -> ssl.sslContext(sslContext))
                    .port(Integer.valueOf(consoleConfig.getRsocketConsolePort()))
                    .doOnConnection(
                        t -> log.info("New sidecar CONNECTED. Pay attention that this is maybe a resumed connection..."))))
                .subscribe();
    

    Expected Behavior

    Receive setup payload in the socket acceptor

    Actual Behavior

    Receive a normal payload data.

    Steps to Reproduce

    Unfortuncately that we can't reproduce it stablely. The issue occur sometimes after server is restarted. Usually after the server is restarted everything goes well.

    Possible Solution

    May be this is a parallel bug. It occur now and then. I know this issue may ba a little difficult. Is there any suggestion that I can take to set the breakpoint. This issue's key point is to find out why the normal paryload data replace the setup payload data and then cause the deserialization problem.

    Your Environment

    • RSocket version(s) used: rsocket-core-1.0.2 and springboot-starter-rsocket 2.3.4.RELEASE
    • Platform (eg. JVM version (javar -version) or Node version (node --version)): OpenJDK 1.8
    • OS and version (eg uname -a): Mac OS 10.15.4
    bug superseded 
    opened by KaimingWan 16
  • #1077 type safe discard element consumer

    #1077 type safe discard element consumer

    Discard element consumer is accepting other elements than ReferenceCounted

    Motivation:

    When I use DefaultRSocketClient in logs files I see warning log entries like:

    ] WARN  reactor.core.publisher.Operators        : Error in discard hook
    java.lang.ClassCastException: class org.springframework.core.io.buffer.NettyDataBuffer cannot be cast to class io.netty.util.ReferenceCounted (org.springframework.core.io.buffer.NettyDataBuffer and io.netty.util.ReferenceCounted are in unnamed module of loader 'app')
    

    Modifications:

    Make discard consumer accept other types than ReferenceCounted parameter and avoid having warnings in logs

    Result:

    The warning log entries are not created by DefaultRSocketClient.

    opened by luczus 0
  • SendUtils onDiscard handler class cast exception

    SendUtils onDiscard handler class cast exception

    I'm using RSocket Java with Spring Webflux and RSocket JS in the front-end, using REQUEST_STREAM with an infinite Kafka source. The returned Flux receives quite a lot of small messages and does quite a bit of filtering and buffering. The relevant code in SendUtils does an unchecked cast of each discarded object to a ReferenceCounted which seems to throw a class cast exception that is caught and ignored.
    The problem is that due to the sheer amount of objects being discarded (due to filtering and discarded buffers on disconnect), this uses up a substantial amount of CPU resources looking at the profiler.

    profiler

    The returned Fluxes are scheduled on Schedulers.boundedElastic(), and for some reason entire threads are started up just for this exception: flamegraph

    Expected Behavior

    Not use up so much CPU resources

    Actual Behavior

    Uses a lot of CPU resources

    Steps to Reproduce

    Possible Solution

    Not exactly sure, but perhaps an instanceof check:

    if (data instanceof ReferenceCounted) ((ReferenceCounted) data).release();
    

    Your Environment

    • RSocket version(s) used: rsocket-core:1.1.3, rsocket-transport-netty:1.1.3
    • Other relevant libraries versions (eg. netty, ...): Spring Webflux 5.3.23
    • Platform (eg. JVM version (javar -version) or Node version (node --version)): 17
    • OS and version (eg uname -a): Happens on all tested platforms (Debian, Mac)
    enhancement 
    opened by Svenskunganka 4
  • Ability to store session object inside RSocketRequester chain

    Ability to store session object inside RSocketRequester chain

    Hello.

    Would be very nice to add to somewhere inside RSocketRequester the AtomicReference<Object> type field that will cache the session object for reuse, especially it will be very convenient to obtain value from it inside MessageMappings at Spring Boot.

    I am coming from this: #831 and had implemented a basic wrapper around WebsocketDuplexConnection as to cache this value. Not a big deal, it is just needed to use reflection per each ws route as to get that session object, but would be nice, if that it can have native public method not so chained to obtain it.

    Thank you.

    opened by maxim-bandurko-lsvt 1
  • CompositeByteBuf memory allocation for Frame messages

    CompositeByteBuf memory allocation for Frame messages

    Motivation

    I have been evaluating the RSocket and did some profiling. It looks like creating Frame message many ByteBuf buffers are used (i.e. header, payload, frame size). Those buffers are composed using CompositeByteBuf. The ByteBuf can be reused, but the CompositeByteBuf are always created as new ones. Such CompositeByteBuf allocation doesn't take much memory, but sending many Frame messages it sums up. Here is an example memory allocation using async-profiler:

    CleanShot 2022-09-05 at 15 39 01@2x

    Also when Netty sends the Frame messages it needs to duplicate internal DirectByteBuffer objects because of CompositeByteBuf usage:

    CleanShot 2022-09-05 at 16 31 51@2x

    This wouldn't be the case if only one ByteBuf were used instead. As far as I understand, it is done in order to avoid copying buffers. Nevertheless maybe there is a way of improving it?

    Considered alternatives

    Alternatives would be:

    • avoid using CompositeByteBuf and just use one ByteBuf per Frame message.
    • provide a way of customize the creation of Frame messages. Current implementation is using static SendUtils.sendReleasingPayload() method and there is no way of replacing it.

    Additional context

    I'm using rsocket-java version 1.1.2 with TCP transport.

    opened by agusevas 1
  • Ability to limit number of open requests at transport level

    Ability to limit number of open requests at transport level

    Lease frames allow a responder to control the total number of requests from the other end, but it is an optional feature, and there should also be a way to limit the total number of requests at the transport level.

    For the implementation, subscribers created by RSocketResponder will need a callback on DuplexConnection#sendFrame to notify them when those frames have been sent. This will allow them to keep track of the number of open requests. It will also enable keeping track of and limiting the overall number of messages buffered per stream.

    opened by rstoyanchev 0
Releases(1.1.3)
  • 1.1.3(Sep 14, 2022)

    What's Changed

    • improves BaseDuplexConnection and fixes PingClient impl by @OlegDokuka in https://github.com/rsocket/rsocket-java/pull/1062
    • ensures SetupFrame is available for future use by @OlegDokuka in https://github.com/rsocket/rsocket-java/pull/1046
    • adds message counting to protect against malicious overflow by @OlegDokuka in https://github.com/rsocket/rsocket-java/pull/1067
    • Adds reflection hints for native-image support 1.1.x by @violetagg in https://github.com/rsocket/rsocket-java/pull/1073
    • introduces onClose listener for RSocketClient and connect method by @OlegDokuka in https://github.com/rsocket/rsocket-java/pull/1063

    Full Changelog: https://github.com/rsocket/rsocket-java/compare/1.1.2...1.1.3

    Source code(tar.gz)
    Source code(zip)
  • 1.1.2(Aug 17, 2022)

    What's Changed

    • fixes typo in LeaseSpec initialization https://github.com/rsocket/rsocket-java/pull/1024
    • adds first frame handling timeout https://github.com/rsocket/rsocket-java/pull/1027
    • bumps libs versions and provides a few UnboundedProcessor fixes https://github.com/rsocket/rsocket-java/pull/1028
    • eliminate boxing in RequesterResponderSupport when using IntObjectMap https://github.com/rsocket/rsocket-java/pull/1029
    • adds tests for WeightedLoadbalanceStrategy https://github.com/rsocket/rsocket-java/pull/1035
    • migrates from deprecated API, updates dependencies https://github.com/rsocket/rsocket-java/pull/1042
    • adds routing example with TaggingMetadata and CompositeMetadata https://github.com/rsocket/rsocket-java/pull/1021
    • fixes block() in MetadataPushRequesterMono/FnfRequesterMono https://github.com/rsocket/rsocket-java/pull/1044

    New Contributors

    • @arodionov made their first contribution in https://github.com/rsocket/rsocket-java/pull/1024
    • @olme04 made their first contribution in https://github.com/rsocket/rsocket-java/pull/1029

    Full Changelog: https://github.com/rsocket/rsocket-java/compare/1.1.1...1.1.2

    Source code(tar.gz)
    Source code(zip)
  • 1.1.1(Jun 9, 2021)

    :star: New Features

    • Improve Lease API #877
    • Refactor InMemoryResumableFramesStore and improve test coverage #1014
    • JCStress tests for the Requester and Responder operators #999
    • Add per-stream MimeType extension codecs #998
    • "Automatic-Module-Name" for each module #1007

    :beetle: Bug Fixes

    • Remove failed loadbalance targets from the active list #982
    • Local and remote resume state disagreement #973
    • Fix memory Leak related to keepAlive and resume, unreleased buffers #1009
    • Fix memory leak and add test for live lock on queue.poll() #989
    • Add null-safe iteration of active streams #1004
    • NullPointerException on connection disposal #914

    :notebook_with_decorative_cover: Documentation

    • Update Javadoc for load-balancing #1000

    :hammer: Other

    • Fully migrate tests to JUnit 5 #1016
    • Replace use of deprecated MonoProcessor API #1003
    • Replace use of deprecated Processor APIs #957
    • Fix LoadbalanceTest issues #983
    • Updates sample code in RSocketConnector Javadoc #977
    • Upgrade to Reactor 2020.0.4 #981

    :heart: Contributors

    We'd like to thank all the contributors who worked on this release!

    Source code(tar.gz)
    Source code(zip)
  • 1.0.5(Jun 9, 2021)

  • 1.0.4(Mar 4, 2021)

    :star: New Features

    • Fix performance degradation when fragmentation is used #995

    :beetle: Bug Fixes

    • Fix deadlock on multiconsumer clear/poll in UnboundedProcessor #990
    • Fix OverflowException if UnicastProcessor request and onNext are in a race #985
    • Fix RequestOperator to subscribe to the source at later phase #963
    • Ensure DeafultPayload#create methods make a copy of ByteBuf content #993
    • Ensure Subscriber is removed from sendingSubscriptions #962

    :hammer: Other

    • Upgrade to Reactor Dysprosium-SR17 and Netty 4.1.59 #980

    :heart: Contributors

    We'd like to thank all the contributors who worked on this release!

    Source code(tar.gz)
    Source code(zip)
  • 1.1.0(Oct 27, 2020)

    :star: New Features

    • Provide request intercepting api #944
    • Migrate weighted loadbalance strategy to user new RequestInterceptors for Stats tracking #946
    • Update WeightedLoadbalanceStrategy to use a Builder #949

    :beetle: Bug Fixes

    • Improve Loadbalance implementations and test coverage #953
    • LoadBalancedRSocketMono (LatencySubscriber) does not seem to propagate subscriber context properly #822
    • Load balancer throwing several exceptions when refreshing available sockets #786
    • LoadBalancedRSocketMono loose sockets on server restart #633

    :hammer: Other

    • Upgrade to Reactor 2020.0.0 #952
    Source code(tar.gz)
    Source code(zip)
  • 1.0.3(Oct 26, 2020)

    :star: New Features

    • Avoid queueing in UnicastProcessor receivers #932
    • Update username length to align with the spec (uint8 vs uint16) #938
    • Use heap buffers in the default payload decoder #945

    :beetle: Bug Fixes

    • Safer iteration when cancelling subscriptions #941

    :hammer: Other

    • Upgrade to Reactor Dysprosium-SR13 #951
    • Dependencies for 1.1.0-M02 missing from Maven Central? #940
    • Use static errors in RSocket default method implementations #933

    :heart: Contributors

    We'd like to thank all the contributors who worked on this release!

    Source code(tar.gz)
    Source code(zip)
  • 1.1.0-RC1(Oct 12, 2020)

    :star: New Features

    • Resume mechanism improvements & refactor #766

    :beetle: Bug Fixes

    • Ensures that resumability implementation is stable #934
    • Resume related Netty ByteBuf leak #779

    :hammer: Other

    • Upgrade to Reactor 2020.0 RC2 #943
    Source code(tar.gz)
    Source code(zip)
  • 1.1.0-M2(Sep 14, 2020)

    :star: New Features

    • Improve DuplexConnection api and rework Resumability #923
    • Changes LoadbalanceStrategy to accept List #919
    • Expose remote address via DuplexConnection #929
    • Ability to intercept requests and access channel information such as remote address #735

    :beetle: Bug Fixes

    • Ensure RRRSubscriber doesn't cancel subscription on onNext #918

    :hammer: Other

    • Make LoadbalanceStrategy implementations public #928
    Source code(tar.gz)
    Source code(zip)
  • 1.1.0-M1(Aug 10, 2020)

    :star: New Features

    • Introduces RSocketClient as way to use RSocket from the connector side #850
    • Provides new Loadbalancer implementation #899
    • Implements dedicated Publisher/Subscriber for each request type #761 #761
    • Reworks multiplexer and improves tests to check async interceptors #909 #909
    • Improves ref-counting by eliminating redundant retain/release #902
    • Reject frames less than min MTU size #913

    :hammer: Other

    • Removes deprecations dedicated for 1.1 #906
    • Upgrade to Reactor 2020.0.0-M2 #890
    Source code(tar.gz)
    Source code(zip)
  • 1.0.2(Aug 10, 2020)

    :star: New Features

    • Reject any frames before SETUP/LEASE #904
    • Reject empty fragments #903
    • RSocket server is subject to empty frames flood denial-of-service #895
    • provides setup for maxInboundPayloadSize and maxFrameSize #876

    :beetle: Bug Fixes

    • fixes extra Payload release on racing complete and cancel #894
    • fixes DefaultPayload.create(ByteBuf, ByteBuf) to release params #886

    :notebook_with_decorative_cover: Documentation

    • provides an example of peer to peer communication #868
    Source code(tar.gz)
    Source code(zip)
  • 1.0.1(Jun 8, 2020)

    :star: New Features

    • Exposes API for the full customization of websocket sending / handling #835

    :beetle: Bug Fixes

    • Provides error handling in case Bubbling / NIC exceptions #863
    • Fixes responder to handle errors according to the spec #861
    • Fixes ConnectionSetupPayload refcnt management #854
    • Ensures streams are terminated if racing terminal and new stream #848
    • Fixes ReconnectMono behaviour when racing invalidate and subscribe #847 #847
    • Bugfix RSocketConnector#setupPayload to copy the content #843

    :notebook_with_decorative_cover: Documentation

    • RSocketClient and RSocketConnector Javadoc update #856
    • Provides well documented lease example #840
    • Update WebSocketHeadersSample #839

    :hammer: Other

    • Ensures there is no extra logging about a dropped error #859
    • Ensures lease is used exactly on request frame sending #855
    • Forward compatibility with Reactor Netty 1.0 #851
    • Apply FragmentationDuplexConnection from a single place #836
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(May 12, 2020)

    :star: New Features

    • Includes LimitRateInterceptor as a built-in interceptor #834
    • Provides bindNow shortcut for the server #830
    • Removes limit rate operator #829
    • Provides support for TracingMetadata #824
    • Renames Flyweight classes to Codec #820
    • Relaxed stream id supplement #814
    • Provides Payload refCnt verification #809
    • Deprecate AbstractRSocket and provide alternative #805
    • Add Zipkin Tracing Flyweight #583

    :beetle: Bug Fixes

    • Fxes onClose behaviour to not error on shutdown #833
    • Provides ordered stream id issuing #811
    • Adds missing NPE catch in order to handle rare racing cases #808
    • NullPointerException on stream cancellation #757
    • Requests may have non-sequential stream ids #749

    :notebook_with_decorative_cover: Documentation

    • Provides extra @NonNullApi annotation for all packages #826
    • Add Javadoc to RSocketConnector and RSocketServer #813

    :hammer: Other

    • Removes deprecated errorConsumer #832
    • Provides supportive class to hide concurrency complexity #807
    • Reduces maintenance complexity #806
    • Removes TupleByteBufs #804
    • Deprecate ResponderRSocket #802
    • Rename XXXFlyweight in frames package to XXXCodec #787
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0-RC7(Apr 28, 2020)

    :star: New Features

    • Deprecate ResumeStrategy and replace with Reactor Retry #798
    • Send root-cause errors from connection to onClose of RSocket #797
    • Consolidate use of single ByteBufAllocator exposed from the transport #796
    • Fixes Payload#hasMetadata to strictly match the flag in the frame #783
    • Add RSocketConnector and RSocketServer as replacements for RSocketFactory #780
    • Support for reconnectable, shared Mono #759
    • Provides Prioritised delivering of Zero Streams Frames #718

    :beetle: Bug Fixes

    • Provides more ByteBuf leaks fixes #803
    • Apply recent changes in UnicastProcessor to UnboundedProcessor related to buffer leaks #799
    • Fixes requestChannel to ensure support of half-closed state #794
    • Use doOnDiscard to release cached Payloads #777
    • Enforces reassembly of fragments to always be in enabled #775
    • Pre-validate Payload length at stream level to avoid closing connection #768
    • NullPointerException on stream cancellation #757
    • Fixes behavior of RequestChannel #736

    :hammer: Other

    • Default errorConsumer to no-op in the new API #801
    • Fix logging in examples and load-balancer tests #800
    • Drop UriHandler and UriTransportRegistry #795
    • Avoid sending of redundant ERROR and CANCEL frames #792
    • Upgrade to Reactor Dysprosium-SR7 #789
    • Remove package cycles involving lease, internal, exception, and frame #774
    • Move RSocketFactory implementation to core sub-package to avoid dependencies from top-level package #770
    • Provides support for the JDK 14 based builds #763
    • Disables gradle modules feature #751
    • Provides rollback to CompositeByteBuf usage #750
    • Add MESSAGE_RSOCKET_MIMETYPE and MESSAGE_RSOCKET_ACCEPT_MIMETYPES #744
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0-RC6(Feb 4, 2020)

  • 1.0.0-RC3(Aug 30, 2019)

  • 1.0.0-RC2(Aug 1, 2019)

  • 0.12.2-RC4(Jun 11, 2019)

    Implementation of CompositeMetadata Extension - https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md

    Fixes bugs found with fragmentation

    Source code(tar.gz)
    Source code(zip)
  • 0.12.2-RC3(May 22, 2019)

  • 0.12.2-RC2(Apr 18, 2019)

  • 0.12.2-RC1(Apr 16, 2019)

    Fixes a bug in a request channel with long running streams where the channel would stop emitting data

    Fixes a bug in fragmentation when the mtu is 64 bytes that caused an illegal refcnt exception

    Replaces compositebytebufs with specialized ByteBuf tuples of 2 and 3

    Source code(tar.gz)
    Source code(zip)
  • 0.12.1-RC3(Apr 8, 2019)

  • 0.11.21(Mar 28, 2019)

    This release provides some major bugfixes related to the connection closing and introduces a more flexible way to convert RSocketFactory.receiver() to ServerTransport.ConnectionAcceptor, hence easily integrate with WebsocketRoutingTransport and existing HttpClient

    Source code(tar.gz)
    Source code(zip)
  • 0.11.17.2(Mar 28, 2019)

  • 0.11.18(Mar 13, 2019)

  • 0.12.1-RC2(Feb 28, 2019)

  • 0.12.1-RC1(Feb 27, 2019)

  • 0.11.17(Feb 19, 2019)

    Fixes fragmentation when metadata frame isn't present was leading to an out of bounds Exception. Takes into account a metadata frame not being present.

    Source code(tar.gz)
    Source code(zip)
  • 0.11.16(Feb 4, 2019)

  • 0.11.15(Dec 20, 2018)

Owner
RSocket
Streaming message protocol with Reactive Extension/Stream semantics
RSocket
Realtime Client Server Framework for the JVM, supporting WebSockets with Cross-Browser Fallbacks

Welcome to Atmosphere: The Event Driven Framework supporting WebSocket and HTTP The Atmosphere Framework contains client and server side components fo

Atmosphere Framework 3.6k Jan 3, 2023
A small java project consisting of Client and Server, that communicate via TCP/UDP protocols.

Ninja Battle A small java project consisting of Client and Server, that communicate via TCP/UDP protocols. Client The client is equipped with a menu i

Steliyan Dobrev 2 Jan 14, 2022
TCP/UDP client/server library for Java, based on Kryo

KryoNet can be downloaded on the releases page. Please use the KryoNet discussion group for support. Overview KryoNet is a Java library that provides

Esoteric Software 1.7k Jan 2, 2023
A High Performance Network ( TCP/IP ) Library

Chronicle-Network About A High Performance Network library Purpose This library is designed to be lower latency and support higher throughputs by empl

Chronicle Software : Open Source 231 Dec 31, 2022
TCP/IP packet demultiplexer. Download from:

TCPFLOW 1.5.0 Downloads directory: http://digitalcorpora.org/downloads/tcpflow/ Installation Most common GNU/Linux distributions ship tcpflow in their

Simson L. Garfinkel 1.5k Jan 4, 2023
FileServer - A multithreaded client-server program that uses Java Sockets to establish TCP/IP connection

A multithreaded client-server program that uses Java Sockets to establish TCP/IP connection. The server allows multiple clients to upload, retrieve and delete files on/from the server.

Lokesh Bisht 3 Nov 13, 2022
IoT Platform, Device management, data collection, processing and visualization, multi protocol, rule engine, netty mqtt client

GIoT GIoT: GIoT是一个开源的IoT平台,支持设备管理、物模型,产品、设备管理、规则引擎、多种存储、多sink、多协议(http、mqtt、tcp,自定义协议)、多租户管理等等,提供插件化开发 Documentation Quick Start Module -> giot-starte

gerry 34 Sep 13, 2022
A Java library that implements a ByteChannel interface over SSLEngine, enabling easy-to-use (socket-like) TLS for Java applications.

TLS Channel TLS Channel is a library that implements a ByteChannel interface over a TLS (Transport Layer Security) connection. It delegates all crypto

Mariano Barrios 149 Dec 31, 2022
Java library for representing, parsing and encoding URNs as in RFC2141 and RFC8141

urnlib Java library for representing, parsing and encoding URNs as specified in RFC 2141 and RFC 8141. The initial URN RFC 2141 of May 1997 was supers

SLUB 24 May 10, 2022
Pcap editing and replay tools for *NIX and Windows - Users please download source from

Tcpreplay Tcpreplay is a suite of GPLv3 licensed utilities for UNIX (and Win32 under Cygwin) operating systems for editing and replaying network traff

AppNeta, Inc. 956 Dec 30, 2022
Android application allowing to sniff and inject Zigbee, Mosart and Enhanced ShockBurst packets on a Samsung Galaxy S20

RadioSploit 1.0 This Android application allows to sniff and inject Zigbee, Mosart and Enhanced ShockBurst packets from a Samsung Galaxy S20 smartphon

Romain Cayre 52 Nov 1, 2022
Telegram API Client and Telegram BOT API Library and Framework in Pure java.

Javagram Telegram API Client and Telegram Bot API library and framework in pure Java. Hello Telegram You can use Javagram for both Telegram API Client

Java For Everything 3 Oct 17, 2021
Fibers and actors for web development

COMSAT Scalable, Concurrent Web Apps Getting started Add the following Maven/Gradle dependencies: Feature Artifact Servlet integration for defining fi

Parallel Universe 600 Dec 23, 2022
An annotation-based Java library for creating Thrift serializable types and services.

Drift Drift is an easy-to-use, annotation-based Java library for creating Thrift clients and serializable types. The client library is similar to JAX-

null 225 Dec 24, 2022
ssh, scp and sftp for java

sshj - SSHv2 library for Java To get started, have a look at one of the examples. Hopefully you will find the API pleasant to work with :) Getting SSH

Jeroen van Erp 2.2k Jan 8, 2023
Efficient reliable UDP unicast, UDP multicast, and IPC message transport

Aeron Efficient reliable UDP unicast, UDP multicast, and IPC message transport. Java and C++ clients are available in this repository, and a .NET clie

Real Logic 6.3k Dec 27, 2022
LINE 4.1k Dec 31, 2022
Square’s meticulous HTTP client for the JVM, Android, and GraalVM.

OkHttp See the project website for documentation and APIs. HTTP is the way modern applications network. It’s how we exchange data & media. Doing HTTP

Square 43.4k Jan 9, 2023