MQTT broker(java实现高性能的、可扩展、支持集群)

Overview

image SMQTT是一款开源的MQTT消息代理Broker,

SMQTT基于Netty开发,底层采用Reactor3反应堆模型,支持单机部署,支持容器化部署,具备低延迟,高吞吐量,支持百万TCP连接,同时支持多种协议交互,是一款非常优秀的消息中间件!

smqtt目前拥有的功能如下:

架构图

  1. 消息质量等级实现(支持qos0,qos1,qos2)
  2. topicFilter支持
    • topic分级(test/test)
    • +支持(单层匹配)
    • #支持(多层匹配)
  3. 会话消息
    • 默认内存存储
    • 支持持久化(redis/db)
  4. 保留消息
    • 默认内存存储
    • 支持持久化(redis/db)
  5. 遗嘱消息

    设备掉线时候触发

  6. 客户端认证
    • 支持spi注入外部认证
  7. tls加密
    • 支持tls加密(mqtt端口/http端口)
  8. websocket协议支持

    使用websocket协议包装mqtt协议

  9. http协议交互
    • 支持http接口推送消息
    • 支持spi扩展http接口
  10. SPI接口扩展支持
    • 消息管理接口(会话消息/保留消息管理)
    • 通道管理接口 (管理系统的客户端连接)
    • 认证接口 (用于自定义外部认证)
    • 拦截器 (用户自定义拦截消息)
  11. 集群支持(gossip协议实现)
  12. 容器化支持

    默认镜像最新tag: 1ssqq1lxr/smqtt

  13. 持久化支持(session 保留消息)
  14. 管理后台

    请参考smqtt文档如何启动管理后台

main方式启动

引入依赖

<!--smqtt依赖 -->
<dependency>
  <groupId>io.github.quickmsg</groupId>
  <artifactId>smqtt-core</artifactId>
  <version>1.0.6</version>
</dependency>
<!--集群依赖 -->
<dependency>
   <artifactId>smqtt-registry-scube</artifactId>
   <groupId>io.github.quickmsg</groupId>
   <version>1.0.6</version>
</dependency>
<!--管理ui依赖 -->
<dependency>
   <artifactId>smqtt-ui</artifactId>
   <groupId>io.github.quickmsg</groupId>
   <version>1.0.6</version> 
</dependency>

阻塞式启动服务:

Bootstrap.builder()
          .rootLevel(Level.INFO)
          .wiretap(false)
          .port(8555)
          .websocketPort(8999)
          .options(channelOptionMap -> { })//netty options设置
          .childOptions(channelOptionMap -> { }) //netty childOptions设置
          .highWaterMark(1000000)
          .reactivePasswordAuth((U, P) -> true)
          .lowWaterMark(1000)
          .ssl(false)
          .sslContext(new SslContext("crt", "key"))
          .isWebsocket(true)
          .httpOptions(Bootstrap.HttpOptions.builder().enableAdmin(true).ssl(false).accessLog(true).build())
          .clusterConfig(
               ClusterConfig.builder()
                                .clustered(false)
                                .port(7773)
                                .nodeName("node-2")
                                .clusterUrl("127.0.0.1:7771,127.0.0.1:7772")
                                .build()
           )
           .build()
           .startAwait();

非阻塞式启动服务:

 
Bootstrap bootstrap = Bootstrap.builder()
          .rootLevel(Level.INFO)
          .wiretap(false)
          .port(8555)
          .websocketPort(8999)
          .options(channelOptionMap -> { })//netty options设置
          .childOptions(channelOptionMap -> { }) //netty childOptions设置
          .highWaterMark(1000000)
          .reactivePasswordAuth((U, P) -> true)
          .lowWaterMark(1000)
          .ssl(false)
          .sslContext(new SslContext("crt", "key"))
          .isWebsocket(true)
          .httpOptions(Bootstrap.HttpOptions.builder().enableAdmin(true).ssl(false).accessLog(true).build())
          .clusterConfig(
               ClusterConfig.builder()
                                .clustered(false)
                                .port(7773)
                                .nodeName("node-2")
                                .clusterUrl("127.0.0.1:7771,127.0.0.1:7772")
                                .build()
           )
           .build()
           .start().block();

jar方式

  1. 下载源码 mvn compile package -Dmaven.test.skip=true -P jar,web
  在smqtt-bootstrap/target目录下生成jar
  1. 准备配置文件 config.properties
    
# 日志级别 ALL|TRACE|DEBUG|INFO|WARN|ERROR|OFF
smqtt.log.level=INFO
# 开启tcp端口
smqtt.tcp.port=1883
# 高水位
smqtt.tcp.lowWaterMark=4000000
# 低水位
smqtt.tcp.highWaterMark=80000000
# 开启ssl加密
smqtt.tcp.ssl=false
# 证书crt smqtt.tcp.ssl.crt =
# 证书key smqtt.tcp.ssl.key =
# 开启日志
smqtt.tcp.wiretap=false
# boss线程
smqtt.tcp.bossThreadSize=4
# work线程
smqtt.tcp.workThreadSize=8
# websocket端口
smqtt.websocket.port=8999
# websocket开启
smqtt.websocket.enable=true
# smqtt用户
smqtt.tcp.username=smqtt
# smqtt密码
smqtt.tcp.password=smqtt
# 开启http
smqtt.http.enable=true
# 开启http日志
smqtt.http.accesslog=true
# 开启ssl
smqtt.http.ssl.enable=false
# smqtt.http.ssl.crt =
# smqtt.http.ssl.key =
# 开启管理后台(必须开启http)
smqtt.http.admin.enable=true
# 管理后台登录用户
smqtt.http.admin.username=smqtt
# 管理后台登录密码
smqtt.http.admin.password=smqtt
# 开启集群
smqtt.cluster.enable=false
# 集群节点地址
smqtt.cluster.url=127.0.0.1:7771,127.0.0.1:7772
# 节点端口
smqtt.cluster.port=7771
# 节点名称
smqtt.cluster.node=node-1
# 容器集群映射主机
# smqtt.cluster.external.host = localhost
# 容器集群映射port
smqtt.cluster.external.port
  1. 启动服务
  java -jar smqtt-bootstrap-1.0.1-SNAPSHOT.jar <conf.properties路径>

docker 方式

拉取镜像

# 拉取docker镜像地址
docker pull 1ssqq1lxr/smqtt:latest

启动镜像默认配置

# 启动服务
docker run -it  -p 1883:1883 1ssqq1lxr/smqtt

启动镜像使用自定义配置(同上准备配置文件conf.properties)

# 启动服务
docker run -it  -v <配置文件路径目录>:/conf -p 1883:1883  -p 1999:1999 1ssqq1lxr/smqtt

测试服务(启动http端口)

  • 启动客户端订阅主题 test/+

  • 使用http接口推送mqtt消息

# 推送消息
curl -H "Content-Type: application/json" -X POST -d '{"topic": "test/teus", "qos":2, "retain":true, "message":"我来测试保留消息3" }' "http://localhost:1999/smqtt/publish"

管理后台(60000端口)

如何开启

  • main启动

    设置httpOptions && enableAdmin = true

    Bootstrap.httpOptions(Bootstrap.HttpOptions.builder().enableAdmin(true).ssl(false).accessLog(true).build())
    
    
  • jar / docker 启动

    设置config.properties

    # 开启http
    smqtt.http.enable=true
    # 开启http日志
    smqtt.http.accesslog=true
    # 开启ssl
    smqtt.http.ssl.enable=false
    # smqtt.http.ssl.crt =
    # smqtt.http.ssl.key =
    # 开启管理后台(必须开启http)
    smqtt.http.admin.enable=true
    # 管理后台登录用户
    smqtt.http.admin.username=smqtt
    # 管理后台登录密码
    smqtt.http.admin.password=smqtt  
    

页面预览

image

压测文档

点这里

wiki地址

集群类配置参考文档:

smqtt文档

License

Apache License, Version 2.0

麻烦关注下公众号!

image

  • 添加微信号Lemon877164954,拉入smqtt官方交流群
  • 加入qq群 700152283
You might also like...

Rqueue aka Redis Queue [Task Queue, Message Broker] for Spring framework

Rqueue aka Redis Queue [Task Queue, Message Broker] for Spring framework

Rqueue: Redis Queue, Task Queue, Scheduled Queue for Spring and Spring Boot Rqueue is an asynchronous task executor(worker) built for spring and sprin

Jan 5, 2023
Comments
  • mqtt over websocket的8999的端口如何连接?

    mqtt over websocket的8999的端口如何连接?

    大佬您好,1883mqtt的可以连接发送消息,mqtt over websocket 的有什么客户端可以测试么?网上找的mqtt over websocket连接界面,可以连接其他人的mqtt over websocket,连接123.57.69.210:8999的会失败,请问知道是哪块配置错了吗?或者这块有什么注意事项或者推荐吗

    opened by aboluo-gy 4
  • Update MqttChannel.java

    Update MqttChannel.java

    Reduce the generated IDs repetition probability,optimize the generation policy of this method (com.fairland.aiot.mqtt.common.channel.MqttChannel#generateId).

    opened by wakafff 1
  • cluster集群创建问题

    cluster集群创建问题

    cluster集群属性配置: cluster: # 集群配置 enable: ${CLUSTER_ENABLE:true} # 集群开关 url: ${CLUSTER_URL:192.168.26.123:7771,192.168.26.124:7771,192.168.25.100:7771} # 启动节点 port: ${CLUSTER_PORT:7771} # 端口 7772 node: ${CLUSTER_NODE:node-1} # 集群节点名称 唯一,123:node-2,124:node-3 namespace: ${CLUSTER_NAMESPACE:topband} # 集群空间,需要一致才能通信 集群以docker方式启动(--net host),192.168.25.100为本机节点IP,DEBUG日志可以看到与123、124建立了连接: DEBUG r.n.resources.NewConnectionProvider - [id:43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771] Connected new channel DEBUG r.n.resources.NewConnectionProvider - [id:43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771] onStateChange([connected], SimpleConnection{channel=[id: 0x43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771]}) DEBUG r.n.resources.NewConnectionProvider - [id:43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771] onStateChange([configured], ChannelOperations{SimpleConnection{channel=[id: 0x43e77142, L:/192.168.25.100:55297 - R:/192.168.26.123:7771]}}) DEBUG r.n.resources.NewConnectionProvider - [id:46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771] Connected new channel DEBUG r.n.resources.NewConnectionProvider - [id:46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771] onStateChange([connected], SimpleConnection{channel=[id: 0x46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771]}) DEBUG r.n.resources.NewConnectionProvider - [id:46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771] onStateChange([configured], ChannelOperations{SimpleConnection{channel=[id: 0x46eb97b2, L:/192.168.25.100:55298 - R:/192.168.26.124:7771]}})

    本地机器ScubeClusterRegistry类的spreadMessage()方法向集群传播消息时,打印cluster.otherMembers(): INFO i.g.q.registry.ScubeClusterRegistry - cluster: [topband:[email protected]:7771] 而124机器上打印cluster.otherMembers():[],为空 请问这是怎么回事?

    opened by airdive 0
  • 遇到空指针异常

    遇到空指针异常

    12:13:48.153 [business-io-1262] ERROR i.g.q.core.protocol.PublishProtocol - error
    java.lang.NullPointerException: null
    	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1693)
    	at io.github.quickmsg.core.spi.DefaultMessageRegistry.saveSessionMessage(DefaultMessageRegistry.java:32)
    	at io.github.quickmsg.core.protocol.PublishProtocol.filterOfflineSession(PublishProtocol.java:128)
    	at io.github.quickmsg.core.protocol.PublishProtocol.lambda$send$0(PublishProtocol.java:102)
    	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178)
    	at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    	at io.github.quickmsg.core.protocol.PublishProtocol.send(PublishProtocol.java:109)
    	at io.github.quickmsg.core.protocol.PublishProtocol.parseProtocol(PublishProtocol.java:68)
    	at io.github.quickmsg.common.protocol.Protocol.lambda$doParseProtocol$0(Protocol.java:27)
    	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
    	at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
    	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
    	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    	at java.base/java.lang.Thread.run(Thread.java:833)
    
    opened by feimenggo 7
Releases(1.1.4)
Owner
quickmsg
quick message transport!
quickmsg
An Open-Source, Distributed MQTT Broker for IoT.

简体中文 | English MMQ broker MMQ broker 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序。 MMQ broker 完整支持MQTT V3.1 和 V3.1.1。 特征 分布式MQTT服务 万级连接数并发(3台

Solley 60 Dec 15, 2022
Rqueue aka Redis Queue [Task Queue, Message Broker] for Spring framework

Rqueue: Redis Queue, Task Queue, Scheduled Queue for Spring and Spring Boot Rqueue is an asynchronous task executor(worker) built for spring and sprin

Sonu Kumar 221 Jan 5, 2023
An Open-Source, Distributed MQTT Message Broker for IoT.

MMQ broker MMQ broker 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序。 MMQ broker 完整支持MQTT V3.1 和 V3.1.1。 安装 MMQ broker 是跨平台的,支持 Linux、Unix、macOS

Solley 60 Dec 15, 2022
Hi, Spring fans! In this installment we look Spring Integration's support for MQTT and the HiveMQ broker

Spring Integration MQTT & HiveMQ Hi, Spring fans! In this installment we look Spring Integration's support for MQTT and the HiveMQ broker. I'm joined

Spring Tips 5 Nov 21, 2022
An Open-Source, Distributed MQTT Broker for IoT.

简体中文 | English MMQ broker MMQ broker 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序。 MMQ broker 完整支持MQTT V3.1 和 V3.1.1。 特征 分布式MQTT服务 万级连接数并发(3台

Solley 60 Dec 15, 2022
IoT Platform, Device management, data collection, processing and visualization, multi protocol, rule engine, netty mqtt client

GIoT GIoT: GIoT是一个开源的IoT平台,支持设备管理、物模型,产品、设备管理、规则引擎、多种存储、多sink、多协议(http、mqtt、tcp,自定义协议)、多租户管理等等,提供插件化开发 Documentation Quick Start Module -> giot-starte

gerry 34 Sep 13, 2022
ZapIt - An advanced MQTT client made for the modern age

ZapIt An advanced MQTT client made for the modern age. Documentation Please refer to the Wiki Section. Installing Go to the Releases section and downl

chocoearly44 4 Oct 8, 2022
Fast and reliable message broker built on top of Kafka.

Hermes Hermes is an asynchronous message broker built on top of Kafka. We provide reliable, fault tolerant REST interface for message publishing and a

Allegro Tech 742 Jan 3, 2023
Android app for Ribbit, Broker API Reference App

Ribbit Reference Implementation (Android) The reference implementation for designing the Android user interface of a broker-dealer trading application

Alpaca 12 Nov 24, 2022